0
点赞
收藏
分享

微信扫一扫

【onnx量化】hrnet模型并转rknn3588部署


首先上代码

#--------------------------
# QAT quantization  QAT量化
import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
 
model_fp32 = 'hrnet_sim.onnx'
model_quant = 'hrnet.quant.onnx'
quantized_model = quantize_dynamic(model_fp32, model_quant)

量化完发现报错

NOT_IMPLEMENTED : Could not find an implementation for ConvInteger(10) node with name 'Conv_0_quant'

百度发现答案

【onnx量化】hrnet模型并转rknn3588部署_Image


我们把weight_type改成QUInt8

【onnx量化】hrnet模型并转rknn3588部署_数据读取_02

quantized_model = quantize_dynamic(model_fp32, model_quant,weight_type=QuantType.QUInt8)

静态量化参考链接
https://blog.csdn.net/m0_63642362/article/details/124741589? 这里我们直接量化,代码如下

import os
import numpy as np
import cv2
from PIL import Image
from paddle.vision.transforms import Compose, Resize, CenterCrop, Normalize
from onnxruntime.quantization import CalibrationDataReader, QuantFormat, quantize_static, QuantType, CalibrationMethod
from onnxruntime import InferenceSession, get_available_providers
 
# 模型路径
model_fp32 = 'hrnet_coco_w32_256x192.onnx'
model_quant_static = 'hrnet_quant.onnx'
 
# 数据预处理
'''
    缩放 -> 中心裁切 -> 类型转换 -> 转置 -> 归一化 -> 添加维度
'''
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
val_transforms = Compose(
    [
        Resize(256, interpolation="bilinear"),
        CenterCrop(224),
        lambda x: np.asarray(x, dtype='float32').transpose(2, 0, 1) / 255.0,
        Normalize(mean, std),
        lambda x: x[None, ...]
    ]
)
 
# 用于校准的图像数据
'''
    读取图像 -> 预处理 -> 组成数据字典
'''
img_dir = 'data/person/'
img_num = 775
# datas = [
#     val_transforms(
#         Image.open(os.path.join(img_dir, img)).convert('RGB')
#     ) for img in os.listdir(img_dir)[:img_num]
# ]
datas = []
#数据预处理
def _get_3rd_point(a, b):
    """To calculate the affine matrix, three pairs of points are required. This
    function is used to get the 3rd point, given 2D points a & b.
    The 3rd point is defined by rotating vector `a - b` by 90 degrees
    anticlockwise, using b as the rotation center.
    Args:
        a (np.ndarray): point(x,y)
        b (np.ndarray): point(x,y)
    Returns:
        np.ndarray: The 3rd point.
    """
    assert len(a) == 2
    assert len(b) == 2
    direction = a - b
    third_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)
 
    return third_pt
def rotate_point(pt, angle_rad):
    """Rotate a point by an angle.
    Args:
        pt (list[float]): 2 dimensional point to be rotated
        angle_rad (float): rotation angle by radian
    Returns:
        list[float]: Rotated point.
    """
    assert len(pt) == 2
    sn, cs = np.sin(angle_rad), np.cos(angle_rad)
    new_x = pt[0] * cs - pt[1] * sn
    new_y = pt[0] * sn + pt[1] * cs
    rotated_pt = [new_x, new_y]
 
    return rotated_pt
