Python轻应用实现动态注册和Ota升级

前言

本实验将向大家介绍如何使用Python语言在HaaS506 上面实现动态注册和轻应用在线Ota升级

硬件环境

HaaS506 开发板一个,sim卡一张

实现原理

Python 轻应用升级通道采用的物联网平台提供的OTA升级服务,通过这个升级服务,在服务端上传ota包,设备端监听相关的topic,下载升级包,并进行解压和安装。 详细的软件流程如下:

软件流程

代码实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# -*- coding: UTF-8 -*-
import network
import utime as time
import modem
from  linksdk import Device
import ota
from driver import KV

def on_4g_cb(args):
    global g_connect_status
    pdp = args[0]
    netwk_sta = args[1]
    if netwk_sta == 1:
        g_connect_status = True
    else:
        g_connect_status = False


def connect_network():
    global net,on_4g_cb,g_connect_status
    net = network.NetWorkClient()
    g_register_network = False
    if net._stagecode is not None and net._stagecode == 3 and net._subcode == 1:
        g_register_network = True
    else:
        g_register_network = False
    if g_register_network:
        net.on(1,on_4g_cb)
        net.connect(None)
    else:
        print('网络注册失败')
    while True:
        if g_connect_status:
            print('网络连接成功')
            break
        time.sleep_ms(20)


def on_dynreg_cb(data):
    global deviceSecret,device_dyn_resigter_succed
    deviceSecret = data
    device_dyn_resigter_succed = True


# 连接物联网平台
def dyn_register_device(productKey,productSecret,deviceName):
    global on_dynreg_cb,device,deviceSecret
    kv = KV()
    key = '_amp_customer_devicesecret'
    deviceSecret = kv.getStorageSync(key)[key]
    if deviceSecret is not None:
        pass
    else:
        key_info = {
            'productKey': productKey ,
            'productSecret': productSecret ,
            'deviceName': deviceName
        }
        # 动态注册一个设备,获取设备的deviceSecret
        device.register(key_info,on_dynreg_cb)



# 物联网平台连接成功的回调
def on_connect_lk(data):
    global module_name,default_ver,productKey,deviceName,deviceSecret,on_trigger,on_download,on_verify,on_upgrade
    print('***** connect lp succeed****')
    data_handle = {}
    data_handle['device_handle'] = data['handle']

    # 初始化ota服务
    ota.init(data_handle)

    # ota 回调函数注册
    ota.on(1,on_trigger)
    ota.on(2,on_download)
    ota.on(3,on_verify)
    ota.on(4,on_upgrade)

    report_info = {
        "device_handle": data['handle'] ,
        "product_key": productKey ,
        "device_name": deviceName ,
        "module_name": module_name ,
        "version": default_ver
    }

    # 上报本机ota相关信息,上报版本信息返回以后程序返回,知道后台推送ota升级包,才会调用on_trigger函数
    ota.report(report_info)


# 连接物联网平台
def connect_lk(productKey,deviceName,deviceSecret):
    global on_connect_lk,device
    key_info = {
        'region' : 'cn-shanghai' ,
        'productKey': productKey ,
        'deviceName': deviceName ,
        'deviceSecret': deviceSecret ,
        'keepaliveSec': 60
    }
    print(key_info)
    device.on(Device.ON_CONNECT,on_connect_lk)
    # 连接物联网平台, 等待回调函数on_conect 被调用
    device.connect(key_info)

# ota 消息推送的接受函数
def on_trigger(data):
    global info
    # 保存服务端推送的ota信息
    info['url'] = data['url']
    info['length'] = data['length']
    info['module_name'] = data['module_name']
    info['version'] = data['version']
    info['hash'] = data['hash']
    info['hash_type'] = data['hash_type']
    # 开始ota 包下载
    dl_data = {}
    dl_data['url'] = info['url']
    dl_data['store_path'] = info['store_path']
    ota.download(dl_data)

# ota 升级包下载结果回调函数
def on_download(data):
    global info
    if data >= 0:
        print('Ota download succeed')
        # 开始ota包校验
        param = {}
        param['length'] = info['length']
        param['store_path'] = info['store_path']
        param['hash_type'] = info['hash_type']
        param['hash'] = info['hash']
        ota.verify(param)

# ota 升级包校验结果回调函数
def on_verify(data):
    global info
    print(data)
    if data >= 0 :
        print('Ota verify succeed')
        print('Start Upgrade')
        # 开始ota升级
        param = {}
        param['length'] = info['length']
        param['store_path'] = info['store_path']
        param['install_path'] = info['install_path']
        ota.upgrade(param)

# ota 升级包结果回调函数
def on_upgrade(data):
    if data >= 0 :
        print('Ota succeed')

if __name__ == '__main__':
    g_connect_status = False
    net = None
    device = None
    deviceSecret = None
    deviceName = None
    productKey = "xxxx"
    productSecret = "xxxx"
    device_dyn_resigter_succed = False
    # 定义需要升级的模块和版本号
    module_name = 'default'
    default_ver = '1.2.0'
# 定义升级包的下载和安装路径,其中url,hash_type和hash 会通过服务端推送被保存下来
    info = {
        'url': '',
        'store_path': '/data/pyamp/app.zip',
        'install_path': '/data/pyamp/',
        'length': 0,
        'hash_type': '',
        'hash': ''
    }


# 连接网络
connect_network()
# 获取设备的IMEI 作为deviceName 进行动态注册
deviceName = modem.getDevImei()
device = Device()
if deviceName is not None and len(deviceName) > 0 :
    dyn_register_device(productKey,productSecret,deviceName)
else:
    print("获取设备IMEI失败,无法进行动态注册")
while deviceSecret is None:
    time.sleep(0.2)
print('动态注册成功:' + deviceSecret)
connect_lk(productKey,deviceName,deviceSecret)
while True:
    print('当前App版本号:' + default_ver)
    print('等待Ota升级包.....')
    time.sleep(5)

升级包推送

  • 修改以上代码中的productKey和productSecret 替换 ,然后通过IDE推送到设备中运行

  • 修改代码中default_ver 版本号为1.3.0并保存

  • 拔掉串口和pc 的连接

  • 点击IDE的烧录按钮进行打包操作(打包的文件为upgrade/pyamp.zip)

  • 使用物联网平台上传并且推送pyamp.zip 的升级包,具体操作如下

    1. 登录阿里云官网,选择产品阿里云IoT平台->企业物联网平台->控制台->实例概览->公共实例,如下如所知,打开监控运维中的OTA升级服务

      OTA升级服务
    2. 点击升级包列表,选择添加升级包

      • 选择整包升级,输入升级包名称,选择锁需要的升级的产品名称

      • 升级包模块选择 default

      • 升级包版本号轻填写 Python轻应用代码中的版本号: 1.3.0

      • 选择签名算法,点击上传升级包(IDE 工作目录 upgrade 目录下的 pyamp.zip)

      • 选择需要平台验证,并填写升级包描述并确认

      创建升级包
    3. 验证升级包

      在升级包创建完成以后,点击相应升级包操作栏的验证按钮。 * 选择需要被升级的目标轻应用的版本号 * 选择需要验证的设备名称 * 点击确定开始推送升级包

      升级包验证
    4. 等待设备升级成功以后,手动重启设备,新版本的app 执行成功,打印新的版本号: 1.3.0