未验证 提交 1ee80501 编写于 作者: L LokeZhou 提交者: GitHub

Pipeline vehicle (#7432)

* * add vehicle_pressing.py to judge vehicle press lane
* add lane_seg_infer.py to lane seg
* add pipeline vehicle retrograde:
* add vehicle_retrograde.py to judge vehicle retrograde
* add infer_cfg_vehicle_violation.yml to config vehicle press vehicle retrograde
* add lane_seg_config.yml to config lane seg

* *add ppvehicle_press readme
*add ppvehicle_retrograde.md
*fix lane_seg preditor init

* ppvehicle_press.md ppvehicle_retrograde.ma add gif image

* *fix  ppvehicle press retrograde readme, test=document_fix
*add ppvehicle press retrograde en readme test=document_fix
*fix ppvehicle retrograde keep left bug

* fix ppvehicle readme,test=document_fix

* fix ppvehicle retrograde en readme,test=document_fix

* ppvehicle_press ppvehicle_retrograde readme add sulution test=document_fix

* fix ppvehicle press retrograde readme format test=document_fix
上级 a600f087
crop_thresh: 0.5
visual: True
warmup_frame: 50
DET:
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_ppvehicle.zip
batch_size: 1
MOT:
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_ppvehicle.zip
tracker_config: deploy/pipeline/config/tracker_config.yml
batch_size: 1
skip_frame_num: -1 # preferably no more than 3
enable: True
LANE_SEG:
lane_seg_config: deploy/pipeline/config/lane_seg_config.yml
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip
VEHICLE_PRESSING:
enable: True
VEHICLE_RETROGRADE:
frame_len: 8
sample_freq: 7
enable: True
filter_horizontal_flag: False
deviation: 23
move_scale: 0.01
fence_line: [] #[x1,y1,x2,y2] y2>y1.
...@@ -29,3 +29,20 @@ VEHICLE_ATTR: ...@@ -29,3 +29,20 @@ VEHICLE_ATTR:
color_threshold: 0.5 color_threshold: 0.5
type_threshold: 0.5 type_threshold: 0.5
enable: False enable: False
LANE_SEG:
lane_seg_config: deploy/pipeline/config/lane_seg_config.yml
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip
VEHICLE_PRESSING:
enable: False
VEHICLE_RETROGRADE:
frame_len: 8
sample_freq: 7
enable: False
filter_horizontal_flag: True
keep_right_flag: True
deviation: 23
move_scale: 0.01
fence_line: [] #[x1,y1,x2,y2] y2>y1.
type: PLSLaneseg
PLSLaneseg:
run_mode: 'paddle'
batch_size: 1
device: gpu
min_subgraph_size: 3
use_dynamic_shape: False
trt_min_shape: [100,100]
trt_max_shape: [2000,3000]
trt_opt_shape: [512,1024]
trt_calib_mode: False
cpu_threads: 10
enable_mkldnn: False #Enable to use mkldnn to speed up when using cpu.
filter_horizontal_flag: True #Whether to filter horizontal roads
horizontal_filtration_degree: 23
horizontal_filtering_threshold: 0.25
...@@ -30,7 +30,10 @@ class Result(object): ...@@ -30,7 +30,10 @@ class Result(object):
'det_action': dict(), 'det_action': dict(),
'cls_action': dict(), 'cls_action': dict(),
'vehicleplate': dict(), 'vehicleplate': dict(),
'vehicle_attr': dict() 'vehicle_attr': dict(),
'lanes': dict(),
'vehicle_press': dict(),
'vehicle_retrograde': dict()
} }
def update(self, res, name): def update(self, res, name):
...@@ -128,4 +131,4 @@ class DataCollector(object): ...@@ -128,4 +131,4 @@ class DataCollector(object):
if len(carlp) > 0: if len(carlp) > 0:
return carlp[0][0] return carlp[0][0]
else: else:
return None return None
\ No newline at end of file
[English](ppvehicle_press_en.md) | 简体中文
# PP-Vehicle压实线识别模块
车辆压实线识别在智慧城市,智慧交通等方向具有广泛应用。在PP-Vehicle中,集成了车辆压实线识别模块,可识别车辆是否违章压实线。
| 任务 | 算法 | 精度 | 预测速度 | 下载链接|
|-----------|------|-----------|----------|---------------|
| 车辆检测/跟踪 | PP-YOLOE | mAP 63.9 | 38.67ms | [预测部署模型](https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_ppvehicle.zip) |
| 车道线识别 | PP-liteseg | mIou 32.69 | 47 ms | [预测部署模型](https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip) |
注意:
1. 车辆检测/跟踪模型预测速度是基于NVIDIA T4, 开启TensorRT FP16得到。模型预测速度包含数据预处理、模型预测、后处理部分。
2. 车辆检测/跟踪模型的训练和精度测试均基于[VeRi数据集](https://www.v7labs.com/open-datasets/veri-dataset)
3. 车道线模型预测速度基于Tesla P40,python端预测,模型预测速度包含数据预处理、模型预测、后处理部分。
4. 车道线模型训练和精度测试均基于[BDD100K-LaneSeg](https://bdd-data.berkeley.edu/portal.html#download)[Apollo Scape](http://apolloscape.auto/lane_segmentation.html#to_dataset_href),两个数据集车道线分割[标签](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)
## 使用方法
### 配置项说明
[配置文件](../../config/infer_cfg_ppvehicle.yml)中与车辆压线相关的参数如下:
```
VEHICLE_PRESSING:
enable: True #是否开启功能
LANE_SEG:
lane_seg_config: deploy/pipeline/config/lane_seg_config.yml #车道线提取配置文件
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip #模型文件路径
```
[车道线配置文件](../../config/lane_seg.yml)中与车道线提取相关的参数如下:
```
type: PLSLaneseg #选择分割模型
PLSLaneseg:
batch_size: 1 #图片batch_size
device: gpu #选择gpu还是cpu
filter_flag: True #是否过滤水平方向道路线
horizontal_filtration_degree: 23 #过滤水平方向车道线阈值,当分割出来的车道线最大倾斜角与
#最小倾斜角差值小于阈值时,不进行过滤
horizontal_filtering_threshold: 0.25 #确定竖直方向与水平方向分开阈值
#thr = (min_degree+max_degree)*0.25
#根据车道线倾斜角与thr的大小比较,将车道线分为垂直方向与水平方向
```
### 使用命令
1. 从模型库下载`车辆检测/跟踪`, `车道线识别`两个预测部署模型并解压到`./output_inference`路径下;默认会自动下载模型,如果手动下载,需要修改模型文件夹为模型存放路径。
2. 修改配置文件中`VEHICLE_PRESSING`项的`enable: True`,以启用该功能。
3. 图片输入时,启动命令如下(更多命令参数说明,请参考[快速开始-参数说明](./PPVehicle_QUICK_STARTED.md)):
```bash
# 预测单张图片文件
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_PRESSING.enable=true
--image_file=test_image.jpg \
--device=gpu
# 预测包含一张或多张图片的文件夹
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_PRESSING.enable=true
--image_dir=images/ \
--device=gpu
```
4. 视频输入时,启动命令如下:
```bash
#预测单个视频文件
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_PRESSING.enable=true
--video_file=test_video.mp4 \
--device=gpu
#预测包含一个或多个视频的文件夹
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
--video_dir=test_videos/ \
-o VEHICLE_PRESSING.enable=true
--device=gpu
```
5. 若修改模型路径,有以下两种方式:
- 方法一:`./deploy/pipeline/config/infer_cfg_ppvehicle.yml`下可以配置不同模型路径,车道线识别模型修改`LANE_SEG`字段下配置
- 方法二:直接在命令行中增加`-o`,以覆盖配置文件中的默认模型路径:
```bash
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
--video_file=test_video.mp4 \
--device=gpu \
-o VEHICLE_PRESSING.enable=true
LANE_SEG.model_dir=output_inference
```
测试效果如下:
<div width="1000" align="center">
<img src="https://raw.githubusercontent.com/LokeZhou/PaddleDetection/develop/deploy/pipeline/docs/images/vehicle_press.gif"/>
</div>
## 方案说明
1.车道线识别模型使用了[PaddleSeg](https://github.com/PaddlePaddle/PaddleSeg) 的超轻量分割方案。训练样本[标签](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)分为4类:
0 背景
1 双黄线
2 实线
3 虚线
车辆压线分析过滤虚线类;
2.车道线通过对分割结果聚类得到,且默认过滤水平方向车道线,若不过滤可在[车道线配置文件](../../config/lane_seg.yml)修改`filter_flag`参数;
3.车辆压线判断条件:车辆的检测框底边线与车道线是否有交点;
**性能优化措施**
1.因摄像头视角原因,可以根据实际情况决定是否过滤水平方向车道线;
English | [简体中文](ppvehicle_press.md)
# PP-Vehicle press line identification module
Vehicle compaction line recognition is widely used in smart cities, smart transportation and other directions.
In PP-Vehicle, a vehicle compaction line identification module is integrated to identify whether the vehicle is in violation of regulations.
| task | algorithm | precision | infer speed | download|
|-----------|------|-----------|----------|---------------|
| Vehicle detection/tracking | PP-YOLOE | mAP 63.9 | 38.67ms | [infer deploy model](https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_ppvehicle.zip) |
| Lane line segmentation | PP-liteseg | mIou 32.69 | 47 ms | [infer deploy model](https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip) |
Notes:
1. The prediction speed of vehicle detection/tracking model is based on NVIDIA T4 and TensorRT FP16. The model prediction speed includes data preprocessing, model prediction and post-processing.
2. The training and precision test of vehicle detection/tracking model are based on [VeRi](https://www.v7labs.com/open-datasets/veri-dataset).
3. The predicted speed of lane line segmentation model is based on Tesla P40 and python prediction. The predicted speed of the model includes data preprocessing, model prediction and post-processing.
4. Lane line model training and precision testing are based on [BDD100K-LaneSeg](https://bdd-data.berkeley.edu/portal.html#download)and [Apollo Scape](http://apolloscape.auto/lane_segmentation.html#to_dataset_href),The label data of the two data sets is in[Lane_dataset_label](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)
## Instructions
### Description of Configuration
The parameters related to vehicle line pressing in [config file](../../config/infer_cfg_ppvehicle.yml) is as follows:
```
VEHICLE_PRESSING:
enable: True #Whether to enable the funcion
LANE_SEG:
lane_seg_config: deploy/pipeline/config/lane_seg_config.yml #lane line seg config file
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip #model path
```
The parameters related to Lane line segmentation in [lane line seg config file](../../config/lane_seg.yml)is as follows:
```
type: PLSLaneseg #Select segmentation Model
PLSLaneseg:
batch_size: 1 #image batch_size
device: gpu #device is gpu or cpu
filter_flag: True #Whether to filter the horizontal direction road route
horizontal_filtration_degree: 23 #Filter the threshold value of the lane line in the horizontal direction. When the difference between the maximum inclination angle and the minimum inclination angle of the segmented lane line is less than the threshold value, no filtering is performed
horizontal_filtering_threshold: 0.25 #Determine the threshold value for separating the vertical direction from the horizontal direction thr=(min_degree+max_degree) * 0.25 Divide the lane line into vertical direction and horizontal direction according to the comparison between the gradient angle of the lane line and thr
```
### How to Use
1. Download 'vehicle detection/tracking' and 'lane line recognition' two prediction deployment models from the model base and unzip them to '/ output_ Invitation ` under the path; By default, the model will be downloaded automatically. If you download it manually, you need to modify the model folder as the model storage path.
2. Modify Profile ` VEHICLE_PRESSING ' -'enable: True' item to enable this function.
3. When inputting a picture, the startup command is as follows (for more command parameter descriptions,please refer to [QUICK_STARTED - Parameter_Description](./PPVehicle_QUICK_STARTED.md)
```bash
# For single image
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_PRESSING.enable=true
--image_file=test_image.jpg \
--device=gpu
# For folder contains one or multiple images
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_PRESSING.enable=true
--image_dir=images/ \
--device=gpu
```
4. For video input, please run these commands.
```bash
#For single video
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_PRESSING.enable=true
--video_file=test_video.mp4 \
--device=gpu
#For folder contains one or multiple videos
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
--video_dir=test_videos/ \
-o VEHICLE_PRESSING.enable=true
--device=gpu
```
5. There are two ways to modify the model path:
- 1.Set paths of each model in `./deploy/pipeline/config/infer_cfg_ppvehicle.yml`,For Lane line segmentation, the path should be modified under the `LANE_SEG`
- 2.Directly add `-o` in command line to override the default model path in the configuration file:
```bash
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
--video_file=test_video.mp4 \
--device=gpu \
-o VEHICLE_PRESSING.enable=true
LANE_SEG.model_dir=output_inference
```
The result is shown as follow:
<div width="1000" align="center">
<img src="https://raw.githubusercontent.com/LokeZhou/PaddleDetection/develop/deploy/pipeline/docs/images/vehicle_press.gif"/>
</div>
## Features to the Solution
1.Lane line recognition model uses [PaddleSeg]( https://github.com/PaddlePaddle/PaddleSeg )Super lightweight segmentation scheme.Train [lable](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)it is divided into four categories:
0 Background
1 Double yellow line
2 Solid line
3 Dashed line
Lane line recognition filtering Dashed lines;
2.Lane lines are obtained by clustering segmentation results, and the horizontal lane lines are filtered by default. If not, you can modify the `filter_flag` in [lane line seg config file](../../config/lane_seg.yml);
3.Judgment conditions for vehicle line pressing: whether there is intersection between the bottom edge line of vehicle detection frame and lane line;
**Performance optimization measures:**
1.Due to the camera angle, it can be decided whether to filter the lane line in the horizontal direction according to the actual situation;
[English](ppvehicle_retrograde_en.md) | 简体中文
# PP-Vehicle车辆逆行识别模块
车辆逆行识别在智慧城市,智慧交通等方向具有广泛应用。在PP-Vehicle中,集成了车辆逆行识别模块,可识别车辆是否逆行。
| 任务 | 算法 | 精度 | 预测速度 | 下载链接|
|-----------|------|-----------|----------|---------------|
| 车辆检测/跟踪 | PP-YOLOE | mAP 63.9 | 38.67ms | [预测部署模型](https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_ppvehicle.zip) |
| 车道线识别 | PP-liteseg | mIou 32.69 | 47 ms | [预测部署模型](https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip) |
注意:
1. 车辆检测/跟踪模型预测速度是基于NVIDIA T4, 开启TensorRT FP16得到。模型预测速度包含数据预处理、模型预测、后处理部分。
2. 车辆检测/跟踪模型的训练和精度测试均基于[VeRi数据集](https://www.v7labs.com/open-datasets/veri-dataset)
3. 车道线模型预测速度基于Tesla P40,python端预测,模型预测速度包含数据预处理、模型预测、后处理部分。
4. 车道线模型训练和精度测试均基于[BDD100K-LaneSeg](https://bdd-data.berkeley.edu/portal.html#download.)[Apollo Scape](http://apolloscape.auto/lane_segmentation.html#to_dataset_href)。两个数据集的标签文件[Lane_dataset_label](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)
## 使用方法
### 配置项说明
[配置文件](../../config/infer_cfg_ppvehicle.yml)中与车辆逆行识别模块相关的参数如下:
```
LANE_SEG:
lane_seg_config: deploy/pipeline/config/lane_seg_config.yml #车道线提取配置文件
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip #模型文件路径
VEHICLE_RETROGRADE:
frame_len: 8 #采样帧数
sample_freq: 7 #采样频率
enable: True #开启车辆逆行判断功能
filter_horizontal_flag: False #是否过滤水平方向车辆
keep_right_flag: True #按车辆靠右行驶规则,若车辆靠左行驶,则设为False
deviation: 23 #过滤水平方向车辆的角度阈值,如果大于该角度则过滤
move_scale: 0.01 #过滤静止车辆阈值,若车辆移动像素大于图片对角线*move_scale,则认为车辆移动,反之
#车辆静止
fence_line: [] #车道中间线坐标,格式[x1,y1,x2,y2] 且y2>y1。若为空,由程序根据车流方向自动判断
```
[车道线配置文件](../../config/lane_seg.yml)中与车道线提取相关的参数如下:
```
type: PLSLaneseg #选择分割模型
PLSLaneseg:
batch_size: 1 #图片batch_size
device: gpu #选择gpu还是cpu
filter_flag: True #是否过滤水平方向道路线
horizontal_filtration_degree: 23 #过滤水平方向车道线阈值,当分割出来的车道线最大倾斜角与
#最小倾斜角差值小于阈值时,不进行过滤
horizontal_filtering_threshold: 0.25 #确定竖直方向与水平方向分开阈值
#thr = (min_degree+max_degree)*0.25
#根据车道线倾斜角与thr的大小比较,将车道线分为垂直方向与水平方向
```
### 使用命令
1. 从模型库下载`车辆检测/跟踪`, `车道线识别`两个预测部署模型并解压到`./output_inference`路径下;默认会自动下载模型,如果手动下载,需要修改模型文件夹为模型存放路径。
2. 修改配置文件中`VEHICLE_RETROGRADE`项的`enable: True`,以启用该功能。
3. 车辆逆行识别功能需要视频输入时,启动命令如下:
```bash
#预测单个视频文件
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_RETROGRADE.enable=true \
--video_file=test_video.mp4 \
--device=gpu
#预测包含一个或多个视频的文件夹
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_RETROGRADE.enable=true \
--video_dir=test_video.mp4\
--device=gpu
```
5. 若修改模型路径,有以下两种方式:
- 方法一:`./deploy/pipeline/config/infer_cfg_ppvehicle.yml`下可以配置不同模型路径,车道线识别模型修改`LANE_SEG`字段下配置
- 方法二:直接在命令行中增加`-o`,以覆盖配置文件中的默认模型路径:
```bash
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
--video_file=test_video.mp4 \
--device=gpu \
-o LANE_SEG.model_dir=output_inference/
VEHICLE_RETROGRADE.enable=true
```
测试效果如下:
<div width="1000" align="center">
<img src="https://raw.githubusercontent.com/LokeZhou/PaddleDetection/develop/deploy/pipeline/docs/images/vehicle_retrograde.gif"/>
</div>
**注意:**
- 车道线中间线自动判断条件:在采样的视频段内同时有两个相反方向的车辆,且判断一次后固定,不再更新;
- 因摄像头角度以及2d视角问题,车道线中间线判断存在不准确情况,可在配置文件手动输入中间线坐标
## 方案说明
1.车辆在采样视频段内,根据车道中间线的位置与车辆轨迹,判断车辆是否逆行,判断流程图:
<div width="1000" align="center">
<img src="https://raw.githubusercontent.com/LokeZhou/PaddleDetection/develop/deploy/pipeline/docs/images/vehicle_retrograde.png"/>
</div>
2.车道线识别模型使用了[PaddleSeg](https://github.com/PaddlePaddle/PaddleSeg) 的超轻量分割方案。训练样本[标签](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)分为4类:
0 背景
1 双黄线
2 实线
3 虚线
车辆逆行分析过滤虚线类;
3.车道线通过对分割结果聚类得到,且默认过滤水平方向车道线,若不过滤可在[车道线配置文件](../../config/lane_seg.yml)修改`filter_flag`参数;
4.车辆逆行判断默认过滤水平方向车辆,若不过滤可在[配置文件](../../config/infer_cfg_ppvehicle.yml)修改`filter_horizontal_flag`参数;
5.车辆逆行默认按靠右行驶规则判断,若修改,可在[配置文件](../../config/infer_cfg_ppvehicle.yml)修改`keep_right_flag`参数;
**性能优化措施**
1.因摄像头视角原因,可以根据实际情况决定是否过滤水平方向车道线与水平方向车辆;
2.车道中间线可手动输入;
English | [简体中文](ppvehicle_retrograde.md)
# PP-Vehicle vehicle retrograde identification module
Vehicle reverse identification is widely used in smart cities, smart transportation and other directions. In PP-Vehicle, a vehicle retrograde identification module is integrated to identify whether the vehicle is retrograde.
| task | algorithm | precision | infer speed | download|
|-----------|------|-----------|----------|---------------|
| Vehicle detection/tracking | PP-YOLOE | mAP 63.9 | 38.67ms | [infer deploy model](https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_ppvehicle.zip) |
| Lane line segmentation | PP-liteseg | mIou 32.69 | 47 ms | [infer deploy model](https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip) |
Notes:
1. The prediction speed of vehicle detection/tracking model is based on NVIDIA T4 and TensorRT FP16. The model prediction speed includes data preprocessing, model prediction and post-processing.
2. The training and precision test of vehicle detection/tracking model are based on [VeRi](https://www.v7labs.com/open-datasets/veri-dataset).
3. The predicted speed of lane line segmentation model is based on Tesla P40 and python prediction. The predicted speed of the model includes data preprocessing, model prediction and post-processing.
4. Lane line model training and precision testing are based on [BDD100K-LaneSeg](https://bdd-data.berkeley.edu/portal.html#download) and [Apollo Scape](http://apolloscape.auto/lane_segmentation.html#to_dataset_href),The label data of the two data sets is in [Lane_dataset_label](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)
## Instructions
### Description of Configuration
[The parameters related to vehicle retrograde in [config file](../../config/infer_cfg_ppvehicle.yml) is as follows:
```
LANE_SEG:
lane_seg_config: deploy/pipeline/config/lane_seg_config.yml #lane line seg config file
model_dir: https://bj.bcebos.com/v1/paddledet/models/pipeline/pp_lite_stdc2_bdd100k.zip #model path
VEHICLE_RETROGRADE:
frame_len: 8 #Number of sampling frames
sample_freq: 7 #sampling frequency
enable: True #Whether to enable the funcion
filter_horizontal_flag: False #Whether to filter vehicles in horizontal direction
keep_right_flag: True #According to the right driving rule, if the vehicle keeps left driving, it is set as False
deviation: 23 #Filter the horizontal angle vehicles threshold. If it is greater than this angle, filter
move_scale: 0.01 #Filter the threshold value of stationary vehicles. If the vehicle moving pixel is greater than the image diagonal * move_scale, the vehicle is considered moving, otherwise, the vehicle is stationary
fence_line: [] #Lane centerline coordinates, format[x1,y1,x2,y2] and y2>y1. If it is empty, the program will automatically judge according to the direction of traffic flow
```
The parameters related to Lane line segmentation in [lane line seg config file](../../config/lane_seg.yml)is as follows:
```
type: PLSLaneseg #Select segmentation Model
PLSLaneseg:
batch_size: 1 #image batch_size
device: gpu #device is gpu or cpu
filter_flag: True #Whether to filter the horizontal direction road route
horizontal_filtration_degree: 23 #Filter the threshold value of the lane line in the horizontal direction. When the difference between the maximum inclination angle and the minimum inclination angle of the segmented lane line is less than the threshold value, no filtering is performed
horizontal_filtering_threshold: 0.25 #Determine the threshold value for separating the vertical direction from the horizontal direction thr=(min_degree+max_degree) * 0.25 Divide the lane line into vertical direction and horizontal direction according to the comparison between the gradient angle of the lane line and thr
```
### How to Use
1. Download 'vehicle detection/tracking' and 'lane line recognition' two prediction deployment models from the model base and unzip them to '/ output_ Invitation ` under the path; By default, the model will be downloaded automatically. If you download it manually, you need to modify the model folder as the model storage path.
2. Modify Profile`VEHICLE_RETROGRADE`-`enable: True`, item to enable this function.
3. When video input is required for vehicle retrograde recognition function, the starting command is as follows:
```bash
#For single video
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_RETROGRADE.enable=true \
--video_file=test_video.mp4 \
--device=gpu
#For folder contains one or multiple videos
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
-o VEHICLE_RETROGRADE.enable=true \
--video_dir=test_video.mp4\
--device=gpu
```
5. There are two ways to modify the model path:
- 1.Set paths of each model in `./deploy/pipeline/config/infer_cfg_ppvehicle.yml`,For Lane line segmentation, the path should be modified under the `LANE_SEG`
- 2.Directly add `-o` in command line to override the default model path in the configuration file:
```bash
python deploy/pipeline/pipeline.py --config deploy/pipeline/config/infer_cfg_ppvehicle.yml \
--video_file=test_video.mp4 \
--device=gpu \
-o LANE_SEG.model_dir=output_inference/
VEHICLE_RETROGRADE.enable=true
```
The result is shown as follow:
<div width="1000" align="center">
<img src="https://raw.githubusercontent.com/LokeZhou/PaddleDetection/develop/deploy/pipeline/docs/images/vehicle_retrograde.gif"/>
</div>
**Note:**
- Automatic judgment condition of lane line middle line: there are two vehicles in opposite directions in the sampled video segment, and the judgment is fixed after one time and will not be updated;
- Due to camera angle and 2d visual angle problems, the judgment of lane line middle line is inaccurate. You can manually enter the middle line coordinates in the configuration file
## Features to the Solution
1.In the sampling video segment, judge whether the vehicle is retrograde according to the location of the lane centerline and the vehicle track, and determine the flow chart:
<div width="1000" align="center">
<img src="https://raw.githubusercontent.com/LokeZhou/PaddleDetection/develop/deploy/pipeline/docs/images/vehicle_retrograde_en.png"/>
</div>
2.Lane line recognition model uses [PaddleSeg]( https://github.com/PaddlePaddle/PaddleSeg )Super lightweight segmentation scheme.Train [lable](https://bj.bcebos.com/v1/paddledet/data/mot/bdd100k/lane_dataset_label.zip)it is divided into four categories:
0 Background
1 Double yellow line
2 Solid line
3 Dashed line
Lane line recognition filtering Dashed lines;
3.Lane lines are obtained by clustering segmentation results, and the horizontal lane lines are filtered by default. If not, you can modify the `filter_flag` in [lane line seg config file](../../config/lane_seg.yml);
4.The vehicles in the horizontal direction are filtered by default when judging the vehicles in the reverse direction. If not, you can modify the `filter_horizontal_flag` in [config file](../../config/infer_cfg_ppvehicle.yml);
5.The vehicle will be judged according to the right driving rule by default.If not, you can modify the `keep_right_flag` in [config file](../../config/infer_cfg_ppvehicle.yml);
**Performance optimization measures:**
1.Due to the camera's viewing angle, it can be decided whether to filter the lane lines and vehicles in the horizontal direction according to the actual situation;
2.The lane middle line can be manually entered;
...@@ -66,7 +66,10 @@ class PipeTimer(Times): ...@@ -66,7 +66,10 @@ class PipeTimer(Times):
'det_action': Times(), 'det_action': Times(),
'cls_action': Times(), 'cls_action': Times(),
'vehicle_attr': Times(), 'vehicle_attr': Times(),
'vehicleplate': Times() 'vehicleplate': Times(),
'lanes': Times(),
'vehicle_press': Times(),
'vehicle_retrograde': Times()
} }
self.img_num = 0 self.img_num = 0
self.track_num = 0 self.track_num = 0
...@@ -123,24 +126,19 @@ class PipeTimer(Times): ...@@ -123,24 +126,19 @@ class PipeTimer(Times):
dic['img_num'] = self.img_num dic['img_num'] = self.img_num
return dic return dic
class PushStream(object): class PushStream(object):
def __init__(self, pushurl = "rtsp://127.0.0.1:8554/"): def __init__(self, pushurl="rtsp://127.0.0.1:8554/"):
self.command = "" self.command = ""
# 自行设置 # 自行设置
self.pushurl = pushurl self.pushurl = pushurl
def initcmd(self, fps, width, height): def initcmd(self, fps, width, height):
self.command = ['ffmpeg', self.command = [
'-y', 'ffmpeg', '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt',
'-f', 'rawvideo', 'bgr24', '-s', "{}x{}".format(width, height), '-r', str(fps), '-i',
'-vcodec','rawvideo', '-', '-pix_fmt', 'yuv420p', '-f', 'rtsp', self.pushurl
'-pix_fmt', 'bgr24', ]
'-s', "{}x{}".format(width, height),
'-r', str(fps),
'-i', '-',
'-pix_fmt', 'yuv420p',
'-f', 'rtsp',
self.pushurl]
self.pipe = sp.Popen(self.command, stdin=sp.PIPE) self.pipe = sp.Popen(self.command, stdin=sp.PIPE)
...@@ -264,4 +262,4 @@ def parse_mot_keypoint(input, coord_size): ...@@ -264,4 +262,4 @@ def parse_mot_keypoint(input, coord_size):
skeleton.append(refine_keypoint_coordinary(kpts, bbox, coord_size)) skeleton.append(refine_keypoint_coordinary(kpts, bbox, coord_size))
parsed_skeleton_with_mot["mot_id"] = ids parsed_skeleton_with_mot["mot_id"] = ids
parsed_skeleton_with_mot["skeleton"] = skeleton parsed_skeleton_with_mot["skeleton"] = skeleton
return parsed_skeleton_with_mot return parsed_skeleton_with_mot
\ No newline at end of file
...@@ -40,7 +40,7 @@ from python.infer import Detector, DetectorPicoDet ...@@ -40,7 +40,7 @@ from python.infer import Detector, DetectorPicoDet
from python.keypoint_infer import KeyPointDetector from python.keypoint_infer import KeyPointDetector
from python.keypoint_postprocess import translate_to_ori_images from python.keypoint_postprocess import translate_to_ori_images
from python.preprocess import decode_image, ShortSizeScale from python.preprocess import decode_image, ShortSizeScale
from python.visualize import visualize_box_mask, visualize_attr, visualize_pose, visualize_action, visualize_vehicleplate from python.visualize import visualize_box_mask, visualize_attr, visualize_pose, visualize_action, visualize_vehicleplate, visualize_vehiclepress, visualize_lane, visualize_vehicle_retrograde
from pptracking.python.mot_sde_infer import SDE_Detector from pptracking.python.mot_sde_infer import SDE_Detector
from pptracking.python.mot.visualize import plot_tracking_dict from pptracking.python.mot.visualize import plot_tracking_dict
...@@ -55,6 +55,9 @@ from pphuman.mtmct import mtmct_process ...@@ -55,6 +55,9 @@ from pphuman.mtmct import mtmct_process
from ppvehicle.vehicle_plate import PlateRecognizer from ppvehicle.vehicle_plate import PlateRecognizer
from ppvehicle.vehicle_attr import VehicleAttr from ppvehicle.vehicle_attr import VehicleAttr
from ppvehicle.vehicle_pressing import VehiclePressingRecognizer
from ppvehicle.vehicle_retrograde import VehicleRetrogradeRecognizer
from ppvehicle.lane_seg_infer import LaneSegPredictor
from download import auto_download_model from download import auto_download_model
...@@ -305,6 +308,18 @@ class PipePredictor(object): ...@@ -305,6 +308,18 @@ class PipePredictor(object):
if self.with_vehicle_attr: if self.with_vehicle_attr:
print('Vehicle Attribute Recognition enabled') print('Vehicle Attribute Recognition enabled')
self.with_vehicle_press = cfg.get(
'VEHICLE_PRESSING', False)['enable'] if cfg.get('VEHICLE_PRESSING',
False) else False
if self.with_vehicle_press:
print('Vehicle Pressing Recognition enabled')
self.with_vehicle_retrograde = cfg.get(
'VEHICLE_RETROGRADE', False)['enable'] if cfg.get(
'VEHICLE_RETROGRADE', False) else False
if self.with_vehicle_retrograde:
print('Vehicle Retrograde Recognition enabled')
self.modebase = { self.modebase = {
"framebased": False, "framebased": False,
"videobased": False, "videobased": False,
...@@ -322,6 +337,8 @@ class PipePredictor(object): ...@@ -322,6 +337,8 @@ class PipePredictor(object):
"REID": "idbased", "REID": "idbased",
"VEHICLE_PLATE": "idbased", "VEHICLE_PLATE": "idbased",
"VEHICLE_ATTR": "idbased", "VEHICLE_ATTR": "idbased",
"VEHICLE_PRESSING": "idbased",
"VEHICLE_RETROGRADE": "idbased",
} }
self.is_video = is_video self.is_video = is_video
...@@ -367,7 +384,20 @@ class PipePredictor(object): ...@@ -367,7 +384,20 @@ class PipePredictor(object):
self.vehicle_attr_predictor = VehicleAttr.init_with_cfg( self.vehicle_attr_predictor = VehicleAttr.init_with_cfg(
args, vehicleattr_cfg) args, vehicleattr_cfg)
if self.with_vehicle_press:
vehiclepress_cfg = self.cfg['VEHICLE_PRESSING']
basemode = self.basemode['VEHICLE_PRESSING']
self.modebase[basemode] = True
self.vehicle_press_predictor = VehiclePressingRecognizer(
vehiclepress_cfg)
if self.with_vehicle_press or self.with_vehicle_retrograde:
laneseg_cfg = self.cfg['LANE_SEG']
self.laneseg_predictor = LaneSegPredictor(
laneseg_cfg['lane_seg_config'], laneseg_cfg['model_dir'])
if not is_video: if not is_video:
det_cfg = self.cfg['DET'] det_cfg = self.cfg['DET']
model_dir = det_cfg['model_dir'] model_dir = det_cfg['model_dir']
batch_size = det_cfg['batch_size'] batch_size = det_cfg['batch_size']
...@@ -437,6 +467,13 @@ class PipePredictor(object): ...@@ -437,6 +467,13 @@ class PipePredictor(object):
self.modebase[basemode] = True self.modebase[basemode] = True
self.reid_predictor = ReID.init_with_cfg(args, reid_cfg) self.reid_predictor = ReID.init_with_cfg(args, reid_cfg)
if self.with_vehicle_retrograde:
vehicleretrograde_cfg = self.cfg['VEHICLE_RETROGRADE']
basemode = self.basemode['VEHICLE_RETROGRADE']
self.modebase[basemode] = True
self.vehicle_retrograde_predictor = VehicleRetrogradeRecognizer(
vehicleretrograde_cfg)
if self.with_mot or self.modebase["idbased"] or self.modebase[ if self.with_mot or self.modebase["idbased"] or self.modebase[
"skeletonbased"]: "skeletonbased"]:
mot_cfg = self.cfg['MOT'] mot_cfg = self.cfg['MOT']
...@@ -474,7 +511,7 @@ class PipePredictor(object): ...@@ -474,7 +511,7 @@ class PipePredictor(object):
args, video_action_cfg) args, video_action_cfg)
def set_file_name(self, path): def set_file_name(self, path):
if type(path)==int: if type(path) == int:
self.file_name = path self.file_name = path
elif path is not None: elif path is not None:
self.file_name = os.path.split(path)[-1] self.file_name = os.path.split(path)[-1]
...@@ -499,7 +536,7 @@ class PipePredictor(object): ...@@ -499,7 +536,7 @@ class PipePredictor(object):
# det -> attr # det -> attr
batch_loop_cnt = math.ceil( batch_loop_cnt = math.ceil(
float(len(input)) / self.det_predictor.batch_size) float(len(input)) / self.det_predictor.batch_size)
self.warmup_frame = min(10, len(input)//2) - 1 self.warmup_frame = min(10, len(input) // 2) - 1
for i in range(batch_loop_cnt): for i in range(batch_loop_cnt):
start_index = i * self.det_predictor.batch_size start_index = i * self.det_predictor.batch_size
end_index = min((i + 1) * self.det_predictor.batch_size, len(input)) end_index = min((i + 1) * self.det_predictor.batch_size, len(input))
...@@ -569,6 +606,24 @@ class PipePredictor(object): ...@@ -569,6 +606,24 @@ class PipePredictor(object):
vehicleplate_res = {'vehicleplate': platelicenses} vehicleplate_res = {'vehicleplate': platelicenses}
self.pipeline_res.update(vehicleplate_res, 'vehicleplate') self.pipeline_res.update(vehicleplate_res, 'vehicleplate')
if self.with_vehicle_press:
vehicle_press_res_list = []
if i > self.warmup_frame:
self.pipe_timer.module_time['vehicle_press'].start()
lanes, direction = self.laneseg_predictor.run(batch_input)
if len(lanes) == 0:
print(" no lanes!")
continue
lanes_res = {'output': lanes, 'direction': direction}
self.pipeline_res.update(lanes_res, 'lanes')
vehicle_press_res_list = self.vehicle_press_predictor.run(
lanes, det_res)
vehiclepress_res = {'output': vehicle_press_res_list}
self.pipeline_res.update(vehiclepress_res, 'vehicle_press')
self.pipe_timer.img_num += len(batch_input) self.pipe_timer.img_num += len(batch_input)
if i > self.warmup_frame: if i > self.warmup_frame:
self.pipe_timer.total_time.end() self.pipe_timer.total_time.end()
...@@ -578,7 +633,7 @@ class PipePredictor(object): ...@@ -578,7 +633,7 @@ class PipePredictor(object):
def capturevideo(self, capture, queue): def capturevideo(self, capture, queue):
frame_id = 0 frame_id = 0
while(1): while (1):
if queue.full(): if queue.full():
time.sleep(0.1) time.sleep(0.1)
else: else:
...@@ -608,13 +663,15 @@ class PipePredictor(object): ...@@ -608,13 +663,15 @@ class PipePredictor(object):
pushstream = PushStream(pushurl) pushstream = PushStream(pushurl)
pushstream.initcmd(fps, width, height) pushstream.initcmd(fps, width, height)
elif self.cfg['visual']: elif self.cfg['visual']:
video_out_name = 'output' if (self.file_name is None or type(self.file_name)==int) else self.file_name video_out_name = 'output' if (
if type(video_file)==str and "rtsp" in video_file: self.file_name is None or
type(self.file_name) == int) else self.file_name
if type(video_file) == str and "rtsp" in video_file:
video_out_name = video_out_name + "_t" + str(thread_idx).zfill( video_out_name = video_out_name + "_t" + str(thread_idx).zfill(
2) + "_rtsp" 2) + "_rtsp"
if not os.path.exists(self.output_dir): if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir) os.makedirs(self.output_dir)
out_path = os.path.join(self.output_dir, video_out_name+".mp4") out_path = os.path.join(self.output_dir, video_out_name + ".mp4")
fourcc = cv2.VideoWriter_fourcc(* 'mp4v') fourcc = cv2.VideoWriter_fourcc(* 'mp4v')
writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
...@@ -662,16 +719,16 @@ class PipePredictor(object): ...@@ -662,16 +719,16 @@ class PipePredictor(object):
object_in_region_info = { object_in_region_info = {
} # store info for vehicle parking in region } # store info for vehicle parking in region
illegal_parking_dict = None illegal_parking_dict = None
cars_count = 0
retrograde_traj_len = 0
framequeue = queue.Queue(10) framequeue = queue.Queue(10)
thread = threading.Thread( thread = threading.Thread(
target=self.capturevideo, target=self.capturevideo, args=(capture, framequeue))
args=(capture, framequeue))
thread.start() thread.start()
time.sleep(1) time.sleep(1)
while(not framequeue.empty()): while (not framequeue.empty()):
if frame_id % 10 == 0: if frame_id % 10 == 0:
print('Thread: {}; frame id: {}'.format(thread_idx, frame_id)) print('Thread: {}; frame id: {}'.format(thread_idx, frame_id))
...@@ -741,10 +798,10 @@ class PipePredictor(object): ...@@ -741,10 +798,10 @@ class PipePredictor(object):
self.pipe_timer.total_time.end() self.pipe_timer.total_time.end()
if self.cfg['visual']: if self.cfg['visual']:
_, _, fps = self.pipe_timer.get_total_time() _, _, fps = self.pipe_timer.get_total_time()
im = self.visualize_video(frame_rgb, mot_res, frame_id, fps, im = self.visualize_video(frame_rgb, mot_res, frame_id,
entrance, records, fps, entrance, records,
center_traj) # visualize center_traj) # visualize
if len(self.pushurl)>0: if len(self.pushurl) > 0:
pushstream.pipe.stdin.write(im.tobytes()) pushstream.pipe.stdin.write(im.tobytes())
else: else:
writer.write(im) writer.write(im)
...@@ -789,6 +846,39 @@ class PipePredictor(object): ...@@ -789,6 +846,39 @@ class PipePredictor(object):
self.pipe_timer.module_time['vehicle_attr'].end() self.pipe_timer.module_time['vehicle_attr'].end()
self.pipeline_res.update(attr_res, 'vehicle_attr') self.pipeline_res.update(attr_res, 'vehicle_attr')
if self.with_vehicle_press or self.with_vehicle_retrograde:
if frame_id == 0 or cars_count == 0 or cars_count > len(
mot_res['boxes']):
if frame_id > self.warmup_frame:
self.pipe_timer.module_time['lanes'].start()
lanes, directions = self.laneseg_predictor.run(
[copy.deepcopy(frame_rgb)])
lanes_res = {'output': lanes, 'directions': directions}
if frame_id > self.warmup_frame:
self.pipe_timer.module_time['lanes'].end()
if frame_id == 0 or (len(lanes) > 0 and frame_id > 0):
self.pipeline_res.update(lanes_res, 'lanes')
cars_count = len(mot_res['boxes'])
if self.with_vehicle_press:
if frame_id > self.warmup_frame:
self.pipe_timer.module_time['vehicle_press'].start()
press_lane = copy.deepcopy(self.pipeline_res.get('lanes'))
if press_lane is None:
continue
vehicle_press_res_list = self.vehicle_press_predictor.mot_run(
press_lane, mot_res['boxes'])
vehiclepress_res = {'output': vehicle_press_res_list}
if frame_id > self.warmup_frame:
self.pipe_timer.module_time['vehicle_press'].end()
self.pipeline_res.update(vehiclepress_res, 'vehicle_press')
if self.with_idbased_detaction: if self.with_idbased_detaction:
if frame_id > self.warmup_frame: if frame_id > self.warmup_frame:
self.pipe_timer.module_time['det_action'].start() self.pipe_timer.module_time['det_action'].start()
...@@ -905,6 +995,69 @@ class PipePredictor(object): ...@@ -905,6 +995,69 @@ class PipePredictor(object):
video_action_imgs.clear() # next clip video_action_imgs.clear() # next clip
if self.with_vehicle_retrograde:
# get the params
frame_len = self.cfg["VEHICLE_RETROGRADE"]["frame_len"]
sample_freq = self.cfg["VEHICLE_RETROGRADE"]["sample_freq"]
if sample_freq * frame_len > frame_count: # video is too short
sample_freq = int(frame_count / frame_len)
# filter the warmup frames
if frame_id > self.warmup_frame:
self.pipe_timer.module_time['vehicle_retrograde'].start()
if frame_id % sample_freq == 0:
frame_mot_res = copy.deepcopy(self.pipeline_res.get('mot'))
self.vehicle_retrograde_predictor.update_center_traj(
frame_mot_res, max_len=frame_len)
retrograde_traj_len = retrograde_traj_len + 1
#the number of collected frames is enough to predict
if retrograde_traj_len == frame_len:
retrograde_mot_res = copy.deepcopy(
self.pipeline_res.get('mot'))
retrograde_lanes = copy.deepcopy(
self.pipeline_res.get('lanes'))
frame_shape = frame_rgb.shape
if retrograde_lanes is None:
continue
retrograde_res, fence_line = self.vehicle_retrograde_predictor.mot_run(
lanes_res=retrograde_lanes,
det_res=retrograde_mot_res,
frame_shape=frame_shape)
retrograde_res_update = self.pipeline_res.get(
'vehicle_retrograde')
if retrograde_res_update is not None:
retrograde_res_update = retrograde_res_update['output']
if retrograde_res is not None:
for retrograde_res_id in retrograde_res:
if retrograde_res_id not in retrograde_res_update:
retrograde_res_update.append(
retrograde_res_id)
else:
retrograde_res_update = []
retrograde_res_dict = {
'output': retrograde_res_update,
"fence_line": fence_line,
}
if retrograde_res is not None and len(retrograde_res) > 0:
print("retrograde res:", retrograde_res)
self.pipeline_res.update(retrograde_res_dict,
'vehicle_retrograde')
if frame_id > self.warmup_frame:
self.pipe_timer.module_time['vehicle_retrograde'].end()
retrograde_traj_len = 0
self.collector.append(frame_id, self.pipeline_res) self.collector.append(frame_id, self.pipeline_res)
if frame_id > self.warmup_frame: if frame_id > self.warmup_frame:
...@@ -920,7 +1073,7 @@ class PipePredictor(object): ...@@ -920,7 +1073,7 @@ class PipePredictor(object):
entrance, records, center_traj, entrance, records, center_traj,
self.illegal_parking_time != -1, self.illegal_parking_time != -1,
illegal_parking_dict) # visualize illegal_parking_dict) # visualize
if len(self.pushurl)>0: if len(self.pushurl) > 0:
pushstream.pipe.stdin.write(im.tobytes()) pushstream.pipe.stdin.write(im.tobytes())
else: else:
writer.write(im) writer.write(im)
...@@ -928,7 +1081,8 @@ class PipePredictor(object): ...@@ -928,7 +1081,8 @@ class PipePredictor(object):
cv2.imshow('Paddle-Pipeline', im) cv2.imshow('Paddle-Pipeline', im)
if cv2.waitKey(1) & 0xFF == ord('q'): if cv2.waitKey(1) & 0xFF == ord('q'):
break break
if self.cfg['visual'] and len(self.pushurl)==0:
if self.cfg['visual'] and len(self.pushurl) == 0:
writer.release() writer.release()
print('save result to {}'.format(out_path)) print('save result to {}'.format(out_path))
...@@ -945,6 +1099,7 @@ class PipePredictor(object): ...@@ -945,6 +1099,7 @@ class PipePredictor(object):
illegal_parking_dict=None): illegal_parking_dict=None):
image = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) image = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
mot_res = copy.deepcopy(result.get('mot')) mot_res = copy.deepcopy(result.get('mot'))
if mot_res is not None: if mot_res is not None:
ids = mot_res['boxes'][:, 0] ids = mot_res['boxes'][:, 0]
scores = mot_res['boxes'][:, 2] scores = mot_res['boxes'][:, 2]
...@@ -997,6 +1152,20 @@ class PipePredictor(object): ...@@ -997,6 +1152,20 @@ class PipePredictor(object):
image = visualize_attr(image, vehicle_attr_res, boxes) image = visualize_attr(image, vehicle_attr_res, boxes)
image = np.array(image) image = np.array(image)
lanes_res = result.get('lanes')
if lanes_res is not None:
lanes = lanes_res['output'][0]
image = visualize_lane(image, lanes)
image = np.array(image)
vehiclepress_res = result.get('vehicle_press')
if vehiclepress_res is not None:
press_vehicle = vehiclepress_res['output']
if len(press_vehicle) > 0:
image = visualize_vehiclepress(
image, press_vehicle, threshold=self.cfg['crop_thresh'])
image = np.array(image)
if mot_res is not None: if mot_res is not None:
vehicleplate = False vehicleplate = False
plates = [] plates = []
...@@ -1036,6 +1205,13 @@ class PipePredictor(object): ...@@ -1036,6 +1205,13 @@ class PipePredictor(object):
video_action_score=video_action_score, video_action_score=video_action_score,
video_action_text="Fight") video_action_text="Fight")
vehicle_retrograde_res = result.get('vehicle_retrograde')
if vehicle_retrograde_res is not None:
mot_retrograde_res = copy.deepcopy(result.get('mot'))
image = visualize_vehicle_retrograde(image, mot_retrograde_res,
vehicle_retrograde_res)
image = np.array(image)
visual_helper_for_display = [] visual_helper_for_display = []
action_to_display = [] action_to_display = []
...@@ -1067,6 +1243,8 @@ class PipePredictor(object): ...@@ -1067,6 +1243,8 @@ class PipePredictor(object):
human_attr_res = result.get('attr') human_attr_res = result.get('attr')
vehicle_attr_res = result.get('vehicle_attr') vehicle_attr_res = result.get('vehicle_attr')
vehicleplate_res = result.get('vehicleplate') vehicleplate_res = result.get('vehicleplate')
lanes_res = result.get('lanes')
vehiclepress_res = result.get('vehicle_press')
for i, (im_file, im) in enumerate(zip(im_files, images)): for i, (im_file, im) in enumerate(zip(im_files, images)):
if det_res is not None: if det_res is not None:
...@@ -1094,6 +1272,16 @@ class PipePredictor(object): ...@@ -1094,6 +1272,16 @@ class PipePredictor(object):
det_res_i['boxes'][:, 4:6] = det_res_i[ det_res_i['boxes'][:, 4:6] = det_res_i[
'boxes'][:, 4:6] - det_res_i['boxes'][:, 2:4] 'boxes'][:, 4:6] - det_res_i['boxes'][:, 2:4]
im = visualize_vehicleplate(im, plates, det_res_i['boxes']) im = visualize_vehicleplate(im, plates, det_res_i['boxes'])
if vehiclepress_res is not None:
press_vehicle = vehiclepress_res['output'][i]
if len(press_vehicle) > 0:
im = visualize_vehiclepress(
im, press_vehicle, threshold=self.cfg['crop_thresh'])
im = np.ascontiguousarray(np.copy(im))
if lanes_res is not None:
lanes = lanes_res['output'][i]
im = visualize_lane(im, lanes)
im = np.ascontiguousarray(np.copy(im))
img_name = os.path.split(im_file)[-1] img_name = os.path.split(im_file)[-1]
if not os.path.exists(self.output_dir): if not os.path.exists(self.output_dir):
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import codecs
import os
import yaml
import numpy as np
import cv2
from sklearn.cluster import DBSCAN
from pptracking.python.det_infer import load_predictor
class LaneSegPredictor:
def __init__(self, lane_seg_config, model_dir):
"""
Prepare for prediction.
The usage and docs of paddle inference, please refer to
https://paddleinference.paddlepaddle.org.cn/product_introduction/summary.html
"""
if not os.path.exists(lane_seg_config):
raise ValueError("Cannot find : {},".format(lane_seg_config))
args = yaml.safe_load(open(lane_seg_config))
self.model_dir = model_dir
self.args = args[args['type']]
self.shape = None
self.filter_horizontal_flag = self.args['filter_horizontal_flag']
self.horizontal_filtration_degree = self.args[
'horizontal_filtration_degree']
self.horizontal_filtering_threshold = self.args[
'horizontal_filtering_threshold']
try:
self.predictor, _ = load_predictor(
model_dir=self.model_dir,
run_mode=self.args['run_mode'],
batch_size=self.args['batch_size'],
device=self.args['device'],
min_subgraph_size=self.args['min_subgraph_size'],
use_dynamic_shape=self.args['use_dynamic_shape'],
trt_min_shape=self.args['trt_min_shape'],
trt_max_shape=self.args['trt_max_shape'],
trt_opt_shape=self.args['trt_opt_shape'],
trt_calib_mode=self.args['trt_calib_mode'],
cpu_threads=self.args['cpu_threads'],
enable_mkldnn=self.args['enable_mkldnn'])
except Exception as e:
print(str(e))
exit()
def run(self, img):
input_names = self.predictor.get_input_names()
input_handle = self.predictor.get_input_handle(input_names[0])
output_names = self.predictor.get_output_names()
output_handle = self.predictor.get_output_handle(output_names[0])
img = np.array(img)
self.shape = img.shape[1:3]
img = self.normalize(img)
img = np.transpose(img, (0, 3, 1, 2))
input_handle.reshape(img.shape)
input_handle.copy_from_cpu(img)
self.predictor.run()
results = output_handle.copy_to_cpu()
results = self.postprocess(results)
return self.get_line(results)
def normalize(self, im, mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)):
mean = np.array(mean)[np.newaxis, np.newaxis, :]
std = np.array(std)[np.newaxis, np.newaxis, :]
im = im.astype(np.float32, copy=False) / 255.0
im -= mean
im /= std
return im
def postprocess(self, pred):
pred = np.argmax(pred, axis=1)
pred[pred == 3] = 0
pred[pred > 0] = 255
return pred
def get_line(self, results):
lines = []
directions = []
for i in range(results.shape[0]):
line, direction = self.hough_line(np.uint8(results[i]))
lines.append(line)
directions.append(direction)
return lines, directions
def get_distance(self, array_1, array_2):
lon_a = array_1[0]
lat_a = array_1[1]
lon_b = array_2[0]
lat_b = array_2[1]
s = pow(pow((lat_b - lat_a), 2) + pow((lon_b - lon_a), 2), 0.5)
return s
def get_angle(self, array):
import math
x1, y1, x2, y2 = array
a_x = x2 - x1
a_y = y2 - y1
angle1 = math.atan2(a_y, a_x)
angle1 = int(angle1 * 180 / math.pi)
if angle1 > 90:
angle1 = 180 - angle1
return angle1
def get_proportion(self, lines):
proportion = 0.0
h, w = self.shape
for line in lines:
x1, y1, x2, y2 = line
length = abs(y2 - y1) / h + abs(x2 - x1) / w
proportion = proportion + length
return proportion
def line_cluster(self, linesP):
points = []
for i in range(0, len(linesP)):
l = linesP[i]
x_center = (float(
(max(l[2], l[0]) - min(l[2], l[0]))) / 2.0 + min(l[2], l[0]))
y_center = (float(
(max(l[3], l[1]) - min(l[3], l[1]))) / 2.0 + min(l[3], l[1]))
points.append([x_center, y_center])
dbscan = DBSCAN(
eps=50, min_samples=2, metric=self.get_distance).fit(points)
labels = dbscan.labels_
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
cluster_list = list([] for i in range(n_clusters_))
if linesP is not None:
for i in range(0, len(linesP)):
if labels[i] == -1:
continue
l = linesP[i]
x1, y1, x2, y2 = l
if y2 >= y1:
cluster_list[labels[i]].append([x1, y1, x2, y2])
else:
ll = [x2, y2, x1, y1]
cluster_list[labels[i]].append(ll)
return cluster_list
def hough_line(self,
binary_img,
min_line=50,
min_line_points=50,
max_line_gap=10):
linesP = cv2.HoughLinesP(binary_img, 1, np.pi / 180, min_line, None,
min_line_points, max_line_gap)
if linesP is None:
return [], None
coarse_cluster_list = self.line_cluster(linesP[:, 0])
filter_lines_output, direction = self.filter_lines(coarse_cluster_list)
return filter_lines_output, direction
def filter_lines(self, coarse_cluster_list):
lines = []
angles = []
for i in range(len(coarse_cluster_list)):
if len(coarse_cluster_list[i]) == 0:
continue
coarse_cluster_list[i] = np.array(coarse_cluster_list[i])
distance = abs(coarse_cluster_list[i][:, 3] - coarse_cluster_list[i]
[:, 1]) + abs(coarse_cluster_list[i][:, 2] -
coarse_cluster_list[i][:, 0])
l = coarse_cluster_list[i][np.argmax(distance)]
angles.append(self.get_angle(l))
lines.append(l)
if len(lines) == 0:
return [], None
if not self.filter_horizontal_flag:
return lines, None
#filter horizontal roads
angles = np.array(angles)
max_angle, min_angle = np.max(angles), np.min(angles)
if (max_angle - min_angle) < self.horizontal_filtration_degree:
return lines, np.mean(angles)
thr_angle = (
max_angle + min_angle) * self.horizontal_filtering_threshold
lines = np.array(lines)
min_angle_line = lines[np.where(angles < thr_angle)]
max_angle_line = lines[np.where(angles >= thr_angle)]
max_angle_line_pro = self.get_proportion(max_angle_line)
min_angle_line_pro = self.get_proportion(min_angle_line)
if max_angle_line_pro >= min_angle_line_pro:
angle_list = angles[np.where(angles >= thr_angle)]
return max_angle_line, np.mean(angle_list)
else:
angle_list = angles[np.where(angles < thr_angle)]
return min_angle_line, np.mean(angle_list)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import math
class VehiclePressingRecognizer(object):
def __init__(self, cfg):
self.cfg = cfg
def judge(self, Ax1, Ay1, Ax2, Ay2, Bx1, By1, Bx2, By2):
if (max(Ax1,Ax2)>=min(Bx1,Bx2) and min(Ax1,Ax2)<=max(Bx1,Bx2)) and \
(max(Ay1,Ay2)>=min(By1,By2) and min(Ay1,Ay2)<=max(By1,By2)):
if ((Bx1-Ax1)*(Ay2-Ay1)-(By1-Ay1)*(Ax2-Ax1)) * ((Bx2-Ax1)*(Ay2-Ay1)-(By2-Ay1)*(Ax2-Ax1))<=0 \
and ((Ax1-Bx1)*(By2-By1)-(Ay1-By1)*(Bx2-Bx1)) * ((Ax2-Bx1)*(By2-By1)-(Ay2-By1)*(Bx2-Bx1)) <=0:
return True
else:
return False
else:
return False
def is_intersect(self, line, bbox):
Ax1, Ay1, Ax2, Ay2 = line
xmin, ymin, xmax, ymax = bbox
bottom = self.judge(Ax1, Ay1, Ax2, Ay2, xmin, ymax, xmax, ymax)
return bottom
def run(self, lanes, det_res):
intersect_bbox_list = []
start_idx, boxes_num_i = 0, 0
for i in range(len(lanes)):
lane = lanes[i]
if det_res is not None:
det_res_i = {}
boxes_num_i = det_res['boxes_num'][i]
det_res_i['boxes'] = det_res['boxes'][start_idx:start_idx +
boxes_num_i, :]
intersect_bbox = []
for line in lane:
for bbox in det_res_i['boxes']:
if self.is_intersect(line, bbox[2:]):
intersect_bbox.append(bbox)
intersect_bbox_list.append(intersect_bbox)
start_idx += boxes_num_i
return intersect_bbox_list
def mot_run(self, lanes, det_res):
intersect_bbox_list = []
if det_res is None:
return intersect_bbox_list
lanes_res = lanes['output']
for i in range(len(lanes_res)):
lane = lanes_res[i]
for line in lane:
for bbox in det_res:
if self.is_intersect(line, bbox[3:]):
intersect_bbox_list.append(bbox)
return intersect_bbox_list
\ No newline at end of file
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import math
class VehicleRetrogradeRecognizer(object):
def __init__(self, cfg):
self.cfg = cfg
self.filter_horizontal_flag = self.cfg['filter_horizontal_flag']
self.deviation = self.cfg['deviation']
self.move_scale = self.cfg['move_scale']
self.keep_right_flag = self.cfg['keep_right_flag']
self.center_traj_retrograde = [{}] #retrograde recognizer record use
self.fence_line = None if len(self.cfg[
'fence_line']) == 0 else self.cfg['fence_line']
def update_center_traj(self, mot_res, max_len):
from collections import deque, defaultdict
if mot_res is not None:
ids = mot_res['boxes'][:, 0]
scores = mot_res['boxes'][:, 2]
boxes = mot_res['boxes'][:, 3:]
boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
else:
boxes = np.zeros([0, 4])
ids = np.zeros([0])
scores = np.zeros([0])
# single class, still need to be defaultdict type for ploting
num_classes = 1
online_tlwhs = defaultdict(list)
online_scores = defaultdict(list)
online_ids = defaultdict(list)
online_tlwhs[0] = boxes
online_ids[0] = ids
if mot_res is not None:
for cls_id in range(num_classes):
tlwhs = online_tlwhs[cls_id]
obj_ids = online_ids[cls_id]
for i, tlwh in enumerate(tlwhs):
x1, y1, w, h = tlwh
center = tuple(map(int, (x1 + w / 2., y1 + h)))
obj_id = int(obj_ids[i])
if self.center_traj_retrograde is not None:
if obj_id not in self.center_traj_retrograde[cls_id]:
self.center_traj_retrograde[cls_id][obj_id] = deque(
maxlen=max_len)
self.center_traj_retrograde[cls_id][obj_id].append(
center)
def get_angle(self, array):
x1, y1, x2, y2 = array
a_x = x2 - x1
a_y = y2 - y1
angle1 = math.atan2(a_y, a_x)
angle1 = int(angle1 * 180 / math.pi)
a_x = x2 - x1 if y2 >= y1 else x1 - x2
a_y = y2 - y1 if y2 >= y1 else y1 - y2
angle2 = math.atan2(a_y, a_x)
angle2 = int(angle2 * 180 / math.pi)
if angle2 > 90:
angle2 = 180 - angle2
return angle1, angle2
def is_move(self, array, frame_shape):
x1, y1, x2, y2 = array
h, w, _ = frame_shape
if abs(x1 - x2) > w * self.move_scale or abs(y1 -
y2) > h * self.move_scale:
return True
else:
return False
def get_distance_point2line(self, point, line):
line_point1, line_point2 = np.array(line[0:2]), np.array(line[2:])
vec1 = line_point1 - point
vec2 = line_point2 - point
distance = np.abs(np.cross(vec1, vec2)) / np.linalg.norm(line_point1 -
line_point2)
return distance
def driving_direction(self, line1, line2, is_init=False):
x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
x2, y2 = line2[0] - line1[0], line2[1] - line1[1]
result = x1 * y2 - x2 * y1
distance = self.get_distance_point2line([x2, y2], line1)
if result < 0:
result = 1
elif result == 0:
if line2[3] >= line2[1]:
return -1
else:
return 1
else:
result = -1
return result, distance
def get_long_fence_line(self, h, w, line):
x1, y1, x2, y2 = line
if x1 == x2:
return [x1, 0, x1, h]
if y1 == y2:
return [0, y1, w, y1]
k = (y2 - y1) / (x2 - x1)
b = y1 - k * x1
if k == 1 and b == 0:
return [0, 0, w, h]
if k == -1 and b == 0:
return [w, 0, h, h]
top = [-b / k, 0]
left = [0, b]
right = [w, k * w + b]
bottom = [(h - b) / k, h]
candidate = np.array([top, left, right, bottom])
flag = np.array([0, 0, 0, 0])
if top[0] >= 0 and top[0] <= w:
flag[0] = 1
if left[1] > 0 and left[1] <= h:
flag[1] = 1
if right[1] > 0 and right[1] <= h:
flag[2] = 1
if bottom[0] > 0 and bottom[0] < w:
flag[3] = 1
ind = np.where(flag == 1)
candidate = candidate[ind]
candidate_sort = candidate[candidate[:, 1].argsort()]
return [
int(candidate_sort[0][0]), int(candidate_sort[0][1]),
int(candidate_sort[1][0]), int(candidate_sort[1][1])
]
def init_fence_line(self, lanes, pos_dir_traj, neg_dir_traj, frame_shape):
fence_lines_candidate = None
h, w, _ = frame_shape
abs_distance = h * h + w * w
for lane in lanes[0]:
pos_dir_distansce = h * h + w * w
neg_dir_distansce = h * h + w * w
pos_dir = 0
neg_dir = 0
for traj_line in pos_dir_traj:
dir_result, distansce = self.driving_direction(
lane, traj_line['traj_line'])
if dir_result > 0:
pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
pos_dir = 1
else:
neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
neg_dir = 1
if pos_dir > 0 and neg_dir > 0:
continue
for traj_line in neg_dir_traj:
dir_result, distansce = self.driving_direction(
lane, traj_line['traj_line'])
if dir_result > 0:
pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
pos_dir = 1
else:
neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
neg_dir = 1
if pos_dir > 0 and neg_dir > 0:
diff_dir_distance = abs(pos_dir_distansce - neg_dir_distansce)
if diff_dir_distance < abs_distance:
fence_lines_candidate = lane
abs_distance = diff_dir_distance
if fence_lines_candidate is None:
return None
fence_lines_candidate = self.get_long_fence_line(h, w,
fence_lines_candidate)
return fence_lines_candidate
def judge_retrograde(self, traj_line):
line1 = self.fence_line
x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
line2 = traj_line['traj_line']
x2_start_point, y2_start_point = line2[0] - line1[0], line2[1] - line1[
1]
x2_end_point, y2_end_point = line2[2] - line1[0], line2[3] - line1[1]
start_point_dir = x1 * y2_start_point - x2_start_point * y1
end_point_dir = x1 * y2_end_point - x2_end_point * y1
if start_point_dir < 0:
start_point_dir = 1
elif start_point_dir == 0:
if line2[3] >= line2[1]:
start_point_dir = -1
else:
start_point_dir = 1
else:
start_point_dir = -1
if end_point_dir < 0:
end_point_dir = 1
elif end_point_dir == 0:
if line2[3] >= line2[1]:
end_point_dir = -1
else:
end_point_dir = 1
else:
end_point_dir = -1
if self.keep_right_flag:
driver_dir = -1 if (line2[3] - line2[1]) >= 0 else 1
else:
driver_dir = -1 if (line2[3] - line2[1]) <= 0 else 1
return start_point_dir == driver_dir and start_point_dir == end_point_dir
def mot_run(self, lanes_res, det_res, frame_shape):
det = det_res['boxes']
directions = lanes_res['directions']
lanes = lanes_res['output']
if len(directions) > 0:
direction = directions[0]
else:
return [], self.fence_line
if len(det) == 0:
return [], self.fence_line
traj_lines = []
pos_dir_traj = []
neg_dir_traj = []
for i in range(len(det)):
class_id = int(det[i][1])
mot_id = int(det[i][0])
traj_i = self.center_traj_retrograde[class_id][mot_id]
if len(traj_i) < 2:
continue
traj_line = {
'index': i,
'mot_id': mot_id,
'traj_line':
[traj_i[0][0], traj_i[0][1], traj_i[-1][0], traj_i[-1][1]]
}
if not self.is_move(traj_line['traj_line'], frame_shape):
continue
angle, angle_deviation = self.get_angle(traj_line['traj_line'])
if direction is not None and self.filter_horizontal_flag:
if abs(angle_deviation - direction) > self.deviation:
continue
traj_line['angle'] = angle
traj_lines.append(traj_line)
if self.fence_line is None:
if angle >= 0:
pos_dir_traj.append(traj_line)
else:
neg_dir_traj.append(traj_line)
if len(traj_lines) == 0:
return [], self.fence_line
if self.fence_line is None:
if len(pos_dir_traj) < 1 or len(neg_dir_traj) < 1:
return [], None
self.fence_line = self.init_fence_line(lanes, pos_dir_traj,
neg_dir_traj, frame_shape)
return [], self.fence_line
else:
retrograde_list = []
for traj_line in traj_lines:
if self.judge_retrograde(traj_line) == False:
retrograde_list.append(det[traj_line['index']][0])
return retrograde_list, self.fence_line
...@@ -442,3 +442,138 @@ def visualize_vehicleplate(im, results, boxes=None): ...@@ -442,3 +442,138 @@ def visualize_vehicleplate(im, results, boxes=None):
text_scale, (0, 255, 255), text_scale, (0, 255, 255),
thickness=text_thickness) thickness=text_thickness)
return im return im
def draw_press_box_lanes(im, np_boxes, labels, threshold=0.5):
"""
Args:
im (PIL.Image.Image): PIL image
np_boxes (np.ndarray): shape:[N,6], N: number of box,
matix element:[class, score, x_min, y_min, x_max, y_max]
labels (list): labels:['class1', ..., 'classn']
threshold (float): threshold of box
Returns:
im (PIL.Image.Image): visualized image
"""
if isinstance(im, str):
im = Image.open(im).convert('RGB')
elif isinstance(im, np.ndarray):
im = Image.fromarray(im)
draw_thickness = min(im.size) // 320
draw = ImageDraw.Draw(im)
clsid2color = {}
color_list = get_color_map_list(len(labels))
if np_boxes.shape[1] == 7:
np_boxes = np_boxes[:, 1:]
expect_boxes = (np_boxes[:, 1] > threshold) & (np_boxes[:, 0] > -1)
np_boxes = np_boxes[expect_boxes, :]
for dt in np_boxes:
clsid, bbox, score = int(dt[0]), dt[2:], dt[1]
if clsid not in clsid2color:
clsid2color[clsid] = color_list[clsid]
color = tuple(clsid2color[clsid])
if len(bbox) == 4:
xmin, ymin, xmax, ymax = bbox
# draw bbox
draw.line(
[(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin),
(xmin, ymin)],
width=draw_thickness,
fill=(0, 0, 255))
elif len(bbox) == 8:
x1, y1, x2, y2, x3, y3, x4, y4 = bbox
draw.line(
[(x1, y1), (x2, y2), (x3, y3), (x4, y4), (x1, y1)],
width=2,
fill=color)
xmin = min(x1, x2, x3, x4)
ymin = min(y1, y2, y3, y4)
# draw label
text = "{}".format(labels[clsid])
tw, th = draw.textsize(text)
draw.rectangle(
[(xmin + 1, ymax - th), (xmin + tw + 1, ymax)], fill=color)
draw.text((xmin + 1, ymax - th), text, fill=(0, 0, 255))
return im
def visualize_vehiclepress(im, results, threshold=0.5):
results = np.array(results)
labels = ['violation']
im = draw_press_box_lanes(im, results, labels, threshold=threshold)
return im
def visualize_lane(im, lanes):
if isinstance(im, str):
im = Image.open(im).convert('RGB')
elif isinstance(im, np.ndarray):
im = Image.fromarray(im)
draw_thickness = min(im.size) // 320
draw = ImageDraw.Draw(im)
if len(lanes) > 0:
for lane in lanes:
draw.line(
[(lane[0], lane[1]), (lane[2], lane[3])],
width=draw_thickness,
fill=(0, 0, 255))
return im
def visualize_vehicle_retrograde(im, mot_res, vehicle_retrograde_res):
if isinstance(im, str):
im = Image.open(im).convert('RGB')
elif isinstance(im, np.ndarray):
im = Image.fromarray(im)
draw_thickness = min(im.size) // 320
draw = ImageDraw.Draw(im)
lane = vehicle_retrograde_res['fence_line']
if lane is not None:
draw.line(
[(lane[0], lane[1]), (lane[2], lane[3])],
width=draw_thickness,
fill=(0, 0, 0))
mot_id = vehicle_retrograde_res['output']
if mot_id is None or len(mot_id) == 0:
return im
if mot_res is None:
return im
np_boxes = mot_res['boxes']
if np_boxes is not None:
for dt in np_boxes:
if dt[0] not in mot_id:
continue
bbox = dt[3:]
if len(bbox) == 4:
xmin, ymin, xmax, ymax = bbox
# draw bbox
draw.line(
[(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin),
(xmin, ymin)],
width=draw_thickness,
fill=(0, 255, 0))
# draw label
text = "retrograde"
tw, th = draw.textsize(text)
draw.rectangle(
[(xmax + 1, ymin - th), (xmax + tw + 1, ymin)],
fill=(0, 255, 0))
draw.text((xmax + 1, ymin - th), text, fill=(0, 255, 0))
return im
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册