def get_affine_transform(center,
                         scale,
                         rot,
                         output_size,
                         shift=(0., 0.),
                         inv=False):
    """Get the affine transform matrix, given the center/scale/rot/output_size.
    Args:
        center (np.ndarray[2, ]): Center of the bounding box (x, y).
        scale (np.ndarray[2, ]): Scale of the bounding box
            wrt [width, height].
        rot (float): Rotation angle (degree).
        output_size (np.ndarray[2, ] | list(2,)): Size of the
            destination heatmaps.
        shift (0-100%): Shift translation ratio wrt the width/height.
            Default (0., 0.).
        inv (bool): Option to inverse the affine transform direction.
            (inv=False: src->dst or inv=True: dst->src)
    Returns:
        np.ndarray: The transform matrix.
    """
    assert len(center) == 2
    assert len(scale) == 2
    assert len(output_size) == 2
    assert len(shift) == 2
 
    # pixel_std is 200.
    scale_tmp = scale * 200.0
 
    shift = np.array(shift)
    src_w = scale_tmp[0]
    dst_w = output_size[0]
    dst_h = output_size[1]
 
    rot_rad = np.pi * rot / 180
    src_dir = rotate_point([0., src_w * -0.5], rot_rad)
    dst_dir = np.array([0., dst_w * -0.5])
 
    src = np.zeros((3, 2), dtype=np.float32)
    src[0, :] = center + scale_tmp * shift
    src[1, :] = center + src_dir + scale_tmp * shift
    src[2, :] = _get_3rd_point(src[0, :], src[1, :])
 
    dst = np.zeros((3, 2), dtype=np.float32)
    dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
    dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
    dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])
 
    if inv:
        trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
    else:
        trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
 
    return trans
def bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):
    """Transform the bbox format from (x,y,w,h) into (center, scale)
    Args:
        bbox (ndarray): Single bbox in (x, y, w, h)
        aspect_ratio (float): The expected bbox aspect ratio (w over h)
        padding (float): Bbox padding factor that will be multilied to scale.
            Default: 1.0
        pixel_std (float): The scale normalization factor. Default: 200.0
    Returns:
        tuple: A tuple containing center and scale.
        - np.ndarray[float32](2,): Center of the bbox (x, y).
        - np.ndarray[float32](2,): Scale of the bbox w & h.
    """
 
    x, y, w, h = bbox[:4]
    center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)
 
    if w > aspect_ratio * h:
        h = w * 1.0 / aspect_ratio
    elif w < aspect_ratio * h:
        w = h * aspect_ratio
 
    scale = np.array([w, h], dtype=np.float32) / pixel_std
    scale = scale * padding
 
    return center, scale
