add more examples for HaaS Python

# Python应用升级
<img src=./../../../images/Python应用升级/Python应用升级-导学步骤.png width=80%/>
## 1、简介
### 1.1、背景
智能物联网进展至今,要想形成竞争力,必须要有一套闭环的端云一体解决方案,IoT硬件端有HaaS开发板,可使用Python开发,做到一端开发多端应用,应用开发完成后要迭代,就需要升级,为解决应用升级问题,需要一套应用分发系统来帮助HaaS设备实现端云一体的闭环。本案例就是介绍如何使用HaaS Cloud平台帮助开发者实现应用升级功能的实践。
### 1.2、设计方案
HaaS Cloud平台为开发者提供了升级的通道,并提供应用升级相关的产品\设\应\版本可视化操作页面,开发者只需界面点点就可以实现应用升级全流程。
2、HaaS Cloud控制台创建测试设备分组、创建应用、创建应用版本。
4、设备端接收并解析HaaS Cloud控制台推送的升级消息。
<img src=./../../../images/Python应用升级/Python应用升级-整体流程图.png width=80%/>
### 1.3、准备
1. M5Stack Core2开发板 一套
2. microUSB数据线 一条
| 名称 | 数量 | 参考链接 |
| --- | --- | --- |
| M5Stack Core2开发版 | 1 | [M5Stack Core2](https://item.taobao.com/item.htm?spm=a1z10.5-c-s.w4002-22404213529.17.732749d8usCqYX&id=625561056791) |
| microUSB数据线 | 1 | M5Stack Core2开发套件自带 |
## 2、注册设备、创建测试设备分组
### 2.1、注册阿里云账号
进入阿里云官网,[注册阿里云账号](https://account.aliyun.com/register/qr_register.htm) 。注册完成后登录官网。如果您已经有阿里云账号,直接登录即可。
### 2.2、注册设备
<img src=./../../../images/Python应用升级/Python应用升级_物联网平台.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_空产品页.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_新建产品.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_添加设备.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_批量添加.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_设备列表.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_设备详情.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级_设备证书.png width=80%/>
### 2.3、关联产品到HaaS Cloud控制台
进入[HaaS Cloud控制台](https://haas.iot.aliyun.com/welcome),将物联网平台上创建的产品关联到这里,步骤如下图所示。
<img src=./../../../images/Python应用升级/Python应用升级_HaaS控制台绑定产品.png width=80%/>
HaaS Cloud控制台上查看关联产品下的设备,如下图所示
<img src=./../../../images/Python应用升级/Python应用升级_HaaS控制台查看设备.png width=80%/>
### 2.4、创建测试设备分组
<img src=./../../../images/Python应用升级/Python应用升级-创建测试分组.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级-测试分组创建完成.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级-添加分组设备.png width=80%/>
## 3、设备端开发
### 3.1、开发环境准备
### 3.2、创建解决方案
如下图所示,在Haas Studio中创建项目。先选择左侧的“开发板型号”(本文选的是M5Stack-Core2),再从右侧的案例中选择“Python应用升级”案例点击“立即创建”即可。
<img src=./../../../images/Python应用升级/HaaS_Studio_创建工程示范.png width=80%/>
### 3.3、设备端代码修改
> Python脚本的详细说明请参考脚本内嵌的文字注释
1. **修改路由器名称及密码**
# Wi-Fi SSID和Password设置
2. **修改设备的三元组信息**
# HaaS设备三元组
ProductKey = "Your-ProductKey"
DeviceName = "Your-DeviceName"
DeviceSecret = "Your-DeviceSecret"
3. **修改应用配置文件manifest.json**
2、关键字appName对应的值,只能使用英文字母、数字,长度不能超过16个字符,在HaaS Cloud平台全局范围内唯一;
3、关键字appid对应的值,必须为16位的数字,且开头为88001000xxxxpppp,该appid在HaaS Cloud平台全局范围内唯一;
## 4、运行设备
### 4.1、推送脚本到固件
<img src=./../../../images/Python应用升级/Python应用升级-程序推送到固件.png width=80%/>
### 4.2、设备本地查看
程序启动后,会去连接网络,注册和监听HaaS Cloud服务推送的消息。详细log如下所示:
==== python execute bootpy ====
==== python file check /data/pyamp/main.py ====
==== python execute from /data/pyamp/main.py ====
Python App Upgrade Program
"appName": "python9000",
"version": "0.0.1",
"appid": "8800100099990000"
Wi-Fi is connecting...
Wi-Fi is connecting...
Wi-Fi is connecting...
Wi-Fi is connected
IP: x.x.x.x
NTP start
NTP done
establish tcp connection with server(host='$YourProductKey.iot-as-mqtt.cn-shanghai.aliyuncs.com', port=[443])
tcp_connect: can only connect from state CLOSED
success to establish tcp, fd=54
link platform connected
#订阅HaaS Cloud Python应用升级服务
subscribed topic success: /sys/$YourProductKey/$YourDeviceName/thing/service/hli_service_upgrade_push
## 5、制作升级包和推送升级
### 5.1、本地制作Python应用升级包
# 下面提到的$appName、$appid值必须跟工程根目录manifest.json中一致。
cd testUpgrade
tar cvzf $appName-$appid-$verison.tar.gz *
### 5.2、创建Python应用
进入[HaaS Cloud控制台](https://haas.iot.aliyun.com/welcome),在左侧菜单栏进入”升级服务-应用升级“,创建应用。
<img src=./../../../images/Python应用升级/Python应用升级-创建Python应用.png width=80%/>
<div align="center">
<img src=./../../../images/Python应用升级/Python应用升级-创建应用步骤.png width=80%/>
### 5.3、创建升级版本
<img src=./../../../images/Python应用升级/Python应用升级-我的应用列表.png width=80%/>
<div align="center">
<img src=./../../../images/Python应用升级/Python应用升级-创建版本步骤.png width=80%/>
<div align="center">
<img src=./../../../images/Python应用升级/Python应用升级-应用版本列表.png width=80%/>
### 5.4、创建测试任务
<img src=./../../../images/Python应用升级/Python应用升级-开始测试.png width=80%/>
<img src=./../../../images/Python应用升级/Python应用升级-新建测试任务.png width=80%/>
### 5.5、推送升级
<img src=./../../../images/Python应用升级/Python应用升级-推送应用.png width=80%/>
{'msg_id': 1657023114, 'service_id': 'hli_service_upgrade_push', 'params': '{"app":[{"size":1269,"appId":"8800100099990000","rollingBack":0,"version":"0.0.2","url":"https://iotx-haas-xxx.gz?Expires=xxx&OSSAccessKeyId=xxx&Signature=xxx","md5":"8366bb1b8ad9766335b733b226ad178c","needRestart":0}],"appUpdateCount":1,"errorCode":"200","needUpdateFirmware":0,"haasTraceId":"0bc1747b16546803930783531e538d"}', 'code': 0, 'params_len': 490}
local version[0.0.1] low,upgrade to [0.0.2].
upgrade_result_cb needUpgrade
Reset Device to upgarde app: 3
Reset Device to upgarde app: 2
Reset Device to upgarde app: 1
==== python execute bootpy ====
App is being upgraded. It will take about 10 seconds.
App upgrade finished.
==== python file check /data/pyamp/main.py ====
==== python execute from /data/pyamp/main.py ====
"appName": "python9000",
"version": "0.0.2",
"appid": "8800100099990000"
new version upgrade success.
new version upgrade success.
### 5.6、再次推送升级入口
HaaS Cloud控制台左侧菜单”升级服务-应用升级“,点击列表中右侧”版本列表“按钮,在版本列表对应行的最后点击”更多“中点击”测试"按钮即可进入推送升级页面。
<img src=./../../../images/Python应用升级/Python应用升级-开始测试.png width=80%/>
### 5.7、删除版本
1. HaaS Cloud控制台左侧菜单”升级服务-应用升级“,点击列表中右侧”版本列表“按钮,在版本列表对应行的最后点击”更多“中点击”废弃"按钮。
2. HaaS Cloud控制台左侧菜单”升级服务-应用升级“,点击列表中右侧”版本列表“按钮,在版本列表对应行的最后点击”更多“中点击”删除"按钮。
3. 重复本文[5.3、创建升级版本]和[5.4、创建测试任务]即可再次创建升级版本
## 后记
"name": "m5stackcore2",
"version": "1.0.0",
"io": {
"debugLevel": "ERROR",
"repl": "disable"
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
@File : main.py
@Author : 杭漂
@version : 1.0
@Description: 通过跟HaaS Cloud控制台中“升级服务-应用升级”配合,实现设备端python应用的升级。
HaaS Cloud控制台网址:https://haas.iot.aliyun.com/welcome
import network # 网络库
import utime # 延时函数在utime库中
import time # 获取时间搓
import sntp # 网络时间同步库
import ujson as json
import kv
import machine
from upgrade import *
# Wi-Fi SSID和Password设置
ProductKey = "Your-ProductKey"
DeviceName = "Your-DeviceName"
DeviceSecret = "Your-DeviceSecret"
key_info = {
'region' : 'cn-shanghai' ,
'productKey': ProductKey ,
'deviceName': DeviceName ,
'deviceSecret': DeviceSecret ,
'keepaliveSec': 60
app_info = {
def connect_wifi(ssid, pwd):
global g_wifi_connected
while True:
try :
# 初始化网络
wlan = network.WLAN(network.STA_IF)
wlan.connect(ssid, pwd)
while True:
print('Wi-Fi is connecting...')
# 网络连接成功后,更新显示字符
if (wlan.isconnected() == True):
g_wifi_connected = True
print('Wi-Fi is connected')
ip = wlan.ifconfig()[0]
print('IP: %s' %ip)
# NTP时间更新,如果更新不成功,将不能进行识别
print('NTP start')
print('NTP done')
print("OSError except")
print("unknow except")
def query_upgrade_result_cb(data):
global engine
print("upgrade_result_cb needUpgrade")
app_info['localAppVersion'] = data['localAppVersion']
app_info['appNewVersion'] = data['version']
app_info['appId'] = data['appId']
resetTime = 3
while True:
print("Reset Device to upgarde app:",resetTime)
resetTime = resetTime - 1
if resetTime == 0:
def get_app_info():
file_name = r'/data/pyamp/manifest.json'
# 以只读方式打开文件
f = open(file_name)
# 一次读取整个文件
content = f.read()
app_dict = json.loads(content)
app_info['localAppVersion'] = app_dict['version']
app_info['appId'] = app_dict['appid']
def main() :
# 全局变量
global engine,g_wifi_connected
g_wifi_connected = False
# 读取本地配置
# 连接网络
connect_wifi(SSID, PWD)
while True:
if g_wifi_connected == True:
# 链接阿里云物联网平台
engine = Upgrade(key_info,app_info,query_upgrade_result_cb)
while True:
if __name__ == '__main__':
print("Python App Upgrade Program")
"appName": "python9000",
"version": "0.0.1",
"appid": "8800100099990000"
"name": "m5stackcore2",
"version": "1.0.0",
"io": {
"debugLevel": "ERROR",
"repl": "disable"
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
@File : main.py
@Author : 杭漂
@version : 1.0
@Description: 通过跟HaaS Cloud控制台中“升级服务-应用升级”配合,实现设备端python应用的升级,该文件为针对本案例测试应用升级使用。
import utime
import ujson as json
# manifest.json文件注意事项:
# 1、该文件内不要添加任何注释;
# 2、关键字appName对应的值,只能使用英文字母、数字,长度不能超过16个字符,在HaaS Cloud平台全局范围内唯一;
# 3、关键字appid对应的值,必须为16位的数字,且开头为88001000xxxxpppp,该appid在HaaS Cloud平台全局范围内唯一;
# 4、关键字verison对应的值,只能使用数字和'.',必须是3段式,例如0.0.1,在相同appName下唯一;
# 下面提到的$appName、$appid、$verison值必须跟升级包中manifest.json中一致。
# cd /${YourProjectPath}/src/code/testUpgrade/
# tar cvzf $appName-$appid-$verison.tar.gz *
# Python升级包标准名字举例:python001-8800100099991000-0.0.9.tar.gz
app_info = {
def get_app_info():
file_name = r'/data/pyamp/manifest.json'
# 以只读方式打开文件
f = open(file_name)
# 一次读取整个文件
content = f.read()
app_dict = json.loads(content)
app_info['localAppVersion'] = app_dict['version']
app_info['appId'] = app_dict['appid']
if __name__ == '__main__':
while True:
print("new version upgrade success.")
"appName": "python9000",
"version": "0.0.2",
"appid": "8800100099990000"
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
@File : upgrade.py
@Description: python应用升级能力实现
@Author : 杭漂
@version : 1.0
from aliyunIoT import Device
import ujson as json
class Upgrade :
def compareVersion(self,oldVersion,newVersion):
old_list = oldVersion.split('.')
new_list = newVersion.split('.')
if (len(new_list) == 3) and (len(old_list) ==3) :
if int(new_list[0]) > int(old_list[0]):
return 1
elif (int(new_list[0]) == int(old_list[0])) and (int(new_list[1]) > int(old_list[1])):
return 1
elif (int(new_list[0]) == int(old_list[0])) and (int(new_list[1]) == int(old_list[1])) and (int(new_list[2]) > int(old_list[2])):
return 1
return 0
print("oldVersion(%s) or newVersion(%s) format mismatch,must x.x.x" % (oldVersion,newVersion))
return 0
def __cb_lk_service(self, data):
print('service payload data ---->\n' + str(data))
if data != None:
params = data['params']
if (data['service_id'].find("hli_service_upgrade_push") != -1):
params_dict = json.loads(params)
if params_dict['errorCode'] == '200':
app_list = params_dict['app']
i = 0
while (i < len(app_list)) :
appId = app_list[i]['appId']
if(appId == self.__app_info['appId']):
version = app_list[i]['version']
if app_list[i]['rollingBack'] == 0 :
url = app_list[i]['url']
version = app_list[i]['version']
needUpgrade = self.compareVersion(self.__app_info['localAppVersion'],version)
if needUpgrade == 1:
print("local version[%s] low,upgrade to [%s]." % (self.__app_info['localAppVersion'],version))
data_info = {'appId':appId,'url':url,'version':version,'localAppVersion':self.__app_info['localAppVersion']}
i += 1
else :
print("receive upgrade notice failed[%s]." % (params['errorCode']))
def __pub_query_upgrade_event(self,app_info) :
self.__app_info = app_info
appInfo1 = {'appId':app_info['appId'],'version':app_info['localAppVersion']}
allParams = {'id': 1, 'version': '1.0', 'params': { 'mac':app_info['mac'], 'ip': app_info['ip'],'userScenario':'online', 'vendorId': 'HaaSPython','buildType':'eng','isSecurityOn':0 ,'firmwareVersion':'0.0.1','app':[appInfo1]}}
all_params_str = json.dumps(allParams)
query_upgrade_topic = '/sys/' + self.__dev_info['productKey'] + '/' + self.__dev_info['deviceName'] + '/thing/event/hli_event_upgrade_query/post'
topic_info = {
'topic': query_upgrade_topic,
'qos': 1,
'payload': all_params_str
print('Topic发布成功:%s' % (query_upgrade_topic))
def __pub_upgrade_result_event(self,app_info,argInt) :
self.__app_info = app_info
ext = {'appId':app_info['appId'],'appNewVersion':app_info['appNewVersion'],'appOldVersion':app_info['localAppVersion']}
ext_str = json.dumps(ext)
# argInt,200:成功。404:包下载失败。405:包安装失败
allParams = {'id': 1, 'version': '1.0', 'params': \
{ 'eventType':'haas.upgrade', 'eventName': 'app.result','argInt':argInt, 'ext': ext_str}}
all_params_str = json.dumps(allParams)
query_upgrade_topic = '/sys/' + self.__dev_info['productKey'] + '/' + self.__dev_info['deviceName'] + '/thing/event/hli_event/post'
topic_info = {
'topic': query_upgrade_topic,
'qos': 1,
'payload': all_params_str
print('Topic发布成功:%s' % (query_upgrade_topic))
def __sub_upgrade_push_service(self) :
upgrade_topic = '/sys/' + self.__dev_info['productKey'] + '/' + self.__dev_info['deviceName'] + '/thing/service/hli_service_upgrade_push'
sub_topic = {
'topic': upgrade_topic,
'qos': 1
ret = self.device.subscribe(sub_topic)
if ret == 0 :
print("subscribed topic success: %s" % (upgrade_topic))
else :
print("subscribed topic failed: %s" % (upgrade_topic))
def __cb_lk_connect(self, data):
print('link platform connected')
self.g_lk_connect = True
def __connect_iot(self) :
self.device = Device()
self.device.on(Device.ON_CONNECT, self.__cb_lk_connect)
self.device.on(Device.ON_SERVICE, self.__cb_lk_service)
while True:
if self.g_lk_connect:
def __init__(self, dev_info,app_info,callback) :
self.__dev_info = dev_info
self.__app_info = app_info
self.__cb = callback
self.g_lk_connect = False
......@@ -28,8 +28,8 @@ deviceName = "设备名称"
deviceSecret = "设备密钥"
# Wi-Fi SSID和Password设置
wifi_ssid = "请填写您的路由器名称"
wifi_password = "请填写您的路由器密码"
wifiSsid = "请填写您的路由器名称"
wifiPassword = "请填写您的路由器密码"
# 回调函数状态
on_request = False
"name": "haas100",
"version": "1.0.0",
"io": {
"epaper": {
"type": "SPI",
"port": 3,
"mode": "master",
"freq": 14500000
"epaper_dc": {
"type": "GPIO",
"port": 17,
"dir": "output",
"pull": "pulldown"
"epaper_cs": {
"type": "GPIO",
"port": 5,
"dir": "output",
"pull": "pullup"
"epaper_rst": {
"type": "GPIO",
"port": 16,
"dir": "output",
"pull": "pullup"
"epaper_busy": {
"type": "GPIO",
"port": 4,
"dir": "input",
"pull": "pullup"
"debugLevel": "ERROR",
"repl": "disable"
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
@File : main.py
@Author : zeta.zz
@version : 1.0
@Description: price_tag - 电子价签
board.json - 硬件资源配置文件
from showPriceData import ShowData
from machine import Pin
from driver import GPIO
import network
import time
import kv
import gc
from aliyunIoT import Device # iot组件是连接阿里云物联网平台的组件
import json
import binascii
# 物联网平台连接标志位
iot_connected = False
wlan = None
# 三元组信息
productKey = "Your-ProductKey"
deviceName = "Your-devicename"
deviceSecret = "Your-deviceSecret"
# 物联网设备实例
device = None
# Wi-Fi SSID和Password设置
wifiSsid = "Your-AP-SSID"
wifiPassword = "Your-AP-Password"
# 定义电子价签物联网模型参数
sal_update = 0
sal_pic = ''
sal_offerta = '30'
sal_price = '108.50'
sal_name = 'AliOS-Things'
arrayBuf = bytearray(b'')
updating = 0
# 等待Wi-Fi成功连接到路由器
def get_wifi_status():
global wlan
wifi_connected = False
wlan.active(True) #激活界面
wlan.scan() #扫描接入点
#print("start to connect ", wifiSsid)
# 连接到指定的路由器(路由器名称为wifiSsid, 密码为:wifiPassword)
wlan.connect(wifiSsid, wifiPassword)
while True:
wifi_connected = wlan.isconnected() # 获取Wi-Fi连接路由器的状态信息
if wifi_connected: # Wi-Fi连接成功则退出while循环
print("wifi_connected:", wifi_connected)
ifconfig = wlan.ifconfig() #获取接口的IP/netmask/gw/DNS地址
# 物联网平台连接成功的回调函数
def on_connect(data):
global iot_connected
iot_connected = True
# 设置props 事件接收函数(当云平台向设备下发属性时)
def on_props(request):
# global post_default_value
global arrayBuf, sal_update, sal_name, sal_price, sal_offerta, sal_pic, updating
props = eval(request['params'])
# 获取 sal_name 参数
if 'sal_name' in props.keys():
sal_name = props['sal_name']
# 获取 sal_price 参数
if 'sal_price' in props.keys():
sal_price = props['sal_price']
# 获取 sal_offerta 参数
if 'sal_offerta' in props.keys():
sal_offerta = props['sal_offerta']
# 获取 sal_pic 参数
if 'sal_pic' in props.keys():
sal_pic = props['sal_pic']
# print(sal_pic)
print("recevied sal pic")
if 'sal_update' in props.keys():
sal_update = props['sal_update']
# 判断是否需要更新
if sal_update == 1:
if len(sal_pic) % 2 == 0:
picBuf = binascii.unhexlify(sal_pic)
# 改变显示图形buf
arrayBuf = bytearray(picBuf)
# print(arrayBuf)
# 刷新屏幕
# 墨水屏刷新慢,加入保护
if updating == 0:
updating = 1
priceTagObj.show(name=sal_name, sel='$ '+sal_price, offerta='-'+sal_offerta+'%', byteBuf=arrayBuf)
updating = 0
# 上传更新设置, 图片更新后才会更新
sal_update = 0
except Exception as e:
def post_props(data):
global device
if isinstance(data,dict):
data = {'params': json.dumps(data)}
ret = device.postProps(data)
return ret
def connect_lk(productKey, deviceName, deviceSecret):
global device, iot_connected
key_info = {
'region': 'cn-shanghai',
'productKey': productKey,
'deviceName': deviceName,
'deviceSecret': deviceSecret,
'keepaliveSec': 60
# 将三元组信息设置到iot组件中
device = Device()
# 设定连接到物联网平台的回调函数,如果连接物联网平台成功,则调用on_connect函数
device.on(Device.ON_CONNECT, on_connect)
# 配置收到云端属性控制指令的回调函数
# 如果收到物联网平台发送的属性控制消息,则调用on_props函数
device.on(Device.ON_PROPS, on_props)
# 启动连接阿里云物联网平台过程
# 等待设备成功连接到物联网平台
while True:
if iot_connected:
kv.set('app_upgrade_channel', 'disable')
print('sleep for 1 s')
# 初始化物联网平台更新状态
def post_default_value():
# global sal_update
value = {'sal_update' : sal_update}
value = {'sal_name' : sal_name}
value = {'sal_price' : sal_price}
value = {'sal_offerta' : sal_offerta}
if __name__ == '__main__':
wlan = network.WLAN(network.STA_IF) #创建WLAN对象
connect_lk(productKey, deviceName, deviceSecret)
# 初始化墨水屏
# epaperDC = GPIO()
# epaperDC.open("epaper_dc")
# epaperCS = GPIO()
# epaperCS.open("epaper_cs")
# epaperRST = GPIO()
# epaperRST.open("epaper_rst")
# epaperBUSY = GPIO()
# epaperBUSY.open("epaper_busy")
# priceTagObj = ShowData(14500000, epaperDC, epaperCS, epaperRST, epaperBUSY)
# priceTagObj = ShowData(14500000, Pin(18), Pin(23), Pin(17), Pin(5), Pin(16), Pin(4))
priceTagObj = ShowData(14500000, Pin(17), Pin(2), Pin(16), Pin(4))
# 初始化显示
updating = 1
priceTagObj.show(name=sal_name, sel='$ '+sal_price, offerta='-'+sal_offerta+'%')
updating = 0
# buf test code
# apple_pic = b'ffffffffffffffffffff1fffffffffffffffffffff1fffffffffffffffffffff1fffffffffffffffffffff1fffffffffffffffffffff1fffffffffffff1ffeffff1fffffffffffff03f8ffff1fffffffffffff00f8ffff1fffffffffc77fe0f1ffff1fffffffffc71ffcf1ffff1fffffffff8f1ffef1ffff1fffffffff0f8ffff1ffff1fffffffff1f87fff1ffff1fffffffff3fc6fff9ffff1fffffffff3fe2fff8ffff1fffffffff7fe0fff8ffff1fffffffff7ff0fff8ffff1fffffffff7ff07ffcffff1ffffffffffff03ffcffff1ffffffffffff01ffeffff1ffffffffffff00fffffff1ffffffffffff083ffffff1ffffffffffd01c0ffffff1fffffff3fc00080ffffff1fffffff0f000100feffff1fffffff0706080afcffff1fffffffc33f807ff8ffff1fffffffe37fc0fff8ffff1ffffffff1ffe0fff1ffff1ffffffff1ffe0fff1ffff1ffffffff8fff1ffe3ffff1ffffffff8fff1ffe3ffff1ffffffff8fff1ffe3ffff1ffffffff8fff1ffe3ffff1ffffffff9fff1fff3ffff1ffffffff0ffe0fff1ffff1fffff3ff0ffe0ff81ffff1fffff0fe07fc0ff00feff1fffff07851f043f08fcff1fffffc307000e007cf8ff1fffffe11f801f00fff0ff1ffffff13fe0ffc0fff1ff1ffffff8fff0ffe1ffe3ff1ffffff8fff0fff1ffe3ff1ffffff8fff9ffe1ffe3ff1ffffff8fff1fff3ffe3ff1ffffff8fff8fff1ffe3ff1ffffff8fff0ffe1ffe3ff1ffffff8fff0ffe1ffe3ff1ffffff17fe0ffc0fff1ff1fffffe17fc07fc0fff0ff1fffffc31f863f0c7ff8ff1fffff07020e040e08fcff1fffff0f801f003f00feff1fffff3fe07fc07f80ffff1ffffffff8fff1ffe3ffff1ffffffff8fff1ffe3ffff1ffffffff8fff1ffe3ffff1ffffffff8fff1fff3ffff1ffffffff1ffe0fff1ffff1ffffffff1ffe0fff1ffff1fffffffe17fc0fff8ffff1fffffffc33f847ff8ffff1fffffff07060e16fcffff1fffffff0f001f00feffff1fffffff3fc07f80ffffff1ffffffffff1fff1ffffff1ffffffffff1fff1ffffff1ffffffffff1fff1ffffff1ffffffffff9fff3ffffff1ffffffffff1fff1ffffff1ffffffffff1fff1ffffff1ffffffffff1fff1ffffff1fffffffffe1fff0ffffff1fffffffffe3fff8ffffff1fffffffff873ffcffffff1fffffffff070ffcffffff1fffffffff0f00ffffffff1fffffffff3f80ffffffff1fffffffffffe9ffffffff1fffffffffffffffffffff1fffffffffffffffffffff1fffffffffffffffffffff1fffffffffffffffffffff1fffffffffffffffffffff1f'
# picBuf = binascii.unhexlify(apple_pic)
# arrayBuf = bytearray(picBuf)
# priceTagObj.show(name=sal_name, sel='$ '+sal_price, offerta='-'+sal_offerta+'%', byteBuf=arrayBuf)
# gc.collect()
"""ESP2in9b v.2."""
from time import sleep
from machine import SPI, Pin
# from driver import SPI
from xglcd_font import XglcdFont
from esp2in9bv2 import Display
# #############################################################################
# ############### HaaS Python price tag ############################
# #############################################################################
# ##| |####| |##| OFFERTA |########
# ##| product pic |####| ¥ 100.00 (sal) |##| -30% |########
# ##| (url) |####| (name) |##| (offerta) |########
# #############################################################################
# ---85pixel--- ---135pixel--- ---76pixel---
class ShowData():
# init hw params
def __init__(self, baudrate, dc, cs, rst, busy):
self.BAUDRATE = baudrate
# self.SCK = sck
# self.MOSI = mosi
self.DC = dc
self.CS = cs
self.RST = rst
self.BUSY = busy
# self.epaperSPI = SPI()
# self.epaperSPI.open("epaper")
def show(self, name='AliOS-Things', sel='$ 108.50', offerta='-30%', byteBuf=bytearray(b'')):
# init epaper
spi = SPI(2, baudrate=self.BAUDRATE, sck=Pin(18), mosi=Pin(23))
display = Display(spi, dc=self.DC, cs=self.CS, rst=self.RST, busy=self.BUSY)
# display = Display(spi=self.epaperSPI, dc=self.DC, cs=self.CS, rst=self.RST, busy=self.BUSY)
# start draw Banner USE red color
display.fill_rectangle(100, 0, 28, 296, red=True)
# load fonts and draw Banner font
unispace = XglcdFont('data/pyamp/fonts/Unispace12x24.c', 12, 24)
text_width = unispace.measure_text("HaaS Python price tag")
# print(text_width)
display.draw_text(100, (296-text_width)//2, "HaaS Python price tag", unispace, red=True, invert=True, rotate=90)
# start draw offerta Banner use black color
display.fill_rectangle(0, 220, 100, 76)
# load fonts and draw banner fonts
ArcadePix = XglcdFont('data/pyamp/fonts/ArcadePix9x11.c', 9, 11)
text_width = ArcadePix.measure_text("OFFERTA")
# print(text_width)
display.draw_text(70, (220+(76-text_width)//2), "OFFERTA", ArcadePix, invert=True, rotate=90)
# start draw Split line
display.fill_rectangle(30, 85, 5, 135)
# load fonts
# change name to ArcadePix
# wendy = XglcdFont('fonts/Wendy7x8.c', 7, 8)
# text_width = wendy.measure_text(name)
# print(text_width)
# display.draw_text(13, (220-text_width-4), name, wendy, rotate=90)
text_width = ArcadePix.measure_text(name)
# print(text_width)
display.draw_text(13, (220-text_width-4), name, ArcadePix, rotate=90)
# start load Product pic
# this pic can update by cloud push
if byteBuf != bytearray(b''):
display.draw_bitmap(0, 0, 85, 85, bytebuf=byteBuf, rotate=90)
display.draw_bitmap(0, 0, 85, 85, path="data/pyamp/images/alios.mono", rotate=90)
# start load price
# this price can update by cloud push
espresso = XglcdFont('data/pyamp/fonts/EspressoDolce18x24.c', 18, 24)
text_width = espresso.measure_text(sel)
# print(text_width)
display.draw_text(50, 110, sel, espresso, rotate=90)
# start load offerta price
espresso = XglcdFont('data/pyamp/fonts/EspressoDolce18x24.c', 18, 24)
text_width = espresso.measure_text(offerta)
# print(text_width)
display.draw_text(30, 230, offerta, espresso, invert=True, rotate=90)
# display show
# sleep(10)
# display.sleep()
# display.clear()
def close(self):
display = Display(self.epaperSPI, dc=self.DC, cs=self.CS, rst=self.RST, busy=self.BUSY)
print('clean and close SPI')
# -*- coding: utf-8 -*-
"""Utility to convert images to MONO_HMSB format."""
from PIL import Image
from os import path
import sys
def error(msg):
"""Display error and exit."""
print (msg)
def set_bit(value, index, high=True):
"""Set the index:th bit of value.
value (int): Number that will have bit modified.
index (int): Index of bit to set.
high (bool): True (default) = set bit high, False = set bit low
mask = 1 << index
value &= ~mask
if high:
value |= mask
return value
def write_bin(f, pixel_list, width):
"""Save image in MONO_HMSB format."""
index = 0
list_bytes = []
image_byte = 0
windex = 0
for pix in pixel_list:
image_byte = set_bit(image_byte, index, pix > 0)
index += 1
windex += 1
if index > 7 or windex >= width:
image_byte = 0
index = 0
if windex >= width:
windex = 0
if __name__ == '__main__':
args = sys.argv
if len(args) != 2:
error('Please specify input file: ./img2monohmsb.py test.png')
in_path = args[1]
if not path.exists(in_path):
error('File Not Found: ' + in_path)
filename, ext = path.splitext(in_path)
out_path = filename + '.mono'
img = Image.open(in_path).convert('1') # Open and covert to monochrome
pixels = list(img.getdata())
with open(out_path, 'wb') as f:
write_bin(f, pixels, img.width)
print('Saved: ' + out_path)
from os import path
import sys
import binascii
if __name__ == '__main__':
args = sys.argv
if len(args) != 2:
error('Please specify input file: ./img2monohmsb.py test.png')
in_path = args[1]
if not path.exists(in_path):
error('File Not Found: ' + in_path)
array_size = 85*85
with open(in_path, "rb") as f:
buf = bytearray(f.read(array_size))
zztest = binascii.hexlify(buf)
# -*- coding: utf-8 -*-
"""Utility to convert images to MONO_HMSB format."""
from PIL import Image
from os import path
import sys
import binascii
def error(msg):
"""Display error and exit."""
print (msg)
def set_bit(value, index, high=True):
"""Set the index:th bit of value.
value (int): Number that will have bit modified.
index (int): Index of bit to set.
high (bool): True (default) = set bit high, False = set bit low
mask = 1 << index
value &= ~mask
if high:
value |= mask
return value
def write_bin(f, pixel_list, width):
"""Save image in MONO_HMSB format."""
index = 0
list_bytes = []
image_byte = 0
windex = 0
for pix in pixel_list:
image_byte = set_bit(image_byte, index, pix > 0)
index += 1
windex += 1
if index > 7 or windex >= width:
image_byte = 0
index = 0
if windex >= width:
windex = 0
if __name__ == '__main__':
args = sys.argv
if len(args) != 2:
error('Please specify input file: ./img2monohmsb.py test.png')
in_path = args[1]
if not path.exists(in_path):
error('File Not Found: ' + in_path)
filename, ext = path.splitext(in_path)
out_path = filename + '.mono'
img = Image.open(in_path).convert('1') # Open and covert to monochrome
pixels = list(img.getdata())
with open(out_path, 'wb') as f:
write_bin(f, pixels, img.width)
print('Saved: ' + out_path)
array_size = 85*85
with open(out_path, "rb") as f:
buf = bytearray(f.read(array_size))
zztest = binascii.hexlify(buf)
"""XGLCD Font Utility."""
from math import floor
from framebuf import FrameBuffer, MONO_VLSB
class XglcdFont(object):
"""Font data in X-GLCD format.
letters: A bytearray of letters (columns consist of bytes)
width: Maximum pixel width of font
height: Pixel height of font
start_letter: ASCII number of first letter
height_bytes: How many bytes comprises letter height
Font files can be generated with the free version of MikroElektronika
GLCD Font Creator: www.mikroe.com/glcd-font-creator
The font file must be in X-GLCD 'C' format.
To save text files from this font creator program in Win7 or higher
you must use XP compatibility mode or you can just use the clipboard.
def __init__(self, path, width, height, start_letter=32, letter_count=96):
"""Constructor for X-GLCD Font object.
path (string): Full path of font file
width (int): Maximum width in pixels of each letter
height (int): Height in pixels of each letter
start_letter (int): First ACII letter. Default is 32.
letter_count (int): Total number of letters. Default is 96.
self.width = width
self.height = height
self.start_letter = start_letter
self.letter_count = letter_count
self.bytes_per_letter = (floor(
(self.height - 1) / 8) + 1) * self.width + 1
def __load_xglcd_font(self, path):
"""Load X-GLCD font data from text file.
path (string): Full path of font file.
bytes_per_letter = self.bytes_per_letter
# Buffer to hold letter byte values
self.letters = bytearray(bytes_per_letter * self.letter_count)
mv = memoryview(self.letters)
offset = 0
with open(path, 'r') as f:
for line in f:
# Skip lines that do not start with hex values
line = line.strip()
if len(line) == 0 or line[0:2] != '0x':
# Remove comments
comment = line.find('//')
if comment != -1:
line = line[0:comment].strip()
# Remove trailing commas
if line.endswith(','):
line = line[0:len(line) - 1]
# Convert hex strings to bytearray and insert in to letters
mv[offset: offset + bytes_per_letter] = bytearray(
int(b, 16) for b in line.split(','))
offset += bytes_per_letter
def get_letter(self, letter, invert=False, rotate=0):
"""Convert letter byte data to pixels.
letter (string): Letter to return (must exist within font).
invert (bool): True = white text, False (Default) black text.
rotate (int): rotation (default: 0)
(FrameBuffer): Pixel data in MONO_VLSB.
(int, int): Letter width and height.
# Get index of letter
letter_ord = ord(letter) - self.start_letter
# Confirm font contains letter
if letter_ord >= self.letter_count:
print('Font does not contain character: ' + letter)
return b'', 0, 0
bytes_per_letter = self.bytes_per_letter
offset = letter_ord * bytes_per_letter
mv = memoryview(self.letters[offset:offset + bytes_per_letter])
# Get width of letter (specified by first byte)
width = mv[0]
height = self.height
byte_height = (height - 1) // 8 + 1 # Support fonts up to 5 bytes high
if byte_height > 6:
print("Error: maximum font byte height equals 6.")
return b'', 0, 0
array_size = width * byte_height
ba = bytearray(mv[1:array_size + 1])
# Set inversion and re-order bytes if height > 1 byte
pos = 0
ba2 = bytearray(array_size)
if invert is True: # 0 bit is black/red so inverted is default
for i in range(0, array_size, byte_height):
ba2[pos] = ba[i]
if byte_height > 1:
ba2[pos + width] = ba[i + 1]
if byte_height > 2:
ba2[pos + width * 2] = ba[i + 2]
if byte_height > 3:
ba2[pos + width * 3] = ba[i + 3]
if byte_height > 4:
ba2[pos + width * 4] = ba[i + 4]
if byte_height > 5:
ba2[pos + width * 5] = ba[i + 5]
pos += 1
else: # Use XOR to negate inversion
for i in range(0, array_size, byte_height):
ba2[pos] = ba[i] ^ 0xFF
if byte_height > 1:
ba2[pos + width] = ba[i + 1] ^ 0xFF
if byte_height > 2:
ba2[pos + width * 2] = ba[i + 2] ^ 0xFF
if byte_height > 3:
ba2[pos + width * 3] = ba[i + 3] ^ 0xFF
if byte_height > 4:
ba2[pos + width * 4] = ba[i + 4] ^ 0xFF
if byte_height > 5:
ba2[pos + width * 5] = ba[i + 5] ^ 0xFF
pos += 1
fb = FrameBuffer(ba2, width, height, MONO_VLSB)
if rotate == 0: # 0 degrees
return fb, width, height
elif rotate == 90: # 90 degrees
byte_width = (width - 1) // 8 + 1
adj_size = height * byte_width
fb2 = FrameBuffer(bytearray(adj_size), height, width, MONO_VLSB)
for y in range(height):
for x in range(width):
fb2.pixel(y, x, fb.pixel(x, (height - 1) - y))
return fb2, height, width
elif rotate == 180: # 180 degrees
fb2 = FrameBuffer(bytearray(array_size), width, height, MONO_VLSB)
for y in range(height):
for x in range(width):
fb2.pixel(x, y,
fb.pixel((width - 1) - x, (height - 1) - y))
return fb2, width, height
elif rotate == 270: # 270 degrees
byte_width = (width - 1) // 8 + 1
adj_size = height * byte_width
fb2 = FrameBuffer(bytearray(adj_size), height, width, MONO_VLSB)
for y in range(height):
for x in range(width):
fb2.pixel(y, x, fb.pixel((width - 1) - x, y))
return fb2, height, width
def measure_text(self, text, spacing=1):
"""Measure length of text string in pixels.
text (string): Text string to measure
spacing (optional int): Pixel spacing between letters. Default: 1.
int: length of text
length = 0
for letter in text:
# Get index of letter
letter_ord = ord(letter) - self.start_letter
offset = letter_ord * self.bytes_per_letter
# Add length of letter and spacing
length += self.letters[offset] + spacing
return length
......@@ -58,9 +58,12 @@ def connect_4g_network():
return 0
######## 物联网平台相关代码 ############
productKey = "gyd2HK4GEJQ"
deviceName = "azz6ldcavPDjR4Luvq4t"
deviceName = "5cbe23fe7f5995d639fa079187d0f645"
# Wi-Fi SSID和Password设置
# 三元组信息
productKey = "产品密钥"
deviceName = "设备名字"
deviceSecret = "设备密钥"
device = None
zzio606Obj = None
switchList = ["switch0", "switch1", "switch2", "switch3", "switch4", "switch5"]
"name": "01Studio",
"version": "1.0.0",
"io": {
"ir": {
"type": "GPIO",
"port": "D10",
"dir": "input",
"pull": "pullup"
"trig": {
"type": "GPIO",
"port": "A3",
"dir": "output",
"pull": "pullup"
"echo": {
"type": "GPIO",
"port": "A5",
"dir": "input",
"pull": "pullup"
"servo": {
"type": "PWM",
"port": "B5"
from time import sleep_us,ticks_us
from driver import GPIO
class HCSR04():
def __init__(self,trigObj,echoObj):
self.trig = None
self.echo = None
if not isinstance(trigObj, GPIO):
raise ValueError("parameter is not a GPIO object")
if not isinstance(echoObj, GPIO):
raise ValueError("parameter is not a GPIO object")
self.trig = trigObj
self.echo = echoObj
def measureDistance(self):
while self.echo.read() == 0:
if self.echo.read() == 1:
ts=ticks_us() #开始时间
while self.echo.read() == 1: #等待脉冲高电平结束
te=ticks_us() #结束时间
tc=te-ts #回响时间(单位us,1us=1*10^(-6)s)
distance=(tc*170)/10000 #距离计算 (单位为:cm)
return distance
# -*- coding: UTF-8 -*-
from ulinksdk.aliyunIoT import Device # iot组件是连接阿里云物联网平台的组件
import network # Wi-Fi功能所在库
import ujson # json字串解析库
import utime # 延时API所在组件
from driver import GPIO # GPIO类,用于控制微处理器的输入输出功能
import hcsr04 # hcsr04超声波传感器类
from micropython import const
disDev = 0
echoDev = 0
trigDev = 0
# 安全距离,单位是5厘米
# 物联网平台连接标志位
iot_connected = False
wlan = None
# 三元组信息
productKey = "产品密钥"
deviceName = "设备名称"
deviceSecret = "设备密钥"
# 物联网设备实例
device = None
# Wi-Fi SSID和Password设置
wifiSsid = "路由器名称"
wifiPassword = "路由器密码"
# 等待Wi-Fi成功连接到路由器
def get_wifi_status():
global wlan
wifi_connected = False
wlan.active(True) # 激活界面
wlan.scan() # 扫描接入点
wlan.disconnect() # 断开Wi-Fi
#print("start to connect ", wifiSsid)
# 连接到指定的路由器(路由器名称为wifiSsid, 密码为:wifiPassword)
wlan.connect(wifiSsid, wifiPassword)
while True:
wifi_connected = wlan.isconnected() # 获取Wi-Fi连接路由器的状态信息
if wifi_connected: # Wi-Fi连接成功则退出while循环
print("wifi_connected:", wifi_connected)
ifconfig = wlan.ifconfig() # 获取接口的IP/netmask/gw/DNS地址
# 物联网平台连接成功的回调函数
def on_connect(data):
global iot_connected
iot_connected = True
# 设置props 事件接收函数(当云平台向设备下发属性时)
def on_props(request):
def connect_lk(productKey, deviceName, deviceSecret):
global device, iot_connected
key_info = {
'region': 'cn-shanghai',
'productKey': productKey,
'deviceName': deviceName,
'deviceSecret': deviceSecret,
'keepaliveSec': 60
# 将三元组信息设置到iot组件中
device = Device()
# 设定连接到物联网平台的回调函数,如果连接物联网平台成功,则调用on_connect函数
device.on(Device.ON_CONNECT, on_connect)
# 配置收到云端属性控制指令的回调函数,如果收到物联网平台发送的属性控制消息,则调用on_props函数
device.on(Device.ON_PROPS, on_props)
# 启动连接阿里云物联网平台过程
# 等待设备成功连接到物联网平台
if iot_connected:
print('sleep for 1 s')
print('sleep for 2s')
def door_check_init():
global disDev, echoDev, trigDev
echoDev = GPIO()
trigDev = GPIO()
disDev = hcsr04.HCSR04(trigDev, echoDev)
def door_detector():
global disDev, echoDev, trigDev
lastOpened = 0
while True: # 无限循环
distance = disDev.measureDistance()
print('distance = ', distance)
if(distance > ALARM_DISTANCE):
thisOpened = 1
thisOpened = 0
if(lastOpened != thisOpened):
print("door status changed: ", thisOpened)
# 生成上报到物联网平台的属性值字串,此处的属性标识符"door_opened"必须和物联网平台的属性一致
# "door_opened" - 表示入户门开关状态
upload_data = {'params': ujson.dumps({
'door_opened': thisOpened,
# 上传状态到物联网平台
lastOpened = thisOpened
utime.sleep(1) # 打印完之后休眠1秒
if __name__ == '__main__':
nic = network.LAN()
connect_lk(productKey, deviceName, deviceSecret)
"name": "01Studio",
"version": "1.0.0",
"io": {
"fire": {
"type": "ADC",
"port": "A0",
"sampling": 12000000
"led": {
"type": "GPIO",
"port": "A1",
"dir": "output",
"pull": "pullup"
"debugLevel": "ERROR",
"repl": "disable"
from driver import ADC
class Fire(object):
def __init__(self, adcObj):
self.adcObj = None
if not isinstance(adcObj, ADC):
raise ValueError("parameter is not an ADC object")
self.adcObj = adcObj
def getVoltage(self):
if self.adcObj is None:
raise ValueError("invalid ADC object")
value = self.adcObj.readVoltage()
return value
# -*- encoding: utf-8 -*-
from ulinksdk.aliyunIoT import Device # iot组件是连接阿里云物联网平台的组件
import network
import utime # 延时API所在组件
import ujson # json字串解析库
from driver import GPIO # LED需使用GPIO进行控制
from driver import ADC # ADC类,通过微处理器的ADC模块读取ADC通道输入电压
import fire # fire火焰传感器类
adcDev = 0 # ADC通道对象
ledDev = 0 # 报警LED对象
alarm_on = 0 # 记录报警状态
# 物联网平台连接标志位
iot_connected = False
# 物联网设备实例
device = None
# 三元组信息
# 三元组信息
productKey = "产品密钥"
deviceName = "设备名字"
deviceSecret = "设备密钥"
# Wi-Fi SSID和Password设置
wifiSsid = "请填写您的路由器名称"
wifiPassword = "请填写您的路由器密码"
def alarm_control(on):
global ledDev
if on == 0:
ledDev.write(0) # GPIO写入0,执行灭灯操作
ledDev.write(1) # GPIO写入 1,执行亮灯报警动作
# 等待Wi-Fi成功连接到路由器
def get_wifi_status():
global wifiSsid, wifiPassword
print("start to connect ", wifiSsid)
nm.connect(wifiSsid, wifiPassword)
while True:
wifi_connected = nm.getStatus() # 获取Wi-Fi连接路由器的状态信息
if wifi_connected == 5: # Wi-Fi连接成功则退出while循环
info = nm.getInfo()
print("wifi 连接成功:")
print(" SSID:", info["ssid"])
print(" IP:", info["ip"])
print(" MAC:", info["mac"])
print(" RSSI:", info["rssi"])
print("wifi 连接失败")
print('sleep for 1s')
# 物联网平台连接成功的回调函数
def on_connect(data):
global iot_connected
iot_connected = True
# 设置props 事件接收函数(当云平台向设备下发属性时)
def on_props(request):
global alarm_on, device
# {"alarmState":1} or {"alarmState":0}
payload = ujson.loads(request['params'])
# 获取dict状态字段 注意要验证键存在 否则会抛出异常
if "alarmState" in payload.keys():
alarm_on = payload["alarmState"]
if (alarm_on):
# 根据云端设置的报警灯状态改变本地LED状态
# 要将更改后的状态同步上报到云平台
prop = ujson.dumps({
'alarmState': alarm_on,
upload_data = {'params': prop}
# 上报本地报警灯状态到云端
# 连接物联网平台
def connect_lk(productKey, deviceName, deviceSecret):
global device, iot_connected
key_info = {
'region': 'cn-shanghai',
'productKey': productKey,
'deviceName': deviceName,
'deviceSecret': deviceSecret,
'keepaliveSec': 60
# 将三元组信息设置到iot组件中
device = Device()
# 设定连接到物联网平台的回调函数,如果连接物联网平台成功,则调用on_connect函数
device.on(Device.ON_CONNECT, on_connect)
# 配置收到云端属性控制指令的回调函数,如果收到物联网平台发送的属性控制消息,则调用on_props函数
device.on(Device.ON_PROPS, on_props)
# 启动连接阿里云物联网平台过程
# 等待设备成功连接到物联网平台
if iot_connected:
print('sleep for 1 s')
print('sleep for 2s')
# 上传火焰传感器检测电压信息和报警信息到物联网平台
def upload_fire_detector_state():
global device, alarm_on
fireVoltage = 0
# 无限循环
while True:
fireVoltage = fireDev.getVoltage() # 获取电压值
print("The fire status Voltage ",fireVoltage)
# 生成上报到物联网平台的属性值字串,此处的属性标识符"fireVoltage"和"alarmState"必须和物联网平台的属性一致
# "fireVoltage" - 代表燃气传感器测量到的电压值
# "alarmState" - 代表报警灯的当前状态
upload_data = {'params': ujson.dumps({
'fireVoltage': fireVoltage,
'alarmState': alarm_on
# 上传火焰传感器检测电压信息和报警信息到物联网平台
# 每2秒钟上报一次
if __name__ == '__main__':
global fireDev
alarm_on = 0
# 硬件初始化。。。。。。。。。。。。。。。。。。
# 初始化 ADC
adcDev = ADC()
fireDev = fire.Fire(adcDev)
# 初始化LED所连接GPIO
ledDev = GPIO()
alarm_control(alarm_on) # 关闭报警灯
# 请替换物联网平台申请到的产品和设备信息,可以参考README
nic = network.LAN()
connect_lk(productKey, deviceName, deviceSecret)
"name": "01Studio",
"version": "1.0.0",
"io": {
"sht3x": {
"type": "I2C",
"port": 1,
"addrWidth": 7,
"freq": 400000,
"mode": "master",
"devAddr": 68
"led_g": {
"type": "GPIO",
"port": "A15",
"dir": "output",
"pull": "pullup"
"led_b": {
"type": "GPIO",
"port": "G8",
"dir": "output",
"pull": "pullup"
"debugLevel": "ERROR",
"repl": "disable"
# -*- encoding: utf-8 -*-
@File : main.py
@Description: 温湿度上云
@Author : ethan.lcz
@version : 1.0
from ulinksdk.aliyunIoT import Device # iot组件是连接阿里云物联网平台的组件
import network # Wi-Fi功能所在库
import utime # 延时API所在组件
from driver import I2C # I2C总线驱动库
from driver import GPIO # ESP32和使用GPIO控制LED
import sht3x # SHT3X-DIS温湿度传感器驱动库
import ujson # json字串解析库
# 空调和加湿器状态变量
airconditioner = 0
humidifier = 0
airconditioner_value = 0
humidifier_value = 0
# 物联网平台连接标志位
iot_connected = False
# 三元组信息
productKey = "产品密钥"
deviceName = "设备名字"
deviceSecret = "设备密钥"
# Wi-Fi SSID和Password设置
wifiSsid = "请填写您的路由器名称"
wifiPassword = "请填写您的路由器密码"
# 物联网平台连接标志位
iot_connected = False
wlan = None
# 物联网设备实例
device = None
i2cObj = None
humitureDev = None
# 等待Wi-Fi成功连接到路由器
def get_wifi_status():
global wlan
wifi_connected = False
wlan = network.WLAN(network.STA_IF) #创建WLAN对象
wifi_connected = wlan.isconnected() # 获取Wi-Fi连接路由器的状态信息
if not wifi_connected:
wlan.active(True) #激活界面
wlan.scan() #扫描接入点
#print("start to connect ", wifiSsid)
wlan.connect(wifiSsid, wifiPassword) # 连接到指定的路由器(路由器名称为wifiSsid, 密码为:wifiPassword)
while True:
wifi_connected = wlan.isconnected() # 获取Wi-Fi连接路由器的状态信息
if wifi_connected: # Wi-Fi连接成功则退出while循环
print("wifi_connected:", wifi_connected)
ifconfig = wlan.ifconfig() #获取接口的IP/netmask/gw/DNS地址
# 通过温湿度传感器读取温湿度信息
def get_temp_humi():
global humitureDev
# 如果需要同时获取温湿度信息,可以呼叫getTempHumidity,实例代码如下:
humniture = humitureDev.getTempHumidity() # 获取温湿度传感器测量到的温湿度值
temperature = humniture[0] # get_temp_humidity返回的字典中的第一个值为温度值
humidity = humniture[1] # get_temp_humidity返回的字典中的第二个值为相对湿度值
temperature = humitureDev.getTemperature() # 获取温度测量结果
# print("The temperature is: %.1f" % temperature)
humidity = humitureDev.getHumidity() # 获取相对湿度测量结果
# print("The humidity is: %d" % humidity)
return temperature, humidity # 返回读取到的温度值和相对湿度值
# 物联网平台连接成功的回调函数
def on_connect(data):
global iot_connected
iot_connected = True
# 设置props 事件接收函数(当云平台向设备下发属性时)
def on_props(request):
global airconditioner, humidifier, airconditioner_value, humidifier_value
# {"airconditioner":1} or {"humidifier":1} or {"airconditioner":1, "humidifier":1}
payload = ujson.loads(request['params'])
# print (payload)
# 获取dict状态字段 注意要验证键存在 否则会抛出异常
if "airconditioner" in payload.keys():
airconditioner_value = payload["airconditioner"]
if (airconditioner_value):
if "humidifier" in payload.keys():
humidifier_value = payload["humidifier"]
if (humidifier_value):
# print(airconditioner_value, humidifier_value)
airconditioner.write(airconditioner_value) # 控制空调开关
humidifier.write(humidifier_value) # 控制加湿器开关
# 要将更改后的状态同步上报到云平台
prop = ujson.dumps({
'airconditioner': airconditioner_value,
'humidifier': humidifier_value,
upload_data = {'params': prop}
# 上报空调和加湿器属性到云端
def connect_lk(productKey, deviceName, deviceSecret):
global device, iot_connected
key_info = {
'region': 'cn-shanghai',
'productKey': productKey,
'deviceName': deviceName,
'deviceSecret': deviceSecret,
'keepaliveSec': 60
# 将三元组信息设置到iot组件中
device = Device()
# 设定连接到物联网平台的回调函数,如果连接物联网平台成功,则调用on_connect函数
device.on(Device.ON_CONNECT, on_connect)
# 配置收到云端属性控制指令的回调函数,如果收到物联网平台发送的属性控制消息,则调用on_props函数
device.on(Device.ON_PROPS, on_props)
# 启动连接阿里云物联网平台过程
# 等待设备成功连接到物联网平台
if iot_connected:
print('sleep for 1 s')
print('sleep for 2s')
# 上传温度信息和湿度信息到物联网平台
def upload_temperature_and_Humidity():
global device
while True:
data = get_temp_humi() # 读取温度信息和湿度信息
# 生成上报到物联网平台的属性值字串
prop = ujson.dumps({
'CurrentTemperature': data[0],
'CurrentHumidity': data[1]
print('uploading data: ', prop)
upload_data = {'params': prop}
# 上传温度和湿度信息到物联网平台
if __name__ == '__main__':
# 硬件初始化
i2cObj = I2C()
i2cObj.open("sht3x") # 按照board.json中名为"sht3x"的设备节点的配置参数(主设备I2C端口号,从设备地址,总线频率等)初始化I2C类型设备对象
humitureDev = sht3x.SHT3X(i2cObj) # 初始化SHT3X-DIS传感器
# 初始化 GPIO
airconditioner = GPIO()
humidifier = GPIO()
humidifier.open('led_g') # 加湿器使用board.json中led_g节点定义的GPIO,对应esp32外接的的绿灯
airconditioner.open('led_b') # 空调使用board.json中led_b节点定义的GPIO,对应esp32外接的上的蓝灯
# 请替换物联网平台申请到的产品和设备信息,可以参考文章:https://blog.csdn.net/HaaSTech/article/details/114360517
nic = network.LAN()
connect_lk(productKey, deviceName, deviceSecret)
"name": "01Studio",
"version": "1.0.0",
"io": {
"hcho": {
"type": "ADC",
"port": "A1",
"sampling": 12000000
"debugLevel": "ERROR",
"repl": "disable"
\ No newline at end of file
