1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 | # -*- coding: UTF-8 -*-
#完整的Python轻应用升级包含以下几个步骤:
# 1. 联网
# 2. 连接物联网平台,等待连接成功的回调
# 3. 初始化ota服务
# 4. 上报版本信息,等待ota升级触发的函数回调
# 5. 下载升级包
# 6. 校验升级包
# 7. 安装升级包
from linksdk import Device
import time
import ota
import netmgr as nm
# wifi 状态标志位
wifi_connected = False
# wifi热点信息
ssid = 'xxxx'
pwd = 'xxxx'
# 定义需要升级的模块和版本号
module_name = 'default'
default_ver = '1.2.0'
# 三元组信息
productKey = "xxxxxxxx"
deviceName = "xxxxxxxx"
deviceSecret = "xxxxxxxx"
# 定义升级包的下载和安装路径,其中url,hash_type和hash 会通过服务端推送被保存下来
info = {
'url': '',
'store_path': '/data/pyamp/app.zip',
'install_path': '/data/pyamp/',
'length': 0,
'hash_type': '',
'hash': ''
}
def on_wifi_connected(status):
global wifi_connected
wifi_connected = True
# 连接wifi网络
def connect_wifi(ssid,pwd):
global wifi_connected,on_wifi_connected
nm.init()
wifi_connected = nm.getStatus()
if not wifi_connected:
nm.register_call_back(1,on_wifi_connected)
nm.connect(ssid,pwd)
while True :
if wifi_connected:
break
else:
print('Waiting for wifi connected')
time.sleep(1)
if nm.getStatus():
print('DeviceIP:' + nm.getInfo()['IP'])
else:
print('DeviceIP:get failed')
print("ConnectWifi finished")
# 物联网平台连接成功的回调
def on_connect(data):
global module_name,default_ver,productKey,deviceName,deviceSecret,on_trigger,on_download,on_verify,on_upgrade
print('***** connect lp succeed****')
data_handle = {}
data_handle['device_handle'] = data['handle']
# 初始化ota服务
ota.init(data_handle)
# ota 回调函数注册
ota.on(1,on_trigger)
ota.on(2,on_download)
ota.on(3,on_verify)
ota.on(4,on_upgrade)
report_info = {
"device_handle": data['handle'] ,
"product_key": productKey ,
"device_name": deviceName ,
"module_name": module_name ,
"version": default_ver
}
# 上报本机ota相关信息,上报版本信息返回以后程序返回,知道后台推送ota升级包,才会调用on_trigger函数
ota.report(report_info)
# ota 消息推送的接受函数
def on_trigger(data):
global info
# 保存服务端推送的ota信息
info['url'] = data['url']
info['length'] = data['length']
info['module_name'] = data['module_name']
info['version'] = data['version']
info['hash'] = data['hash']
info['hash_type'] = data['hash_type']
# 开始ota 包下载
dl_data = {}
dl_data['url'] = info['url']
dl_data['store_path'] = info['store_path']
ota.download(dl_data)
# ota 升级包下载结果回调函数
def on_download(data):
global info
if data >= 0:
print('Ota download succeed')
# 开始ota包校验
param = {}
param['length'] = info['length']
param['store_path'] = info['store_path']
param['hash_type'] = info['hash_type']
param['hash'] = info['hash']
ota.verify(param)
# ota 升级包校验结果回调函数
def on_verify(data):
global info
print(data)
if data >= 0 :
print('Ota verify succeed')
print('Start Upgrade')
# 开始ota升级
param = {}
param['length'] = info['length']
param['store_path'] = info['store_path']
param['install_path'] = info['install_path']
ota.upgrade(param)
# ota 升级包结果回调函数
def on_upgrade(data):
if data >= 0 :
print('Ota succeed')
# 连接物联网平台
def connect_lk(productKey,deviceName,deviceSecret):
global on_connect
device = Device()
key_info = {
'region' : 'cn-shanghai' ,
'productKey': productKey ,
'deviceName': deviceName ,
'deviceSecret': deviceSecret ,
'keepaliveSec': 60
}
device.on(Device.ON_CONNECT,on_connect)
# 连接物联网平台, 等待回调函数on_conect 被调用
device.connect(key_info)
# 保持程序不退出
while True:
time.sleep(5)
if __name__ == '__main__':
connect_wifi(ssid,pwd)
connect_lk(productKey,deviceName,deviceSecret)
|