提交 11a6023b 编写于 作者: J jinouxiang.jox 提交者: skylarCai

[Detail]

Issue id: #1763 add security_monitoring_system demo
Solution: when megnetic sensor is triggered,buzzer will make sound and camera will caputure image and upload to the iot platform.we can monitor the system on mobile app.
上级 95adac82
# 防盗监控系统
  
下图是本案例的四步导学。
<div align="center">
<img src=./../../../images/security_monitoring_system/导学流程.png width=80%/>
</div>
## 简介
### 背景
&emsp;&emsp;
日常生活中,在夜间休息或家中无人时,如果在家中存放了贵重物品,需要做好家中贵重物品的防盗工作。当门窗或存放贵重物品的柜子、抽屉被打开时,防盗报警系统需要能够及时报警,并抓拍到入侵者的图像信息。
&emsp;&emsp;
本案例采用M5Stack Core2开发板,结合干簧管传感器、蜂鸣器、摄像头等传感器,搭建了一套低成本、易实现的家庭防盗监控系统,基于阿里云物联网平台,可在手机app上进行远程监控,动态控制系统运行。
## 准备
1. [M5Stack Core2开发板](https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C84FAF561DF6A00000001&dataId=800C84FAF561DF6A) 一套
2. M5Stack Unit CAM摄像头 一个
3. 连接线 一根
4. [干簧管传感器](https://haas.iot.aliyun.com/solution/detail/hardware?versionId=800C4C5091EC2DE500000001&dataId=800C4C5091EC2DE5) 一个
5. 公对母、公对公杜邦线 若干
涉及到的硬件购买链接如下,仅供参考,不负责商家发货的品质保障等问题!
| 名称 | 数量 | 参考链接 |
| --- | --- | --- |
| M5Stack Core2开发版 | 1 | [M5Stack Core2](https://item.taobao.com/item.htm?spm=a1z10.5-c-s.w4002-22404213529.17.732749d8usCqYX&id=625561056791) |
| microUSB数据线 | 1 | M5Stack Core2开发套件自带 |
| 摄像头 | 1 | [M5Stack Unit CAM摄像头](https://item.taobao.com/item.htm?spm=a1z10.5-c-s.w4002-22404213529.29.698e2d4844EBZF&id=643872470244) |
| 连接线 | 1条 | [M5Stack GROVE连接线](https://item.taobao.com/item.htm?spm=a1z10.5-c-s.w4002-22404213529.11.6b6d5f86B5IYMF&id=610410604759) 请选用10cm长即可 |
| 干簧管传感器 | 1 | [干簧管传感器](https://detail.tmall.com/item.htm?spm=a1z10.3-b-s.w4011-16538328900.35.2fc03d6cnWnN4k&id=41210762746&rn=e2e0e6902441d7885abaed9cc81ee1ed&abbucket=11) |
| 公对母杜邦线 | 若干 | [杜邦线](https://detail.tmall.com/item.htm?spm=a230r.1.14.22.b10c6663PCvNOq&id=41254478179&ns=1&abbucket=7&skuId=3211140814458) 请选用10cm长即可 |
&emsp;&emsp;
硬件连线图如下图所示:
<div align="center">
<img src=./../../../images/security_monitoring_system/接线图.png width=90%/>
</div>
<br>
## 云端开发
&emsp;&emsp;
整个过程包含以下4个步骤:
1. 开通公共实例
2. 创建产品(设备模型)
3. 定义产品功能(物模型)
4. 创建设备及获取三元组
### 开通公共实例
&emsp;&emsp;
对于第一次使用物联网平台的开发者,需要开通实例以使用物联网平台的功能。这里可以使用免费的公共实例进行开发。
&emsp;&emsp;
[物联网平台](https://iot.console.aliyun.com/lk/summary/new)中,左上角选择“华东2-上海”,点击“公共实例”,即可开通。开通后点击“公共实例”,即可进入[控制台](https://iot.console.aliyun.com/lk/summary/new)进行产品创建。
![防盗报警系统物联网平台.png](./../../../images/security_monitoring_system/防盗报警系统物联网平台.png)
### 创建产品(设备模型)
&emsp;&emsp;
进入[公共实例控制台](https://iot.console.aliyun.com/lk/summary/new),点击“创建产品”按钮,即可进入[新建产品页面](https://iot.console.aliyun.com/product)
![防盗报警系统空产品页.png](./../../../images/security_monitoring_system/防盗报警系统空产品页.png)
&emsp;&emsp;
进入[新建产品页面](https://iot.console.aliyun.com/product),设定“产品名称”,这里我们命名为“**防盗报警系统**”,开发者也可以根据自己的喜好来命名。在“所属品类”中,选择“自定义品类”。
&emsp;&emsp;
产品的节点类型选择“直连设备”,数据格式选择“ICA标准数据格式”,检验类型和认证方式选择默认设定即可。开发者可根据自己的需求在“产品描述”页面添加针对此产品的描述。
&emsp;&emsp;
对于 M5StackCore2 等搭载 Wi-Fi 的设备而言,联网方式选择“Wi-Fi”。
![防盗报警系统新建产品.png](./../../../images/security_monitoring_system/防盗报警系统新建产品.png)
&emsp;&emsp;
点击“确认”按钮,即可完成产品创建。
![防盗报警系统完成创建产品.png](./../../../images/security_monitoring_system/防盗报警系统完成创建产品.png)
&emsp;&emsp;
点击“前往定义物模型”
![防盗报警系统尚未添加任何功能.png](./../../../images/security_monitoring_system/防盗报警系统尚未添加任何功能.png)
### 定义产品功能(物模型)
&emsp;&emsp;
开发者可以使用准备好的[物模型文件](./link_platform/model.zip)来进行快速导入。点击左上角“快速导入”,选择物模型文件并上传,就能够生成案例对应的物模型。
![防盗报警系统快速导入.png](./../../../images/security_monitoring_system/防盗报警系统快速导入.png)
&emsp;&emsp;
生成后的效果如下:
![防盗报警系统导入完成.png](./../../../images/security_monitoring_system/防盗报警系统导入完成.png)
&emsp;&emsp;
定义好物模型后,需要发布物模型上线,并发布产品,以使变更生效。
![防盗报警系统发布物模型.png](./../../../images/security_monitoring_system/防盗报警系统发布物模型.png)
![防盗报警系统发布产品.png](./../../../images/security_monitoring_system/防盗报警系统发布产品.png)
&emsp;&emsp;
产品及其物模型创建完成后就可以创建这个产品的设备了。
### 创建设备及获取三元组
&emsp;&emsp;
点击左侧栏中“设备“,在筛选框中选择要添加设备的产品,点击“添加设备”。这里这里我们命名为“**test_device**”,开发者也可以根据自己的喜好来命名。
![防盗报警系统添加设备.png](./../../../images/security_monitoring_system/防盗报警系统添加设备.png)
&emsp;&emsp;
开发者也可以选择“批量添加”,一次性添加多个设备,并生成随机的DeviceName。
![防盗报警系统批量添加.png](./../../../images/security_monitoring_system/防盗报警系统批量添加.png)
&emsp;&emsp;
生成的设备如下。
![防盗报警系统设备列表.png](./../../../images/security_monitoring_system/防盗报警系统设备列表.png)
&emsp;&emsp;
点击前往“查看”按钮,就可以看到此设备的详细信息了。
![防盗报警系统设备详情.png](./../../../images/security_monitoring_system/防盗报警系统设备详情.png)
&emsp;&emsp;
点击右上角的“查看”按钮,就能看到设备的三元组信息了。
三元组是物联网设备端和物联网云端设备相关联的唯一标识符,在设备端连接云端的时候会使用三元组信息和云端进行鉴权,鉴权通过之后云端会认为设备已激活并上线。
![防盗报警系统设备证书.png](./../../../images/security_monitoring_system/防盗报警系统设备证书.png)
&emsp;&emsp;
再次前往物联网平台的设备信息页面,若设备运行正确,此时应该可以看到设备名右侧的状态由“未激活”变为“在线”。
选中“实时刷新”,可以看到数据实时从设备上报到物联网平台。设备上云成功。
![防盗报警系统物模型数据.png](./../../../images/security_monitoring_system/防盗报警系统物模型数据.png)
## 物联网应用开发
&emsp;&emsp;
IoT Studio 提供了应用快速开发的能力,可以很方便地与物联网平台进行联动。本节的开发工作也将围绕 IoT Studio展开。
### 1. 新建“普通项目”
&emsp;&emsp;
打开IoT Studio官网,在项目管理中新建一个空白项目,如下图所示,将此项目命名为“**防盗报警系统**”,开发者也可以根据自己的喜好来命名。
![防盗报警系统IS项目管理.png](./../../../images/security_monitoring_system/防盗报警系统IS项目管理.png)
![防盗报警系统IS新建项目.png](./../../../images/security_monitoring_system/防盗报警系统IS新建项目.png)
### 2. 关联产品
&emsp;&emsp;
为了使本项目能够获取到目标设备的属性信息,我们首先需要将该项目和我们在前一节创建的产品“防盗报警系统”绑定。
&emsp;&emsp;
在项目控制台,点击左侧的“产品”,点击“关联物联网平台产品”。此时可以看见我们创建的“防盗报警系统”。点击选中,并勾选“关联产品同时关联其下所有设备”,以便该项目可以访问到所有设备的属性信息。
![防盗报警系统IS关联产品.png](./../../../images/security_monitoring_system/防盗报警系统IS关联产品.png)
### 3. 创建“移动应用”
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/防盗报警系统IS创建移动应用.png)
### 4. “移动应用”开发
&emsp;&emsp;
从左侧组件栏中选择防盗报警系统app需要的组件,将其拖入中间的窗口,按自己的喜好进行布局。本案例中,选择了时钟、实时曲线、开关、指示灯、图片、文字等组件。具体配置过程如下:
&emsp;&emsp;
- 配置图表组件
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/移动应用-配置图表组件.jpg)
- 配置开关组件
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/移动应用-配置设备报警开关.jpg)
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/移动应用-配置设备属性开关.jpg)
- 配置指示灯组件
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/移动应用-指示灯.jpg)
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/移动应用-状态指示灯.jpg)
- 配置图片组件
![防盗报警系统IS创建移动应用.png](./../../../images/security_monitoring_system/移动应用-图片配置.jpg)
## HaaS Cloud平台配置
&emsp;&emsp;
由于需要用到HaaS聚合服务中工具服务能力,获取图片url,因此需要在HaaS Cloud平台绑定物联网平台产品。
&emsp;&emsp;
进入[HaaS官网](https://haas.iot.aliyun.com/)
使用阿里云账号登陆后,点击右上角“控制台”。
![{name_img_static_HaaS官网}](./../../../images/security_monitoring_system/haas官网.jpg)
&emsp;&emsp;
进入HaaS Cloud平台的控制台页面:
在左侧导航栏“设备管理”下,可以看到“产品”和“设备”两个选项,点击“产品”,点击“绑定物联网平台产品”,在右侧弹出的标签页中勾选我们刚刚创建的“**防盗报警系统**”,点击确定完成关联。
![{name_img_step22}](./../../../images/security_monitoring_system/防盗报警系统HaaS控制台绑定产品.png)
&emsp;&emsp;
在左侧导航栏“设备管理”下,点击“设备”,选择关联的产品,即可看到关联的设备列表。此时可以看到“**test_device**”出现在了列表中,证明关联成功。
![防盗报警系统HaaS控制台绑定产品.png](./../../../images/security_monitoring_system/防盗报警系统HaaS控制台绑定设备.png)
## 设备端开发
### 开发环境准备
&emsp;&emsp;
在进行下一步之前请确保ESP32中已经烧录了HaaS Python固件并且其开发环境已经搭建完毕。详情请参考[M5StackCore2开发环境](../../../startup/M5StackCore2_startup.md)的说明。
<br>
### 创建解决方案
&emsp;&emsp;
如下图所示,打开VS Code之后在新建一个基于helloworld的python工程,设定好工程名称(“security_monitoring_system”)及工作区路径之后,硬件类型选择"m5stackcore2",点击”立即创建“,创建一个Python轻应用的解决方案。
<div align="center">
<img src=./../../../images/security_monitoring_system/创建工程.png width=100%/>
</div>
&emsp;&emsp;
[防盗报警系统代码](./code/)文件下的所有脚本进行复制到“security_monitoring_system”工程目录下,然后进行如下设定完成设备端代码的开发。
> Python脚本的详细说明请参考脚本内嵌的文字注释
1. **修改路由器名称及密码**
&emsp;&emsp;
修改security_monitoring_system工程里main.py中SSID和PWD的值为开发者实际要连接的路由器的名称及密码(请注意名称和密码都需要放在''符号中间)。
```python
# Wi-Fi SSID和Password设置
SSID='Your-AP-SSID'
PWD='Your-AP-Password'
```
&emsp;&emsp;
修改完成之后connect_wifi函数就会连接开发者自己设定的路由器。
2. **修改设备的三元组信息**
&emsp;&emsp;
按照创建产品、设备步骤获取三元组信息,填入main.py中:
```python
# HaaS设备三元组
productKey = "Your-ProductKey"
deviceName = "Your-devicename"
deviceSecret = "Your-deviceSecret"
```
&emsp;&emsp;
最后点击VSCode中HaaS Studio工程底部的部署运行或串口,选择对应的端口编号烧录程序。
&emsp;&emsp;
推送此脚本到M5StackCore2之后,通过PC端串口调试工具可以看到设备端打印如下日志。日志的含义为:
* “link platform connected” 代表成功连接到云端平台。
* "upload--->"之后跟着的为设备端向云端发送的请求信息,其中"eventName"为"saveImageTest",表示调用云端增值服务中的获取图片url信息功能。
* "download <----"之后跟着的是收到云端回复的识别结果信息:
- result:url是否获取成功。(success:成功,failed:失败)
- imageUrl:返回的图片url链接地址
- commandName:saveImageTestReply表明该消息是saveImageTest服务的返回信息。
```json
establish tcp connection with server(host='xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com', port=[443])
tcp_connect: can only connect from state CLOSED
success to establish tcp, fd=55
link platform connected
megnetic detector ...
upload--->{'qos': 1, 'payload': '{"version": "1.0", "id": 1, "params": {"ext": "{\\"fileName\\": \\"test.jpg\\", \\"enlarge\\": 1, \\"filePosition\\": \\"lp\\", \\"fileId\\": \\"xxxxx\\"}", "eventType": "haas.faas", "eventName": "saveImageTest", "argInt": 1}}', 'topic': '/sys/xxx/xxxxx/thing/event/hli_event/post'}
recognize time : 1629
download <----{'msg_id': 140514731, 'service_id': 'hli_async_service', 'params': '{"ext":"{\\"imageUrl\\":\\"http://vibktprfx-prod-prod-aic-vc-cn-shanghai.oss-cn-shanghai.aliyuncs.com/xxx/22-05-24-01/xxx.png?Expires=xxx&OSSAccessKeyId=xxx&Signature=xxx\\",
\\"result\\":\\"success\\"}","commandName":"saveImageTestReply","commandType":"haas.faas","argInt":1}', 'code': 0, 'params_len': 360}
```
## 端云连调
### 云端查看设备上报结果
&emsp;&emsp;
此时如果拉开抽屉,触发干簧管传感器,蜂鸣器会发出响声,app页面的报警指示灯会变成红色,同时摄像头上传抓拍到的图像。
<div align="center">
<img src=./../../../images/security_monitoring_system/移动应用-app截图.jpg width=100%/>
</div>
&emsp;&emsp;
接着通过云端设置报警开关状态为“关闭”,也可以手机扫码设置,此时在VSCode可以看到如下log显示。之后如果再次出发干簧管传感器,app页面的指示灯保持绿色,同时蜂鸣器也不会报警。
日志的含义为:
* “magnetic_switch set value : 0” 表示收到平台下发的设置报警开关消息。
- 0:表示关闭报警开关;
- 1:表示打开报警开关;
* "upload props--->"之后跟着的为设备端向物联网平台发送的属性信息,其中包括四个属性:
- PictureURL:返回的图片url链接地址,用于在移动应用中展示抓拍的图像。
- alarm_status:当前设备报警状态。
- 0:设备未报警,对应移动应用app中设备报警状态指示灯为**绿色**
- 1:设备报警,对应移动应用app中设备报警状态指示灯为**红色**
- magnetic_switch:设备报警开关。
- 0:关闭报警开关,此时若干簧管传感器触发,也不会产生报警。
- 1:打开报警开关,此时若干簧管传感器触发,则会报警,蜂鸣器发出声音,抓拍入侵者图像,上报平台。
- alarm_count:设备报警触发次数,用于移动应用app统计展示。
```log
magnetic_switch set value : 0
upload props-->{'params': '{"PictureURL": "", "alarm_status": 0, "magnetic_switch": 0, "alarm_count": 1}'}
```
&emsp;&emsp;
扫描二维码后,打开监控app,此时在手机端就可以看到如下监控曲线,当传感器触发时,可以看到摄像头抓拍到的图像。
&emsp;&emsp;
注意:这里的二维码有效期24小时,如需发布长期使用,请在阿里云[域名注册](https://wanwang.aliyun.com/)申请独立域名后在Iot Studio配置才可。
<div align="center">
<img src=./../../../images/security_monitoring_system/移动应用-app手机端.jpg width=50%/>
</div>
### 设备端效果
&emsp;&emsp;
将开发板、摄像头、蜂鸣器、干簧管传感器、磁铁等传感器固定好,当抽屉被拉开时,由于干簧管传感器与磁铁分离,产生信号,屏幕上会打印"WARNING"字样,蜂鸣器发出声音,能够有效震慑入侵者。
<div align="center">
<img src=./../../../images/security_monitoring_system/演示.gif width=80%/>
</div>
\ No newline at end of file
{
"name": "m5stackcore2",
"version": "1.0.0",
"io": {
"ir": {
"type": "GPIO",
"port": 19,
"dir": "input",
"pull": "pullup"
},
"buzzer": {
"type": "PWM",
"port": 27
}
},
"debugLevel": "ERROR",
"repl": "disable"
}
from driver import PWM
class BUZZER(object):
def __init__(self, pwmObj,data=None):
self.pwmObj = None
if not isinstance(pwmObj, PWM):
raise ValueError("parameter is not an PWM object")
self.pwmObj = pwmObj
if data is not None:
self.setOptionDuty(data)
def setOptionDuty(self,data):
if self.pwmObj is None:
raise ValueError("invalid PWM object")
self.pwmObj.setOption(data)
def start(self,data):
if self.pwmObj is None:
raise ValueError("invalid PWM object")
self.setOptionDuty(data)
def close(self,data):
if self.pwmObj is None:
raise ValueError("invalid PWM object")
self.setOptionDuty(data)
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
'''
@File : cloudAI.py
@Description: 云端AI
@Author : jiangyu
@version : 1.0
'''
from aliyunIoT import Device
import utime # 延时函数在utime库中
import ujson as json
class CloudAI :
def __gesture_cb(self, dict) :
'''
Reply list :
handGestureReply : 手势识别
'''
gesture = 'NA'
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
score = ext_dict['score']
if score > 0.4 :
gesture = ext_dict['type']
print("recognize hand gesture : " + gesture)
self.__cb('handGestureReply', gesture)
def __action_cb(self, dict) :
'''
Reply list :
actionReply : 动作识别
'''
action = 'NA'
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
i = 0
elements_list = ext_dict['elements']
while (i < len(elements_list)) :
g_score = elements_list[i]['score']
label = elements_list[i]['label']
if g_score > 0.5:
print("recognize action: " + label)
action = label
break
i += 1
self.__cb('recognizeActionReply', action)
def __license_plate_cb(self, dict) :
plateNumber = 'NA'
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
g_confidence = ext_dict['confidence']
if g_confidence > 0.7 :
plateNumber = ext_dict['plateNumber']
print('detect: ' + plateNumber)
self.__cb('ocrCarNoReply', plateNumber)
def __fruits_cb(self, dict) :
fruit_name = 'NA'
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
i = 0
fruits_list = ext_dict['fruitList']
while (i < len(fruits_list)) :
g_score = fruits_list[i]['score']
fruit_name = fruits_list[i]['name']
if g_score > 0.6:
print('detect: ' + fruit_name)
i += 1
self.__cb('detectFruitsReply', fruit_name)
def __pedestrian_cb(self, dict) :
detected = False
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
i = 0
data = ext_dict['data']
data_dict = json.loads(data)
elements_list = data_dict['elements']
while (i < len(elements_list)) :
g_score = elements_list[i]['score']
if g_score > 0.6:
print('Pedestrian Detected')
detected = True
i += 1
self.__cb('DetectPedestrianReply', detected)
def __businesscard_cb(self, dict) :
card_info = {}
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
card_info['name'] = ext_dict['name']
print("name : " + card_info['name'])
if card_info['name'] == '' :
card_info['name'] = 'unknown'
phoneNumbers_list = ext_dict['cellPhoneNumbers']
print("phoneNumbers : ")
print(phoneNumbers_list)
if len(phoneNumbers_list) :
card_info['phoneNumbers'] = phoneNumbers_list[0]
else :
card_info['phoneNumbers'] = 'unknown'
email_list = ext_dict['emails']
print("email_list: ")
print(email_list)
if len(email_list) :
card_info['email'] = email_list[0]
else :
card_info['email'] = 'unknown'
self.__cb('recognizeBusinessCardReply', card_info)
def __rubblish_cb(self, dict) :
name = 'NA'
if dict != None:
ext = dict['ext']
extDict = json.loads(ext)
result = extDict['result']
if result == 'success':
i = 0
elements = extDict['elements']
while (i < len(elements)) :
gScore = elements[i]['categoryScore']
if gScore > 0.8:
name = elements[i]['category']
print('detect: ' + name)
break
i += 1
self.__cb('classifyingRubbishReply', name)
def __object_cb(self, dict) :
name = 'NA'
if dict != None:
ext = dict['ext']
extDict = json.loads(ext)
result = extDict['result']
if result == 'success':
i = 0
elements = extDict['elements']
while (i < len(elements)) :
gScore = elements[i]['score']
if gScore > 0.25:
name = elements[i]['type']
print('detect: ' + name)
break
i += 1
self.__cb('detectObjectReply', name)
def __vehicletype_cb(self, dict) :
name = 'NA'
detect = False
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
i = 0
item_list = ext_dict['items']
name = 'NA'
while (i < len(item_list)) :
g_score = item_list[i]['score']
name = item_list[i]['name']
# 这里可以修改识别的可信度,目前设置返回可信度大于85%才认为识别正确
if g_score > 0.85 and name != 'others':
print('detect: ' + name)
detect = True
self.__cb('recognizeVehicleReply', name)
break
i += 1
if detect == False:
self.__cb('recognizeVehicleReply', 'NA')
def __vehiclelogo_cb(self, dict) :
num = 0
if dict != None:
ext = dict['ext']
ext_dict = json.loads(ext)
result = ext_dict['result']
if result == 'success':
item_list = ext_dict['elements']
num = len(item_list)
if num > 0:
print('detect: ' + str(num) + ' vehicle')
detected = True
if detected == False:
print('do not detect!')
self.__cb('recognizeLogoReply', num)
def __imageurl_cb(self, dict) :
picurl = 'NA'
if dict != None:
ext = dict['ext']
extDict = json.loads(ext)
picurl = extDict['imageUrl']
print(picurl)
print(dict)
self.__cb('saveImageTestReply', picurl)
def __cb_lk_service(self, data):
self.g_lk_service = True
print('download <----' + str(data))
if data != None :
params = data['params']
params_dict = json.loads(params)
command = params_dict['commandName']
#print(' command <----' + str(params_dict['commandName']) + str(params_dict))
if command == 'handGestureReply' :
self.__gesture_cb(params_dict)
elif command == 'recognizeActionReply' :
self.__action_cb(params_dict)
elif command == 'ocrCarNoReply' :
self.__license_plate_cb(params_dict)
elif command == 'DetectPedestrianReply' :
self.__pedestrian_cb(params_dict)
elif command == 'detectFruitsReply' :
self.__fruits_cb(params_dict)
elif command == 'recognizeBusinessCardReply' :
self.__businesscard_cb(params_dict)
elif command == 'classifyingRubbishReply' :
self.__rubblish_cb(params_dict)
elif command == 'detectObjectReply' :
self.__object_cb(params_dict)
elif command == 'recognizeVehicleReply' :
self.__vehicletype_cb(params_dict)
elif command == 'recognizeLogoReply' :
self.__vehiclelogo_cb(params_dict)
elif command == 'saveImageTestReply' :
self.__imageurl_cb(params_dict)
else :
print('unknown command reply')
def __cb_lk_connect(self, data):
print('link platform connected')
self.g_lk_connect = True
# 接收云端下发的属性设置
# request格式: {"code": 0, "params_len": 17, "msg_id": 1828542828, "params": "{\"door_status\":0}"}
def __on_props(self, request):
try:
props = eval(request['params'])
magnetic_switch = props['magnetic_switch']
print("magnetic_switch set value : " + str(magnetic_switch))
self.__cb('magnetic_switch', magnetic_switch)
except Exception as e:
print(e)
def __connect_iot(self) :
self.cloud_device = Device()
self.cloud_device.on(Device.ON_CONNECT, self.__cb_lk_connect)
self.cloud_device.on(Device.ON_SERVICE, self.__cb_lk_service)
self.cloud_device.on(Device.ON_PROPS, self.__on_props)
self.cloud_device.connect(self.__dev_info)
while True:
if self.g_lk_connect:
break
def __init__(self, dev_info, callback) :
self.__dev_info = dev_info
self.__cb = callback
self.g_lk_connect = False
self.g_lk_service = False
self.__connect_iot()
def getDevice(self) :
return self.cloud_device
def __upload_request(self, command, frame, enlarge = 1) :
# 上传图片到LP
fileName = 'test.jpg'
start = utime.ticks_ms()
fileid = self.cloud_device.uploadContent(fileName, frame, None)
if fileid != None:
ext = { 'filePosition':'lp', 'fileName': fileName, 'fileId': fileid, 'enlarge':enlarge}
ext_str = json.dumps(ext)
all_params = {'id': 1, 'version': '1.0', 'params': { 'eventType': 'haas.faas', 'eventName': command, 'argInt': 1, 'ext': ext_str }}
all_params_str = json.dumps(all_params)
#print(all_params_str)
upload_file = {
'topic': '/sys/' + self.__dev_info['productKey'] + '/' + self.__dev_info['deviceName'] + '/thing/event/hli_event/post',
'qos': 1,
'payload': all_params_str
}
# 上传完成通知HaaS聚合平台
print('upload--->' + str(upload_file))
self.g_lk_service = False
self.cloud_device.publish(upload_file)
i = 0
while (self.g_lk_service == False and i < 200) :
utime.sleep_ms(10)
i = i + 1
continue
else:
print('filedid is none, upload content fail')
time_diff = utime.ticks_diff(utime.ticks_ms(), start)
print('recognize time : %d' % time_diff)
def recognizeGesture(self, frame) :
self.__upload_request('handGesture', frame)
def recognizeAction(self, frame) :
self.__upload_request('recognizeAction', frame, 2)
def recognizeLicensePlate(self, frame) :
self.__upload_request('ocrCarNo', frame)
def detectPedestrian(self, frame) :
self.__upload_request('detectPedestrian', frame)
def detectFruits(self, frame) :
self.__upload_request('detectFruits', frame)
def recognizeBussinessCard(self, frame) :
self.__upload_request('recognizeBusinessCard', frame)
def recognizeVehicleType(self, frame) :
self.__upload_request('recognizeVehicle', frame)
def detectVehicleCongestion(self, frame) :
self.__upload_request('vehicleCongestionDetect', frame)
def classifyRubbish(self, frame) :
self.__upload_request('classifyingRubbish', frame)
def detectObject(self, frame) :
self.__upload_request('detectObject', frame)
def saveImageTest(self, frame) :
self.__upload_request('saveImageTest', frame)
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
'''
@File : irdistance.py
@Description: 红外传感器驱动
@Author : 风裁
@version : 1.0
'''
from driver import GPIO
class IRDISTANCE(object):
def __init__(self, gpioObj):
self.gpioObj = None
if not isinstance(gpioObj, GPIO):
raise ValueError("parameter is not a GPIO object")
self.gpioObj = gpioObj
def objectDetection(self):
if self.gpioObj is None:
raise ValueError("invalid GPIO object")
value = self.gpioObj.read()
return value
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
'''
@File : main.py
@Author : aoyi
@version : 1.0
@Description: security_monitoring_system案例 - 防盗报警系统
board.json - 硬件资源配置文件
'''
from buzzer import BUZZER
import irdistance
from driver import PWM,GPIO
import time
from aliyunIoT import Device # iot组件是连接阿里云物联网平台的组件
import json
import display # 显示库
import network # 网络库
import ucamera # 摄像头库
import _thread # 线程库
from cloudAI import *
# 物联网平台连接标志位
iot_connected = False
wlan = None
# 三元组信息
productKey = "Your-ProductKey"
deviceName = "Your-devicename"
deviceSecret = "Your-deviceSecret"
# 物联网设备实例
device = None
# Wi-Fi SSID和Password设置
wifiSsid = "Your-AP-SSID"
wifiPassword = "Your-AP-Password"
# 警报开关以及时间段控制(大于等于alarm_start 或者小于等于alarm_end )
alarm_switch = 1
alarm_status = 0
alarm_count = 0
alarm_pic_url = ''
alarming = False
FLAG_ALARM_STATUS = "alarm_status"
FLAG_ALARM_COUNT = "alarm_count"
FLAG_ALARM_SWITCH = "magnetic_switch"
UP_PICTURE_URL = "PictureURL"
cloud_key_info = {
'region' : 'cn-shanghai' ,
'productKey': productKey ,
'deviceName': deviceName,
'deviceSecret': deviceSecret ,
'keepaliveSec': 60
}
# 显示线程函数
def displayThread():
# 引用全局变量
global disp, gFrame, alarming
while True:
# 采集摄像头画面
gFrame = ucamera.capture()
if gFrame != None:
# 显示图像
disp.image(0, 20, gFrame, 0)
time.sleep(0.1)
if alarming:
# 设置显示字体
disp.font(disp.FONT_DejaVu40)
# 显示文字
disp.text(10, 50, 'WARNING...', disp.RED)
# 等待Wi-Fi成功连接到路由器
def get_wifi_status():
global wlan
wifi_connected = False
wlan.active(True) #激活界面
wlan.scan() #扫描接入点
# 连接到指定的路由器(路由器名称为wifiSsid, 密码为:wifiPassword)
wlan.connect(wifiSsid, wifiPassword)
while True:
wifi_connected = wlan.isconnected() # 获取Wi-Fi连接路由器的状态信息
if wifi_connected: # Wi-Fi连接成功则退出while循环
break
else:
time.sleep(0.5)
print("wifi_connected:", wifi_connected)
ifconfig = wlan.ifconfig() #获取接口的IP/netmask/gw/DNS地址
print(ifconfig)
time.sleep(0.5)
# 物联网平台连接成功的回调函数
def on_connect(data):
global iot_connected
iot_connected = True
def recognize_cb(commandReply, result) :
global alarm_pic_url, gStartRecognize, alarm_switch
if commandReply == 'saveImageTestReply' and gStartRecognize == True:
if result != 'NA' :
alarm_pic_url = result
print('imageurl: ' + result)
utime.sleep(2)
elif commandReply == 'magnetic_switch':
alarm_switch = result
else :
print('main: unknown command reply')
post_default_value()
gStartRecognize = False
print('识别结束')
def post_props(data):
global engine, device
if isinstance(data,dict):
data = {'params': json.dumps(data)}
print('upload props-->' + str(data))
ret = engine.getDevice().postProps(data)
#ret = device.postProps(data)
return ret
def post_default_value():
global alarm_switch,alarm_status,alarm_count,alarm_pic_url,gFrame,FLAG_ALARM_SWITCH,FLAG_ALARM_STATUS,UP_PICTURE_URL
value = {FLAG_ALARM_SWITCH : alarm_switch,
FLAG_ALARM_STATUS : alarm_status,
FLAG_ALARM_COUNT : alarm_count,
UP_PICTURE_URL : alarm_pic_url}
post_props(value)
if __name__ == '__main__':
global disp, gFrame, engine, gStartRecognize
# 创建lcd display对象
disp = display.TFT()
gFrame = None
gStartRecognize = False
wlan = network.WLAN(network.STA_IF) #创建WLAN对象
get_wifi_status()
#初始化cloudai
engine = CloudAI(cloud_key_info, recognize_cb)
# 初始化摄像头
ucamera.init('uart', 33, 32)
ucamera.setProp(ucamera.SET_FRAME_SIZE, ucamera.SIZE_320X240)
# 初始化蜂鸣器
pwmObj = PWM()
pwmObj.open("buzzer")
pwm_init_data = {'freq':2000, 'duty': 0}
buzzer = BUZZER(pwmObj,data=pwm_init_data)
# 初始化干簧管传感器
gpioirDev = GPIO()
gpioirDev.open("ir")
gIrDev = irdistance.IRDISTANCE(gpioirDev)
print("megnetic detector ...")
try:
# 启动显示线程
_thread.start_new_thread(displayThread, ())
except:
print("Error: unable to start thread")
while True:
if alarm_switch == 1 and gIrDev.objectDetection() == 1:
wait_cnt = 0
if alarming == False:
print('human detected, start buzzer')
pwm_start_data = {'freq':2000, 'duty': 5}
buzzer.start(pwm_start_data)
alarm_count = alarm_count + 1
alarming = True
alarm_status = 1
post_default_value()
if gFrame != None:
gStartRecognize = True
engine.saveImageTest(gFrame)
while gStartRecognize == True and wait_cnt < 50:
time.sleep(0.1)
wait_cnt = wait_cnt + 1
post_default_value()
if alarm_switch == 0 or gIrDev.objectDetection() == 0:
if alarming:
print('close buzzer')
pwm_start_data = {'freq':2000, 'duty': 0}
buzzer.start(pwm_start_data)
alarming = False
alarm_status = 0
post_default_value()
time.sleep(0.1)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册