跳到主要内容
版本:最新版本(unreleased)

Python插件

⭐Python加载器插件文档

上新!!!新增 GRPC模式,区别于之前的CGO调用Wrapper方式

Python Language Wrapper CGo模式:

img

Python Language Wrapper GRPC模式:

img

👉👉👉快速查看

背景

  1. 之前的wrapper.py 由C项目 实现了 wrapper接口实现。

    CGO 模式下: aiges_c_python_wrapper编译成libwrapper.so,由aiges统一加载。 Grpc 模式下: aiges 和 python wrapper.py之间使用grpc通信

  2. 之前如果python用户需要实现推理插件, 只需要参考 wrapper.py实现对应接口后,即可实现python推理。

  3. 当用户实现wrapper.py后, 无法直接调试运行,且不太了解aiges如何调用wrapper.py以及传递到 wrapper.py对应的参数是什么类型都非常疑惑,造成python版本的AI推理插件集成方式并不那么pythonic。

新版wrapper.py集成方式优化目标

  1. 用户可以定义AI能力输入的数据字段,控制字段列表。

  2. 用户可以按需定义AI能力输出的字段列表。

  3. 平台工具可以通过wrapper.py自动导出用户schema并配置到webgate,对用户屏蔽schema概念。

  4. 平台工具可以提供用户直接运行wrapper.py,并按照平台真实加载wrapper.py方式传递对应参数,方便用户在任何环境快速Debug,发现一些基础问题。

  5. 尽可能简化用户输入,并且在有限的用户输入下,获取平台需要的信息。

  6. 尽可能提升python能力的推理效率和性能。

wrapper.py 新设计

img_1.png

  1. 提供 python sdk: python sdk将发布到 pypi,方便用户随时安装和更新。

  2. 为什么? 新wrapper要求用户 实现 Wrapper 类,并将原有 函数式 wrapper开头的函数放入到 Wrapper (类方法|对象方法?待讨论 todo)中去。用户实现的Wrapper类必须继承WrapperBase类,并且wrapperInitwrapperFiniwrapperOnceExecwrapperError等函数在WrapperBase类被声明为类方法@classmethod,未实现则会抛出NotImplementedError错误。

  3. 用户在Wrapper类中除了要实现原有的wrapperInitWrapperExec等实现之外,需要额外定义能力的输入和输出,最终生成的HTTP接口基于此信息生成。

  4. 将用户的请求响应对象转为实现Buffer protocol的对象,然后使用memoryview建立视图来减少内存数据的copy,增加wrapper.py的推理效率。

为什么

  • 我们希望用户只需要定义关键的实现,而不必关心背后wrapper.py如何被调用的细节,这块背后逻辑其实是复杂的,我们不希望在wrapper.py中让用户过多的定义一些平台预先要求的设定,我们希望在SDK的基类中实现定义好这些默认行为,比如wrapper.py真实调用顺序 为 WrapperInit -> WrapperExec -> WrapperFin

  • 基类中定义行为的好处是,用户继承基类并实现必要方法后,可以直接运行,并且调试拿到结果。

  • 至于为什么希望用户通过继承WrapperBase类来实现 Wrapper类中,是因为可以在基类行为中做一些更Pythonic的魔法,从而简化用户的输入。

    新版本Python加载器插件

WrapperBase类

新版Python加载器插件最大的改变是引入了WrapperBase类,用户实现的Wrapper类必须继承WrapperBase类,并且wrapperInitwrapperFiniwrapperOnceExecwrapperError等函数在WrapperBase类被声明为类方法@classmethod,未实现则会抛出NotImplementedError错误

快速开始你的第一个wrapper.py

下面介绍一个调用三方API的Python加载器插件的实现过程来帮助您理解整个过程。

