提交 d3906886 编写于 作者: W wzhiyuan-nit 提交者: yilu.myl

BugID:1844:Add the source code of esp32 worst-case.

[Detail]
Issue description:

Rootcause:

Solution:

Requirement description:

Solution:

[Verified Cases]
Build Pass: <solution list>
Test Pass: <test case list>
Signed-off-by: Nwzhiyuan-nit <wuming.wzy@alibaba-inc.com>
上级 b59e5f91
# WORST-CASE
## 1、简介
### 1.1、背景知识
&emsp;&emsp;
开发者在进行硬件选型时通常会遇到这样的问题,即硬件板卡能支持多少外围设备的扩展?系统资源又能支持几路线程的稳定运行?为帮助开发者更为高效的选择合适的物联网硬件设备,我们将针对不同的物联网开发板分别提供了worst case测试案例。本篇文档为esp32s开发板的worst case
### 1.2、准备
&emsp;&emsp;
本案例打造需要如下硬件:
- ESP32开发板一台
- mfrc522 RFID传感器模块一个
- RFID卡片若干
- 磁控传感器
- Servo舵机一个
- 电机转数传感器
- 步进电机及驱动板
- 光敏电阻传感器
- 温湿度传感器
- GPS
- mpu6050(3轴加速度+3轴陀螺仪)
&emsp;&emsp;
ESP32各开发板和外围传感器硬件接线请参考下表。
<table>
<tr>
<td align="center">硬件</td>
<td align="center">步进电机驱动板</td>
<td align="center">mfrc522 RFID传感器</td>
<td align="center">磁控传感器</td>
<td align="center">Servo舵机</td>
<td align="center">电机转数传感器</td>
<td align="center">光敏电阻传感器</td>
<td align="center">温湿度传感器</td>
<td align="center">mpu6050</td>
<td align="center">GPS</td>
<td align="center">ESP32开发板</td>
<td align="center">电源转接板</td>
</tr>
<tr>
<td align="center" rowspan="16">端口标识</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
<td align="center">GND</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">VCC</td>
<td align="center">VCC</td>
<td align="center">-</td>
<td align="center">VCC</td>
<td align="center">VCC</td>
<td align="center">VCC</td>
<td align="center">VCC</td>
<td align="center">VCC</td>
<td align="center">-</td>
<td align="center">3.3V</td>
</tr>
<tr>
<td align="center">VCC</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">VCC</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">5V</td>
</tr>
<tr>
<td align="center">IN1</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P04</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">IN2</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P00</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">IN3</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P02</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">IN4</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P15</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">RST</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P25</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">SPI</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">SPI3</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">DO</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P13</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">PWM</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P27</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">OUT</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">P26</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">AO</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">ADC0</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">I2C</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">I2C0</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">I2C</td>
<td align="center">-</td>
<td align="center">I2C0</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">-</td>
<td align="center">UART</td>
<td align="center">UART2</td>
<td align="center">-</td>
</tr>
<tr>
<td align="center" rowspan="1">硬件说明</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C4A7B94BEA8CA00000001&dataId=800C4A7B94BEA8CA" target="_blank">步进电机驱动器详情</td>
<td align="center" >-</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C4C5091EC2DE500000001&dataId=800C4C5091EC2DE5" target="_blank">磁控开关详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C5A3F64C2F88300000001&dataId=800C5A3F64C2F883&s=" target="_blank">舵机详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C24C1F05E458A00000001&dataId=800C24C1F05E458A" target="_blank">电机转数传感器详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C57784852F32C00000002&dataId=800C57784852F32C" target="_blank">光敏电阻传感详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800CD7F0F2F57FF800000002&dataId=800CD7F0F2F57FF8&s=" target="_blank">温湿度传感器详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C38A51341E22F00000001&dataId=800C38A51341E22F" target="_blank">MPU6050(加速度计&陀螺仪)详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C7C08112B0A0700000002&dataId=800C7C08112B0A07&s=" target="_blank">GPS模块详情</td>
<td> <a align="center" href="https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C0A5C37AADCDB00000001&dataId=800C0A5C37AADCDB" target="_blank">ESP32开发板详情</td>
<td align="center" >-</td>
</tr>
</table>
&emsp;&emsp;
下图是以NodeMCU-32S为例的硬件连线图:
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_硬件连线图.png
width=100%/>
</div>
## 2、物联网平台开发
&emsp;&emsp;
整个过程包含以下4个步骤:
1. 开通公共实例
2. 创建产品(设备模型)
3. 定义产品功能(物模型)
4. 创建设备及获取三元组
### 2.1、开通公共实例
&emsp;&emsp;
对于第一次使用物联网平台的读者,需要开通实例以使用物联网平台的功能。这里可以使用免费的公共实例进行开发。
&emsp;&emsp;
[物联网平台](https://iot.console.aliyun.com/lk/summary/new)中,左上角选择“华东2-上海”,点击“公共实例”,即可开通。开通后点击“公共实例”,即可进入[控制台](https://iot.console.aliyun.com/lk/summary/new)进行产品创建。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_物联网平台.png
width=100%/>
</div>
### 2.2、创建产品(设备模型)
&emsp;&emsp;
进入[公共实例控制台](https://iot.console.aliyun.com/lk/summary/new),点击“创建产品”按钮,即可进入[新建产品页面](https://iot.console.aliyun.com/product)
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_空产品页.png
width=100%/>
</div>
&emsp;&emsp;
进入[新建产品页面](https://iot.console.aliyun.com/product),设定“产品名称”,这里我们命名为“**WORST-CASE**”,读者也可以根据自己的喜好来命名。在“所属品类”中,选择“自定义品类”。
&emsp;&emsp;
产品的节点类型选择“直连设备”,数据格式选择“ICA标准数据格式”,检验类型和认证方式选择默认设定即可。开发者可根据自己的需求在“产品描述”页面添加针对此产品的描述。
&emsp;&emsp;
对于 ESP32 等搭载 Wi-Fi 的设备而言,联网方式选择“Wi-Fi”。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_新建产品.png
width=100%/>
</div>
&emsp;&emsp;
点击“确认”按钮,即可完成产品创建。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_完成创建产品.png
width=100%/>
</div>
&emsp;&emsp;
点击“前往定义物模型”
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_尚未添加任何功能.png
width=100%/>
</div>
### 2.3、定义产品功能(物模型)
&emsp;&emsp;
进入功能定义,点击编辑草稿,选择添加自定义功能,依次添加“地理位置”“加速度”“角速度”“入侵”“告警”五个功能项,如下所示:
&emsp;&emsp;
生成后的效果如下:
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_添加功能.png
width=100%/>
</div>
&emsp;&emsp;
定义好物模型后,需要发布物模型上线,并发布产品,以使变更生效。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_发布物模型.png
width=100%/>
</div>
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_发布产品.png
width=100%/>
</div>
&emsp;&emsp;
产品及其物模型创建完成后就可以创建这个产品的设备了。
### 2.4、创建设备及获取三元组
&emsp;&emsp;
点击左侧栏中“设备“,在筛选框中选择要添加设备的产品,点击“添加设备”。这里这里我们命名为“**ability_test**”,开发者也可以根据自己的喜好来命名。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_添加设备.png
width=100%/>
</div>
&emsp;&emsp;
开发者也可以选择“批量添加”,一次性添加多个设备,并生成随机的DeviceName。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_批量添加.png
width=100%/>
</div>
&emsp;&emsp;
生成的设备如下。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_设备列表.png
width=100%/>
</div>
&emsp;&emsp;
点击前往“查看”按钮,就可以看到此设备的详细信息了。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_设备详情.png
width=100%/>
</div>
&emsp;&emsp;
点击右上角的“查看”按钮,就能看到设备的三元组信息了。
三元组是物联网设备端和物联网云端设备相关联的唯一标识符,在设备端连接云端的时候会使用三元组信息和云端进行鉴权,鉴权通过之后云端会认为设备已激活并上线。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_设备证书.png
width=100%/>
</div>
&emsp;&emsp;
再次前往物联网平台的设备信息页面,若设备运行正确,此时应该可以看到设备名右侧的状态由“未激活”变为“在线”。
选中“实时刷新”,可以看到数据实时从设备上报到物联网平台。设备上云成功。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_物模型数据.png
width=100%/>
</div>
## 3、规则引擎
### 3.1、场景联动设定
&emsp;&emsp;
点击规则引擎->场景联动,点击创建规则。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_创建规则0.png width=100%/>
</div>
&emsp;&emsp;
进行规则设定,如下所示,云端检测到“入侵”,那么发送告警使能信息到设备通知设备打开告警。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_编辑规则0.png width=100%/>
</div>
&emsp;&emsp;
点击规则引擎->场景联动,点击创建规则。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_创建规则1.png width=100%/>
</div>
&emsp;&emsp;
进行规则设定,如下所示,云端未检测到“入侵”,那么就发送告警解除信息到设备通知设备关闭告警。
<div align="center">
<img src=./../../../images/worst_case/esp32s/ESP32_WORST_CASE_编辑规则1.png width=100%/>
</div>
## 4、设备端开发
### 4.1、开发环境
&emsp;&emsp;
在进行下一步之前请确保ESP32开发环境已经搭建完毕。详情请参考[esp32开发环境](../../../startup/ESP32_startup.md)的说明。
### 4.2、创建解决方案
&emsp;&emsp;
如下图所示,在Haas Studio中创建项目。先选择左侧的“开发板型号”再从右侧的案例中选择“WORST-CASE”案例点击“立即创建”即可。
<div align="center">
<img src=./../../../images/HaaS_Studio_创建工程示范.png width=100%/>
</div>
<br>
1. **填写Wi-Fi名称及密码**
&emsp;&emsp;
在main.py中,填写可用的Wi-Fi名称及密码。
``` python
# wifi连接的的ssid和pwd定义
wifiSsid = "请填写您的路由器名称"
wifiPassword = "请填写您的路由器密码"
```
2. **修改设备端三元组**
&emsp;&emsp;
在main.py中,填写创建的设备三元组信息。关于设备三元组的获取,请参考[创建设备及获取三元组](./README.md "创建设备及获取三元组")中的步骤。
``` python
# 三元组信息
productKey = "产品key"
deviceName = "设备名称"
deviceSecret = "设备密钥"
```
{
"name": "esp32",
"version": "1.0.0",
"io": {
"uln2003_a": {
"type": "GPIO",
"port": 4,
"dir": "output",
"pull": "pushpull"
},
"uln2003_a_": {
"type": "GPIO",
"port": 0,
"dir": "output",
"pull": "pushpull"
},
"uln2003_b": {
"type": "GPIO",
"port": 2,
"dir": "output",
"pull": "pushpull"
},
"uln2003_b_": {
"type": "GPIO",
"port": 15,
"dir": "output",
"pull": "pushpull"
},
"magnetron": {
"type": "GPIO",
"port": 13,
"dir": "input",
"pull": "pullup"
},
"servo": {
"type": "PWM",
"port": 27
},
"photoresistor": {
"type": "ADC",
"port": 0,
"atten": 2,
"width": 3,
"sampling": 12000000
},
"sht3x": {
"type": "I2C",
"port": 0,
"addrWidth": 7,
"freq": 400000,
"mode": "master",
"devAddr": 68
},
"mpu6050": {
"type": "I2C",
"port": 0,
"addrWidth": 7,
"freq": 400000,
"mode": "master",
"devAddr": 104
},
"gnss": {
"type": "UART",
"port": 2,
"dataWidth": 8,
"baudRate": 9600,
"stopBits": 1,
"flowControl": "disable",
"parity": "none"
},
"motorspeed": {
"type": "GPIO",
"port": 26,
"dir": "irq",
"pull": "pullup",
"intMode": "falling"
},
"mfrc522_sda": {
"type": "GPIO",
"port": 5,
"dir": "output",
"pull": "pullup"
},
"mfrc522_rst":{
"type":"GPIO",
"port": 25,
"dir": "output",
"pull":"pullup"
},
"mfrc522": {
"type": "SPI",
"port": 3,
"mode": "master",
"freq": 2000000
}
},
"debugLevel": "ERROR",
"repl": "disable"
}
from driver import UART
from micropyGNSS import MicropyGNSS
class Gnss(object):
def __init__(self, uartObj):
self.uartObj = None
if not isinstance(uartObj, UART):
raise ValueError("parameter is not a GPIO object")
# 初始化定位模组串口
self.uartObj = uartObj
self.gnss = MicropyGNSS(location_formatting='dd')
def getLocation(self):
if self.uartObj is None:
raise ValueError("invalid UART object")
# 创建定位信息解析器
sentence = bytearray(100)
recvsize = self.uartObj.read(sentence)
if(recvsize):
print(sentence)
# 解析地理位置信息
for c in sentence:
self.gnss.update(chr(c))
print(self.gnss.longitude, self.gnss.latitude, self.gnss.altitude)
return self.gnss
from driver import GPIO
class MagnetronSensor():
def __init__(self, gpioObj):
self.gpioObj = None
if not isinstance(gpioObj, GPIO):
raise ValueError("parameter is not a GPIO object")
self.gpioObj = gpioObj
def isMagnetic(self):
if self.gpioObj is None:
raise ValueError("invalid GPIO object")
value = self.gpioObj.read()
return value
# -*- encoding: utf-8 -*-
from aliyunIoT import Device # iot组件是连接阿里云物联网平台的组件
from driver import GPIO
from driver import TIMER # 定时器类
from driver import PWM # PWM类
from driver import ADC # ADC类
from driver import I2C # I2C类
from driver import UART # UART类
from driver import SPI # SPI类
from mpu6050 import MPU6050 # 加速度计陀螺仪类
import uln2003 # 引入ULN2003步进电机驱动库
import ujson # json字串解析库
import utime # 延时函数在utime库中
import servo # 舵机控制库
import sht3x # 温湿度传感器库
import gnss # 定位模块gnss传感器库
import network # Wi-Fi功能所在库
import magnetronSensor # 磁控管开关传感器库
import photoresistor # 光敏电阻传感器库
import motorspeed # 马达测速传感器库
import mfrc522 # 门禁卡驱动库
import _thread # 线程库
# 主线程超时时间
CASE_TIMEOUT_S = 3600
# 步进电机控制量
en_uln2003 = False
# 舵机控制量
en_servo = False
# 步进电机转数计数
motor_cnt = 0
# 步进电机转动圈数
uln2003_cnt = 0
# 舵机摆动次数
servo_cnt = 0
# 磁控管检测计数
magnetron_cnt = 0
# 入侵检测
invasion = 0
# 加速度
ax = 0
ay = 0
az = 0
# 陀螺仪
gx = 0
gy = 0
gz = 0
# GPS
longitude = None
latitude = None
altitude = None
# 物联网平台连接标志位
iot_connected = False
wlan = None
# 三元组信息
productKey = "产品key"
deviceName = "设备名称"
deviceSecret = "设备密钥"
# 物联网设备实例
device = None
# Wi-Fi SSID和Password设置
wifiSsid = "请填写您的路由器名称"
wifiPassword = "请填写您的路由器密码"
# 传感器设备类
magnetronDev = None
gpioDev = None
pwmDev = None
i2cDev = None
i2cDev1 = None
adcDev = None
sht3xDev = None
mpu6050Dev = None
uartDev = None
gnssDev = None
servoDev = None
photoresistorDev = None
gpioDev1 = None
motorspeedDev = None
spiDev = None
sdaGpioDev = None
rstGpioDev = None
mfrc522Dev = None
uln2003Dev = None
# 设备实例
A = None
A_ = None
B = None
B_ = None
# 等待Wi-Fi成功连接到路由器
def get_wifi_status():
global wlan
wifi_connected = False
wlan.active(True) #激活界面
wlan.scan() #扫描接入点
# 连接到指定的路由器(路由器名称为wifiSsid, 密码为:wifiPassword)
wlan.connect(wifiSsid, wifiPassword)
while True:
wifi_connected = wlan.isconnected() # 获取Wi-Fi连接路由器的状态信息
if wifi_connected: # Wi-Fi连接成功则退出while循环
break
else:
utime.sleep(0.5)
print("wifi_connected:", wifi_connected)
ifconfig = wlan.ifconfig() # 获取接口的IP/netmask/gw/DNS地址
print(ifconfig)
utime.sleep(0.5)
# 物联网平台连接成功的回调函数
def on_connect(data):
global iot_connected
iot_connected = True
# 设置props事件接收函数(当云平台向设备下发属性时)
def on_props(request):
payload = ujson.loads(request['params'])
# print("payload:%s"%payload)
if ("en_alert" in payload.keys()):
alert = payload["en_alert"]
if (alert):
print("alarm: invasion ...\n")
def connect_lk(productKey, deviceName, deviceSecret):
global device, iot_connected
key_info = {
'region': 'cn-shanghai',
'productKey': productKey,
'deviceName': deviceName,
'deviceSecret': deviceSecret,
'keepaliveSec': 60
}
# 将三元组信息设置到iot组件中
device = Device()
# 设定连接到物联网平台的回调函数,如果连接物联网平台成功,则调用on_connect函数
device.on(Device.ON_CONNECT, on_connect)
# 配置收到云端属性控制指令的回调函数,如果收到物联网平台发送的属性控制消息,则调用on_props函数
device.on(Device.ON_PROPS, on_props)
# 启动连接阿里云物联网平台过程
device.connect(key_info)
# 等待设备成功连接到物联网平台
while(True):
if iot_connected:
print('物联网平台连接成功')
break
else:
print('sleep for 1 s')
utime.sleep(1)
print('sleep for 2s')
utime.sleep(2)
# 设备初始化
def timer_init():
global timerObj
timerObj = TIMER(0)
timerObj.open(mode=timerObj.PERIODIC, period=1000, callback=timer_cb)
timerObj.start()
def uln2003_init():
global uln2003Dev
global A, A_, B, B_
A = GPIO()
A.open("uln2003_a")
A_ = GPIO()
A_.open("uln2003_a_")
B = GPIO()
B.open("uln2003_b")
B_ = GPIO()
B_.open("uln2003_b_")
uln2003Dev = uln2003.ULN2003(A, A_, B, B_)
def magnetron_init():
global gpioDev, magnetronDev
gpioDev = GPIO()
gpioDev.open("magnetron")
magnetronDev = magnetronSensor.MagnetronSensor(gpioDev)
def motorspeed_init():
global gpioDev1, motorspeedDev
gpioDev1 = GPIO()
gpioDev1.open("motorspeed")
motorspeedDev = motorspeed.MOTORSPEED(gpioDev1, motorspeed_handler)
def servo_init():
global pwmDev, servoDev
pwmDev = PWM()
pwmDev.open("servo")
servoDev = servo.SERVO(pwmDev)
def photoresistor_init():
global adcDev, photoresistorDev
adcDev = ADC()
adcDev.open("photoresistor")
photoresistorDev = photoresistor.PHOTORESISTOR(adcDev)
def sht3x_init():
global i2cDev, sht3xDev
i2cDev = I2C()
i2cDev.open("sht3x")
sht3xDev = sht3x.SHT3X(i2cDev)
def mpu6050_init():
global i2cDev1, mpu6050Dev
i2cDev1 = I2C()
i2cDev1.open("mpu6050")
mpu6050Dev = MPU6050(i2cDev1)
def gnss_init():
global gnssDev, uartDev
uartDev = UART()
uartDev.open('gnss')
gnssDev = gnss.Gnss(uartDev)
def mfrc522_init():
global spiDev, sdaGpioDev, rstGpioDev, mfrc522Dev
spiDev = SPI()
spiDev.open("mfrc522")
sdaGpioDev = GPIO()
sdaGpioDev.open("mfrc522_sda")
rstGpioDev = GPIO()
rstGpioDev.open("mfrc522_rst")
mfrc522Dev = mfrc522.MFRC522(spiDev, sdaGpioDev, rstGpioDev)
# 关闭设备
def timer_deinit():
global timerObj
timerObj.stop()
timerObj.close()
def uln2003_deinit():
global A, A_, B, B_
A.close()
A_.close()
B.close()
B_.close()
def magnetron_deinit():
global gpioDev
gpioDev.close()
def motorspeed_deinit():
global gpioDev1
gpioDev1.close()
def servo_deinit():
global pwmDev
pwmDev.close()
def photoresistor_deinit():
global adcDev
adcDev.close()
def sht3x_deinit():
global i2cDev
i2cDev.close()
def mpu6050_deinit():
global i2cDev1
i2cDev1.close()
def gnss_deinit():
global uartDev
uartDev.close()
def mfrc522_deinit():
global spiDev, sdaGpioDev, rstGpioDev
spiDev.close()
sdaGpioDev.close()
rstGpioDev.close()
# 步进电机控制
def uln2003_ctrl(cmd = 'stop', step = 0, speed = 4):
global uln2003Dev
step_tmp = step
if (cmd is 'stop'): # 停止
uln2003Dev.motorStop()
return
while (step_tmp > 0):
if (cmd is 'cw'): # 顺时针转动
uln2003Dev.motorCw(speed)
elif (cmd is 'ccw'): # 逆时针转动
uln2003Dev.motorCcw(speed)
step_tmp -= 1
# 定时器回调函数
def timer_cb(args):
global ax, ay, az, gx, gy, gz
global longitude, latitude, altitude
print("timer run ...\n")
(ax, ay, az) = mpu6050_read("A")
(gx, gy, gz) = mpu6050_read("G")
location = gnss_read()
if(location):
# print(location.longitude, location.latitude, location.altitude)
longitude = location.longitude[0]
latitude = location.latitude[0]
altitude = location.altitude
# 磁体检测,0-检测到磁体,1-未检测到磁体;
def get_magnetron_status():
global magnetronDev
status = magnetronDev.isMagnetic()
return status
# 舵机控制
def servo_control(data):
global servoDev
servoDev.setOptionSero(data)
# 读取液位数据
def get_lightness():
global photoresistorDev
return photoresistorDev.getLightness()
# 温湿度读取
def sht3x_read(param):
global sht3xDev
if (param is "H"):
data = sht3xDev.getHumidity()
elif(param is "T"):
data = sht3xDev.getTemperature()
else:
raise ValueError("sht3x_read param error.")
return data
# 加速度/角速度读取
def mpu6050_read(param):
global mpu6050Dev
if (param is "A"):
data = mpu6050Dev.getAcceleration()
elif(param is "G"):
data = mpu6050Dev.getGyroscope()
else:
raise ValueError("mpu6050_read param error.")
return data
# 位置信息读取
def gnss_read():
global gnssDev
return gnssDev.getLocation()
# 马达转数计数回调
def motorspeed_handler(data):
global motor_cnt
motor_cnt += 1
print('motor speed cnts:', motor_cnt)
# 门禁控制线程
def mfrc522_th():
global mfrc522Dev
while (True):
rfid_card = mfrc522Dev.readCardID()
utime.sleep(1)
print("rfid_card:", rfid_card)
# 电机控制线程
def motor_th():
global en_servo, en_uln2003
global uln2003_cnt, servo_cnt
servo_control(0) # 舵机复位
utime.sleep(1)
while (True):
if (en_servo):
print("run servo ...\n")
servo_cnt += 1
servo_control(-90) # 舵机旋转-90度
utime.sleep(3)
servo_control(90) # 舵机旋转90度
utime.sleep(1)
en_servo = False
if (en_uln2003):
print("run motor ...\n")
uln2003_cnt += 1
# 驱动步进电机走512步,即电机转动一周
uln2003_ctrl('cw', 512, 4)
en_uln2003 = False
print("servo_cnt=", servo_cnt)
print("uln2003_cnt=", uln2003_cnt)
utime.sleep_ms(500)
def control_th():
global invasion, en_uln2003, en_servo
global magnetron_cnt
while (True):
if (not get_magnetron_status()): # 获取磁控管传感器的状态,0-检测到入侵,1-未检测入侵
invasion = 1
magnetron_cnt += 1
else:
invasion = 0
if (en_uln2003 is False):
lightness = get_lightness()
if (lightness < 1000):
en_uln2003 = True
print("lightness: ", lightness)
if (en_servo is False):
humidity = sht3x_read("H")
temp = sht3x_read("T")
if (humidity > 70.0 or humidity < 30.0 or temp > 30.0 or temp < 15.0):
en_servo = True # 开启舵机打开温湿度均衡系统
print("temperature:", temp, "humidity:", humidity)
utime.sleep(1)
def worst_case_main():
global wlan, invasion
global productKey, deviceName, deviceSecret
global ax, ay, az, gx, gy, gz
global longitude, latitude, altitude
wlan = network.WLAN(network.STA_IF) #创建WLAN对象
# 请替换物联网平台申请到的产品和设备信息,可以参考文章:https://blog.csdn.net/HaaSTech/article/details/114360517
get_wifi_status()
connect_lk(productKey, deviceName, deviceSecret)
timer_init()
motorspeed_init()
uln2003_init()
magnetron_init()
servo_init()
photoresistor_init()
sht3x_init()
mpu6050_init()
gnss_init()
mfrc522_init()
try:
# 启动门禁线程
th0_id = _thread.start_new_thread(mfrc522_th, ())
# 启动电机驱动线程
th1_id = _thread.start_new_thread(motor_th, ())
# 启动控制线程
th2_id = _thread.start_new_thread(control_th, ())
except:
raise ValueError("start thread failed.")
start = utime.time()
while True:
now = utime.time()
if ((now - start) > CASE_TIMEOUT_S):
break
print("upload...\n")
upload_data = {
'params': ujson.dumps({
'GeoLocation': {
'Longitude': longitude,
'Latitude': latitude,
'Altitude': altitude,
'CoordinateSystem': 1
},
"accel": {
'ax':ax,
'ay':ay,
'az':az
},
"gyro": {
'gx':gx,
'gy':gy,
'gz':gz
},
"invasion": invasion
})
}
# 上报数据至物联网平台
device.postProps(upload_data)
utime.sleep(2)
_thread.stop_thread(th0_id)
_thread.stop_thread(th1_id)
_thread.stop_thread(th2_id)
mfrc522_deinit()
gnss_deinit()
mpu6050_deinit()
sht3x_deinit()
photoresistor_deinit()
servo_deinit()
magnetron_deinit()
uln2003_deinit()
motorspeed_init()
timer_deinit()
if __name__ == '__main__':
worst_case_main()
from driver import SPI,GPIO
class AuthenticationError(Exception):
pass
class StatusNotSuccessError(Exception):
pass
class MFRC522:
KEY = [0xFF,0xFF,0xFF,0xFF,0xFF,0xFF]
BLOCK_ADDRS = [8, 9, 10]
MAX_LEN = 16
PCD_IDLE = 0x00
PCD_AUTHENT = 0x0E
PCD_RECEIVE = 0x08
PCD_TRANSMIT = 0x04
PCD_TRANSCEIVE = 0x0C
PCD_RESETPHASE = 0x0F
PCD_CALCCRC = 0x03
PICC_REQIDL = 0x26
PICC_REQALL = 0x52
PICC_ANTICOLL = 0x93
PICC_SElECTTAG = 0x93
PICC_AUTHENT1A = 0x60
PICC_AUTHENT1B = 0x61
PICC_READ = 0x30
PICC_WRITE = 0xA0
PICC_DECREMENT = 0xC0
PICC_INCREMENT = 0xC1
PICC_RESTORE = 0xC2
PICC_TRANSFER = 0xB0
PICC_HALT = 0x50
MI_OK = 0
MI_NOTAGERR = 1
MI_ERR = 2
Reserved00 = 0x00
CommandReg = 0x01
CommIEnReg = 0x02
DivlEnReg = 0x03
CommIrqReg = 0x04
DivIrqReg = 0x05
ErrorReg = 0x06
Status1Reg = 0x07
Status2Reg = 0x08
FIFODataReg = 0x09
FIFOLevelReg = 0x0A
WaterLevelReg = 0x0B
ControlReg = 0x0C
BitFramingReg = 0x0D
CollReg = 0x0E
Reserved01 = 0x0F
Reserved10 = 0x10
ModeReg = 0x11
TxModeReg = 0x12
RxModeReg = 0x13
TxControlReg = 0x14
TxAutoReg = 0x15
TxSelReg = 0x16
RxSelReg = 0x17
RxThresholdReg = 0x18
DemodReg = 0x19
Reserved11 = 0x1A
Reserved12 = 0x1B
MifareReg = 0x1C
Reserved13 = 0x1D
Reserved14 = 0x1E
SerialSpeedReg = 0x1F
Reserved20 = 0x20
CRCResultRegM = 0x21
CRCResultRegL = 0x22
Reserved21 = 0x23
ModWidthReg = 0x24
Reserved22 = 0x25
RFCfgReg = 0x26
GsNReg = 0x27
CWGsPReg = 0x28
ModGsPReg = 0x29
TModeReg = 0x2A
TPrescalerReg = 0x2B
TReloadRegH = 0x2C
TReloadRegL = 0x2D
TCounterValueRegH = 0x2E
TCounterValueRegL = 0x2F
Reserved30 = 0x30
TestSel1Reg = 0x31
TestSel2Reg = 0x32
TestPinEnReg = 0x33
TestPinValueReg = 0x34
TestBusReg = 0x35
AutoTestReg = 0x36
VersionReg = 0x37
AnalogTestReg = 0x38
TestDAC1Reg = 0x39
TestDAC2Reg = 0x3A
TestADCReg = 0x3B
Reserved31 = 0x3C
Reserved32 = 0x3D
Reserved33 = 0x3E
Reserved34 = 0x3F
serNum = []
def __init__(self, spiDev, select_gpio, reset_gpio):
"""
Sets up an instance of the MFRC522 Reader over SPI
"""
self._spiDev = None
if not isinstance(spiDev, SPI):
raise ValueError("parameter is not an SPI object")
if not isinstance(select_gpio, GPIO):
raise ValueError("parameter is not an GPIO object")
if not isinstance(reset_gpio, GPIO):
raise ValueError("parameter is not an GPIO object")
self._spiDev = spiDev
self.reset_pin = reset_gpio
self.select_pin = select_gpio
self.reset_pin.write(1)
self.MFRC522_Init()
def MFRC522_Reset(self):
self.Write_MFRC522(self.CommandReg, self.PCD_RESETPHASE)
def Write_MFRC522(self, addr, val):
self.select_pin.write(0)
self._spiDev.write(b'%c' % int(0xff & ((addr << 1) & 0x7e)))
self._spiDev.write(b'%c' % int(0xff & val))
self.select_pin.write(1)
def Read_MFRC522(self, addr):
self.select_pin.write(0)
self._spiDev.write(b'%c' % int(0xff & (((addr << 1) & 0x7e) | 0x80)))
buf = bytearray(1)
self._spiDev.read(buf)
self.select_pin.write(1)
return buf[0]
def Close_MFRC522(self):
self._spiDev.close()
def SetBitMask(self, reg, mask):
tmp = self.Read_MFRC522(reg)
self.Write_MFRC522(reg, tmp | mask)
def ClearBitMask(self, reg, mask):
tmp = self.Read_MFRC522(reg)
self.Write_MFRC522(reg, tmp & (~mask))
def AntennaOn(self):
temp = self.Read_MFRC522(self.TxControlReg)
if (~(temp & 0x03)):
self.SetBitMask(self.TxControlReg, 0x03)
def AntennaOff(self):
self.ClearBitMask(self.TxControlReg, 0x03)
def MFRC522_ToCard(self, command, sendData):
backData = []
backLen = 0
status = self.MI_ERR
irqEn = 0x00
waitIRq = 0x00
lastBits = None
n = 0
if command == self.PCD_AUTHENT:
irqEn = 0x12
waitIRq = 0x10
if command == self.PCD_TRANSCEIVE:
irqEn = 0x77
waitIRq = 0x30
self.Write_MFRC522(self.CommIEnReg, irqEn | 0x80)
self.ClearBitMask(self.CommIrqReg, 0x80)
self.SetBitMask(self.FIFOLevelReg, 0x80)
self.Write_MFRC522(self.CommandReg, self.PCD_IDLE)
for i in range(len(sendData)):
self.Write_MFRC522(self.FIFODataReg, sendData[i])
self.Write_MFRC522(self.CommandReg, command)
if command == self.PCD_TRANSCEIVE:
self.SetBitMask(self.BitFramingReg, 0x80)
i = 2000
while True:
n = self.Read_MFRC522(self.CommIrqReg)
i -= 1
if ~((i != 0) and ~(n & 0x01) and ~(n & waitIRq)):
break
self.ClearBitMask(self.BitFramingReg, 0x80)
if i != 0:
if (self.Read_MFRC522(self.ErrorReg) & 0x1B) == 0x00:
status = self.MI_OK
if n & irqEn & 0x01:
status = self.MI_NOTAGERR
if command == self.PCD_TRANSCEIVE:
n = self.Read_MFRC522(self.FIFOLevelReg)
lastBits = self.Read_MFRC522(self.ControlReg) & 0x07
if lastBits != 0:
backLen = (n - 1) * 8 + lastBits
else:
backLen = n * 8
if n == 0:
n = 1
if n > self.MAX_LEN:
n = self.MAX_LEN
for i in range(n):
backData.append(self.Read_MFRC522(self.FIFODataReg))
else:
status = self.MI_ERR
return (status, backData, backLen)
def MFRC522_Request(self, reqMode):
status = None
backBits = None
TagType = []
self.Write_MFRC522(self.BitFramingReg, 0x07)
TagType.append(reqMode)
(status, backData, backBits) = self.MFRC522_ToCard(self.PCD_TRANSCEIVE, TagType)
if ((status != self.MI_OK) | (backBits != 0x10)):
status = self.MI_ERR
return (status, backBits)
def MFRC522_Anticoll(self):
backData = []
serNumCheck = 0
serNum = []
self.Write_MFRC522(self.BitFramingReg, 0x00)
serNum.append(self.PICC_ANTICOLL)
serNum.append(0x20)
(status, backData, backBits) = self.MFRC522_ToCard(self.PCD_TRANSCEIVE, serNum)
if (status == self.MI_OK):
i = 0
if len(backData) == 5:
for i in range(4):
serNumCheck = serNumCheck ^ backData[i]
if serNumCheck != backData[4]:
status = self.MI_ERR
else:
status = self.MI_ERR
return (status, backData)
def CalulateCRC(self, pIndata):
self.ClearBitMask(self.DivIrqReg, 0x04)
self.SetBitMask(self.FIFOLevelReg, 0x80)
for i in range(len(pIndata)):
self.Write_MFRC522(self.FIFODataReg, pIndata[i])
self.Write_MFRC522(self.CommandReg, self.PCD_CALCCRC)
i = 0xFF
while True:
n = self.Read_MFRC522(self.DivIrqReg)
i -= 1
if not ((i != 0) and not (n & 0x04)):
break
pOutData = []
pOutData.append(self.Read_MFRC522(self.CRCResultRegL))
pOutData.append(self.Read_MFRC522(self.CRCResultRegM))
return pOutData
def MFRC522_SelectTag(self, serNum):
backData = []
buf = []
buf.append(self.PICC_SElECTTAG)
buf.append(0x70)
for i in range(5):
buf.append(serNum[i])
pOut = self.CalulateCRC(buf)
buf.append(pOut[0])
buf.append(pOut[1])
(status, backData, backLen) = self.MFRC522_ToCard(self.PCD_TRANSCEIVE, buf)
if (status == self.MI_OK) and (backLen == 0x18):
print("Size: " + str(backData[0]))
return backData[0]
else:
return 0
def MFRC522_Auth(self, authMode, BlockAddr, Sectorkey, serNum):
buff = []
# First byte should be the authMode (A or B)
buff.append(authMode)
# Second byte is the trailerBlock (usually 7)
buff.append(BlockAddr)
# Now we need to append the authKey which usually is 6 bytes of 0xFF
for i in range(len(Sectorkey)):
buff.append(Sectorkey[i])
# Next we append the first 4 bytes of the UID
for i in range(4):
buff.append(serNum[i])
# Now we start the authentication itself
(status, backData, backLen) = self.MFRC522_ToCard(self.PCD_AUTHENT, buff)
# Check if an error occurred
if not (status == self.MI_OK):
raise AuthenticationError("AUTH ERROR!!")
if not (self.Read_MFRC522(self.Status2Reg) & 0x08) != 0:
raise AuthenticationError("AUTH ERROR(status2reg & 0x08) != 0")
# Return the status
return status
def MFRC522_StopCrypto1(self):
self.ClearBitMask(self.Status2Reg, 0x08)
def MFRC522_Read(self, blockAddr):
recvData = []
recvData.append(self.PICC_READ)
recvData.append(blockAddr)
pOut = self.CalulateCRC(recvData)
recvData.append(pOut[0])
recvData.append(pOut[1])
(status, backData, backLen) = self.MFRC522_ToCard(self.PCD_TRANSCEIVE, recvData)
if not (status == self.MI_OK):
raise StatusNotSuccessError("Error while reading!")
if len(backData) == 16:
print("Sector " + str(blockAddr) + " " + str(backData))
return backData
else:
return None
def MFRC522_Write(self, blockAddr, writeData):
buff = []
buff.append(self.PICC_WRITE)
buff.append(blockAddr)
crc = self.CalulateCRC(buff)
buff.append(crc[0])
buff.append(crc[1])
(status, backData, backLen) = self.MFRC522_ToCard(self.PCD_TRANSCEIVE, buff)
if not (status == self.MI_OK) or not (backLen == 4) or not ((backData[0] & 0x0F) == 0x0A):
status = self.MI_ERR
print("%s backdata &0x0F == 0x0A %s" % (backLen, backData[0] & 0x0F))
if status == self.MI_OK:
buf = []
for i in range(16):
buf.append(writeData[i])
crc = self.CalulateCRC(buf)
buf.append(crc[0])
buf.append(crc[1])
(status, backData, backLen) = self.MFRC522_ToCard(self.PCD_TRANSCEIVE, buf)
if not (status == self.MI_OK) or not (backLen == 4) or not ((backData[0] & 0x0F) == 0x0A):
raise StatusNotSuccessError("Error while writing")
if status == self.MI_OK:
print("Data written")
def MFRC522_DumpClassic1K(self, key, uid):
for i in range(64):
status = self.MFRC522_Auth(self.PICC_AUTHENT1A, i, key, uid)
# Check if authenticated
if status == self.MI_OK:
self.MFRC522_Read(i)
else:
raise AuthenticationError("Authentication error")
def MFRC522_Init(self):
self.MFRC522_Reset()
self.Write_MFRC522(self.TModeReg, 0x8D)
self.Write_MFRC522(self.TPrescalerReg, 0x3E)
self.Write_MFRC522(self.TReloadRegL, 30)
self.Write_MFRC522(self.TReloadRegH, 0)
self.Write_MFRC522(self.TxAutoReg, 0x40)
self.Write_MFRC522(self.ModeReg, 0x3D)
self.AntennaOn()
def readText(self):
id, text = self.read_no_block()
while not id:
id, text = self.read_no_block()
return id, text
def readCardID(self):
id = self.read_id_no_block()
while not id:
id = self.read_id_no_block()
return id
def read_id_no_block(self):
(status, TagType) = self.MFRC522_Request(self.PICC_REQIDL)
if status != self.MI_OK:
return None
(status, uid) = self.MFRC522_Anticoll()
if status != self.MI_OK:
return None
return self.uid_to_num(uid)
def read_no_block(self):
(status, TagType) = self.MFRC522_Request(self.PICC_REQIDL)
if status != self.MI_OK:
return None, None
(status, uid) = self.MFRC522_Anticoll()
if status != self.MI_OK:
return None, None
id = self.uid_to_num(uid)
self.MFRC522_SelectTag(uid)
status = self.MFRC522_Auth(self.PICC_AUTHENT1A, 11, self.KEY, uid)
data = []
text_read = ''
if status == self.MI_OK:
for block_num in self.BLOCK_ADDRS:
block = self.MFRC522_Read(block_num)
if block:
data += block
if data:
text_read = ''.join(chr(i) for i in data)
self.MFRC522_StopCrypto1()
return id, text_read
def write(self, text):
id, text_in = self.write_no_block(text)
while not id:
id, text_in = self.write_no_block(text)
return id, text_in
def write_no_block(self, text):
(status, TagType) = self.MFRC522_Request(self.PICC_REQIDL)
if status != self.MI_OK:
return None, None
(status, uid) = self.MFRC522_Anticoll()
if status != self.MI_OK:
return None, None
id = self.uid_to_num(uid)
self.MFRC522_SelectTag(uid)
status = self.MFRC522_Auth(self.PICC_AUTHENT1A, 11, self.KEY, uid)
self.MFRC522_Read(11)
if status == self.MI_OK:
data = bytearray()
data.extend(bytearray(text.ljust(len(self.BLOCK_ADDRS) * 16).encode('ascii')))
i = 0
for block_num in self.BLOCK_ADDRS:
self.MFRC522_Write(block_num, data[(i*16):(i+1)*16])
i += 1
self.MFRC522_StopCrypto1()
return id, text[0:(len(self.BLOCK_ADDRS) * 16)]
def uid_to_num(self, uid):
n = 0
for i in range(0, 5):
n = n * 256 + uid[i]
return n
"""
Copyright (C) 2015-2021 Alibaba Group Holding Limited
HaaS Python driver for motorspeed
Author: HaaS
Date: 2022/05/15
"""
from driver import GPIO
class MOTORSPEED(object):
def __init__(self, gpioObj, func=None):
self.gpioObj = None
if not isinstance(gpioObj, GPIO):
raise ValueError("parameter is not a GPIO object")
self.gpioObj = gpioObj
if func != None:
self.gpioObj.on(func)
def objectDetection(self):
if self.gpioObj is None:
raise ValueError("invalid GPIO object")
value = self.gpioObj.read()
return value
"""
Copyright (C) 2015-2021 Alibaba Group Holding Limited
HaaS Python's driver for MPU6050
Author: HaaS Python Team
Date: 2022/02/02
"""
from micropython import const
from driver import I2C
from utime import sleep_ms
import math
MPU_SELF_TESTX_REG = const(0X0D) #自检寄存器X
MPU_SELF_TESTY_REG = const(0X0E) #自检寄存器Y
MPU_SELF_TESTZ_REG = const(0X0F) #自检寄存器Z
MPU_SELF_TESTA_REG = const(0X10) #自检寄存器A
MPU_SAMPLE_RATE_REG = const(0X19) #采样频率分频器
MPU_CFG_REG = const(0X1A) #配置寄存器
MPU_GYRO_CFG_REG = const(0X1B) #陀螺仪配置寄存器
MPU_ACCEL_CFG_REG = const(0X1C) #加速度计配置寄存器
MPU_MOTION_DET_REG = const(0X1F) #运动检测阀值设置寄存器
MPU_FIFO_EN_REG = const(0X23) #FIFO使能寄存器
MPU_I2CMST_CTRL_REG = const(0X24) #IIC主机控制寄存器
MPU_I2CSLV0_ADDR_REG = const(0X25) #IIC从机0器件地址寄存器
MPU_I2CSLV0_REG = const(0X26) #IIC从机0数据地址寄存器
MPU_I2CSLV0_CTRL_REG = const(0X27) #IIC从机0控制寄存器
MPU_I2CSLV1_ADDR_REG = const(0X28) #IIC从机1器件地址寄存器
MPU_I2CSLV1_REG = const(0X29) #IIC从机1数据地址寄存器
MPU_I2CSLV1_CTRL_REG = const(0X2A) #IIC从机1控制寄存器
MPU_I2CSLV2_ADDR_REG = const(0X2B) #IIC从机2器件地址寄存器
MPU_I2CSLV2_REG = const(0X2C) #IIC从机2数据地址寄存器
MPU_I2CSLV2_CTRL_REG = const(0X2D) #IIC从机2控制寄存器
MPU_I2CSLV3_ADDR_REG = const(0X2E) #IIC从机3器件地址寄存器
MPU_I2CSLV3_REG = const(0X2F) #IC从机3数据地址寄存器
MPU_I2CSLV3_CTRL_REG = const(0X30) #IIC从机3控制寄存器
MPU_I2CSLV4_ADDR_REG = const(0X31) #IIC从机4器件地址寄存器
MPU_I2CSLV4_REG = const(0X32) #IIC从机4数据地址寄存器
MPU_I2CSLV4_DO_REG = const(0X33) #IIC从机4写数据寄存器
MPU_I2CSLV4_CTRL_REG = const(0X34) #IIC从机4控制寄存器
MPU_I2CSLV4_DI_REG = const(0X35) #IIC从机4读数据寄存器
MPU_I2CMST_STA_REG = const(0X36) #IIC主机状态寄存器
MPU_INTBP_CFG_REG = const(0X37) #中断/旁路设置寄存器
MPU_INT_EN_REG = const(0X38) #中断使能寄存器
MPU_INT_STA_REG = const(0X3A) #中断状态寄存器
MPU_ACCEL_XOUTH_REG = const(0X3B) #加速度值,X轴高8位寄存器
MPU_ACCEL_XOUTL_REG = const(0X3C) #速度值,X轴低8位寄存器
MPU_ACCEL_YOUTH_REG = const(0X3D) #加速度值,Y轴高8位寄存器
MPU_ACCEL_YOUTL_REG = const(0X3E) #加速度值,Y轴低8位寄存器
MPU_ACCEL_ZOUTH_REG = const(0X3F) #加速度值,Z轴高8位寄存器
MPU_ACCEL_ZOUTL_REG = const(0X40) #加速度值,Z轴低8位寄存器
MPU_TEMP_OUTH_REG = const(0X41) #温度值高八位寄存器
MPU_TEMP_OUTL_REG = const(0X42) #温度值低8位寄存器
MPU_GYRO_XOUTH_REG = const(0X43) #陀螺仪值,X轴高8位寄存器
MPU_GYRO_XOUTL_REG = const(0X44) #陀螺仪值,X轴低8位寄存器
MPU_GYRO_YOUTH_REG = const(0X45) #陀螺仪值,Y轴高8位寄存器
MPU_GYRO_YOUTL_REG = const(0X46) #陀螺仪值,Y轴低8位寄存器
MPU_GYRO_ZOUTH_REG = const(0X47) #陀螺仪值,Z轴高8位寄存器
MPU_GYRO_ZOUTL_REG = const(0X48) #陀螺仪值,Z轴低8位寄存器
MPU_I2CSLV0_DO_REG = const(0X63) #IIC从机0数据寄存器
MPU_I2CSLV1_DO_REG = const(0X64) #IIC从机1数据寄存器
MPU_I2CSLV2_DO_REG = const(0X65) #IIC从机2数据寄存器
MPU_I2CSLV3_DO_REG = const(0X66) #IIC从机3数据寄存器
MPU_I2CMST_DELAY_REG = const(0X67) #IIC主机延时管理寄存器
MPU_SIGPATH_RST_REG = const(0X68) #信号通道复位寄存器
MPU_MDETECT_CTRL_REG = const(0X69) #运动检测控制寄存器
MPU_USER_CTRL_REG = const(0X6A) #用户控制寄存器
MPU_PWR_MGMT1_REG = const(0X6B) #电源管理寄存器1
MPU_PWR_MGMT2_REG = const(0X6C) #电源管理寄存器2
MPU_FIFO_CNTH_REG = const(0X72) #FIFO计数寄存器高八位
MPU_FIFO_CNTL_REG = const(0X73) #FIFO计数寄存器低八位
MPU_FIFO_RW_REG = const(0X74) #FIFO读写寄存器
MPU_DEVICE_ID_REG = const(0X75) #器件ID寄存器
MPU_DEV_ID = const(0x68)
mpu6050_dict = {'temp': 0.0, 'gyroX': 0, 'gyroY': 0, 'gyroZ': 0, 'accX': 0, 'accY': 0, 'accZ': 0}
class MPU6050Error(Exception):
def __init__(self, value=0, msg="mpu6050 common error"):
self.value = value
self.msg = msg
def __str__(self):
return "Error code:%d, Error message: %s" % (self.value, str(self.msg))
__repr__ = __str__
class MPU6050(object):
"""
This class implements mpu6050 chip's defs.
"""
def __init__(self, i2cDev):
self._i2cDev = None
if not isinstance(i2cDev, I2C):
raise ValueError("parameter is not an I2C object")
# make MPU6050's internal object points to i2cDev
self._i2cDev = i2cDev
# 初始化MPU6050传感器
r = self.init()
if r != 0:
raise ValueError("MPU6050 init error")
def i2c_write_byte(self, addr, value):
Reg = bytearray([addr, value])
self._i2cDev.write(Reg)
#print("--> write addr " + str(addr) + ", value = " + str(value))
def i2c_read_byte(self, addr):
Reg = bytearray([addr])
self._i2cDev.write(Reg)
tmp = bytearray(1)
self._i2cDev.read(tmp)
#print("<-- read addr " + str(addr) + ", value = " + str(tmp[0]))
return tmp[0]
def i2c_read_len(self, addr, len):
reg = bytearray([addr])
data = bytearray(len)
self._i2cDev.write(reg)
sleep_ms(20)
self._i2cDev.read(data)
# print("--> read " + str(len) + " bytes from addr " + str(addr) + ", " + str(len) + " bytes value = " + str(data))
return data
# 设置MPU6050陀螺仪传感器满量程范围
# fsr:0,±250dps;1,±500dps;2,±1000dps;3,±2000dps
# 返回值:0,设置成功
# 其他,设置失败
def setGyroFsr(self, fsr):
return self.i2c_write_byte(MPU_GYRO_CFG_REG, fsr << 3) # 设置陀螺仪满量程范围
# 设置MPU6050加速度传感器满量程范围
# fsr:0,±2g;1,±4g;2,±8g;3,±16g
# 返回值:0,设置成功
# 其他,设置失败
def setAccelFsr(self, fsr):
return self.i2c_write_byte(MPU_ACCEL_CFG_REG, fsr << 3) # 设置加速度传感器满量程范围
# 设置MPU6050的数字低通滤波器
# lpf:数字低通滤波频率(Hz)
# 返回值:0,设置成功
# 其他,设置失败
def setLPF(self, lpf):
if (lpf >= 188):
data = 1
elif (lpf >= 98):
data = 2
elif (lpf >= 42):
data = 3
elif (lpf >= 20):
data = 4
elif (lpf >= 10):
data = 5
else:
data = 6
return self.i2c_write_byte(MPU_CFG_REG, data) # 设置数字低通滤波器
# 设置MPU6050的采样率(假定Fs=1KHz)
# rate:4~1000(Hz)
# 返回值:0,设置成功
# 其他,设置失败
def setRate(self, rate):
if (rate > 1000):
rate = 1000
if (rate < 4):
rate = 4
data = 1000 // rate - 1
self.i2c_write_byte(MPU_SAMPLE_RATE_REG, data) # 设置数字低通滤波器
return self.setLPF(rate / 2) # 自动设置LPF为采样率的一半
# 得到温度值
# 返回值:温度值
def getTemperature(self):
buf = bytearray(2)
buf = self.i2c_read_len(MPU_TEMP_OUTH_REG, 2)
raw = (buf[0] << 8) | buf[1]
if (raw > (1 << 15)):
raw = raw - (1<<16)
temp = 36.53 + (raw) / 340
return round(temp, 2)
# 得到陀螺仪值(原始值)
# gx,gy,gz:陀螺仪x,y,z轴的原始读数(带符号)
# 返回值:0,成功
# 其他,错误代码
def getGyroscope(self):
buf = bytearray(6)
buf = self.i2c_read_len(MPU_GYRO_XOUTH_REG, 6)
gx = (buf[0] << 8) | buf[1]
gy = (buf[2] << 8) | buf[3]
gz = (buf[4] << 8) | buf[5]
if (gx > (1 << 15)):
gx = gx - (1<<16)
if (gy > (1 << 15)):
gy = gy - (1<<16)
if (gz > (1 << 15)):
gz = gz - (1<<16)
return (gx, gy, gz)
# 得到加速度值(原始值)
# gx,gy,gz:陀螺仪x,y,z轴的原始读数(带符号)
# 返回值:0,成功
# 其他,错误代码
def getAcceleration(self):
buf = bytearray(6)
buf = self.i2c_read_len(MPU_ACCEL_XOUTH_REG, 6)
ax = (buf[0] << 8) | buf[1]
ay = (buf[2] << 8) | buf[3]
az = (buf[4] << 8) | buf[5]
if (ax > (1 << 15)):
ax = ax - (1<<16)
if (ay > (1 << 15)):
ay = ay - (1<<16)
if (az > (1 << 15)) :
az = az - (1<<16)
return (ax, ay, az)
def getData(self):
global mpu6050_dict
mpu6050_dict['temp'] = self.getTemperature()
arr = self.getGyroscope()
mpu6050_dict['gyroX'] = arr[0]
mpu6050_dict['gyroY'] = arr[1]
mpu6050_dict['gyroZ'] = arr[2]
brr = self.getAcceleration()
mpu6050_dict['accX'] = brr[0]
mpu6050_dict['accY'] = brr[1]
mpu6050_dict['accZ'] = brr[2]
return mpu6050_dict
# 初始化MPU6050
# 返回值:0,成功
# 其他,错误代码
def init(self):
device_id = 0
self.i2c_write_byte(MPU_PWR_MGMT1_REG, 0X80) # 复位MPU6050
sleep_ms(200)
self.i2c_write_byte(MPU_PWR_MGMT1_REG, 0X00) # 唤醒MPU6050
self.setGyroFsr(3) # 陀螺仪传感器,±2000dps
self.setAccelFsr(0) # 加速度传感器,±2g
self.setRate(50) # 设置采样率50Hz
self.i2c_write_byte(MPU_INT_EN_REG, 0X00) # 关闭所有中断
self.i2c_write_byte(MPU_USER_CTRL_REG, 0X00) # I2C主模式关闭
self.i2c_write_byte(MPU_FIFO_EN_REG, 0X00) # 关闭FIFO
self.i2c_write_byte(MPU_INTBP_CFG_REG, 0X80) # INT引脚低电平有效
device_id = self.i2c_read_byte(MPU_DEVICE_ID_REG)
if (device_id == MPU_DEV_ID):
# 器件ID正确
self.i2c_write_byte(MPU_PWR_MGMT1_REG, 0X01) # 设置CLKSEL,PLL X轴为参考
self.i2c_write_byte(MPU_PWR_MGMT2_REG, 0X00) # 加速度与陀螺仪都工作
self.setRate(50) # 设置采样率为50Hz
return 0
else:
return 1
from driver import ADC
class PHOTORESISTOR(object):
def __init__(self, adcObj):
self.adcObj = None
if not isinstance(adcObj, ADC):
raise ValueError("parameter is not an ADC object")
self.adcObj = adcObj
def getLightness(self):
if self.adcObj is None:
raise ValueError("invalid ADC object")
value = self.adcObj.readVoltage()
return value
"""
HaaSPython PWM driver for servo
舵机传感器驱动
"""
from driver import PWM
class SERVO(object):
def __init__(self, pwmObj):
self.pwmObj = None
if not isinstance(pwmObj, PWM):
raise ValueError("parameter is not an PWM object")
self.pwmObj = pwmObj
def setOptionSero(self,data):
if self.pwmObj is None:
raise ValueError("invalid PWM object")
data_r = {'freq':50, 'duty': int(((data+90)*2/180+0.5)/20*100)}
self.pwmObj.setOption(data_r)
def close(self):
if self.pwmObj is None:
raise ValueError("invalid PWM object")
self.pwmObj.close()
"""
Copyright (C) 2015-2021 Alibaba Group Holding Limited
MicroPython's driver for CHT8305
Author: HaaS
Date: 2021/09/14
"""
from micropython import const
import utime
from driver import I2C
'''
# sht3x commands definations
# read serial number: CMD_READ_SERIALNBR 0x3780
# read status register: CMD_READ_STATUS 0xF32D
# clear status register: CMD_CLEAR_STATUS 0x3041
# enabled heater: CMD_HEATER_ENABLE 0x306D
# disable heater: CMD_HEATER_DISABLE 0x3066
# soft reset: CMD_SOFT_RESET 0x30A2
# accelerated response time: CMD_ART 0x2B32
# break, stop periodic data acquisition mode: CMD_BREAK 0x3093
# measurement: polling, high repeatability: CMD_MEAS_POLLING_H 0x2400
# measurement: polling, medium repeatability: CMD_MEAS_POLLING_M 0x240B
# measurement: polling, low repeatability: CMD_MEAS_POLLING_L 0x2416
'''
class SHT3X(object):
# i2cDev should be an I2C object and it should be opened before __init__ is called
def __init__(self, i2cDev):
self._i2cDev = None
if not isinstance(i2cDev, I2C):
raise ValueError("parameter is not an I2C object")
# make AHB21B's internal object points to _i2cDev
self._i2cDev = i2cDev
self.start()
def start(self):
# make sure AHB21B's internal object is valid before I2C operation
if self._i2cDev is None:
raise ValueError("invalid I2C object")
# send clear status register command - 0x3041 - CMD_CLEAR_STATUS
cmd = bytearray(2)
cmd[0] = 0x30
cmd[1] = 0x41
self._i2cDev.write(cmd)
# wait for 20ms
utime.sleep_ms(20)
return 0
def getTempHumidity(self):
if self._i2cDev is None:
raise ValueError("invalid I2C object")
tempHumidity = [-1, 2]
# start measurement: polling, medium repeatability - 0x240B - CMD_MEAS_POLLING_M
# if you want to adjust measure repeatability, you can send the following commands:
# high repeatability: 0x2400 - CMD_MEAS_POLLING_H
# low repeatability: 0x2416 - CMD_MEAS_POLLING_L
cmd = bytearray(2)
cmd[0] = 0x24
cmd[1] = 0x0b
self._i2cDev.write(cmd)
# must wait for a little before the measurement finished
utime.sleep_ms(20)
dataBuffer = bytearray(6)
# read the measurement result
self._i2cDev.read(dataBuffer)
# print(dataBuffer)
# calculate real temperature and humidity according to SHT3X-DIS' data sheet
temp = (dataBuffer[0]<<8) | dataBuffer[1]
humi = (dataBuffer[3]<<8) | dataBuffer[4]
tempHumidity[1] = humi * 0.0015259022
tempHumidity[0] = -45.0 + (temp) * 175.0 / (0xFFFF - 1)
return tempHumidity
def getTemperature(self):
data = self.getTempHumidity()
return data[0]
def getHumidity(self):
data = self.getTempHumidity()
return data[1]
def stop(self):
if self._i2cDev is None:
raise ValueError("invalid I2C object")
# stop periodic data acquisition mode
cmd = bytearray(3)
cmd[0] = 0x30
cmd[1] = 0x93
self._i2cDev.write(cmd)
# wait for a little while
utime.sleep_ms(20)
self._i2cDev = None
return 0
def __del__(self):
print('sht3x __del__')
"""
Copyright (C) 2015-2021 Alibaba Group Holding Limited
MicroPython's driver for ULN2003
Author: HaaS
Date: 2022/03/15
"""
from driver import GPIO
from utime import sleep_ms
from micropython import const
import math
class ULN2003(object):
"""
This class implements uln2003 chip's defs.
"""
def __init__(self, a, a_, b, b_):
self._a = None
self._a_ = None
self._b = None
self._b_ = None
if not isinstance(a, GPIO):
raise ValueError("parameter is not an GPIO object")
if not isinstance(a_, GPIO):
raise ValueError("parameter is not an GPIO object")
if not isinstance(b, GPIO):
raise ValueError("parameter is not an GPIO object")
if not isinstance(b_, GPIO):
raise ValueError("parameter is not an GPIO object")
# make ULN2003's internal object points to gpio
self._a = a
self._a_ = a_
self._b = b
self._b_ = b_
def motorCw(self, speed=4):
self._a.write(1)
self._a_.write(0)
self._b.write(0)
self._b_.write(0)
sleep_ms(speed)
self._a.write(0)
self._a_.write(1)
self._b.write(0)
self._b_.write(0)
sleep_ms(speed)
self._a.write(0)
self._a_.write(0)
self._b.write(1)
self._b_.write(0)
sleep_ms(speed)
self._a.write(0)
self._a_.write(0)
self._b.write(0)
self._b_.write(1)
sleep_ms(speed)
def motorCcw(self, speed=4):
self._a.write(0)
self._a_.write(0)
self._b.write(0)
self._b_.write(1)
sleep_ms(speed)
self._a.write(0)
self._a_.write(0)
self._b.write(1)
self._b_.write(0)
sleep_ms(speed)
self._a.write(0)
self._a_.write(1)
self._b.write(0)
self._b_.write(0)
sleep_ms(speed)
self._a.write(1)
self._a_.write(0)
self._b.write(0)
self._b_.write(0)
sleep_ms(speed)
def motorStop(self):
self._a.write(0)
self._a_.write(0)
self._b.write(0)
self._b_.write(0)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册