##############################
def process(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # hwc rgb
    image_size=[192,256]
    aspect_ratio = image_size[0] / image_size[1]
    img_height = img.shape[0]
    img_width = img.shape[1]
    bbox = [0,0,img_width,img_height,0.9]
    padding=1.25
    pixel_std=200
    center, scale = bbox_xywh2cs(
        bbox,
        aspect_ratio,
        padding,
        pixel_std)
    trans = get_affine_transform(center, scale, 0, image_size)
    img = cv2.warpAffine(#图像仿射变换输出为(192,256)
        img,
        trans, (int(image_size[0]), int(image_size[1])),
        flags=cv2.INTER_LINEAR)
    img = np.transpose(img, (2, 0, 1)).astype(np.float32)  # chw rgb
    img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229
    img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224
    img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225

    return img
filelist = os.listdir(img_dir)
for file in filelist:
    img = cv2.imread(img_dir+file)
    img = process(img)
    datas.append([img])
###############################
 
# 数据批次读取器
def batch_reader(datas, batch_size):
    _datas = []
    length = len(datas)
    for i, data in enumerate(datas):
        if batch_size==1:
            yield {'inputs': data}
        elif (i+1) % batch_size==0:
            _datas.append(data)
            yield {'inputs': np.concatenate(_datas, 0)}
            _datas = []
        elif i<length-1:
            _datas.append(data)
        else:
            _datas.append(data)
            yield {'inputs': np.concatenate(_datas, 0)}
 
# 构建校准数据读取器
'''
    实质是一个迭代器
    get_next 方法返回一个如下样式的字典
    {
        输入 1: 数据 1, 
        ...
        输入 n: 数据 n
    }
    记录了模型的各个输入和其对应的经过预处理后的数据
'''
class DataReader(CalibrationDataReader):
    def __init__(self, datas, batch_size):
        self.datas = batch_reader(datas, batch_size)
 
    def get_next(self):
        return next(self.datas, None)
 
# 实例化一个校准数据读取器
# data_reader = DataReader(datas, 32)
data_reader = DataReader(datas, 1)

# 静态量化
quantize_static(
    model_input=model_fp32, # 输入模型
    model_output=model_quant_static, # 输出模型
    calibration_data_reader=data_reader, # 校准数据读取器
    quant_format= QuantFormat.QDQ, # 量化格式 QDQ / QOperator
    # activation_type=QuantType.QInt8, # 激活类型 Int8 / UInt8
    activation_type=QuantType.QInt8, # 激活类型 Int8 / UInt8
    weight_type=QuantType.QInt8, # 参数类型 Int8 / UInt8
    calibrate_method=CalibrationMethod.MinMax, # 数据校准方法 MinMax / Entropy / Percentile
    optimize_model=True # 是否优化模型
)

出现报错

onnx Invalid Feed Input Name:inputs

打开模型我们发现输入的名称为input.1

【onnx量化】hrnet模型并转rknn3588部署_数据_03


那就改一下推理的输入dict的键值

【onnx量化】hrnet模型并转rknn3588部署_Image_04


改完就跑成功啦,模型也在资源上传给大家,有需要的自取。

如果对与模型的精度有进一步的要求,我们可以将量化参数改为U8U8,同时
方法采用Entropy

activation_type=QuantType.QUInt8, # 激活类型 Int8 / UInt8
weight_type=QuantType.QUInt8, # 参数类型 Int8 / UInt8
calibrate_method=CalibrationMethod.Entropy, # 数据校准方法 MinMax / Entropy

量化完之后我们对他转rknn,这里注意,为了使得rknn的精度和onnx保持一致,我们要将optimization_level=2,默认是3.

from rknn.api import RKNN
import cv2
import numpy as np
# ONNX_MODEL = 'hrnet_coco_w32_256x192.onnx'
# ONNX_MODEL = 'action.onnx'
ONNX_MODEL = 'hrnet_quant_new.onnx'
RKNN_MODEL = 'hrnet_quant_new.rknn'
IMG_PATH = "1.png"
if __name__ == '__main__':
 
    # Create RKNN object
    rknn = RKNN(verbose=True)
    mean = [123.675, 116.28, 103.53]
    std= [58.395, 57.12, 57.375]
    # pre-process config
    print('--> config model')
    rknn.config(target_platform='rk3588',mean_values=[mean], std_values=[std],quant_img_RGB2BGR=True,optimization_level=2)
    # 如果quant_img_RGB2BGR=True优先做RGB转换再做减均值
    print('done')
 
    print('--> Loading model')
    ret = rknn.load_onnx(model=ONNX_MODEL)
    if ret != 0:
        print('Load model  failed!')
        exit(ret)
    print('done')
 
    # Build model
    print('--> Building model')
    # ret = rknn.build(do_quantization=True, dataset='dataset.txt')  # ,pre_compile=True
    ret = rknn.build(do_quantization=False)  # ,pre_compile=True
    if ret != 0:
        print('Build failed!')
        exit(ret)
    print('done')
    # Export rknn model
    print('--> Export RKNN model')
    ret = rknn.export_rknn(RKNN_MODEL)
    if ret != 0:
        print('Export hrnet_w32_macaque_256x192-f7e9e04f_20210407_sim.rknn failed!')
        exit(ret)
    print('done')
    ##########################################精度测试
    rknn.release()

这里注意,在rknn里面onnx的opset_version也对模型有一定的影响,我们这里是11可以在转之前改成12.

import onnx
from onnx import version_converter, helper

# import onnxruntime
# help(onnx)

# Preprocessing: load the model to be converted.
model_path = 'hrnet_quant.onnx'
original_model = onnx.load(model_path)
original_model.opset_import[0].version = 12
# original_model.ir_version = 6

onnx.save(original_model, "hrnet_quant_new.onnx")
print(original_model.opset_import[0].version)


举报

相关推荐

0 条评论