首先上代码
#--------------------------
# QAT quantization QAT量化
import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
model_fp32 = 'hrnet_sim.onnx'
model_quant = 'hrnet.quant.onnx'
quantized_model = quantize_dynamic(model_fp32, model_quant)
量化完发现报错
NOT_IMPLEMENTED : Could not find an implementation for ConvInteger(10) node with name 'Conv_0_quant'
百度发现答案
我们把weight_type改成QUInt8
quantized_model = quantize_dynamic(model_fp32, model_quant,weight_type=QuantType.QUInt8)
静态量化参考链接
https://blog.csdn.net/m0_63642362/article/details/124741589? 这里我们直接量化,代码如下
import os
import numpy as np
import cv2
from PIL import Image
from paddle.vision.transforms import Compose, Resize, CenterCrop, Normalize
from onnxruntime.quantization import CalibrationDataReader, QuantFormat, quantize_static, QuantType, CalibrationMethod
from onnxruntime import InferenceSession, get_available_providers
# 模型路径
model_fp32 = 'hrnet_coco_w32_256x192.onnx'
model_quant_static = 'hrnet_quant.onnx'
# 数据预处理
'''
缩放 -> 中心裁切 -> 类型转换 -> 转置 -> 归一化 -> 添加维度
'''
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
val_transforms = Compose(
[
Resize(256, interpolation="bilinear"),
CenterCrop(224),
lambda x: np.asarray(x, dtype='float32').transpose(2, 0, 1) / 255.0,
Normalize(mean, std),
lambda x: x[None, ...]
]
)
# 用于校准的图像数据
'''
读取图像 -> 预处理 -> 组成数据字典
'''
img_dir = 'data/person/'
img_num = 775
# datas = [
# val_transforms(
# Image.open(os.path.join(img_dir, img)).convert('RGB')
# ) for img in os.listdir(img_dir)[:img_num]
# ]
datas = []
#数据预处理
def _get_3rd_point(a, b):
"""To calculate the affine matrix, three pairs of points are required. This
function is used to get the 3rd point, given 2D points a & b.
The 3rd point is defined by rotating vector `a - b` by 90 degrees
anticlockwise, using b as the rotation center.
Args:
a (np.ndarray): point(x,y)
b (np.ndarray): point(x,y)
Returns:
np.ndarray: The 3rd point.
"""
assert len(a) == 2
assert len(b) == 2
direction = a - b
third_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)
return third_pt
def rotate_point(pt, angle_rad):
"""Rotate a point by an angle.
Args:
pt (list[float]): 2 dimensional point to be rotated
angle_rad (float): rotation angle by radian
Returns:
list[float]: Rotated point.
"""
assert len(pt) == 2
sn, cs = np.sin(angle_rad), np.cos(angle_rad)
new_x = pt[0] * cs - pt[1] * sn
new_y = pt[0] * sn + pt[1] * cs
rotated_pt = [new_x, new_y]
return rotated_pt
def get_affine_transform(center,
scale,
rot,
output_size,
shift=(0., 0.),
inv=False):
"""Get the affine transform matrix, given the center/scale/rot/output_size.
Args:
center (np.ndarray[2, ]): Center of the bounding box (x, y).
scale (np.ndarray[2, ]): Scale of the bounding box
wrt [width, height].
rot (float): Rotation angle (degree).
output_size (np.ndarray[2, ] | list(2,)): Size of the
destination heatmaps.
shift (0-100%): Shift translation ratio wrt the width/height.
Default (0., 0.).
inv (bool): Option to inverse the affine transform direction.
(inv=False: src->dst or inv=True: dst->src)
Returns:
np.ndarray: The transform matrix.
"""
assert len(center) == 2
assert len(scale) == 2
assert len(output_size) == 2
assert len(shift) == 2
# pixel_std is 200.
scale_tmp = scale * 200.0
shift = np.array(shift)
src_w = scale_tmp[0]
dst_w = output_size[0]
dst_h = output_size[1]
rot_rad = np.pi * rot / 180
src_dir = rotate_point([0., src_w * -0.5], rot_rad)
dst_dir = np.array([0., dst_w * -0.5])
src = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center + scale_tmp * shift
src[1, :] = center + src_dir + scale_tmp * shift
src[2, :] = _get_3rd_point(src[0, :], src[1, :])
dst = np.zeros((3, 2), dtype=np.float32)
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])
if inv:
trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return trans
def bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):
"""Transform the bbox format from (x,y,w,h) into (center, scale)
Args:
bbox (ndarray): Single bbox in (x, y, w, h)
aspect_ratio (float): The expected bbox aspect ratio (w over h)
padding (float): Bbox padding factor that will be multilied to scale.
Default: 1.0
pixel_std (float): The scale normalization factor. Default: 200.0
Returns:
tuple: A tuple containing center and scale.
- np.ndarray[float32](2,): Center of the bbox (x, y).
- np.ndarray[float32](2,): Scale of the bbox w & h.
"""
x, y, w, h = bbox[:4]
center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)
if w > aspect_ratio * h:
h = w * 1.0 / aspect_ratio
elif w < aspect_ratio * h:
w = h * aspect_ratio
scale = np.array([w, h], dtype=np.float32) / pixel_std
scale = scale * padding
return center, scale
##############################
def process(img):
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # hwc rgb
image_size=[192,256]
aspect_ratio = image_size[0] / image_size[1]
img_height = img.shape[0]
img_width = img.shape[1]
bbox = [0,0,img_width,img_height,0.9]
padding=1.25
pixel_std=200
center, scale = bbox_xywh2cs(
bbox,
aspect_ratio,
padding,
pixel_std)
trans = get_affine_transform(center, scale, 0, image_size)
img = cv2.warpAffine(#图像仿射变换输出为(192,256)
img,
trans, (int(image_size[0]), int(image_size[1])),
flags=cv2.INTER_LINEAR)
img = np.transpose(img, (2, 0, 1)).astype(np.float32) # chw rgb
img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229
img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224
img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225
return img
filelist = os.listdir(img_dir)
for file in filelist:
img = cv2.imread(img_dir+file)
img = process(img)
datas.append([img])
###############################
# 数据批次读取器
def batch_reader(datas, batch_size):
_datas = []
length = len(datas)
for i, data in enumerate(datas):
if batch_size==1:
yield {'inputs': data}
elif (i+1) % batch_size==0:
_datas.append(data)
yield {'inputs': np.concatenate(_datas, 0)}
_datas = []
elif i<length-1:
_datas.append(data)
else:
_datas.append(data)
yield {'inputs': np.concatenate(_datas, 0)}
# 构建校准数据读取器
'''
实质是一个迭代器
get_next 方法返回一个如下样式的字典
{
输入 1: 数据 1,
...
输入 n: 数据 n
}
记录了模型的各个输入和其对应的经过预处理后的数据
'''
class DataReader(CalibrationDataReader):
def __init__(self, datas, batch_size):
self.datas = batch_reader(datas, batch_size)
def get_next(self):
return next(self.datas, None)
# 实例化一个校准数据读取器
# data_reader = DataReader(datas, 32)
data_reader = DataReader(datas, 1)
# 静态量化
quantize_static(
model_input=model_fp32, # 输入模型
model_output=model_quant_static, # 输出模型
calibration_data_reader=data_reader, # 校准数据读取器
quant_format= QuantFormat.QDQ, # 量化格式 QDQ / QOperator
# activation_type=QuantType.QInt8, # 激活类型 Int8 / UInt8
activation_type=QuantType.QInt8, # 激活类型 Int8 / UInt8
weight_type=QuantType.QInt8, # 参数类型 Int8 / UInt8
calibrate_method=CalibrationMethod.MinMax, # 数据校准方法 MinMax / Entropy / Percentile
optimize_model=True # 是否优化模型
)
出现报错
onnx Invalid Feed Input Name:inputs
打开模型我们发现输入的名称为input.1
那就改一下推理的输入dict的键值
改完就跑成功啦,模型也在资源上传给大家,有需要的自取。
如果对与模型的精度有进一步的要求,我们可以将量化参数改为U8U8,同时
方法采用Entropy
activation_type=QuantType.QUInt8, # 激活类型 Int8 / UInt8
weight_type=QuantType.QUInt8, # 参数类型 Int8 / UInt8
calibrate_method=CalibrationMethod.Entropy, # 数据校准方法 MinMax / Entropy
量化完之后我们对他转rknn,这里注意,为了使得rknn的精度和onnx保持一致,我们要将optimization_level=2,默认是3.
from rknn.api import RKNN
import cv2
import numpy as np
# ONNX_MODEL = 'hrnet_coco_w32_256x192.onnx'
# ONNX_MODEL = 'action.onnx'
ONNX_MODEL = 'hrnet_quant_new.onnx'
RKNN_MODEL = 'hrnet_quant_new.rknn'
IMG_PATH = "1.png"
if __name__ == '__main__':
# Create RKNN object
rknn = RKNN(verbose=True)
mean = [123.675, 116.28, 103.53]
std= [58.395, 57.12, 57.375]
# pre-process config
print('--> config model')
rknn.config(target_platform='rk3588',mean_values=[mean], std_values=[std],quant_img_RGB2BGR=True,optimization_level=2)
# 如果quant_img_RGB2BGR=True优先做RGB转换再做减均值
print('done')
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
# ret = rknn.build(do_quantization=True, dataset='dataset.txt') # ,pre_compile=True
ret = rknn.build(do_quantization=False) # ,pre_compile=True
if ret != 0:
print('Build failed!')
exit(ret)
print('done')
# Export rknn model
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export hrnet_w32_macaque_256x192-f7e9e04f_20210407_sim.rknn failed!')
exit(ret)
print('done')
##########################################精度测试
rknn.release()
这里注意,在rknn里面onnx的opset_version也对模型有一定的影响,我们这里是11可以在转之前改成12.
import onnx
from onnx import version_converter, helper
# import onnxruntime
# help(onnx)
# Preprocessing: load the model to be converted.
model_path = 'hrnet_quant.onnx'
original_model = onnx.load(model_path)
original_model.opset_import[0].version = 12
# original_model.ir_version = 6
onnx.save(original_model, "hrnet_quant_new.onnx")
print(original_model.opset_import[0].version)