准备项目

  1. 安装或者更新aiges sdk库 (该sdk用于辅助wrapper.py本地调试)

  2. 使用 aiges 快速生成你的python项目

    python -m aiges create -n  "project"

    该命令生成一个 "project" 文件夹,并包含 wrapper.py 的半成品。

  3. 添加项目内依赖,完善wrapper.py并且本地调试通过

  4. 将wrapper.py 构建为docker镜像,并发布到 athena_serving框架。

  5. 访问你的AI HTTP API... Enjoy...

完成本地调试

❗ 提前注意
  • 实现Wrapper类时,必须继承WrapperBase类。

  • 运行中用到的参数,可以选择将变量声明为类变量,实例变量同样可选。为了模拟AIservice传递参数,在Wrapper类中声明一个类成员config用于初始化,上线后选择注释即可,在本例中如下

    class Wrapper(WrapperBase):
    requrl, http_method, http_uri = None, None, None
    # music
    access_key_music, access_secret_music = None, None
    # humming
    access_key_humming, access_secret_humming = None, None

    config = {}
    config = {
    "requrl" : ...,
    "http_method" : ...,
    "http_uri" : ...,
    "access_key_music" : ...,
    "access_secret_music" : ...,
    "access_key_humming" : ...,
    "access_secret_humming" : ...
    }
  • wrapperOnceExec函数执行返回的类型是Response对象,而不是通常表示执行状态错误码的int类型,意味着无论结果正常与否,均需实例化Response对象并返回。

    res = Response()
    • 未出现异常时,Response对象是是由一个或多个ResponseData对象构成的列表,其中ResponseData类有keydatalentypestatus五个成员变量
      l = ResponseData()
      l.key = "output_text"
      l.status = 3
      l.len = len(r.text.encode())
      l.data = r.text
      l.type = 0
      res.list = [l]
      # multi data: res.list = [l1, l2, l3]
      return res
    • 出现异常时,直接调用Response对象的response_err方法返回错误码
      return res.response_err(ERROR_CODE)
继承WrapperBase类完成Wrapper类的构建
  1. wrapperInit用于初始化加载器执行过程中用到的变量,参数从字典变量config中读入

    def wrapperInit(cls, config: {}) -> int:
    print("Initializing ..")
    config = config

    Wrapper.requrl, Wrapper.http_method, Wrapper.http_uri = config['requrl'], config['http_method'], config['http_uri']
    Wrapper.access_key_music, Wrapper.access_secret_music = config['access_key_music'], config['access_secret_music']
    Wrapper.access_key_humming, Wrapper.access_secret_humming = config['access_key_humming'], config['access_secret_humming']

    print('----------Finish Init--------------')

    return 0
  2. wrapperError将会返回错误码代表的含义,在本例中如下

    def wrapperError(cls, ret: int) -> str:
    if ret == 1001:
    return "识别无结果"
    elif ret == 2000:
    return "录音失败,可能是设备权限问题"
    elif ret == 2001:
    return "初始化错误或者初始化超时"
    elif ret == 2002:
    return "处理metadata错误"
    elif ret == 2004:
    return "无法生成指纹(有可能是静音)"
    elif ret == 2005:
    return "超时"
    elif ret == 3000:
    return "服务端错误"
    elif ret == 3001:
    return "Access Key不存在或错误"
    elif ret == 3002:
    return "HTTP内容非法"
    elif ret == 3003:
    return "请求数超出限制(需要升级账号)"
    elif ret == 3006:
    return "参数非法"
    elif ret == 3014:
    return "签名非法"
    elif ret == 3015:
    return "QPS超出限制(需要升级账号)"
    else:
    return f"User Defined Error: {ret}"
  3. wrapperFini用于处理一些加载器插件的堆区指针的回收工作,对于Python语言,通常不需要实现:

    def wrapperFini() -> int:
    logging.info('Wrapper finished.')
    return 0
  4. wrapperOnceExec的执行由鉴权发送HTTP请求接收响应数据构成

    def wrapperOnceExec(self, params: {}, reqData: DataListCls) -> Response:
    ......
    # 鉴权
    data_mode = params['mode']

    access_key = Wrapper.access_key_music if data_mode == 'music' else Wrapper.access_key_humming
    access_secret = Wrapper.access_secret_music if data_mode == 'music' else Wrapper.access_secret_humming

    src = reqData.list[0].data# binary files
    sample_bytes = reqData.list[0].len
    signature_version, data_type = '1', 'audio'

    timestamp = time.time()
    res = Response()

    string_to_sign = Wrapper.http_method + '\n' \
    + Wrapper.http_uri + '\n' \
    + access_key + '\n' \
    + data_type + '\n' \
    + signature_version + '\n' \
    + str(timestamp)
    sign = base64.b64encode(hmac.new(access_secret.encode('ascii'), string_to_sign.encode('ascii'),digestmod=hashlib.sha1).digest()).decode('ascii')

    if sign is None:
    return res.response_err(5014)

    # 发送http请求
    files = {'sample': src}
    data = {
    'access_key': access_key,
    'sample_bytes': sample_bytes,
    'timestamp': str(timestamp),
    'signature': sign,
    'data_type': data_type,
    'signature_version': signature_version
    }

    try:
    r = requests.post(Wrapper.requrl, files=files, data=data, timeout=5)
    except requests.exceptions.ConnectTimeout:
    return res.response_err(4408)
    if r is None:
    return res.response_err(4003)

    if r.status_code != 200:
    return res.response_err(4000 + r.status_code)

    # 接受响应数据
    pattern = re.compile('"code":\d+')
    error_code = re.findall(pattern, r.text)
    error_code = error_code[0].split(':')[-1]

    if int(error_code):
    return self.response_err(int(error_code))
    else:
    r.encoding = 'utf-8'
    l = ResponseData()

    l.key = "output_text"
    l.type = 0
    l.status = 3
    l.data = r.text
    l.len = len(r.text.encode())
    res.list = [l]
    return res

