未验证 提交 7ec3ec11 编写于 作者: D Double_V 提交者: GitHub

Add VOT models (#4257)

* First version for VOT models.
* Include SiamFC and ATOM.
* A unified architecture for ATOM and Siames series models.
上级 87e87ae7
[submodule "pytracking/pysot-toolkit"]
path = pytracking/pysot-toolkit
url = https://github.com/StrangerZhang/pysot-toolkit.git
此差异已折叠。
# tracking 单目标跟踪框架
## 目标跟踪介绍
tracking 是基于百度深度学习框架Paddle研发的视频单目标跟踪(Visual Object Tracking, VOT)库, 整体框架参考 [pytracking](https://github.com/visionml/pytracking),其优秀的设计使得我们能够方便地将其他跟踪器如SiamFC,SiamRPN,SiamMask等融合到一个框架中,方便后续统一的实验和比较。
当前tracking涵盖当前目标跟踪的主流模型,包括SiamFC, SiamRPN, SiamMask, ATOM。tracking旨在给开发者提供一系列基于PaddlePaddle的便捷、高效的目标跟踪深度学习算法,后续会不断的扩展模型的丰富度。
## 目标跟踪库的代码目录结构
```
ltr 包含模型训练代码
└─ actors 输入数据,输出优化目标
└─ admin 管理数据路径等
└─ data 多线程数据读取和预处理
└─ dataset 训练数据集读取
└─ models 模型定义
└─ train_settings 训练配置
└─ trainers 模型训练器
└─ run_training.py 模型训练入口程序
pytracking 包含跟踪代码
└─ admin 管理数据路径,模型位置等
└─ features 特征提取
└─ libs 跟踪常用操作
└─ parameter 跟踪器参数设置
└─ tracker 跟踪器
└─ utils 画图等
└─ pysot-toolkit 评估数据集载入和指标计算
└─ eval_benchmark.py 评估跟踪器入口程序
└─ visualize_results_on_benchmark.ipynb 可视化跟踪结果
```
## 开始使用
### 数据准备
目标跟踪的训练集和测试集是不同的,目前最好的模型往往是使用多个训练集进行训练。常用的数据集如下:
主流的训练数据集有:
- [VID](http://bvisionweb1.cs.unc.edu/ilsvrc2015/ILSVRC2015_VID.tar.gz)
- [Microsoft COCO 2014](http://cocodataset.org/#download)
- [LaSOT](https://drive.google.com/file/d/1O2DLxPP8M4Pn4-XCttCJUW3A29tDIeNa/view)
下载并解压后的数据集的组织方式为:
```
/Datasets/
└─ ILSVRC2015_VID/
└─ train2014/
└─ LaSOTBenchmark/
```
注:数据集较大,请预留足够的磁盘空间。训练Siamfc时,只需要下载VID数据集,训练ATOM需要全部下载上述三个数据集。
## 快速开始
tracking的工作环境:
- python3
- PaddlePaddle1.7
### 安装依赖
1. 安装paddle,需要安装1.7版本的Paddle,如低于这个版本,请升级到Paddle 1.7.
```bash
pip install paddlepaddle==1.7.0
```
2. 安装第三方库,建议使用anaconda
```bash
# (可选) 0. 强烈建议新建一个 conda 环境,在安装 anaconda 后执行
# conda create -n paddle1.7-py3.6 python=3.6
# conda activate paddle1.7-py3.6
cd tracking
pip install -r requirements.txt
# (可选) 1. 推荐安装:快速读取 jpeg 文件
apt-get install libturbojpeg
# (可选) 2. 推荐安装:进程控制
apt-get install build-essential libcap-dev
pip install python-prctl
```
### 预训练 backbone 下载
在开始训练前,先准备SiamRPN、SiamMask、ATOM模型的Backbone预训练模型。
我们提供 ATOM ResNet18 和 ResNet50 的 backbone模型。可从[这里](https://paddlemodels.bj.bcebos.com/paddle_track/vot/pretrained_models.tar)下载所有预训练模型的压缩包。
压缩包解压后的文件夹为 `pretrained_models`. 文件的目录结构如下:
```
/pretrained_models/
└─ atom
└─ atom_resnet18.pdparams
└─ atom_resnet50.pdparams
└─ backbone
└─ ResNet18.pdparams
└─ ResNet50.pdparams
```
其中/pretrained_models/backbone/文件夹包含,ResNet18、ResNet50在Imagenet上的预训练模型。
### 设置训练参数
在启动训练前,需要设置tracking使用的数据集路径,以及训练模型保存的路径,这些参数在ltr/admin/local.py中设置。
首先,需要先生成local.py文件。
```bash
# 到代码库根目录
cd tracking
# 生成 local.py 文件,再次训练时不需要重新生成
python -c "from ltr.admin.environment import create_default_local_file; create_default_local_file()"
```
其次,设置训练模型文件保存路径:workspace_dir,backbone模型路径:backbone_dir,数据集路径等等,对于没有用到的数据集,可以不用设置其路径。
```
# 用你常用的编辑器编辑 ltr/admin/local.py
# 比方说,vim ltr/admin/local.py
# 其中,
# workspace_dir = './checkpoints' # 要保存训练模型的位置
# backbone_dir = Your BACKBONE_PATH # 训练SiamFC时不需要设置
# 并依次设定需要使用的训练数据集如 VID, LaSOT, COCO 等,比如:
# imagenet_dir = '/Datasets/ILSVRC2015/' # 设置训练集VID的路径
```
训练SiamFC时需要只需要配置 workspace_dir和 imagenet_dir即可,如下:
```bash
self.workspace_dir = './checkpoints'
self.imagenet_dir = '/Datasets/ILSVRC2015/'
```
训练ATOM时,除了 workspace_dir和 imagenet_dir外,还需要指定coco和lasot的数据集路径,参考如下:
```bash
self.workspace_dir = './checkpoints'
self.lasot_dir = '/Datasets/LaSOTBenchmark/'
self.coco_dir = '/Datasets/train2014/'
self.imagenet_dir = '/Datasets/ILSVRC2015/'
```
另外,训练ATOM时,需要准备got10k和lasot的数据集划分文件,方式如下:
```bash
cd ltr/data_specs/
wget https://paddlemodels.cdn.bcebos.com/paddle_track/vot/got10k_lasot_split.tar
tar xvf got10k_lasot_split.tar
```
### 启动训练
```bash
# 到训练代码目录
cd ltr
# 训练 ATOM ResNet18
python run_training.py bbreg atom_res18_vid_lasot_coco
# 训练 ATOM ResNet50
python run_training.py bbreg atom_res50_vid_lasot_coco
# 训练 SiamFC
python run_training.py siamfc siamfc_alexnet_vid
```
## 模型评估
评估训练后的模型使用[pysot-toolkit](https://github.com/StrangerZhang/pysot-toolkit)工具包,其提供了多个单目标跟踪数据集的评估API。测试数据集建议从pysot-toolkit 提供的链接中下载。
准备好测试数据后,使用如下命令,克隆跟踪评估pysot-toolkit的代码模块,运行如下命令:
```bash
cd pytracking
git clone https://github.com/StrangerZhang/pysot-toolkit.git
mv pysot-toolkit pysot_toolkit
cd pysot_toolkit
pip install -r requirements.txt
cd pysot/utils/
python setup.py build_ext --inplace
```
### 测试数据集准备
按照pysot-toolkit的方式准备数据集VOT2018,放到/Datasets 文件夹下。
### 设置模型评估环境
接下来开始设置评估环境:
```bash
# 生成 local.py 文件,在local.py文件中设置测试数据集、待测试模型、以及测试结果的保存路径
python -c "from pytracking.admin.environment import create_default_local_file; create_default_local_file()"
# 用你常用的编辑器编辑 pytracking/pysot_toolkit/local.py
# 比方说,vim pytracking/pysot_toolkit/local.py
# 其中 settings.dataset_path 和 settings.network_path 分别设置为测试集的路径和模型训练参数的路径
```
### 准备测试数据和模型
按照pysot-toolkit的方式准备数据集VOT2018,放到settings.dataset_path指定文件夹中,或者自行设置settings.dataset_path指向测试数据集。
将自己训练的模型拷贝到 `NETWORK_PATH`,或者建立软链接,如
```bash
ln -s tracking/ltr/Logs/checkpoints/ltr/bbreg/atom_res18_vid_lasot_coco $NETWORK_PATH/bbreg
```
### 开始测试:
测试ATOM模型:
```bash
# 在VOT2018上评测ATOM模型
# -d VOT2018 表示使用VOT2018数据集进行评测
# -tr bbreg.atom_res18_vid_lasot_coco 表示要评测的模型,和训练保持一致
# -te atom.default_vot 表示加载定义超参数的文件pytracking/parameter/atom/default_vot.py
# -e 40 表示使用第40个epoch的模型进行评测,也可以设置为'range(1, 50, 1)' 表示测试从第1个epoch到第50个epoch模型
python eval_benchmark.py -d VOT2018 -tr bbreg.atom_res18_vid_lasot_coco -te atom.default_vot -e 40
```
测试SiamFC
```
# 在VOT2018上测试SiamFC
python eval_benchmark.py -d VOT2018 -tr siamfc.siamfc_alexnet_vid -te siamfc.default -e 'range(1, 50, 1)'
```
## 跟踪结果可视化
在数据集上评测完后,可以通过可视化跟踪器的结果来定位问题。我们提供下面的方法来可视化跟踪结果:
```bash
cd pytracking
# 开启 jupyter notebook,请留意终端是否输出 token
jupyter notebook --ip 0.0.0.0 --port 8888
```
在你的浏览器中输入服务器的 IP 地址加上端口号,若是在本地执行则打开
`http://localhost:8888`。若需要输入 token 请查看执行 `jupyter notebook --ip 0.0.0.0 --port 8888` 命令时的终端输出。
打开网页之后,打开 `visualize_results_on_benchmark.ipynb` 来可视化结果。
## 指标结果
| 数据集 | 模型 | Backbone | 论文结果 | 训练结果 | 模型|
| :-------: | :-------: | :---: | :---: | :---------: |:---------: |
|VOT2018| ATOM | Res18 | EAO: 0.401 | 0.399 | [model]() |
|VOT2018| ATOM | AlexNet | EAO: 0.188 | 0.211 | [model]() |
## 引用与参考
SiamFC **[[Paper]](https://arxiv.org/pdf/1811.07628.pdf) [[Code]](https://www.robots.ox.ac.uk/~luca/siamese-fc.html)**
@inproceedings{bertinetto2016fully,
title={Fully-convolutional siamese networks for object tracking},
author={Bertinetto, Luca and Valmadre, Jack and Henriques, Joao F and Vedaldi, Andrea and Torr, Philip HS},
booktitle={European conference on computer vision},
pages={850--865},
year={2016},
organization={Springer}
}
ATOM **[[Paper]](https://arxiv.org/pdf/1811.07628.pdf) [[Raw results]](https://drive.google.com/drive/folders/1MdJtsgr34iJesAgL7Y_VelP8RvQm_IG_) [[Models]](https://drive.google.com/open?id=1EsNSQr25qfXHYLqjZaVZElbGdUg-nyzd) [[Training Code]](https://github.com/visionml/pytracking/blob/master/ltr/README.md#ATOM) [[Tracker Code]](https://github.com/visionml/pytracking/blob/master/pytracking/README.md#ATOM)**
@inproceedings{danelljan2019atom,
title={Atom: Accurate tracking by overlap maximization},
author={Danelljan, Martin and Bhat, Goutam and Khan, Fahad Shahbaz and Felsberg, Michael},
booktitle={Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition},
pages={4660--4669},
year={2019}
}
DiMP **[[Paper]](https://arxiv.org/pdf/1904.07220v1.pdf) [[Raw results]](https://drive.google.com/drive/folders/15mpUAJmzxemnOC6gmvMTCDJ-0v6hxJ7y) [[Models]](https://drive.google.com/open?id=1YEJySjhFokyQ6zgQg6vFAnzEFi1Onq7G) [[Training Code]](https://github.com/visionml/pytracking/blob/master/ltr/README.md#DiMP) [[Tracker Code]](https://github.com/visionml/pytracking/blob/master/pytracking/README.md#DiMP)**
@inproceedings{bhat2019learning,
title={Learning discriminative model prediction for tracking},
author={Bhat, Goutam and Danelljan, Martin and Gool, Luc Van and Timofte, Radu},
booktitle={Proceedings of the IEEE International Conference on Computer Vision},
pages={6182--6191},
year={2019}
}
ECO **[[Paper]](https://arxiv.org/pdf/1611.09224.pdf) [[Models]](https://drive.google.com/open?id=1aWC4waLv_te-BULoy0k-n_zS-ONms21S) [[Tracker Code]](https://github.com/visionml/pytracking/blob/master/pytracking/README.md#ECO)**
@inproceedings{danelljan2017eco,
title={Eco: Efficient convolution operators for tracking},
author={Danelljan, Martin and Bhat, Goutam and Shahbaz Khan, Fahad and Felsberg, Michael},
booktitle={Proceedings of the IEEE conference on computer vision and pattern recognition},
pages={6638--6646},
year={2017}
}
from .base_actor import BaseActor
from .bbreg import AtomActor
from .siamfc import SiamFCActor
from pytracking.libs import TensorDict
class BaseActor:
""" Base class for actor. The actor class handles the passing of the data through the network
and calculation the loss"""
def __init__(self, net, objective):
"""
args:
net - The network to train
objective - The loss function
"""
self.net = net
self.objective = objective
def train(self):
""" Set whether the network is in train mode.
args:
mode (True) - Bool specifying whether in training mode.
"""
self.net.train()
def eval(self):
""" Set network to eval mode"""
self.net.eval()
from . import BaseActor
import paddle.fluid as fluid
class AtomActor(BaseActor):
""" Actor for training the IoU-Net in ATOM"""
def __call__(self, data):
"""
args:
data - The input data, should contain the fields 'train_images', 'test_images', 'train_anno',
'test_proposals' and 'proposal_iou'.
returns:
loss - the training loss
states - dict containing detailed losses
"""
# Run network to obtain IoU prediction for each proposal in 'test_proposals'
iou_pred = self.net(data['train_images'], data['test_images'],
data['train_anno'], data['test_proposals'])
iou_pred = fluid.layers.reshape(iou_pred, [-1, iou_pred.shape[2]])
iou_gt = fluid.layers.reshape(data['proposal_iou'],
[-1, data['proposal_iou'].shape[2]])
# Compute loss
loss = self.objective(iou_pred, iou_gt)
loss = fluid.layers.mean(loss)
# Use scale loss if exists
scale_loss = getattr(self.net, "scale_loss", None)
if callable(scale_loss):
loss = scale_loss(loss)
# Return training stats
stats = {'Loss/total': loss.numpy(), 'Loss/iou': loss.numpy()}
return loss, stats
import numpy as np
import paddle.fluid as fluid
from . import BaseActor
class SiamFCActor(BaseActor):
""" Actor for training the IoU-Net in ATOM"""
def __init__(self, net, objective, batch_size, shape, radius, stride):
super().__init__(net, objective)
self.label_mask, self.label_weights = self._creat_gt_mask(
batch_size, shape, radius, stride)
def _creat_gt_mask(self, batch_size, shape, radius, stride):
h, w = shape
y = np.arange(h, dtype=np.float32) - (h - 1) / 2.
x = np.arange(w, dtype=np.float32) - (w - 1) / 2.
y, x = np.meshgrid(y, x)
dist = np.sqrt(x**2 + y**2)
mask = np.zeros((h, w))
mask[dist <= radius / stride] = 1
mask = mask[np.newaxis, :, :]
weights = np.ones_like(mask)
weights[mask == 1] = 0.5 / np.sum(mask == 1)
weights[mask == 0] = 0.5 / np.sum(mask == 0)
mask = np.repeat(mask, batch_size, axis=0)[:, np.newaxis, :, :]
weights = np.repeat(weights, batch_size, axis=0)[:, np.newaxis, :, :]
weights = fluid.dygraph.to_variable(weights.astype(np.float32))
mask = fluid.dygraph.to_variable(mask.astype(np.float32))
return mask, weights
def __call__(self, data):
# Run network to obtain IoU prediction for each proposal in 'test_proposals'
target_estimations = self.net(data['train_images'], data['test_images'])
# weighted loss
loss_mat = fluid.layers.sigmoid_cross_entropy_with_logits(
target_estimations, self.label_mask, normalize=False)
loss = fluid.layers.elementwise_mul(loss_mat, self.label_weights)
loss = fluid.layers.reduce_sum(loss) / loss.shape[0]
# Return training stats
stats = {'Loss/total': loss.numpy(), 'Loss/center': loss.numpy()}
return loss, stats
import importlib
import os
from collections import OrderedDict
def create_default_local_file():
path = os.path.join(os.path.dirname(__file__), 'local.py')
empty_str = '\'\''
default_settings = OrderedDict({
'workspace_dir': empty_str,
'tensorboard_dir': 'self.workspace_dir + \'/tensorboard/\'',
'backbone_dir': empty_str,
'lasot_dir': empty_str,
'got10k_dir': empty_str,
'trackingnet_dir': empty_str,
'coco_dir': empty_str,
'imagenet_dir': empty_str,
'imagenetdet_dir': empty_str
})
comment = {
'workspace_dir': 'Base directory for saving network checkpoints.',
'tensorboard_dir': 'Directory for tensorboard files.'
}
with open(path, 'w') as f:
f.write('class EnvironmentSettings:\n')
f.write(' def __init__(self):\n')
for attr, attr_val in default_settings.items():
comment_str = None
if attr in comment:
comment_str = comment[attr]
if comment_str is None:
f.write(' self.{} = {}\n'.format(attr, attr_val))
else:
f.write(' self.{} = {} # {}\n'.format(attr, attr_val,
comment_str))
def env_settings():
env_module_name = 'ltr.admin.local'
try:
env_module = importlib.import_module(env_module_name)
return env_module.EnvironmentSettings()
except:
env_file = os.path.join(os.path.dirname(__file__), 'local.py')
create_default_local_file()
raise RuntimeError(
'YOU HAVE NOT SETUP YOUR local.py!!!\n Go to "{}" and set all the paths you need. Then try to run again.'.
format(env_file))
from functools import wraps
import importlib
def model_constructor(f):
""" Wraps the function 'f' which returns the network. An extra field 'constructor' is added to the network returned
by 'f'. This field contains an instance of the 'NetConstructor' class, which contains the information needed to
re-construct the network, such as the name of the function 'f', the function arguments etc. Thus, the network can
be easily constructed from a saved checkpoint by calling NetConstructor.get() function.
"""
@wraps(f)
def f_wrapper(*args, **kwds):
net_constr = NetConstructor(f.__name__, f.__module__, args, kwds)
output = f(*args, **kwds)
if isinstance(output, (tuple, list)):
# Assume first argument is the network
output[0].constructor = net_constr
else:
output.constructor = net_constr
return output
return f_wrapper
class NetConstructor:
""" Class to construct networks. Takes as input the function name (e.g. atom_resnet18), the name of the module
which contains the network function (e.g. ltr.models.bbreg.atom) and the arguments for the network
function. The class object can then be stored along with the network weights to re-construct the network."""
def __init__(self, fun_name, fun_module, args, kwds):
"""
args:
fun_name - The function which returns the network
fun_module - the module which contains the network function
args - arguments which are passed to the network function
kwds - arguments which are passed to the network function
"""
self.fun_name = fun_name
self.fun_module = fun_module
self.args = args
self.kwds = kwds
def get(self):
""" Rebuild the network by calling the network function with the correct arguments. """
net_module = importlib.import_module(self.fun_module)
net_fun = getattr(net_module, self.fun_name)
return net_fun(*self.args, **self.kwds)
from ltr.admin.environment import env_settings
class Settings:
""" Training settings, e.g. the paths to datasets and networks."""
def __init__(self):
self.set_default()
def set_default(self):
self.env = env_settings()
self.use_gpu = True
class StatValue:
def __init__(self):
self.clear()
def reset(self):
self.val = 0
def clear(self):
self.reset()
self.history = []
def update(self, val):
self.val = val
self.history.append(self.val)
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.clear()
self.has_new_data = False
def reset(self):
self.avg = 0
self.val = 0
self.sum = 0
self.count = 0
def clear(self):
self.reset()
self.history = []
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def new_epoch(self):
if self.count > 0:
self.history.append(self.avg)
self.reset()
self.has_new_data = True
else:
self.has_new_data = False
def topk_accuracy(output, target, topk=(1, )):
"""Computes the precision@k for the specified values of k"""
single_input = not isinstance(topk, (tuple, list))
if single_input:
topk = (topk, )
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)[0]
res.append(correct_k * 100.0 / batch_size)
if single_input:
return res[0]
return res
import os
from collections import OrderedDict
from tensorboardX import SummaryWriter
class TensorboardWriter:
def __init__(self, directory, loader_names):
self.directory = directory
self.writer = OrderedDict({
name: SummaryWriter(os.path.join(self.directory, name))
for name in loader_names
})
def write_info(self, module_name, script_name, description):
tb_info_writer = SummaryWriter(os.path.join(self.directory, 'info'))
tb_info_writer.add_text('Modulet_name', module_name)
tb_info_writer.add_text('Script_name', script_name)
tb_info_writer.add_text('Description', description)
tb_info_writer.close()
def write_epoch(self, stats: OrderedDict, epoch: int, ind=-1):
for loader_name, loader_stats in stats.items():
if loader_stats is None:
continue
for var_name, val in loader_stats.items():
if hasattr(val, 'history') and getattr(val, 'has_new_data',
True):
self.writer[loader_name].add_scalar(var_name,
val.history[ind], epoch)
from .loader import LTRLoader
import jpeg4py
import cv2 as cv
import lmdb
import numpy as np
def default_image_loader(path):
"""The default image loader, reads the image from the given path. It first tries to use the jpeg4py_loader,
but reverts to the opencv_loader if the former is not available."""
if default_image_loader.use_jpeg4py is None:
# Try using jpeg4py
im = jpeg4py_loader(path)
if im is None:
default_image_loader.use_jpeg4py = False
print('Using opencv_loader instead.')
else:
default_image_loader.use_jpeg4py = True
return im
if default_image_loader.use_jpeg4py:
return jpeg4py_loader(path)
return opencv_loader(path)
default_image_loader.use_jpeg4py = None
def jpeg4py_loader(path):
""" Image reading using jpeg4py (https://github.com/ajkxyz/jpeg4py)"""
try:
return jpeg4py.JPEG(path).decode()
except Exception as e:
print('ERROR: Could not read image "{}"'.format(path))
print(e)
return None
def opencv_loader(path):
""" Read image using opencv's imread function and returns it in rgb format"""
try:
im = cv.imread(path, cv.IMREAD_COLOR)
# convert to rgb and return
return cv.cvtColor(im, cv.COLOR_BGR2RGB)
except Exception as e:
print('ERROR: Could not read image "{}"'.format(path))
print(e)
return None
def lmdb_loader(path, lmdb_path=None):
try:
if lmdb_loader.txn is None:
db = lmdb.open(lmdb_path, readonly=True, map_size=int(300e9))
lmdb_loader.txn = db.begin(write=False)
img_buffer = lmdb_loader.txn.get(path.encode())
img_buffer = np.frombuffer(img_buffer, np.uint8)
return cv.imdecode(img_buffer, cv.IMREAD_COLOR)
except Exception as e:
print('ERROR: Could not read image "{}"'.format(path))
print(e)
return None
lmdb_loader.txn = None
import os
import signal
import sys
import dataflow as df
import numpy as np
# handle terminate reader process, do not print stack frame
def _reader_quit(signum, frame):
print("Reader process exit.")
sys.exit()
def _term_group(sig_num, frame):
print('pid {} terminated, terminate group '
'{}...'.format(os.getpid(), os.getpgrp()))
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
signal.signal(signal.SIGTERM, _reader_quit)
signal.signal(signal.SIGINT, _term_group)
class LTRLoader(df.DataFlow):
"""
Data loader. Combines a dataset and a sampler, and provides
single- or multi-process iterators over the dataset.
Note: an additional option stack_dim is available to
select along which dimension the data should be stacked to form a batch.
Arguments:
dataset (Dataset): dataset from which to load the data.
batch_size (int, optional): how many samples per batch to load
(default: 1).
shuffle (bool, optional): set to ``True`` to have the data reshuffled
at every epoch (default: False).
sampler (Sampler, optional): defines the strategy to draw samples from
the dataset. If specified, ``shuffle`` must be False.
batch_sampler (Sampler, optional): like sampler, but returns a batch of
indices at a time. Mutually exclusive with batch_size, shuffle,
sampler, and drop_last.
num_workers (int, optional): how many subprocesses to use for data
loading. 0 means that the data will be loaded in the main process.
(default: 0)
collate_fn (callable, optional): merges a list of samples to form a mini-batch.
stack_dim (int): Dimension along which to stack to form the batch. (default: 0)
pin_memory (bool, optional): If ``True``, the data loader will copy tensors
into CUDA pinned memory before returning them.
drop_last (bool, optional): set to ``True`` to drop the last incomplete batch,
if the dataset size is not divisible by the batch size. If ``False`` and
the size of dataset is not divisible by the batch size, then the last batch
will be smaller. (default: False)
timeout (numeric, optional): if positive, the timeout value for collecting a batch
from workers. Should always be non-negative. (default: 0)
worker_init_fn (callable, optional): If not None, this will be called on each
worker subprocess with the worker id (an int in ``[0, num_workers - 1]``) as
input, after seeding and before data loading. (default: None)
.. warning:: If ``spawn`` start method is used, :attr:`worker_init_fn` cannot be an
unpicklable object, e.g., a lambda function.
"""
__initialized = False
def __init__(self,
name,
dataset,
training=True,
batch_size=1,
shuffle=False,
sampler=None,
batch_sampler=None,
num_workers=0,
epoch_interval=1,
collate_fn=None,
stack_dim=0,
pin_memory=False,
drop_last=False,
timeout=0,
worker_init_fn=None):
super().__init__()
ds = df.RepeatedData(dataset, -1)
ds = df.MultiProcessRunnerZMQ(ds, num_proc=num_workers, hwm=300)
# ds = df.MultiThreadRunner(lambda: ds, num_prefetch=1024, num_thread=num_workers)
ds = df.BatchData(ds, batch_size)
self.ds = ds
self.name = name
self.training = training
self.epoch_interval = epoch_interval
self.stack_dim = stack_dim
self.batches_per_epoch = len(dataset) // batch_size
def __len__(self):
return self.batches_per_epoch
def __iter__(self):
if not self.__initialized:
self.reset_state()
self.__initialized = True
for d in self.ds:
if self.stack_dim > 0:
for k, v in d.items():
if len(v.shape) >= self.stack_dim + 1:
d[k] = np.swapaxes(v, 0, self.stack_dim)
yield d
def reset_state(self):
self.ds.reset_state()
import numpy as np
from ltr.data import transforms
import ltr.data.processing_utils as prutils
from pytracking.libs import TensorDict
class BaseProcessing:
""" Base class for Processing. Processing class is used to process the data returned by a dataset, before passing it
through the network. For example, it can be used to crop a search region around the object, apply various data
augmentations, etc."""
def __init__(self,
transform=transforms.ToArray(),
train_transform=None,
test_transform=None,
joint_transform=None):
"""
args:
transform - The set of transformations to be applied on the images. Used only if train_transform or
test_transform is None.
train_transform - The set of transformations to be applied on the train images. If None, the 'transform'
argument is used instead.
test_transform - The set of transformations to be applied on the test images. If None, the 'transform'
argument is used instead.
joint_transform - The set of transformations to be applied 'jointly' on the train and test images. For
example, it can be used to convert both test and train images to grayscale.
"""
self.transform = {
'train': transform if train_transform is None else train_transform,
'test': transform if test_transform is None else test_transform,
'joint': joint_transform
}
def __call__(self, data: TensorDict):
raise NotImplementedError
class SiamFCProcessing(BaseProcessing):
def __init__(self,
search_area_factor,
output_sz,
center_jitter_factor,
scale_jitter_factor,
mode='pair',
scale_type='context',
border_type='meanpad',
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.search_area_factor = search_area_factor
self.output_sz = output_sz
self.center_jitter_factor = center_jitter_factor
self.scale_jitter_factor = scale_jitter_factor
self.mode = mode
self.scale_type = scale_type
self.border_type = border_type
def _get_jittered_box(self, box, mode, rng):
jittered_size = box[2:4] * np.exp(
rng.randn(2) * self.scale_jitter_factor[mode])
max_offset = (np.sqrt(jittered_size.prod()) *
self.center_jitter_factor[mode])
jittered_center = box[0:2] + 0.5 * box[2:4] + max_offset * (rng.rand(2)
- 0.5)
return np.concatenate(
(jittered_center - 0.5 * jittered_size, jittered_size), axis=0)
def __call__(self, data: TensorDict, rng=None):
# Apply joint transforms
if self.transform['joint'] is not None:
num_train_images = len(data['train_images'])
all_images = data['train_images'] + data['test_images']
all_images_trans = self.transform['joint'](*all_images)
data['train_images'] = all_images_trans[:num_train_images]
data['test_images'] = all_images_trans[num_train_images:]
for s in ['train', 'test']:
assert self.mode == 'sequence' or len(data[s + '_images']) == 1, \
"In pair mode, num train/test frames must be 1"
# Add a uniform noise to the center pos
jittered_anno = [
self._get_jittered_box(a, s, rng) for a in data[s + '_anno']
]
# Crop image region centered at jittered_anno box
try:
crops, boxes = prutils.jittered_center_crop(
data[s + '_images'],
jittered_anno,
data[s + '_anno'],
self.search_area_factor[s],
self.output_sz[s],
scale_type=self.scale_type,
border_type=self.border_type)
except Exception as e:
print('{}, anno: {}'.format(data['dataset'], data[s + '_anno']))
raise e
# Apply transforms
data[s + '_images'] = [self.transform[s](x) for x in crops]
data[s + '_anno'] = boxes
# Prepare output
if self.mode == 'sequence':
data = data.apply(prutils.stack_tensors)
else:
data = data.apply(lambda x: x[0] if isinstance(x, list) else x)
return data
class ATOMProcessing(BaseProcessing):
""" The processing class used for training ATOM. The images are processed in the following way.
First, the target bounding box is jittered by adding some noise. Next, a square region (called search region )
centered at the jittered target center, and of area search_area_factor^2 times the area of the jittered box is
cropped from the image. The reason for jittering the target box is to avoid learning the bias that the target is
always at the center of the search region. The search region is then resized to a fixed size given by the
argument output_sz. A set of proposals are then generated for the test images by jittering the ground truth box.
"""
def __init__(self,
search_area_factor,
output_sz,
center_jitter_factor,
scale_jitter_factor,
proposal_params,
mode='pair',
*args,
**kwargs):
"""
args:
search_area_factor - The size of the search region relative to the target size.
output_sz - An integer, denoting the size to which the search region is resized. The search region is always
square.
center_jitter_factor - A dict containing the amount of jittering to be applied to the target center before
extracting the search region. See _get_jittered_box for how the jittering is done.
scale_jitter_factor - A dict containing the amount of jittering to be applied to the target size before
extracting the search region. See _get_jittered_box for how the jittering is done.
proposal_params - Arguments for the proposal generation process. See _generate_proposals for details.
mode - Either 'pair' or 'sequence'. If mode='sequence', then output has an extra dimension for frames
"""
super().__init__(*args, **kwargs)
self.search_area_factor = search_area_factor
self.output_sz = output_sz
self.center_jitter_factor = center_jitter_factor
self.scale_jitter_factor = scale_jitter_factor
self.proposal_params = proposal_params
self.mode = mode
def _get_jittered_box(self, box, mode, rng):
""" Jitter the input box
args:
box - input bounding box
mode - string 'train' or 'test' indicating train or test data
returns:
Variable - jittered box
"""
jittered_size = box[2:4] * np.exp(
rng.randn(2) * self.scale_jitter_factor[mode])
max_offset = (np.sqrt(jittered_size.prod()) *
self.center_jitter_factor[mode])
jittered_center = box[0:2] + 0.5 * box[2:4] + max_offset * (rng.rand(2)
- 0.5)
return np.concatenate(
(jittered_center - 0.5 * jittered_size, jittered_size), axis=0)
def _generate_proposals(self, box, rng):
""" Generates proposals by adding noise to the input box
args:
box - input box
returns:
array - Array of shape (num_proposals, 4) containing proposals
array - Array of shape (num_proposals,) containing IoU overlap of each proposal with the input box. The
IoU is mapped to [-1, 1]
"""
# Generate proposals
num_proposals = self.proposal_params['boxes_per_frame']
proposals = np.zeros((num_proposals, 4))
gt_iou = np.zeros(num_proposals)
for i in range(num_proposals):
proposals[i, :], gt_iou[i] = prutils.perturb_box(
box,
min_iou=self.proposal_params['min_iou'],
sigma_factor=self.proposal_params['sigma_factor'],
rng=rng)
# Map to [-1, 1]
gt_iou = gt_iou * 2 - 1
return proposals, gt_iou
def __call__(self, data: TensorDict, rng=None):
"""
args:
data - The input data, should contain the following fields:
'train_images' -
'test_images' -
'train_anno' -
'test_anno' -
returns:
TensorDict - output data block with following fields:
'train_images' -
'test_images' -
'train_anno' -
'test_anno' -
'test_proposals'-
'proposal_iou' -
"""
# Apply joint transforms
if self.transform['joint'] is not None:
num_train_images = len(data['train_images'])
all_images = data['train_images'] + data['test_images']
all_images_trans = self.transform['joint'](*all_images)
data['train_images'] = all_images_trans[:num_train_images]
data['test_images'] = all_images_trans[num_train_images:]
for s in ['train', 'test']:
assert self.mode == 'sequence' or len(data[s + '_images']) == 1, \
"In pair mode, num train/test frames must be 1"
# Add a uniform noise to the center pos
jittered_anno = [
self._get_jittered_box(a, s, rng) for a in data[s + '_anno']
]
# Crop image region centered at jittered_anno box
try:
crops, boxes = prutils.jittered_center_crop(
data[s + '_images'], jittered_anno, data[s + '_anno'],
self.search_area_factor, self.output_sz)
except Exception as e:
print('{}, anno: {}'.format(data['dataset'], data[s + '_anno']))
raise e
# Apply transforms
data[s + '_images'] = [self.transform[s](x) for x in crops]
data[s + '_anno'] = boxes
# Generate proposals
frame2_proposals, gt_iou = zip(
* [self._generate_proposals(a, rng) for a in data['test_anno']])
data['test_proposals'] = list(frame2_proposals)
data['proposal_iou'] = list(gt_iou)
# Prepare output
if self.mode == 'sequence':
data = data.apply(prutils.stack_tensors)
else:
data = data.apply(lambda x: x[0] if isinstance(x, list) else x)
return data
import math
import numpy as np
import cv2 as cv
def stack_tensors(x):
if isinstance(x, list) and isinstance(x[0], np.ndarray):
return np.stack(x)
return x
def sample_target(im,
target_bb,
search_area_factor,
output_sz=None,
scale_type='original',
border_type='replicate'):
""" Extracts a square crop centered at target_bb box, of area search_area_factor^2 times target_bb area
args:
im - cv image
target_bb - target box [x, y, w, h]
search_area_factor - Ratio of crop size to target size
output_sz - (float) Size to which the extracted crop is resized (always square). If None, no resizing is done.
returns:
cv image - extracted crop
float - the factor by which the crop has been resized to make the crop size equal output_size
"""
x, y, w, h = target_bb.tolist()
# Crop image
if scale_type == 'original':
crop_sz = math.ceil(math.sqrt(w * h) * search_area_factor)
elif scale_type == 'context':
# some context is added into the target_size
# now, the search factor is respect to the "target + context"
# when search_factor = 1, output_size = 127
# when search_factor = 2, output_size = 255
context = (w + h) / 2
base_size = math.sqrt(
(w + context) * (h + context)) # corresponds to 127 in crop
crop_sz = math.ceil(search_area_factor * base_size)
else:
raise NotImplementedError
if crop_sz < 1:
raise Exception('Too small bounding box. w: {}, h: {}'.format(w, h))
x1 = round(x + 0.5 * w - crop_sz * 0.5)
x2 = x1 + crop_sz
y1 = round(y + 0.5 * h - crop_sz * 0.5)
y2 = y1 + crop_sz
x1_pad = max(0, -x1)
x2_pad = max(x2 - im.shape[1] + 1, 0)
y1_pad = max(0, -y1)
y2_pad = max(y2 - im.shape[0] + 1, 0)
# Crop target
im_crop = im[y1 + y1_pad:y2 - y2_pad, x1 + x1_pad:x2 - x2_pad, :]
# Pad
if border_type == 'replicate':
im_crop_padded = cv.copyMakeBorder(im_crop, y1_pad, y2_pad, x1_pad,
x2_pad, cv.BORDER_REPLICATE)
elif border_type == 'zeropad':
im_crop_padded = cv.copyMakeBorder(im_crop, y1_pad, y2_pad, x1_pad,
x2_pad, cv.BORDER_CONSTANT)
elif border_type == 'meanpad':
avg_chans = np.array(
[np.mean(im[:, :, 0]), np.mean(im[:, :, 1]), np.mean(im[:, :, 2])])
im_crop_padded = cv.copyMakeBorder(
im_crop,
y1_pad,
y2_pad,
x1_pad,
x2_pad,
cv.BORDER_CONSTANT,
value=avg_chans)
else:
raise NotImplementedError
if output_sz is not None:
resize_factor = output_sz / crop_sz
return cv.resize(im_crop_padded, (output_sz, output_sz)), resize_factor
else:
return im_crop_padded, 1.0
def transform_image_to_crop(box_in: np.ndarray,
box_extract: np.ndarray,
resize_factor: float,
crop_sz: np.ndarray) -> np.ndarray:
""" Transform the box co-ordinates from the original image co-ordinates to the co-ordinates of the cropped image
args:
box_in - the box for which the co-ordinates are to be transformed
box_extract - the box about which the image crop has been extracted.
resize_factor - the ratio between the original image scale and the scale of the image crop
crop_sz - size of the cropped image
returns:
array - transformed co-ordinates of box_in
"""
box_extract_center = box_extract[0:2] + 0.5 * box_extract[2:4]
box_in_center = box_in[0:2] + 0.5 * box_in[2:4]
box_out_center = (crop_sz - 1) / 2 + (box_in_center - box_extract_center
) * resize_factor
box_out_wh = box_in[2:4] * resize_factor
box_out = np.concatenate((box_out_center - 0.5 * box_out_wh, box_out_wh))
return box_out
def centered_crop(frames, anno, area_factor, output_sz):
crops_resize_factors = [
sample_target(f, a, area_factor, output_sz)
for f, a in zip(frames, anno)
]
frames_crop, resize_factors = zip(*crops_resize_factors)
crop_sz = np.array([output_sz, output_sz], 'int')
# find the bb location in the crop
anno_crop = [
transform_image_to_crop(a, a, rf, crop_sz)
for a, rf in zip(anno, resize_factors)
]
return frames_crop, anno_crop
def jittered_center_crop(frames,
box_extract,
box_gt,
search_area_factor,
output_sz,
scale_type='original',
border_type='replicate'):
""" For each frame in frames, extracts a square crop centered at box_extract, of area search_area_factor^2
times box_extract area. The extracted crops are then resized to output_sz. Further, the co-ordinates of the box
box_gt are transformed to the image crop co-ordinates
args:
frames - list of frames
box_extract - list of boxes of same length as frames. The crops are extracted using anno_extract
box_gt - list of boxes of same length as frames. The co-ordinates of these boxes are transformed from
image co-ordinates to the crop co-ordinates
search_area_factor - The area of the extracted crop is search_area_factor^2 times box_extract area
output_sz - The size to which the extracted crops are resized
returns:
list - list of image crops
list - box_gt location in the crop co-ordinates
"""
crops_resize_factors = [
sample_target(
f,
a,
search_area_factor,
output_sz,
scale_type=scale_type,
border_type=border_type) for f, a in zip(frames, box_extract)
]
frames_crop, resize_factors = zip(*crops_resize_factors)
crop_sz = np.array([output_sz, output_sz], 'int')
# find the bb location in the crop
box_crop = [
transform_image_to_crop(a_gt, a_ex, rf, crop_sz)
for a_gt, a_ex, rf in zip(box_gt, box_extract, resize_factors)
]
return frames_crop, box_crop
def iou(reference, proposals):
"""Compute the IoU between a reference box with multiple proposal boxes.
args:
reference - Tensor of shape (1, 4).
proposals - Tensor of shape (num_proposals, 4)
returns:
array - shape (num_proposals,) containing IoU of reference box with each proposal box.
"""
# Intersection box
tl = np.maximum(reference[:, :2], proposals[:, :2])
br = np.minimum(reference[:, :2] + reference[:, 2:],
proposals[:, :2] + proposals[:, 2:])
sz = np.clip(br - tl, 0, np.inf)
# Area
intersection = np.prod(sz, axis=1)
union = np.prod(
reference[:, 2:], axis=1) + np.prod(
proposals[:, 2:], axis=1) - intersection
return intersection / union
def rand_uniform(a, b, rng=None, shape=1):
""" sample numbers uniformly between a and b.
args:
a - lower bound
b - upper bound
shape - shape of the output tensor
returns:
array
"""
rand = np.random.rand if rng is None else rng.rand
return (b - a) * rand(shape) + a
def perturb_box(box, min_iou=0.5, sigma_factor=0.1, rng=None):
""" Perturb the input box by adding gaussian noise to the co-ordinates
args:
box - input box
min_iou - minimum IoU overlap between input box and the perturbed box
sigma_factor - amount of perturbation, relative to the box size. Can be either a single element, or a list of
sigma_factors, in which case one of them will be uniformly sampled. Further, each of the
sigma_factor element can be either a float, or a tensor
of shape (4,) specifying the sigma_factor per co-ordinate
returns:
array - the perturbed box
"""
if rng is None:
rng = np.random
if isinstance(sigma_factor, list):
# If list, sample one sigma_factor as current sigma factor
c_sigma_factor = rng.choice(sigma_factor)
else:
c_sigma_factor = sigma_factor
if not isinstance(c_sigma_factor, np.ndarray):
c_sigma_factor = c_sigma_factor * np.ones(4)
perturb_factor = np.sqrt(box[2] * box[3]) * c_sigma_factor
# multiple tries to ensure that the perturbed box has iou > min_iou with the input box
for i_ in range(100):
c_x = box[0] + 0.5 * box[2]
c_y = box[1] + 0.5 * box[3]
c_x_per = rng.normal(c_x, perturb_factor[0])
c_y_per = rng.normal(c_y, perturb_factor[1])
w_per = rng.normal(box[2], perturb_factor[2])
h_per = rng.normal(box[3], perturb_factor[3])
if w_per <= 1:
w_per = box[2] * rand_uniform(0.15, 0.5, rng)[0]
if h_per <= 1:
h_per = box[3] * rand_uniform(0.15, 0.5, rng)[0]
box_per = np.round(
np.array(
[c_x_per - 0.5 * w_per, c_y_per - 0.5 * h_per, w_per, h_per]))
if box_per[2] <= 1:
box_per[2] = box[2] * rand_uniform(0.15, 0.5, rng)
if box_per[3] <= 1:
box_per[3] = box[3] * rand_uniform(0.15, 0.5, rng)
box_iou = iou(np.reshape(box, (1, 4)), np.reshape(box_per, (1, 4)))
# if there is sufficient overlap, return
if box_iou > min_iou:
return box_per, box_iou
# else reduce the perturb factor
perturb_factor *= 0.9
return box_per, box_iou
import numpy as np
import dataflow as df
from pytracking.libs import TensorDict
def no_processing(data, rng=None):
return data
class ATOMSampler(df.RNGDataFlow):
""" Class responsible for sampling frames from training sequences to form batches. Each training sample is a
tuple consisting of i) a train frame, used to obtain the modulation vector, and ii) a set of test frames on which
the IoU prediction loss is calculated.
The sampling is done in the following ways. First a dataset is selected at random. Next, a sequence is selected
from that dataset. A 'train frame' is then sampled randomly from the sequence. Next, depending on the
frame_sample_mode, the required number of test frames are sampled randomly, either from the range
[train_frame_id - max_gap, train_frame_id + max_gap] in the 'default' mode, or from [train_frame_id, train_frame_id + max_gap]
in the 'causal' mode. Only the frames in which the target is visible are sampled, and if enough visible frames are
not found, the 'max_gap' is incremented.
The sampled frames are then passed through the input 'processing' function for the necessary processing-
"""
def __init__(self,
datasets,
p_datasets,
samples_per_epoch,
max_gap,
num_test_frames=1,
processing=no_processing,
frame_sample_mode='default'):
"""
args:
datasets - List of datasets to be used for training
p_datasets - List containing the probabilities by which each dataset will be sampled
samples_per_epoch - Number of training samples per epoch
max_gap - Maximum gap, in frame numbers, between the train (reference) frame and the test frames.
num_test_frames - Number of test frames used for calculating the IoU prediction loss.
processing - An instance of Processing class which performs the necessary processing of the data.
frame_sample_mode - Either 'default' or 'causal'. If 'causal', then the test frames are sampled in a causal
manner.
"""
self.datasets = datasets
# If p not provided, sample uniformly from all videos
if p_datasets is None:
p_datasets = [1 for d in self.datasets]
# Normalize
p_total = sum(p_datasets)
self.p_datasets = [x / p_total for x in p_datasets]
self.samples_per_epoch = samples_per_epoch
self.max_gap = max_gap
self.num_test_frames = num_test_frames
self.num_train_frames = 1 # Only a single train frame allowed
self.processing = processing
self.frame_sample_mode = frame_sample_mode
def __len__(self):
return self.samples_per_epoch
def _sample_visible_ids(self, visible, num_ids=1, min_id=None, max_id=None):
""" Samples num_ids frames between min_id and max_id for which target is visible
args:
visible - 1d Tensor indicating whether target is visible for each frame
num_ids - number of frames to be samples
min_id - Minimum allowed frame number
max_id - Maximum allowed frame number
returns:
list - List of sampled frame numbers. None if not sufficient visible frames could be found.
"""
if min_id is None or min_id < 0:
min_id = 0
if max_id is None or max_id > len(visible):
max_id = len(visible)
valid_ids = [i for i in range(min_id, max_id) if visible[i]]
# No visible ids
if len(valid_ids) == 0:
return None
inds = self.rng.choice(
range(len(valid_ids)), size=num_ids, replace=True)
ids = [valid_ids[ii] for ii in inds]
# return random.choices(valid_ids, k=num_ids)
return ids
def __iter__(self):
"""
args:
index (int): Index (Ignored since we sample randomly)
returns:
TensorDict - dict containing all the data blocks
"""
# Select a dataset
# dataset = self.rng.choices(self.datasets, self.p_datasets)[0]
dataset_idx = self.rng.choice(
range(len(self.datasets)), p=self.p_datasets, replace=False)
dataset = self.datasets[dataset_idx]
is_video_dataset = dataset.is_video_sequence()
min_visible_frames = 2 * (self.num_test_frames + self.num_train_frames)
enough_visible_frames = False
# Sample a sequence with enough visible frames and get anno for the same
while not enough_visible_frames:
seq_id = self.rng.randint(0, dataset.get_num_sequences() - 1)
anno, visible = dataset.get_sequence_info(seq_id)
num_visible = np.sum(visible.astype('int64'))
enough_visible_frames = not is_video_dataset or (
num_visible > min_visible_frames and len(visible) >= 20)
if is_video_dataset:
train_frame_ids = None
test_frame_ids = None
gap_increase = 0
if self.frame_sample_mode == 'default':
# Sample frame numbers
while test_frame_ids is None:
train_frame_ids = self._sample_visible_ids(
visible, num_ids=self.num_train_frames)
test_frame_ids = self._sample_visible_ids(
visible,
min_id=train_frame_ids[0] - self.max_gap - gap_increase,
max_id=train_frame_ids[0] + self.max_gap + gap_increase,
num_ids=self.num_test_frames)
gap_increase += 5 # Increase gap until a frame is found
elif self.frame_sample_mode == 'causal':
# Sample frame numbers in a causal manner, i.e. test_frame_ids > train_frame_ids
while test_frame_ids is None:
base_frame_id = self._sample_visible_ids(
visible,
num_ids=1,
min_id=self.num_train_frames - 1,
max_id=len(visible) - self.num_test_frames)
prev_frame_ids = self._sample_visible_ids(
visible,
num_ids=self.num_train_frames - 1,
min_id=base_frame_id[0] - self.max_gap - gap_increase,
max_id=base_frame_id[0])
if prev_frame_ids is None:
gap_increase += 5
continue
train_frame_ids = base_frame_id + prev_frame_ids
test_frame_ids = self._sample_visible_ids(
visible,
min_id=train_frame_ids[0] + 1,
max_id=train_frame_ids[0] + self.max_gap + gap_increase,
num_ids=self.num_test_frames)
gap_increase += 5 # Increase gap until a frame is found
else:
raise ValueError('Unknown frame_sample_mode.')
else:
train_frame_ids = [1] * self.num_train_frames
test_frame_ids = [1] * self.num_test_frames
# Get frames
train_frames, train_anno, _ = dataset.get_frames(seq_id,
train_frame_ids, anno)
test_frames, test_anno, _ = dataset.get_frames(seq_id, test_frame_ids,
anno)
# Prepare data
data = TensorDict({
'train_images': train_frames,
'train_anno': train_anno,
'test_images': test_frames,
'test_anno': test_anno,
'dataset': dataset.get_name()
})
# Send for processing
yield self.processing(data, rng=self.rng)
import random
import numpy as np
import math
import cv2 as cv
from paddle.fluid import layers
from pytracking.libs.paddle_utils import PTensor
class Transform:
""" Class for applying various image transformations."""
def __call__(self, *args):
rand_params = self.roll()
if rand_params is None:
rand_params = ()
elif not isinstance(rand_params, tuple):
rand_params = (rand_params, )
output = [self.transform(img, *rand_params) for img in args]
if len(output) == 1:
return output[0]
return output
def roll(self):
return None
def transform(self, img, *args):
"""Must be deterministic"""
raise NotImplementedError
class Compose:
"""Composes several transforms together.
Args:
transforms (list of ``Transform`` objects): list of transforms to compose.
"""
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, *args):
for t in self.transforms:
if not isinstance(args, tuple):
args = (args, )
args = t(*args)
return args
def __repr__(self):
format_string = self.__class__.__name__ + '('
for t in self.transforms:
format_string += '\n'
format_string += ' {0}'.format(t)
format_string += '\n)'
return format_string
class Normalize(object):
"""Normalize an tensor image with mean and standard deviation.
Given mean: ``(M1,...,Mn)`` and std: ``(S1,..,Sn)`` for ``n`` channels, this transform
will normalize each channel of the input i.e.
``input[channel] = (input[channel] - mean[channel]) / std[channel]``
Args:
mean (sequence): Sequence of means for each channel.
std (sequence): Sequence of standard deviations for each channel.
"""
def __init__(self, mean, std):
self.mean = np.reshape(mean, [-1, 1, 1])
self.std = np.reshape(std, [-1, 1, 1])
def __call__(self, tensor):
"""
Args:
tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
Returns:
Tensor: Normalized Tensor image.
"""
return (tensor - self.mean) / self.std
class ToArray(Transform):
""" Transpose image and jitter brightness"""
def __init__(self, brightness_jitter=0.0):
self.brightness_jitter = brightness_jitter
def __call__(self, img):
img = img.transpose((2, 0, 1))
return img.astype('float32') / 255.
class ToArrayAndJitter(Transform):
""" Transpose image and jitter brightness"""
def __init__(self, brightness_jitter=0.0):
self.brightness_jitter = brightness_jitter
def roll(self):
return np.random.uniform(
max(0, 1 - self.brightness_jitter), 1 + self.brightness_jitter)
def transform(self, img, brightness_factor):
# handle numpy array
img = img.transpose((2, 0, 1))
# backward compatibility
return np.clip(
img.astype('float32') * brightness_factor / 255.0, 0.0, 1.0)
class ToGrayscale(Transform):
"""Converts image to grayscale with probability"""
def __init__(self, probability=0.5):
self.probability = probability
self.color_weights = np.array(
[0.2989, 0.5870, 0.1140], dtype=np.float32)
def roll(self):
return random.random() < self.probability
def transform(self, img, do_grayscale):
if do_grayscale:
if isinstance(img, PTensor):
raise NotImplementedError('Implement paddle variant.')
img_gray = cv.cvtColor(img, cv.COLOR_RGB2GRAY)
return np.stack([img_gray, img_gray, img_gray], axis=2)
# return np.repeat(np.sum(img * self.color_weights, axis=2, keepdims=True).astype(np.uint8), 3, axis=2)
return img
class RandomHorizontalFlip(Transform):
"""Horizontally flip the given NumPy Image randomly with a probability p."""
def __init__(self, probability=0.5):
self.probability = probability
def roll(self):
return random.random() < self.probability
def transform(self, img, do_flip):
if do_flip:
if isinstance(img, PTensor):
return layers.reverse(img, 2)
return np.fliplr(img).copy()
return img
from .lasot import Lasot
from .got10k import Got10k
from .tracking_net import TrackingNet
from .imagenetvid import ImagenetVID
from .coco_seq import MSCOCOSeq
from .vot import VOT
from .youtube_vos import VOS
from .youtube_bb import YoutubeBB
from ltr.data.image_loader import default_image_loader
class BaseDataset(object):
""" Base class for datasets """
def __init__(self, root, image_loader=default_image_loader):
"""
args:
root - The root path to the dataset
image_loader (jpeg4py_loader) - The function to read the images. jpeg4py (https://github.com/ajkxyz/jpeg4py)
is used by default.
"""
if root == '':
raise Exception(
'The dataset path is not setup. Check your "ltr/admin/local.py".'
)
self.root = root
self.image_loader = image_loader
self.sequence_list = [] # Contains the list of sequences.
def __len__(self):
""" Returns size of the dataset
returns:
int - number of samples in the dataset
"""
return self.get_num_sequences()
def __getitem__(self, index):
""" Not to be used! Check get_frames() instead.
"""
return None
def is_video_sequence(self):
""" Returns whether the dataset is a video dataset or an image dataset
returns:
bool - True if a video dataset
"""
return True
def get_name(self):
""" Name of the dataset
returns:
string - Name of the dataset
"""
raise NotImplementedError
def get_num_sequences(self):
""" Number of sequences in a dataset
returns:
int - number of sequences in the dataset."""
return len(self.sequence_list)
def get_sequence_info(self, seq_id):
""" Returns information about a particular sequences,
args:
seq_id - index of the sequence
returns:
Tensor - Annotation for the sequence. A 2d tensor of shape (num_frames, 4).
Format [top_left_x, top_left_y, width, height]
Tensor - 1d Tensor specifying whether target is present (=1 )for each frame. shape (num_frames,)
"""
raise NotImplementedError
def get_frames(self, seq_id, frame_ids, anno=None):
""" Get a set of frames from a particular sequence
args:
seq_id - index of sequence
frame_ids - a list of frame numbers
anno(None) - The annotation for the sequence (see get_sequence_info). If None, they will be loaded.
returns:
list - List of frames corresponding to frame_ids
list - List of annotations (tensor of shape (4,)) for each frame
dict - A dict containing meta information about the sequence, e.g. class of the target object.
"""
raise NotImplementedError
import os
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
from pycocotools.coco import COCO
from collections import OrderedDict
from ltr.admin.environment import env_settings
import numpy as np
class MSCOCOSeq(BaseDataset):
""" The COCO dataset. COCO is an image dataset. Thus, we treat each image as a sequence of length 1.
Publication:
Microsoft COCO: Common Objects in Context.
Tsung-Yi Lin, Michael Maire, Serge J. Belongie, Lubomir D. Bourdev, Ross B. Girshick, James Hays, Pietro Perona,
Deva Ramanan, Piotr Dollar and C. Lawrence Zitnick
ECCV, 2014
https://arxiv.org/pdf/1405.0312.pdf
Download the images along with annotations from http://cocodataset.org/#download. The root folder should be
organized as follows.
- coco_root
- annotations
- instances_train2014.json
- images
- train2014
Note: You also have to install the coco pythonAPI from https://github.com/cocodataset/cocoapi.
"""
def __init__(self,
root=None,
filter=None,
image_loader=default_image_loader):
root = env_settings().coco_dir if root is None else root
super().__init__(root, image_loader)
self.filter = filter
# self.img_pth = os.path.join(root, 'train2014/')
self.img_pth = os.path.join(root, 'train2017/')
# self.anno_path = os.path.join(root, 'annotations/instances_train2014.json')
self.anno_path = os.path.join(root,
'annotations/instances_train2017.json')
# Load the COCO set.
self.coco_set = COCO(self.anno_path)
self.cats = self.coco_set.cats
self.sequence_list = self._get_sequence_list()
def _get_sequence_list(self):
ann_list = list(self.coco_set.anns.keys())
seq_list = []
print('COCO before: {}'.format(len(ann_list)))
for a in ann_list:
if self.coco_set.anns[a]['iscrowd'] == 0:
box = self.coco_set.anns[a]['bbox']
box = np.reshape(np.array(box), (1, 4))
target_visible = (box[:, 2] > 0) & (box[:, 3] > 0)
if self.filter:
target_large = (box[:, 2] * box[:, 3] > 30 * 30)
ratio = box[:, 2] / box[:, 3]
target_reasonable_ratio = (10 > ratio) & (ratio > 0.1)
target_visible = target_visible & target_large & target_reasonable_ratio
if target_visible:
seq_list.append(a)
print('COCO after: {}'.format(len(seq_list)))
return seq_list
def is_video_sequence(self):
return False
def get_name(self):
return 'coco'
def get_num_sequences(self):
return len(self.sequence_list)
def get_sequence_info(self, seq_id):
anno = self._get_anno(seq_id)
target_visible = (anno[:, 2] > 0) & (anno[:, 3] > 0)
return anno, target_visible
def _get_anno(self, seq_id):
anno = self.coco_set.anns[self.sequence_list[seq_id]]['bbox']
return np.reshape(np.array(anno), (1, 4))
def _get_frames(self, seq_id):
path = self.coco_set.loadImgs(
[self.coco_set.anns[self.sequence_list[seq_id]]['image_id']])[0][
'file_name']
img = self.image_loader(os.path.join(self.img_pth, path))
return img
def get_meta_info(self, seq_id):
try:
cat_dict_current = self.cats[self.coco_set.anns[self.sequence_list[
seq_id]]['category_id']]
object_meta = OrderedDict({
'object_class': cat_dict_current['name'],
'motion_class': None,
'major_class': cat_dict_current['supercategory'],
'root_class': None,
'motion_adverb': None
})
except:
object_meta = OrderedDict({
'object_class': None,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return object_meta
def get_frames(self, seq_id=None, frame_ids=None, anno=None):
# COCO is an image dataset. Thus we replicate the image denoted by seq_id len(frame_ids) times, and return a
# list containing these replicated images.
frame = self._get_frames(seq_id)
frame_list = [frame.copy() for _ in frame_ids]
if anno is None:
anno = self._get_anno(seq_id)
anno_frames = [anno.copy()[0, :] for _ in frame_ids]
object_meta = self.get_meta_info(seq_id)
return frame_list, anno_frames, object_meta
import os
import os.path
import numpy as np
import csv
import pandas
from collections import OrderedDict
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
from ltr.admin.environment import env_settings
class Got10k(BaseDataset):
""" GOT-10k dataset.
Publication:
GOT-10k: A Large High-Diversity Benchmark for Generic Object Tracking in the Wild
Lianghua Huang, Xin Zhao, and Kaiqi Huang
arXiv:1810.11981, 2018
https://arxiv.org/pdf/1810.11981.pdf
Download dataset from http://got-10k.aitestunion.com/downloads
"""
def __init__(self,
root=None,
filter=None,
image_loader=default_image_loader,
split=None,
seq_ids=None):
"""
args:
root - path to the got-10k training data. Note: This should point to the 'train' folder inside GOT-10k
image_loader (jpeg4py_loader) - The function to read the images. jpeg4py (https://github.com/ajkxyz/jpeg4py)
is used by default.
split - 'train' or 'val'. Note: The validation split here is a subset of the official got-10k train split,
not NOT the official got-10k validation split. To use the official validation split, provide that as
the root folder instead.
seq_ids - List containing the ids of the videos to be used for training. Note: Only one of 'split' or 'seq_ids'
options can be used at the same time.
"""
root = env_settings().got10k_dir if root is None else root
super().__init__(root, image_loader)
# all folders inside the root
self.sequence_list = self._get_sequence_list()
if split == 'vot-train':
ltr_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '..')
with open(
os.path.join(ltr_path, 'data_specs',
'got10k_prohibited_for_VOT.txt')) as f:
prohibited = [l.strip() for l in f.readlines()]
print('GOT10K before: {}'.format(len(self.sequence_list)))
self.sequence_list = [
x for x in self.sequence_list if x not in prohibited
]
print('GOT10K after: {}'.format(len(self.sequence_list)))
else:
# seq_id is the index of the folder inside the got10k root path
if split is not None:
if seq_ids is not None:
raise ValueError('Cannot set both split_name and seq_ids.')
ltr_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '..')
if split == 'train':
file_path = os.path.join(ltr_path, 'data_specs',
'got10k_train_split.txt')
elif split == 'val':
file_path = os.path.join(ltr_path, 'data_specs',
'got10k_val_split.txt')
else:
raise ValueError('Unknown split name.')
seq_ids = pandas.read_csv(
file_path, header=None, squeeze=True,
dtype=np.int64).values.tolist()
elif seq_ids is None:
seq_ids = list(range(0, len(self.sequence_list)))
# self.seq_ids = seq_ids
self.sequence_list = [self.sequence_list[i] for i in seq_ids]
self.sequence_meta_info = self._load_meta_info()
self.filter = filter
def get_name(self):
return 'got10k'
def _load_meta_info(self):
sequence_meta_info = {
s: self._read_meta(os.path.join(self.root, s))
for s in self.sequence_list
}
return sequence_meta_info
def _read_meta(self, seq_path):
try:
with open(os.path.join(seq_path, 'meta_info.ini')) as f:
meta_info = f.readlines()
object_meta = OrderedDict({
'object_class': meta_info[5].split(': ')[-1][:-1],
'motion_class': meta_info[6].split(': ')[-1][:-1],
'major_class': meta_info[7].split(': ')[-1][:-1],
'root_class': meta_info[8].split(': ')[-1][:-1],
'motion_adverb': meta_info[9].split(': ')[-1][:-1]
})
except:
object_meta = OrderedDict({
'object_class': None,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return object_meta
def _get_sequence_list(self):
with open(os.path.join(self.root, 'list.txt')) as f:
# dir_names = f.readlines()
dir_list = list(csv.reader(f))
dir_list = [dir_name[0] for dir_name in dir_list]
return dir_list
def _read_anno(self, seq_path):
anno_file = os.path.join(seq_path, "groundtruth.txt")
gt = pandas.read_csv(
anno_file,
delimiter=',',
header=None,
dtype=np.float32,
na_filter=False,
low_memory=False).values
return np.array(gt)
def _read_target_visible(self, seq_path, anno):
# Read full occlusion and out_of_view
occlusion_file = os.path.join(seq_path, "absence.label")
cover_file = os.path.join(seq_path, "cover.label")
with open(occlusion_file, 'r', newline='') as f:
occlusion = np.array([int(v[0]) for v in csv.reader(f)], 'byte')
with open(cover_file, 'r', newline='') as f:
cover = np.array([int(v[0]) for v in csv.reader(f)], 'byte')
target_visible = ~occlusion & (cover > 0) & (anno[:, 2] > 0) & (
anno[:, 3] > 0)
return target_visible
def _get_sequence_path(self, seq_id):
return os.path.join(self.root, self.sequence_list[seq_id])
def get_sequence_info(self, seq_id):
seq_path = self._get_sequence_path(seq_id)
anno = self._read_anno(seq_path)
target_visible = self._read_target_visible(seq_path, anno)
if self.filter:
target_large = (anno[:, 2] * anno[:, 3] > 30 * 30)
ratio = anno[:, 2] / anno[:, 3]
target_reasonable_ratio = (10 > ratio) & (ratio > 0.1)
target_visible = target_visible & target_large & target_reasonable_ratio
return anno, target_visible
def _get_frame_path(self, seq_path, frame_id):
return os.path.join(
seq_path, '{:08}.jpg'.format(frame_id + 1)) # frames start from 1
def _get_frame(self, seq_path, frame_id):
return self.image_loader(self._get_frame_path(seq_path, frame_id))
def get_frames(self, seq_id, frame_ids, anno=None):
seq_path = self._get_sequence_path(seq_id)
obj_meta = self.sequence_meta_info[self.sequence_list[seq_id]]
frame_list = [self._get_frame(seq_path, f_id) for f_id in frame_ids]
if anno is None:
anno = self._read_anno(seq_path)
# Return as list of tensors
anno_frames = [anno[f_id, :] for f_id in frame_ids]
return frame_list, anno_frames, obj_meta
import os
import numpy as np
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
import xml.etree.ElementTree as ET
import json
from collections import OrderedDict
import nltk
from nltk.corpus import wordnet
from ltr.admin.environment import env_settings
def get_target_to_image_ratio(seq):
anno = np.array(seq['anno'])
img_sz = np.array(seq['image_size'])
return np.sqrt(anno[0, 2:4].prod() / (img_sz.prod()))
class ImagenetVID(BaseDataset):
""" Imagenet VID dataset.
Publication:
ImageNet Large Scale Visual Recognition Challenge
Olga Russakovsky, Jia Deng, Hao Su, Jonathan Krause, Sanjeev Satheesh, Sean Ma, Zhiheng Huang, Andrej Karpathy,
Aditya Khosla, Michael Bernstein, Alexander C. Berg and Li Fei-Fei
IJCV, 2015
https://arxiv.org/pdf/1409.0575.pdf
Download the dataset from http://image-net.org/
"""
def __init__(self,
root=None,
filter=None,
image_loader=default_image_loader,
min_length=0,
max_target_area=1):
"""
args:
root - path to the imagenet vid dataset.
image_loader (jpeg4py_loader) - The function to read the images. jpeg4py (https://github.com/ajkxyz/jpeg4py)
is used by default.
min_length - Minimum allowed sequence length.
max_target_area - max allowed ratio between target area and image area. Can be used to filter out targets
which cover complete image.
"""
root = env_settings().imagenet_dir if root is None else root
super().__init__(root, image_loader)
cache_file = os.path.join(root, 'cache.json')
if os.path.isfile(cache_file):
# If available, load the pre-processed cache file containing meta-info for each sequence
with open(cache_file, 'r') as f:
sequence_list_dict = json.load(f)
self.sequence_list = sequence_list_dict
else:
# Else process the imagenet annotations and generate the cache file
self.sequence_list = self._process_anno(root)
with open(cache_file, 'w') as f:
json.dump(self.sequence_list, f)
# Filter the sequences based on min_length and max_target_area in the first frame
self.sequence_list = [
x for x in self.sequence_list
if len(x['anno']) >= min_length and get_target_to_image_ratio(x) <
max_target_area
]
self.filter = filter
def get_name(self):
return 'imagenetvid'
def get_num_sequences(self):
return len(self.sequence_list)
def get_sequence_info(self, seq_id):
anno = np.array(self.sequence_list[seq_id]['anno'])
target_visible = np.array(self.sequence_list[seq_id]['target_visible'],
'bool')
target_visible = target_visible & (anno[:, 2] > 0) & (anno[:, 3] > 0)
if self.filter is not None:
target_large = (anno[:, 2] * anno[:, 3] > 30 * 30)
ratio = anno[:, 2] / anno[:, 3]
target_reasonable_ratio = (10 > ratio) & (ratio > 0.1)
target_visible = target_visible & target_reasonable_ratio & target_large
return anno, target_visible
def _get_frame(self, sequence, frame_id):
set_name = 'ILSVRC2015_VID_train_{:04d}'.format(sequence['set_id'])
vid_name = 'ILSVRC2015_train_{:08d}'.format(sequence['vid_id'])
frame_number = frame_id + sequence['start_frame']
frame_path = os.path.join(self.root, 'Data', 'VID', 'train', set_name,
vid_name, '{:06d}.JPEG'.format(frame_number))
# frame_path = os.path.join(self.root, 'Data', 'VID', 'train', vid_name,
# '{:06d}.jpg'.format(frame_number))
return self.image_loader(frame_path)
def get_frames(self, seq_id, frame_ids, anno=None):
sequence = self.sequence_list[seq_id]
frame_list = [self._get_frame(sequence, f) for f in frame_ids]
if anno is None:
anno = sequence['anno']
# Return as list of tensors
anno_frames = [anno[f_id, :] for f_id in frame_ids]
# added the class info to the meta info
object_meta = OrderedDict({
'object_class': sequence['class_name'],
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return frame_list, anno_frames, object_meta
def _process_anno(self, root):
# Builds individual tracklets
base_vid_anno_path = os.path.join(root, 'Annotations', 'VID', 'train')
all_sequences = []
# for set in sorted(os.listdir(base_vid_anno_path)):
for set in sorted([
'ILSVRC2015_VID_train_0000', 'ILSVRC2015_VID_train_0001',
'ILSVRC2015_VID_train_0002', 'ILSVRC2015_VID_train_0003'
]):
set_id = int(set.split('_')[-1])
for vid in sorted(
os.listdir(os.path.join(base_vid_anno_path, set))):
vid_id = int(vid.split('_')[-1])
anno_files = sorted(
os.listdir(os.path.join(base_vid_anno_path, set, vid)))
frame1_anno = ET.parse(
os.path.join(base_vid_anno_path, set, vid, anno_files[0]))
image_size = [
int(frame1_anno.find('size/width').text),
int(frame1_anno.find('size/height').text)
]
objects = [
ET.ElementTree(file=os.path.join(base_vid_anno_path, set,
vid, f)).findall('object')
for f in anno_files
]
tracklets = {}
# Find all tracklets along with start frame
for f_id, all_targets in enumerate(objects):
for target in all_targets:
tracklet_id = target.find('trackid').text
if tracklet_id not in tracklets:
tracklets[tracklet_id] = f_id
for tracklet_id, tracklet_start in tracklets.items():
tracklet_anno = []
target_visible = []
class_name = None
for f_id in range(tracklet_start, len(objects)):
found = False
for target in objects[f_id]:
if target.find('trackid').text == tracklet_id:
if not class_name:
class_name_id = target.find('name').text
class_name = class_name_id
# class_name = self._get_class_name_from_id(class_name_id)
x1 = int(target.find('bndbox/xmin').text)
y1 = int(target.find('bndbox/ymin').text)
x2 = int(target.find('bndbox/xmax').text)
y2 = int(target.find('bndbox/ymax').text)
tracklet_anno.append([x1, y1, x2 - x1, y2 - y1])
target_visible.append(
target.find('occluded').text == '0')
found = True
break
if not found:
break
new_sequence = {
'set_id': set_id,
'vid_id': vid_id,
'class_name': class_name,
'start_frame': tracklet_start,
'anno': tracklet_anno,
'target_visible': target_visible,
'image_size': image_size
}
all_sequences.append(new_sequence)
return all_sequences
import os
import os.path
import numpy as np
import pandas
import csv
from collections import OrderedDict
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
from ltr.admin.environment import env_settings
class Lasot(BaseDataset):
""" LaSOT dataset.
Publication:
LaSOT: A High-quality Benchmark for Large-scale Single Object Tracking
Heng Fan, Liting Lin, Fan Yang, Peng Chu, Ge Deng, Sijia Yu, Hexin Bai, Yong Xu, Chunyuan Liao and Haibin Ling
CVPR, 2019
https://arxiv.org/pdf/1809.07845.pdf
Download the dataset from https://cis.temple.edu/lasot/download.html
"""
def __init__(self,
root=None,
filter=None,
image_loader=default_image_loader,
vid_ids=None,
split=None):
"""
args:
root - path to the lasot dataset.
image_loader (jpeg4py_loader) - The function to read the images. jpeg4py (https://github.com/ajkxyz/jpeg4py)
is used by default.
vid_ids - List containing the ids of the videos (1 - 20) used for training. If vid_ids = [1, 3, 5], then the
videos with subscripts -1, -3, and -5 from each class will be used for training.
split - If split='train', the official train split (protocol-II) is used for training. Note: Only one of
vid_ids or split option can be used at a time.
"""
root = env_settings().lasot_dir if root is None else root
super().__init__(root, image_loader)
self.sequence_list = self._build_sequence_list(vid_ids, split)
self.filter = filter
def _build_sequence_list(self, vid_ids=None, split=None):
if split is not None:
if vid_ids is not None:
raise ValueError('Cannot set both split_name and vid_ids.')
ltr_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '..')
if split == 'train':
file_path = os.path.join(ltr_path, 'data_specs',
'lasot_train_split.txt')
else:
raise ValueError('Unknown split name.')
sequence_list = pandas.read_csv(
file_path, header=None, squeeze=True).values.tolist()
elif vid_ids is not None:
sequence_list = [
c + '-' + str(v) for c in self.class_list for v in vid_ids
]
else:
raise ValueError('Set either split_name or vid_ids.')
return sequence_list
def get_name(self):
return 'lasot'
def get_num_sequences(self):
return len(self.sequence_list)
def _read_anno(self, seq_path):
anno_file = os.path.join(seq_path, "groundtruth.txt")
gt = pandas.read_csv(
anno_file,
delimiter=',',
header=None,
dtype=np.float32,
na_filter=False,
low_memory=False).values
return np.array(gt)
def _read_target_visible(self, seq_path, anno):
# Read full occlusion and out_of_view
occlusion_file = os.path.join(seq_path, "full_occlusion.txt")
out_of_view_file = os.path.join(seq_path, "out_of_view.txt")
with open(occlusion_file, 'r', newline='') as f:
occlusion = np.array([int(v) for v in list(csv.reader(f))[0]],
'byte')
with open(out_of_view_file, 'r') as f:
out_of_view = np.array([int(v) for v in list(csv.reader(f))[0]],
'byte')
target_visible = ~occlusion & ~out_of_view & (anno[:, 2] > 0) & (
anno[:, 3] > 0)
return target_visible
def _get_sequence_path(self, seq_id):
seq_name = self.sequence_list[seq_id]
class_name = seq_name.split('-')[0]
vid_id = seq_name.split('-')[1]
return os.path.join(self.root, class_name, class_name + '-' + vid_id)
def get_sequence_info(self, seq_id):
seq_path = self._get_sequence_path(seq_id)
anno = self._read_anno(seq_path)
target_visible = self._read_target_visible(seq_path, anno)
if self.filter is not None:
target_large = (anno[:, 2] * anno[:, 3] > 30 * 30)
ratio = anno[:, 2] / anno[:, 3]
target_reasonable_ratio = (10 > ratio) & (ratio > 0.1)
target_visible = target_visible & target_reasonable_ratio & target_large
return anno, target_visible
def _get_frame_path(self, seq_path, frame_id):
return os.path.join(
seq_path, 'img',
'{:08}.jpg'.format(frame_id + 1)) # frames start from 1
def _get_frame(self, seq_path, frame_id):
return self.image_loader(self._get_frame_path(seq_path, frame_id))
def _get_class(self, seq_path):
obj_class = seq_path.split('/')[-2]
return obj_class
def get_frames(self, seq_id, frame_ids, anno=None):
seq_path = self._get_sequence_path(seq_id)
obj_class = self._get_class(seq_path)
frame_list = [self._get_frame(seq_path, f_id) for f_id in frame_ids]
if anno is None:
anno = self._read_anno(seq_path)
# Return as list of tensors
anno_frames = [anno[f_id, :] for f_id in frame_ids]
object_meta = OrderedDict({
'object_class': obj_class,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return frame_list, anno_frames, object_meta
import os
import os.path
import numpy as np
import pandas
from collections import OrderedDict
from ltr.data.image_loader import default_image_loader
from .base_dataset import BaseDataset
from ltr.admin.environment import env_settings
def list_sequences(root, set_ids):
""" Lists all the videos in the input set_ids. Returns a list of tuples (set_id, video_name)
args:
root: Root directory to TrackingNet
set_ids: Sets (0-11) which are to be used
returns:
list - list of tuples (set_id, video_name) containing the set_id and video_name for each sequence
"""
sequence_list = []
for s in set_ids:
anno_dir = os.path.join(root, "TRAIN_" + str(s), "anno")
sequences_cur_set = [(s, os.path.splitext(f)[0])
for f in os.listdir(anno_dir)
if f.endswith('.txt')]
sequence_list += sequences_cur_set
return sequence_list
class TrackingNet(BaseDataset):
""" TrackingNet dataset.
Publication:
TrackingNet: A Large-Scale Dataset and Benchmark for Object Tracking in the Wild.
Matthias Mueller,Adel Bibi, Silvio Giancola, Salman Al-Subaihi and Bernard Ghanem
ECCV, 2018
https://ivul.kaust.edu.sa/Documents/Publications/2018/TrackingNet%20A%20Large%20Scale%20Dataset%20and%20Benchmark%20for%20Object%20Tracking%20in%20the%20Wild.pdf
Download the dataset using the toolkit https://github.com/SilvioGiancola/TrackingNet-devkit.
"""
def __init__(self,
root=None,
image_loader=default_image_loader,
set_ids=None):
"""
args:
root - The path to the TrackingNet folder, containing the training sets.
image_loader (jpeg4py_loader) - The function to read the images. jpeg4py (https://github.com/ajkxyz/jpeg4py)
is used by default.
set_ids (None) - List containing the ids of the TrackingNet sets to be used for training. If None, all the
sets (0 - 11) will be used.
"""
root = env_settings().trackingnet_dir if root is None else root
super().__init__(root, image_loader)
if set_ids is None:
set_ids = [i for i in range(12)]
self.set_ids = set_ids
# Keep a list of all videos. Sequence list is a list of tuples (set_id, video_name) containing the set_id and
# video_name for each sequence
self.sequence_list = list_sequences(self.root, self.set_ids)
def get_name(self):
return 'trackingnet'
def _read_anno(self, seq_id):
set_id = self.sequence_list[seq_id][0]
vid_name = self.sequence_list[seq_id][1]
anno_file = os.path.join(self.root, "TRAIN_" + str(set_id), "anno",
vid_name + ".txt")
gt = pandas.read_csv(
anno_file,
delimiter=',',
header=None,
dtype=np.float32,
na_filter=False,
low_memory=False).values
return np.array(gt)
def get_sequence_info(self, seq_id):
anno = self._read_anno(seq_id)
target_visible = (anno[:, 2] > 0) & (anno[:, 3] > 0)
return anno, target_visible
def _get_frame(self, seq_id, frame_id):
set_id = self.sequence_list[seq_id][0]
vid_name = self.sequence_list[seq_id][1]
frame_path = os.path.join(self.root, "TRAIN_" + str(set_id), "frames",
vid_name, str(frame_id) + ".jpg")
return self.image_loader(frame_path)
def get_frames(self, seq_id, frame_ids, anno=None):
frame_list = [self._get_frame(seq_id, f) for f in frame_ids]
if anno is None:
anno = self._read_anno(seq_id)
# Return as list of tensors
anno_frames = [anno[f_id, :] for f_id in frame_ids]
object_meta = OrderedDict({
'object_class': None,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return frame_list, anno_frames, object_meta
import os
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
import numpy as np
import cv2 as cv
from collections import OrderedDict
from ltr.admin.environment import env_settings
def get_axis_aligned_bbox(region):
region = np.array(region)
if len(region.shape) == 3:
# region (1,4,2)
region = np.array([
region[0][0][0], region[0][0][1], region[0][1][0], region[0][1][1],
region[0][2][0], region[0][2][1], region[0][3][0], region[0][3][1]
])
cx = np.mean(region[0::2])
cy = np.mean(region[1::2])
x1 = min(region[0::2])
x2 = max(region[0::2])
y1 = min(region[1::2])
y2 = max(region[1::2])
A1 = np.linalg.norm(region[0:2] - region[2:4]) * np.linalg.norm(region[
2:4] - region[4:6])
A2 = (x2 - x1) * (y2 - y1)
s = np.sqrt(A1 / A2)
w = s * (x2 - x1) + 1
h = s * (y2 - y1) + 1
x11 = cx - w // 2
y11 = cy - h // 2
return x11, y11, w, h
class VOT(BaseDataset):
def __init__(self, root=None, image_loader=default_image_loader):
# root = env_settings().vot_dir if root is None else root
assert root is not None
super().__init__(root, image_loader)
self.sequence_list = self._get_sequence_list()
self.ann = self._get_annotations()
def _get_sequence_list(self):
seq_list = []
for d in os.listdir(self.root):
if os.path.isdir(os.path.join(self.root, d)):
seq_list.append(d)
return sorted(seq_list)
def _get_annotations(self):
ann = {}
for seq in self.sequence_list:
ann[seq] = {'bbox': [], 'rbb': []}
with open(os.path.join(self.root, seq, 'groundtruth.txt')) as f:
lines = [l.strip().split(',') for l in f.readlines()]
for l in lines:
vs = [float(v) for v in l]
if len(vs) == 4:
polys = [
vs[0], vs[1] + vs[3] - 1, vs[0], vs[1],
vs[0] + vs[2] - 1, vs[1], vs[0] + vs[2] - 1,
vs[1] + vs[3] - 1
]
else:
polys = vs
box = get_axis_aligned_bbox(polys)
rbb = cv.minAreaRect(
np.int0(np.array(polys).reshape((-1, 2))))
# assume small rotation angle, switch height, width
if rbb[2] < -45:
angle = rbb[2] + 90
height = rbb[1][0]
width = rbb[1][1]
else:
angle = rbb[2]
height = rbb[1][1]
width = rbb[1][0]
rbb = [rbb[0][0], rbb[0][1], width, height, angle]
ann[seq]['bbox'].append(box)
ann[seq]['rbb'].append(rbb)
return ann
def is_video_sequence(self):
return True
def get_name(self):
return 'vot'
def get_num_sequences(self):
return len(self.sequence_list)
def get_sequence_info(self, seq_id):
anno = self._get_anno(seq_id)
target_visible = (anno[:, 2] > 0) & (anno[:, 3] > 0)
return anno, target_visible
def _get_anno(self, seq_id):
anno = self.ann[self.sequence_list[seq_id]]['bbox']
return np.reshape(np.array(anno), (-1, 4))
def get_meta_info(self, seq_id):
object_meta = OrderedDict({
'object_class': None,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return object_meta
def _get_sequence_path(self, seq_id):
return os.path.join(self.root, self.sequence_list[seq_id])
def _get_frame_path(self, seq_path, frame_id):
return os.path.join(
seq_path, 'color',
'{:08}.jpg'.format(frame_id + 1)) # frames start from 1
def _get_frame(self, seq_path, frame_id):
return self.image_loader(self._get_frame_path(seq_path, frame_id))
def get_frames(self, seq_id=None, frame_ids=None, anno=None):
seq_path = self._get_sequence_path(seq_id)
frame_list = [self._get_frame(seq_path, f_id) for f_id in frame_ids]
if anno is None:
anno = self._get_anno(seq_id)
anno_frames = [anno[f_id, :] for f_id in frame_ids]
object_meta = self.get_meta_info(seq_id)
return frame_list, anno_frames, object_meta
import os
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
import xml.etree.ElementTree as ET
import json
import pickle
from collections import OrderedDict
import numpy as np
import nltk
from nltk.corpus import wordnet
from ltr.admin.environment import env_settings
def get_target_to_image_ratio(seq):
anno = np.array(seq['anno'])
img_sz = np.array(seq['image_size'])
return np.sqrt(anno[0, 2:4].prod() / (img_sz.prod()))
class YoutubeBB(BaseDataset):
""" YoutubeBB dataset.
Publication:
ImageNet Large Scale Visual Recognition Challenge
Olga Russakovsky, Jia Deng, Hao Su, Jonathan Krause, Sanjeev Satheesh, Sean Ma, Zhiheng Huang, Andrej Karpathy,
Aditya Khosla, Michael Bernstein, Alexander C. Berg and Li Fei-Fei
IJCV, 2015
https://arxiv.org/pdf/1409.0575.pdf
Download the dataset from http://image-net.org/
"""
def __init__(self,
root=None,
filter=None,
image_loader=default_image_loader,
min_length=0,
max_target_area=1):
"""
args:
root - path to the imagenet vid dataset.
image_loader (jpeg4py_loader) - The function to read the images. jpeg4py (https://github.com/ajkxyz/jpeg4py)
is used by default.
min_length - Minimum allowed sequence length.
max_target_area - max allowed ratio between target area and image area. Can be used to filter out targets
which cover complete image.
"""
super().__init__(root, image_loader)
meta_file = os.path.join(root, 'ytb_meta.pickle')
with open(meta_file, 'rb') as f:
meta = pickle.load(f)
sequence_list = []
for video_name, video_info in meta:
if 'ILSVRC' not in video_name:
seq_info = {}
for trkid in video_info:
if len(video_info[trkid]['img']) > 2:
seq_info['video_name'] = video_name
seq_info['anno'] = video_info[trkid]['box']
seq_info['img_paths'] = video_info[trkid]['img']
sequence_list.append(seq_info)
print('num_sequences: {}'.format(len(sequence_list)))
self.sequence_list = sequence_list
# Filter the sequences based on min_length and max_target_area in the first frame
# self.sequence_list = [x for x in self.sequence_list if len(x['anno']) >= min_length and
# get_target_to_image_ratio(x) < max_target_area]
self.filter = filter
def get_name(self):
return 'youtubebb'
def get_num_sequences(self):
return len(self.sequence_list)
def get_sequence_info(self, seq_id):
anno = np.array(self.sequence_list[seq_id]['anno'])
target_visible = (anno[:, 2] > 0) & (anno[:, 3] > 0)
if self.filter is not None:
target_large = (anno[:, 2] * anno[:, 3] > 30 * 30)
target_resonable = (anno[:, 2] * anno[:, 3] < 500 * 500)
ratio = anno[:, 2] / anno[:, 3]
target_reasonable_ratio = (10 > ratio) & (ratio > 0.1)
target_visible = target_visible & target_reasonable_ratio & target_large & target_resonable
return anno, target_visible
def _get_frame(self, sequence, frame_id):
frame_path = os.path.join(self.root, sequence['video_name'],
sequence['img_paths'][frame_id] + '.jpg')
return self.image_loader(frame_path)
def get_frames(self, seq_id, frame_ids, anno=None):
sequence = self.sequence_list[seq_id]
frame_list = [self._get_frame(sequence, f) for f in frame_ids]
if anno is None:
anno = sequence['anno']
# Return as list of tensors
anno_frames = [anno[f_id, :] for f_id in frame_ids]
# added the class info to the meta info
object_meta = OrderedDict({
'object_class': None,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return frame_list, anno_frames, object_meta
import os
from .base_dataset import BaseDataset
from ltr.data.image_loader import default_image_loader
import numpy as np
import cv2 as cv
import json
from collections import OrderedDict
from ltr.admin.environment import env_settings
def get_axis_aligned_bbox(region):
region = np.array(region)
if len(region.shape) == 3:
# region (1,4,2)
region = np.array([
region[0][0][0], region[0][0][1], region[0][1][0], region[0][1][1],
region[0][2][0], region[0][2][1], region[0][3][0], region[0][3][1]
])
cx = np.mean(region[0::2])
cy = np.mean(region[1::2])
x1 = min(region[0::2])
x2 = max(region[0::2])
y1 = min(region[1::2])
y2 = max(region[1::2])
A1 = np.linalg.norm(region[0:2] - region[2:4]) * np.linalg.norm(region[
2:4] - region[4:6])
A2 = (x2 - x1) * (y2 - y1)
s = np.sqrt(A1 / A2)
if s is np.nan:
x11, y11, w, h = 0, 0, 0, 0
else:
w = s * (x2 - x1) + 1
h = s * (y2 - y1) + 1
x11 = cx - w // 2
y11 = cy - h // 2
return x11, y11, w, h
class VOS(BaseDataset):
def __init__(self, root=None, image_loader=default_image_loader):
# root = env_settings().vot_dir if root is None else root
assert root is not None
super().__init__(root, image_loader)
with open(os.path.join(self.root, 'meta.json')) as f:
self.meta = json.load(f)['videos']
self.sequence_list = self._get_sequence_list()
self.ann = self._get_annotations()
def _get_sequence_list(self):
seq_list = []
videos = self.meta.keys()
for v in videos:
objs = self.meta[v]['objects'].keys()
for o in objs:
if "rotate_box" in self.meta[v]['objects'][o]:
seq_list.append((v, o))
assert len(seq_list) > 0
return seq_list
def _get_annotations(self):
ann = {}
for seq in self.sequence_list:
ann[seq] = {'bbox': [], 'rbb': []}
polygons = self.meta[seq[0]]['objects'][seq[1]]['rotate_box']
for vs in polygons:
if len(vs) == 4:
polys = [
vs[0], vs[1] + vs[3] - 1, vs[0], vs[1],
vs[0] + vs[2] - 1, vs[1], vs[0] + vs[2] - 1,
vs[1] + vs[3] - 1
]
else:
polys = vs
if not np.all(polys == 0):
box = get_axis_aligned_bbox(polys)
rbb = cv.minAreaRect(
np.int0(np.array(polys).reshape((-1, 2))))
else:
box = np.array([0, 0, 0, 0])
rbb = ((0, 0), (0, 0), 0)
if box[2] * box[3] > 500 * 500:
print(box)
# assume small rotation angle, switch height, width
if rbb[2] < -45:
angle = rbb[2] + 90
height = rbb[1][0]
width = rbb[1][1]
else:
angle = rbb[2]
height = rbb[1][1]
width = rbb[1][0]
rbb = [rbb[0][0], rbb[0][1], width, height, angle]
ann[seq]['bbox'].append(box)
ann[seq]['rbb'].append(rbb)
return ann
def is_video_sequence(self):
return True
def get_name(self):
return 'vot'
def get_num_sequences(self):
return len(self.sequence_list)
def get_sequence_info(self, seq_id):
anno = self._get_anno(seq_id)
target_visible = (anno[:, 2] > 0) & (anno[:, 3] > 0)
target_large = (anno[:, 2] * anno[:, 3] > 30 * 30)
target_resonable = (anno[:, 2] * anno[:, 3] < 500 * 500)
return anno, target_visible & target_large & target_resonable
def _get_anno(self, seq_id):
anno = self.ann[self.sequence_list[seq_id]]['bbox']
return np.reshape(np.array(anno), (-1, 4))
def get_meta_info(self, seq_id):
object_meta = OrderedDict({
'object_class': None,
'motion_class': None,
'major_class': None,
'root_class': None,
'motion_adverb': None
})
return object_meta
def _get_frame_path(self, seq_id, frame_id):
v, o = self.sequence_list[seq_id]
frame_name = self.meta[v]['objects'][o]['frames'][frame_id]
return os.path.join(self.root, 'JPEGImages', v,
'{}.jpg'.format(frame_name)) # frames start from 1
def _get_frame(self, seq_id, frame_id):
return self.image_loader(self._get_frame_path(seq_id, frame_id))
def get_frames(self, seq_id=None, frame_ids=None, anno=None):
frame_list = [self._get_frame(seq_id, f_id) for f_id in frame_ids]
if anno is None:
anno = self._get_anno(seq_id)
anno_frames = [anno[f_id, :] for f_id in frame_ids]
object_meta = self.get_meta_info(seq_id)
return frame_list, anno_frames, object_meta
import os
import paddle.fluid as fluid
import paddle.fluid.dygraph.nn as nn
from ltr.admin.environment import env_settings
CURRENT_DIR = os.path.dirname(__file__)
def weight_init():
init = fluid.initializer.MSRAInitializer(uniform=False)
param = fluid.ParamAttr(initializer=init)
return param
def norm_weight_init(constant=1.0):
init = fluid.initializer.ConstantInitializer(constant)
param = fluid.ParamAttr(initializer=init)
return param
def norm_bias_init():
init = fluid.initializer.ConstantInitializer(value=0.)
param = fluid.ParamAttr(initializer=init)
return param
class ConvBNLayer(fluid.dygraph.Layer):
def __init__(self,
in_channels,
out_channels,
filter_size,
stride=1,
groups=1,
bn_init_constant=1.0,
is_test=False):
super(ConvBNLayer, self).__init__()
self.conv = nn.Conv2D(
num_channels=in_channels,
filter_size=filter_size,
num_filters=out_channels,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
bias_attr=False,
param_attr=weight_init())
self.bn = nn.BatchNorm(
out_channels,
param_attr=norm_weight_init(bn_init_constant),
bias_attr=norm_bias_init(),
act=None,
momentum=0.9,
use_global_stats=is_test)
def forward(self, inputs):
res = self.conv(inputs)
self.conv_res = res
res = self.bn(res)
return res
class BasicBlock(fluid.dygraph.Layer):
expansion = 1
def __init__(self,
in_channels,
out_channels,
stride=1,
is_downsample=None,
is_test=False):
super(BasicBlock, self).__init__()
self.expansion = 1
self.conv_bn1 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
filter_size=3,
stride=stride,
groups=1,
is_test=is_test)
self.conv_bn2 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
filter_size=3,
stride=1,
groups=1,
is_test=is_test)
self.is_downsample = is_downsample
if self.is_downsample:
self.downsample = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
filter_size=1,
stride=stride,
is_test=is_test)
self.stride = stride
def forward(self, inputs):
identity = inputs
res = self.conv_bn1(inputs)
res = fluid.layers.relu(res)
res = self.conv_bn2(res)
if self.is_downsample:
identity = self.downsample(identity)
res += identity
res = fluid.layers.relu(res)
return res
class Bottleneck(fluid.dygraph.Layer):
expansion = 4
def __init__(self,
in_channels,
out_channels,
stride=1,
is_downsample=None,
base_width=64,
dilation=1,
groups=1,
is_test=False):
super(Bottleneck, self).__init__()
width = int(out_channels * (base_width / 64.)) * groups
self.conv_bn1 = ConvBNLayer(
in_channels=in_channels,
filter_size=1,
out_channels=width,
groups=1,
is_test=is_test)
self.conv_bn2 = ConvBNLayer(
in_channels=width,
filter_size=3,
out_channels=width,
stride=stride,
groups=groups,
is_test=is_test)
self.conv_bn3 = ConvBNLayer(
in_channels=width,
filter_size=1,
out_channels=out_channels * self.expansion,
bn_init_constant=0.,
is_test=is_test)
self.is_downsample = is_downsample
if self.is_downsample:
self.downsample = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels * self.expansion,
filter_size=1,
stride=stride,
is_test=is_test)
self.stride = stride
def forward(self, inputs):
identify = inputs
out = self.conv_bn1(inputs)
out = fluid.layers.relu(out)
out = self.conv_bn2(out)
out = fluid.layers.relu(out)
out = self.conv_bn3(out)
if self.is_downsample:
identify = self.downsample(inputs)
out += identify
out = fluid.layers.relu(out)
return out
class ResNet(fluid.dygraph.Layer):
def __init__(self,
name,
Block,
layers,
num_classes=1000,
groups=1,
is_test=False):
"""
:param name: str, namescope
:param layers: int, the layer of defined network
:param num_classes: int, the dimension of final output
:param groups: int, default is 1
"""
super(ResNet, self).__init__(name_scope=name)
support_layers = [18, 34, 50, 101, 152]
assert layers in support_layers, \
"support layer can only be one of [18, 34, 50, 101, 152]"
self.layers = layers
if layers == 18:
depths = [2, 2, 2, 2]
elif layers == 50 or layers == 34:
depths = [3, 4, 6, 3]
elif layers == 101:
depths = [3, 4, 23, 3]
elif layers == 152:
depths = [3, 8, 36, 3]
strides = [1, 2, 2, 2]
num_filters = [64, 128, 256, 512]
self.in_channels = 64
self.dilation = 1
self.groups = groups
self.conv_bn_init = ConvBNLayer(
3,
out_channels=self.in_channels,
filter_size=7,
stride=2,
is_test=is_test)
block_collect = []
downsample = None
for i in range(len(depths)):
# collect layers in each block
_block = []
stride = strides[i]
out_channel = num_filters[i]
if stride != 1 or self.in_channels != num_filters[
i] * Block.expansion:
downsample = True
bottleneck_block = self.add_sublayer(
"block{}_0".format(i),
Block(
self.in_channels,
out_channel,
stride=stride,
is_downsample=downsample,
is_test=is_test))
downsample = False
_block.append(bottleneck_block)
self.in_channels = num_filters[i] * Block.expansion
for j in range(1, depths[i]):
bottleneck_block = self.add_sublayer(
"block{}_{}".format(i, j),
Block(
self.in_channels, out_channel, is_test=is_test))
_block.append(bottleneck_block)
# collect blocks
block_collect.append(_block)
self.block_collect = block_collect
self.maxpool = nn.Pool2D(
pool_size=3, pool_stride=2, pool_padding=1, pool_type="max")
self.global_pool = nn.Pool2D(pool_type='avg', global_pooling=True)
self.fc = nn.Linear(
input_dim=512 * Block.expansion, output_dim=num_classes)
def _add_output_and_check(self, name, x, outputs, output_layers):
if name in output_layers:
outputs[name] = x
return len(output_layers) == len(outputs)
def forward(self, inputs, feat_layers):
out = {}
res = self.conv_bn_init(inputs)
res = fluid.layers.relu(res)
res = self.maxpool(res)
# out['conv_init'] = res
for i in range(len(self.block_collect)):
for layer in self.block_collect[i]:
res = layer(res)
name = 'block{}'.format(i)
if name in feat_layers:
out[name] = res
if len(out) == len(feat_layers):
return out
res = self.global_pool(res)
B, C, _, _ = res.shape
res = fluid.layers.reshape(res, [B, C])
res = self.fc(res)
out['fc'] = res
return out
def resnet18(name, is_test=False, pretrained=False):
net = ResNet(name, Block=BasicBlock, layers=18, is_test=is_test)
if pretrained:
params_path = os.path.join(env_settings().backbone_dir, 'ResNet18')
print("=> loading backbone model from '{}'".format(params_path))
params, _ = fluid.load_dygraph(params_path)
net.load_dict(params)
print("Done")
return net
def resnet50(name, is_test=False, pretrained=False):
net = ResNet(name, Block=Bottleneck, layers=50, is_test=is_test)
if pretrained:
params_path = os.path.join(env_settings().backbone_dir, 'ResNet50')
print("=> loading backbone model from '{}'".format(params_path))
params, _ = fluid.load_dygraph(params_path)
net.load_dict(params)
print("Done")
return net
from collections import OrderedDict
from paddle import fluid
from paddle.fluid.dygraph import nn
class SFC_AlexNet(fluid.dygraph.Layer):
def __init__(self, name, is_test):
super(SFC_AlexNet, self).__init__()
self.is_test = is_test
self.layer_init()
def layer_init(self):
# for conv1
self.conv1 = nn.Conv2D(
num_channels=3,
num_filters=96,
filter_size=11,
stride=2,
padding=0,
groups=1,
param_attr=self.weight_init(),
bias_attr=self.bias_init())
self.bn1 = nn.BatchNorm(
num_channels=96,
is_test=self.is_test,
param_attr=self.norm_weight_init(),
bias_attr=self.bias_init(),
use_global_stats=self.is_test)
self.pool1 = nn.Pool2D(
pool_size=3, pool_type="max", pool_stride=2, pool_padding=0)
# for conv2
self.conv2 = nn.Conv2D(
num_channels=96,
num_filters=256,
filter_size=5,
stride=1,
padding=0,
groups=2,
param_attr=self.weight_init(),
bias_attr=self.bias_init())
self.bn2 = nn.BatchNorm(
num_channels=256,
is_test=self.is_test,
param_attr=self.norm_weight_init(),
bias_attr=self.bias_init(),
use_global_stats=self.is_test)
self.pool2 = nn.Pool2D(
pool_size=3, pool_type="max", pool_stride=2, pool_padding=0)
# for conv3
self.conv3 = nn.Conv2D(
num_channels=256,
num_filters=384,
filter_size=3,
stride=1,
padding=0,
groups=1,
param_attr=self.weight_init(),
bias_attr=self.bias_init())
self.bn3 = nn.BatchNorm(
num_channels=384,
is_test=self.is_test,
param_attr=self.norm_weight_init(),
bias_attr=self.bias_init(),
use_global_stats=self.is_test)
# for conv4
self.conv4 = nn.Conv2D(
num_channels=384,
num_filters=384,
filter_size=3,
stride=1,
padding=0,
groups=2,
param_attr=self.weight_init(),
bias_attr=self.bias_init())
self.bn4 = nn.BatchNorm(
num_channels=384,
is_test=self.is_test,
param_attr=self.norm_weight_init(),
bias_attr=self.bias_init(),
use_global_stats=self.is_test)
# for conv5
self.conv5 = nn.Conv2D(
num_channels=384,
num_filters=256,
filter_size=3,
stride=1,
padding=0,
groups=2,
param_attr=self.weight_init(),
bias_attr=self.bias_init())
def _add_output_and_check(self, name, x, outputs, output_layers):
if name in output_layers:
outputs[name] = x
return len(output_layers) == len(outputs)
def forward(self, inputs, output_layers):
outputs = OrderedDict()
out1 = self.conv1(inputs)
out1 = self.bn1(out1)
out1 = fluid.layers.relu(out1)
if self._add_output_and_check('conv1', out1, outputs, output_layers):
return outputs
out1 = self.pool1(out1)
out2 = self.conv2(out1)
out2 = self.bn2(out2)
out2 = fluid.layers.relu(out2)
if self._add_output_and_check('conv2', out2, outputs, output_layers):
return outputs
out2 = self.pool2(out2)
out3 = self.conv3(out2)
out3 = self.bn3(out3)
out3 = fluid.layers.relu(out3)
if self._add_output_and_check('conv3', out3, outputs, output_layers):
return outputs
out4 = self.conv4(out3)
out4 = self.bn4(out4)
out4 = fluid.layers.relu(out4)
if self._add_output_and_check('conv4', out4, outputs, output_layers):
return outputs
out5 = self.conv5(out4)
if self._add_output_and_check('conv5', out5, outputs, output_layers):
return outputs
return outputs
def norm_weight_init(self):
init = fluid.initializer.ConstantInitializer(1.0)
param = fluid.ParamAttr(initializer=init)
return param
def weight_init(self):
init = fluid.initializer.MSRAInitializer(uniform=False)
param = fluid.ParamAttr(initializer=init)
return param
def bias_init(self):
init = fluid.initializer.ConstantInitializer(value=0.)
param = fluid.ParamAttr(initializer=init)
return param
from .atom_iou_net import AtomIouNet
import paddle
import paddle.fluid as fluid
import paddle.fluid.dygraph as dygraph
import os.path as osp
import sys
CURRENT_DIR = osp.dirname(__file__)
sys.path.append(osp.join(CURRENT_DIR, '..', '..', '..'))
from ltr.models.backbone.resnet import resnet50, resnet18
from ltr.models.bbreg.atom_iou_net import AtomIouNet
class ATOMnet(dygraph.layers.Layer):
def __init__(self,
name,
feature_extractor,
bb_regressor,
bb_regressor_layer,
extractor_grad=True):
"""
:param feature_extractor: backbone
:param bb_regressor: IOUnet
:param bb_regressor_layer: list, which layer is used in IOUnet,
:param extractor_grad: default is True
"""
super(ATOMnet, self).__init__(name)
self.feature_extractor = feature_extractor
self.bb_regressor = bb_regressor
self.bb_regressor_layer = bb_regressor_layer
layers_gt = ['block0', 'block1', 'block2', 'block3', 'fc']
if bb_regressor_layer is not None:
for key in bb_regressor_layer:
assert key in layers_gt
else:
raise ValueError("bb_regressor_layer can only be one of :",
layers_gt)
def forward(self, train_imgs, test_imgs, train_bb, test_proposals):
num_sequences = train_imgs.shape[-4]
num_train_images = train_imgs.shape[0] if len(
train_imgs.shape) == 5 else 1
num_test_images = test_imgs.shape[0] if len(test_imgs.shape) == 5 else 1
if len(train_imgs.shape) == 5:
train_imgs = fluid.layers.reshape(
train_imgs, [-1, *list(train_imgs.shape)[-3:]])
test_imgs = fluid.layers.reshape(test_imgs,
[-1, *list(test_imgs.shape)[-3:]])
train_feat = self.extract_backbone_features(train_imgs)
test_feat = self.extract_backbone_features(test_imgs)
# For clarity, send the features to bb_regressor in sequenceform, i.e. [sequence, batch, feature, row, col]
train_feat_iou = [
fluid.layers.reshape(feat, (num_train_images, num_sequences,
*feat.shape[-3:]))
for feat in train_feat.values()
]
test_feat_iou = [
fluid.layers.reshape(feat, (num_test_images, num_sequences,
*feat.shape[-3:]))
for feat in test_feat.values()
]
# Obtain iou prediction
iou_pred = self.bb_regressor(train_feat_iou, test_feat_iou, train_bb,
test_proposals)
return iou_pred
def extract_backbone_features(self, im, layers=None):
if layers is None:
layers = self.bb_regressor_layer
return self.feature_extractor(im, layers)
def extract_features(self, im, layers):
return self.feature_extractor(im, layers)
def atom_resnet18(iou_input_dim=(256, 256),
iou_inter_dim=(256, 256),
backbone_pretrained=True,
backbone_is_test=False,
iounet_is_test=False):
backbone = resnet18(
'ResNet18', is_test=backbone_is_test, pretrained=backbone_pretrained)
iou_predictor = AtomIouNet(
'IOUnet',
pred_input_dim=iou_input_dim,
pred_inter_dim=iou_inter_dim,
is_test=iounet_is_test)
model = ATOMnet(
'ATOM',
feature_extractor=backbone,
bb_regressor=iou_predictor,
bb_regressor_layer=['block1', 'block2'],
extractor_grad=False)
return model
def atom_resnet50(iou_input_dim=(256, 256),
iou_inter_dim=(256, 256),
backbone_pretrained=True,
backbone_is_test=False,
iounet_is_test=False):
backbone = resnet50(
'ResNet50', is_test=backbone_is_test, pretrained=backbone_pretrained)
iou_predictor = AtomIouNet(
'IOUnet',
input_dim=(512, 1024),
pred_input_dim=iou_input_dim,
pred_inter_dim=iou_inter_dim,
is_test=iounet_is_test)
model = ATOMnet(
'ATOM',
feature_extractor=backbone,
bb_regressor=iou_predictor,
bb_regressor_layer=['block1', 'block2'],
extractor_grad=False)
return model
if __name__ == '__main__':
import numpy as np
a = np.random.uniform(-1, 1, [1, 3, 144, 144]).astype(np.float32)
b = np.random.uniform(-1, 1, [1, 3, 144, 144]).astype(np.float32)
bbox = [[3, 4, 10, 11]]
proposal_bbox = [[4, 5, 11, 12] * 16]
bbox = np.reshape(np.array(bbox), [1, 1, 4]).astype(np.float32)
proposal_bbox = np.reshape(np.array(proposal_bbox),
[1, 16, 4]).astype(np.float32)
with fluid.dygraph.guard():
a_pd = fluid.dygraph.to_variable(a)
b_pd = fluid.dygraph.to_variable(b)
bbox_pd = fluid.dygraph.to_variable(bbox)
proposal_bbox_pd = fluid.dygraph.to_variable(proposal_bbox)
model = atom_resnet50()
res = model(a_pd, b_pd, bbox_pd, proposal_bbox_pd)
params = model.state_dict()
for v in params:
print(v)
"""
the implementation of ATOM iou net
"""
import paddle
import paddle.fluid as fluid
import paddle.fluid.dygraph.nn as nn
import numpy as np
import os.path as osp
import sys
CURRENT_DIR = osp.dirname(__file__)
sys.path.append(osp.join(CURRENT_DIR, '..', '..', '..'))
def weight_init():
init = fluid.initializer.MSRAInitializer(uniform=False)
param = fluid.ParamAttr(initializer=init)
return param
def bias_init():
init = fluid.initializer.ConstantInitializer(value=0.)
param = fluid.ParamAttr(initializer=init)
return param
def norm_weight_init():
# init = fluid.initializer.ConstantInitializer(1.0)
init = fluid.initializer.Uniform(low=0., high=1.)
param = fluid.ParamAttr(initializer=init)
return param
def norm_bias_init():
init = fluid.initializer.ConstantInitializer(value=0.)
param = fluid.ParamAttr(initializer=init)
return param
class ConvBNReluLayer(fluid.dygraph.Layer):
def __init__(self,
in_channels,
out_channels,
filter_size,
stride=1,
groups=1,
padding=1,
is_test=False):
super(ConvBNReluLayer, self).__init__()
self.conv = nn.Conv2D(
num_channels=in_channels,
filter_size=filter_size,
num_filters=out_channels,
stride=stride,
padding=padding,
groups=groups,
bias_attr=bias_init(),
param_attr=weight_init())
self.bn = nn.BatchNorm(
out_channels,
param_attr=norm_weight_init(),
bias_attr=norm_bias_init(),
act=None,
momentum=0.9,
use_global_stats=is_test)
def forward(self, inputs):
res = self.conv(inputs)
self.conv_res = res
res = self.bn(res)
res = fluid.layers.relu(res)
return res
class FCBNReluLayer(fluid.dygraph.Layer):
def __init__(self,
in_channels,
out_channels,
in_size,
is_bias=True,
is_bn=True,
is_relu=True,
is_test=False):
super(FCBNReluLayer, self).__init__()
self.is_bn = is_bn
self.is_relu = is_relu
if is_bias:
bias_init = fluid.ParamAttr(
initializer=fluid.initializer.ConstantInitializer(0.))
else:
bias_init = False
self.linear = nn.Linear(
in_channels * in_size * in_size, out_channels, bias_attr=bias_init)
self.bn = nn.BatchNorm(
out_channels,
param_attr=norm_weight_init(),
bias_attr=norm_bias_init(),
act=None,
momentum=0.9,
use_global_stats=is_test)
def forward(self, x):
x = fluid.layers.reshape(x, [x.shape[0], -1])
x = self.linear(x)
if self.is_bn:
x = self.bn(x)
if self.is_relu:
x = fluid.layers.relu(x)
return x
class AtomIouNet(fluid.dygraph.Layer):
def __init__(self,
name,
input_dim=(128, 256),
pred_input_dim=(256, 256),
pred_inter_dim=(256, 256),
is_test=False):
super(AtomIouNet, self).__init__(name)
self.name = self.full_name()
self.conv3_1r = ConvBNReluLayer(
input_dim[0], 128, filter_size=3, stride=1, is_test=is_test)
self.conv3_1t = ConvBNReluLayer(
input_dim[0], 256, filter_size=3, stride=1, is_test=is_test)
self.conv3_2t = ConvBNReluLayer(
256, pred_input_dim[0], filter_size=3, stride=1, is_test=is_test)
self.fc3_1r = ConvBNReluLayer(
128, 256, filter_size=3, stride=1, padding=0, is_test=is_test)
self.conv4_1r = ConvBNReluLayer(
input_dim[1], 256, filter_size=3, stride=1, is_test=is_test)
self.conv4_1t = ConvBNReluLayer(
input_dim[1], 256, filter_size=3, stride=1, is_test=is_test)
self.conv4_2t = ConvBNReluLayer(
256, pred_input_dim[1], filter_size=3, stride=1, is_test=is_test)
self.fc34_3r = ConvBNReluLayer(
512,
pred_input_dim[0],
filter_size=1,
stride=1,
padding=0,
is_test=is_test)
self.fc34_4r = ConvBNReluLayer(
512,
pred_input_dim[1],
filter_size=1,
stride=1,
padding=0,
is_test=is_test)
self.fc3_rt = FCBNReluLayer(
pred_input_dim[0], pred_inter_dim[0], in_size=5, is_test=is_test)
self.fc4_rt = FCBNReluLayer(
pred_input_dim[1], pred_inter_dim[1], in_size=3, is_test=is_test)
bias_init = fluid.initializer.ConstantInitializer(0.)
self.iou_predictor = nn.Linear(
pred_inter_dim[0] + pred_inter_dim[1], 1, bias_attr=bias_init)
self.outs = {}
def predict_iou(self, filter, feat2, proposals):
"""
predicts IOU for the given proposals
:param modulation: Modulation vectors for the targets. Dims (batch, feature_dim).
:param feat: IoU features (from get_iou_feat) for test images. Dims (batch, feature_dim, H, W).
:param proposals: Proposal boxes for which the IoU will be predicted (batch, num_proposals, 4).
:return:
"""
fc34_3_r, fc34_4_r = filter
c3_t, c4_t = feat2
batch_size = c3_t.shape[0]
# Modulation
c3_t_att = c3_t * fluid.layers.reshape(fc34_3_r, [batch_size, -1, 1, 1])
c4_t_att = c4_t * fluid.layers.reshape(fc34_4_r, [batch_size, -1, 1, 1])
# add batch roi nums
num_proposals_per_batch = proposals.shape[1]
batch_roi_nums = np.array([num_proposals_per_batch] *
batch_size).astype(np.int64)
batch_roi_nums = fluid.dygraph.to_variable(batch_roi_nums)
# input proposals2 is in format xywh, convert it to x0y0x1y1 format
proposals_xyxy = fluid.layers.concat(
[
proposals[:, :, 0:2],
proposals[:, :, 0:2] + proposals[:, :, 2:4]
],
axis=2)
roi2 = fluid.layers.reshape(proposals_xyxy, [-1, 4])
roi2.stop_gradient = False
roi3t = fluid.layers.prroi_pool(
c3_t_att, roi2, 1 / 8., 5, 5, batch_roi_nums=batch_roi_nums)
roi4t = fluid.layers.prroi_pool(
c4_t_att, roi2, 1 / 16., 3, 3, batch_roi_nums=batch_roi_nums)
fc3_rt = self.fc3_rt(roi3t)
fc4_rt = self.fc4_rt(roi4t)
fc34_rt_cat = fluid.layers.concat([fc3_rt, fc4_rt], axis=1)
iou_pred = self.iou_predictor(fc34_rt_cat)
iou_pred = fluid.layers.reshape(iou_pred,
[batch_size, num_proposals_per_batch])
return iou_pred
def forward(self, feat1, feat2, bb1, proposals2):
"""Runs the ATOM IoUNet during training operation.
This forward pass is mainly used for training. Call the individual functions during tracking instead.
args:
feat1: Variable, Features from the reference frames (4 or 5 dims).
feat2: Variable, Features from the test frames (4 or 5 dims).
bb1: Target boxes (x,y,x2,y2) in image coords in the reference samples. Dims (images, sequences, 4).
proposals2: Proposal boxes for which the IoU will be predicted (images, sequences, num_proposals, 4)."""
assert len(feat1[0].shape) == 5, 'Expect 5 dimensional feat1'
num_test_images = feat2[0].shape[0]
batch_size = feat2[0].shape[1]
# Extract first train sample
feat1 = [f[0] for f in feat1]
bb1 = bb1[0]
# Get modulation vector
modulation = self.get_filter(feat1, bb1)
feat2 = [
fluid.layers.reshape(f,
(batch_size * num_test_images, *f.shape[-3:]))
for f in feat2
]
iou_feat = self.get_iou_feat(feat2)
new_modulation = []
for i in range(0, len(modulation)):
tmp = modulation[i]
tmp = fluid.layers.reshape(tmp, [1, batch_size, -1])
tmp = fluid.layers.expand(tmp, [num_test_images, 1, 1])
tmp = fluid.layers.reshape(tmp, [batch_size * num_test_images, -1])
new_modulation.append(tmp)
proposals2 = fluid.layers.reshape(
proposals2, [batch_size * num_test_images, -1, 4])
pred_iou = self.predict_iou(new_modulation, iou_feat, proposals2)
pred_iou = fluid.layers.reshape(pred_iou,
[num_test_images, batch_size, -1])
return pred_iou
def get_filter(self, feat1, bb1):
"""
get modulation feature [feature1, feature2] for the targets
:param feat1: variable, Backbone features from reference images. shapes (batch, feature_dim, H, W).
:param bb1: variable, Target boxes (x,y,w,h) in image coords in the reference samples. shapes (batch, 4).
:return:
"""
feat3_r, feat4_r = feat1
c3_r = self.conv3_1r(feat3_r)
# Add batch_index to rois
batch_size = bb1.shape[0]
batch_roi_nums = np.array([1] * batch_size).astype(np.int64)
batch_roi_nums = fluid.dygraph.to_variable(batch_roi_nums)
# input bb is in format xywh, convert it to x0y0x1y1 format
roi1 = fluid.layers.concat(
[bb1[:, 0:2], bb1[:, 0:2] + bb1[:, 2:4]], axis=1)
roi1.stop_gradient = False
roi3r = fluid.layers.prroi_pool(c3_r, roi1, 1 / 8., 3, 3,
batch_roi_nums)
c4_r = self.conv4_1r(feat4_r)
roi4r = fluid.layers.prroi_pool(c4_r, roi1, 1 / 16., 1, 1,
batch_roi_nums)
fc3_r = self.fc3_1r(roi3r)
# Concatenate
fc34_r = fluid.layers.concat([fc3_r, roi4r], axis=1)
fc34_3_r = self.fc34_3r(fc34_r)
fc34_4_r = self.fc34_4r(fc34_r)
return fc34_3_r, fc34_4_r
def get_iou_feat(self, feat2):
"""
Get IoU prediction features from a 4 or 5 dimensional backbone input.
:param feat2: variable, Backbone features from reference images. [feature1, feature2]
:return: features, variable
"""
feat3_t, feat4_t = feat2
c3_t = self.conv3_2t(self.conv3_1t(feat3_t))
c4_t = self.conv4_2t(self.conv4_1t(feat4_t))
return c3_t, c4_t
def atom_iounet(name,
input_dim=(128, 256),
pred_input_dim=(256, 256),
pred_inter_dim=(256, 256)):
return AtomIouNet(
name,
input_dim=input_dim,
pred_input_dim=pred_input_dim,
pred_inter_dim=pred_inter_dim)
def test_paddle_iounet():
a = np.random.uniform(-1, 1, [1, 1, 512, 18, 18]).astype(np.float32)
b = np.random.uniform(-1, 1, [1, 1, 1024, 9, 9]).astype(np.float32)
bbox = [[3, 4, 10, 11]]
proposal_bbox = [[4, 5, 11, 12] * 16]
bbox = np.reshape(np.array(bbox), [1, 1, 4]).astype(np.float32)
proposal_bbox = np.reshape(np.array(proposal_bbox),
[1, 16, 4]).astype(np.float32)
with fluid.dygraph.guard():
a_pd = fluid.dygraph.to_variable(a)
b_pd = fluid.dygraph.to_variable(b)
bbox_pd = fluid.dygraph.to_variable(bbox)
proposal_bbox_pd = fluid.dygraph.to_variable(proposal_bbox)
feat1 = [a_pd, b_pd]
feat2 = [a_pd, b_pd]
model = AtomIouNet('IOUNet', input_dim=(512, 1024))
res = model(feat1, feat2, bbox_pd, proposal_bbox_pd)
print(res.shape)
params = model.state_dict()
for v in params:
print(v, '\t', params[v].shape)
print(len(params))
if __name__ == '__main__':
test_paddle_iounet()
from .target_estimator_net import SiamFCEstimator
from paddle import fluid
from paddle.fluid import dygraph
import ltr.models.siamese.target_estimator_net as tgt_estimator
class SiamNet(dygraph.layers.Layer):
def __init__(self,
name,
feature_extractor,
target_estimator,
target_estimator_layer,
extractor_grad=True):
"""
:param feature_extractor: backbone
:param target_estimator: headers
:param target_estimator_layer: list, which layer is used in header,
:param extractor_grad: default is True
"""
super(SiamNet, self).__init__(name)
self.feature_extractor = feature_extractor
self.target_estimator = target_estimator
self.target_estimator_layer = target_estimator_layer
def forward(self, train_imgs, test_imgs):
# extract backbone features
if len(train_imgs.shape) == 5:
train_imgs = fluid.layers.reshape(
train_imgs, [-1, *list(train_imgs.shape)[-3:]])
test_imgs = fluid.layers.reshape(test_imgs,
[-1, *list(test_imgs.shape)[-3:]])
train_feat = self.extract_backbone_features(train_imgs)
test_feat = self.extract_backbone_features(test_imgs)
train_feat = [feat for feat in train_feat.values()]
test_feat = [feat for feat in test_feat.values()]
# Obtain target estimation
targets = self.target_estimator(train_feat, test_feat)
return targets
def extract_backbone_features(self, im, layers=None):
if layers is None:
layers = self.target_estimator_layer
return self.feature_extractor(im, layers)
def extract_features(self, im, layers):
return self.feature_extractor(im, layers)
def siamfc_alexnet(backbone_pretrained=False,
backbone_is_test=False,
estimator_is_test=False):
from ltr.models.backbone.sfc_alexnet import SFC_AlexNet
backbone_net = SFC_AlexNet('AlexNet', is_test=backbone_is_test)
target_estimator = tgt_estimator.SiamFCEstimator('CenterEstimator')
model = SiamNet(
'SiamFC',
backbone_net,
target_estimator,
['conv5'], )
return model
from paddle import fluid
from paddle.fluid import dygraph
from paddle.fluid.dygraph import nn
from pytracking.libs.Fconv2d import Conv2D
class SiamFCEstimator(dygraph.layers.Layer):
def __init__(self, name):
super().__init__(name)
init_w = fluid.ParamAttr(
name="a_weight",
initializer=fluid.initializer.ConstantInitializer(0.001),
learning_rate=0.,
trainable=False)
init_b = fluid.ParamAttr(
name="a_bias",
initializer=fluid.initializer.ConstantInitializer(0.),
trainable=True)
self.adjust_conv = nn.Conv2D(
1, 1, 1, 1, 0, param_attr=init_w, bias_attr=init_b)
def forward(self, exemplar, instance):
exemplar_f = self.get_reference(exemplar)
instance_f = self.get_search_feat(instance)
score_map = self.estimate(exemplar_f, instance_f)
return score_map
def get_reference(self, feat):
# remove list warp
return feat[0]
def get_search_feat(self, feat):
# remove list warp
return feat[0]
def estimate(self, exemplar, instance):
shape = instance.shape
instance = fluid.layers.reshape(
instance, shape=[1, -1, shape[2], shape[3]])
cross_conv = Conv2D(stride=1, padding=0, dilation=1, groups=shape[0])
score_map = cross_conv(instance, exemplar)
score_map = fluid.layers.transpose(score_map, [1, 0, 2, 3])
score_map = self.adjust_conv(score_map)
return score_map
import os
import sys
import argparse
import importlib
import multiprocessing
import paddle
import cv2 as cv
env_path = os.path.join(os.path.dirname(__file__), '..')
if env_path not in sys.path:
sys.path.append(env_path)
import ltr.admin.settings as ws_settings
def run_training(train_module, train_name):
"""Run a train scripts in train_settings.
args:
train_module: Name of module in the "train_settings/" folder.
train_name: Name of the train settings file.
"""
# set single threads in opencv
cv.setNumThreads(0)
print('Training: {} {}'.format(train_module, train_name))
settings = ws_settings.Settings()
if settings.env.workspace_dir == '':
raise Exception('Setup your workspace_dir in "ltr/admin/local.py".')
settings.module_name = train_module
settings.script_name = train_name
settings.project_path = 'ltr/{}/{}'.format(train_module, train_name)
expr_module = importlib.import_module('ltr.train_settings.{}.{}'.format(
train_module, train_name))
expr_func = getattr(expr_module, 'run')
expr_func(settings)
def main():
parser = argparse.ArgumentParser(
description='Run a train scripts in train_settings.')
parser.add_argument(
'train_module',
type=str,
help='Name of module in the "train_settings/" folder.')
parser.add_argument(
'train_name', type=str, help='Name of the train settings file.')
args = parser.parse_args()
run_training(args.train_module, args.train_name)
if __name__ == '__main__':
multiprocessing.set_start_method('spawn', force=True)
main()
import paddle.fluid as fluid
import paddle.fluid.dygraph as dygraph
import ltr.actors as actors
import ltr.data.transforms as dltransforms
from ltr.data import processing, sampler, loader
from ltr.dataset import ImagenetVID, MSCOCOSeq, Lasot, Got10k
from ltr.models.bbreg.atom import atom_resnet50, atom_resnet18
from ltr.trainers import LTRTrainer
def run(settings):
# Most common settings are assigned in the settings struct
settings.description = 'ATOM IoUNet with ResNet18 backbone and trained with vid, lasot, coco.'
settings.print_interval = 1 # How often to print loss and other info
settings.batch_size = 64 # Batch size
settings.num_workers = 4 # Number of workers for image loading
settings.normalize_mean = [0.485, 0.456, 0.406
] # Normalize mean (default ImageNet values)
settings.normalize_std = [0.229, 0.224,
0.225] # Normalize std (default ImageNet values)
settings.search_area_factor = 5.0 # Image patch size relative to target size
settings.feature_sz = 18 # Size of feature map
settings.output_sz = settings.feature_sz * 16 # Size of input image patches
# Settings for the image sample and proposal generation
settings.center_jitter_factor = {'train': 0, 'test': 4.5}
settings.scale_jitter_factor = {'train': 0, 'test': 0.5}
settings.proposal_params = {
'min_iou': 0.1,
'boxes_per_frame': 16,
'sigma_factor': [0.01, 0.05, 0.1, 0.2, 0.3]
}
# Train datasets
vid_train = ImagenetVID()
lasot_train = Lasot(split='train')
coco_train = MSCOCOSeq()
# Validation datasets
got10k_val = Got10k(split='val')
# The joint augmentation transform, that is applied to the pairs jointly
transform_joint = dltransforms.ToGrayscale(probability=0.05)
# The augmentation transform applied to the training set (individually to each image in the pair)
transform_train = dltransforms.Compose([
dltransforms.ToArrayAndJitter(0.2), dltransforms.Normalize(
mean=settings.normalize_mean, std=settings.normalize_std)
])
# The augmentation transform applied to the validation set (individually to each image in the pair)
transform_val = dltransforms.Compose([
dltransforms.ToArray(), dltransforms.Normalize(
mean=settings.normalize_mean, std=settings.normalize_std)
])
# Data processing to do on the training pairs
data_processing_train = processing.ATOMProcessing(
search_area_factor=settings.search_area_factor,
output_sz=settings.output_sz,
center_jitter_factor=settings.center_jitter_factor,
scale_jitter_factor=settings.scale_jitter_factor,
mode='sequence',
proposal_params=settings.proposal_params,
transform=transform_train,
joint_transform=transform_joint)
# Data processing to do on the validation pairs
data_processing_val = processing.ATOMProcessing(
search_area_factor=settings.search_area_factor,
output_sz=settings.output_sz,
center_jitter_factor=settings.center_jitter_factor,
scale_jitter_factor=settings.scale_jitter_factor,
mode='sequence',
proposal_params=settings.proposal_params,
transform=transform_val,
joint_transform=transform_joint)
# The sampler for training
dataset_train = sampler.ATOMSampler(
[vid_train, lasot_train, coco_train], [1, 1, 1],
samples_per_epoch=1000 * settings.batch_size,
max_gap=50,
processing=data_processing_train)
# The loader for training
train_loader = loader.LTRLoader(
'train',
dataset_train,
training=True,
batch_size=settings.batch_size,
num_workers=4,
stack_dim=1)
# The sampler for validation
dataset_val = sampler.ATOMSampler(
[got10k_val], [1, ],
samples_per_epoch=500 * settings.batch_size,
max_gap=50,
processing=data_processing_val)
# The loader for validation
val_loader = loader.LTRLoader(
'val',
dataset_val,
training=False,
batch_size=settings.batch_size,
epoch_interval=5,
num_workers=4,
stack_dim=1)
# creat network, set objective, creat optimizer, learning rate scheduler, trainer
with dygraph.guard():
# Create network
net = atom_resnet18(backbone_pretrained=True)
# Freeze backbone
state_dicts = net.state_dict()
for k in state_dicts.keys():
if 'feature_extractor' in k and "running" not in k:
state_dicts[k].stop_gradient = True
# Set objective
objective = fluid.layers.square_error_cost
# Create actor, which wraps network and objective
actor = actors.AtomActor(net=net, objective=objective)
# Set to training mode
actor.train()
# define optimizer and learning rate
gama = 0.2
lr = 1e-3
lr_scheduler = fluid.dygraph.PiecewiseDecay(
[15, 30, 45],
values=[lr, lr * gama, lr * gama * gama],
step=1000,
begin=0)
optimizer = fluid.optimizer.Adam(
parameter_list=net.bb_regressor.parameters(),
learning_rate=lr_scheduler)
trainer = LTRTrainer(actor, [train_loader, val_loader], optimizer,
settings, lr_scheduler)
trainer.train(40, load_latest=False, fail_safe=False)
from .base_trainer import BaseTrainer
from .ltr_trainer import LTRTrainer
此差异已折叠。
此差异已折叠。
import importlib
import os
class EnvSettings:
def __init__(self):
pytracking_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
self.results_path = '{}/tracking_results/'.format(pytracking_path)
self.network_path = '{}/networks/'.format(pytracking_path)
self.dataset_path = '{}/benchmark_datasets/'.format(pytracking_path)
def create_default_local_file():
comment = {'results_path': 'Where to store tracking results',
'dataset_path': 'Where benchmark datasets are stored',
'network_path': 'Where tracking networks are stored.'}
path = os.path.join(os.path.dirname(__file__), 'local.py')
with open(path, 'w') as f:
settings = EnvSettings()
f.write('from pytracking.admin.environment import EnvSettings\n\n')
f.write('def local_env_settings():\n')
f.write(' settings = EnvSettings()\n\n')
f.write(' # Set your local paths here.\n\n')
for attr in dir(settings):
comment_str = None
if attr in comment:
comment_str = comment[attr]
attr_val = getattr(settings, attr)
if not attr.startswith('__') and not callable(attr_val):
if comment_str is None:
f.write(' settings.{} = \'{}\'\n'.format(attr, attr_val))
else:
f.write(' settings.{} = \'{}\' # {}\n'.format(attr, attr_val, comment_str))
f.write('\n return settings\n\n')
def env_settings():
env_module_name = 'pytracking.admin.local'
try:
env_module = importlib.import_module(env_module_name)
return env_module.local_env_settings()
except:
env_file = os.path.join(os.path.dirname(__file__), 'local.py')
# Create a default file
create_default_local_file()
raise RuntimeError('YOU HAVE NOT SETUP YOUR local.py!!!\n Go to "{}" and set all the paths you need. '
'Then try to run again.'.format(env_file))
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
from .tensorlist import TensorList
from .tensordict import TensorDict
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
from .atom import ATOM
def get_tracker_class():
return ATOM
此差异已折叠。
此差异已折叠。
from .siamfc import SiamFC
def get_tracker_class():
return SiamFC
此差异已折叠。
# from .evaluation import *
from .params import *
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册