本地调试模拟传入数据✔️

  • 额外声明用户请求用户响应两个类

    class UserRequest(object):
    '''
    定义请求类:
    params: params 开头的属性代表最终HTTP协议中的功能参数parameters部分, 对应的是xtest.toml中的parameter字段
    params Field支持 StringParamField,
    NumberParamField,BooleanParamField,IntegerParamField,每个字段均支持枚举
    params 属性多用于协议中的控制字段,请求body字段不属于params范畴

    input: input字段多用与请求数据段,即body部分,当前支持 ImageBodyField、 StringBodyField和AudioBodyField
    '''
    params1 = StringParamField(key="mode", enums=["music", "humming"], value='humming')

    input1 = AudioBodyField(key="data", path="/home/wrapper/test.wav")

    class UserResponse(object):
    '''
    定义响应类:
    accepts: accepts代表响应中包含哪些字段, 以及数据类型

    input: input字段多用与请求数据段,即body部分,当前支持 ImageBodyField, StringBodyField, 和AudioBodyField
    '''
    accept1 = StringBodyField(key="ouput_text")
  • 实例化用户请求和用户响应对象

    class Wrapper(WrapperBase):
    # 实例化用户请求类和用户响应类
    requestCls = UserRequest()
    responseCls = UserResponse()
    ......
  • 声明main函数,实例化Wrapper对象,运行程序

    if __name__ == '__main__':
    m = Wrapper()
    m.schema()
    m.run()

Appendix

  • 安装和更新

    使用pip指令完成aiges库的安装和更新

    # 安装aiges
    pip install aiges -i https://pypi.python.org/simple
    # 更新aiges
    pip install --upgrade aiges -i https://pypi.python.org/simple
  • 在执行的过程中,错误需要尽可能早捕获,错误码也要和第三方平台区别开来,即使是默认的HTTP错误码也要有辨别也好,方便定位错误。

  • 调用三方API的Python加载器插件完整实现可以参考