# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve. # #Licensed under the Apache License, Version 2.0 (the "License"); #you may not use this file except in compliance with the License. #You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. import os import random import time import multiprocessing import numpy as np import cv2 import logging from . import nonlocal_video_io from .reader_utils import DataReader logger = logging.getLogger(__name__) class NonlocalReader(DataReader): """ Data reader for kinetics dataset, which read mp4 file and decode into numpy. This is for nonlocal neural network model. cfg: num_classes num_reader_threads image_mean image_std batch_size list crop_size sample_rate video_length jitter_scales Test only cfg: num_test_clips use_multi_crop """ def __init__(self, name, phase, cfg): self.name = name self.phase = phase self.cfg = cfg def create_reader(self): cfg = self.cfg assert cfg['num_reader_threads'] >=1, \ "number of reader threads({}) should be a positive integer".format(cfg['num_reader_threads']) if cfg['num_reader_threads'] == 1: reader_func = make_reader else: reader_func = make_multi_reader dataset_args = {} dataset_args['image_mean'] = cfg['image_mean'] dataset_args['image_std'] = cfg['image_std'] dataset_args['crop_size'] = cfg['crop_size'] dataset_args['sample_rate'] = cfg['sample_rate'] dataset_args['video_length'] = cfg['video_length'] dataset_args['min_size'] = cfg['jitter_scales'][0] dataset_args['max_size'] = cfg['jitter_scales'][1] dataset_args['num_reader_threads'] = cfg['num_reader_threads'] if self.phase == 'train': sample_times = 1 return reader_func(cfg['list'], cfg['batch_size'], sample_times, True, True, **dataset_args) elif self.phase == 'valid': sample_times = 1 return reader_func(cfg['list'], cfg['batch_size'], sample_times, False, False, **dataset_args) elif self.phase == 'test': sample_times = cfg['num_test_clips'] if cfg['use_multi_crop'] == 1: sample_times = int(sample_times / 3) if cfg['use_multi_crop'] == 2: sample_times = int(sample_times / 6) return reader_func(cfg['list'], cfg['batch_size'], sample_times, False, False, **dataset_args) else: logger.info('Not implemented') raise def apply_resize(rgbdata, min_size, max_size): length, height, width, channel = rgbdata.shape ratio = 1.0 # generate random scale between [min_size, max_size] if min_size == max_size: side_length = min_size else: side_length = np.random.randint(min_size, max_size) if height > width: ratio = float(side_length) / float(width) else: ratio = float(side_length) / float(height) out_height = int(height * ratio) out_width = int(width * ratio) outdata = np.zeros( (length, out_height, out_width, channel), dtype=rgbdata.dtype) for i in range(length): outdata[i] = cv2.resize(rgbdata[i], (out_width, out_height)) return outdata def crop_mirror_transform(rgbdata, mean, std, cropsize=224, use_mirror=True, center_crop=False, spatial_pos=-1): channel, length, height, width = rgbdata.shape assert height >= cropsize, "crop size should not be larger than video height" assert width >= cropsize, "crop size should not be larger than video width" # crop to specific scale if center_crop: h_off = int((height - cropsize) / 2) w_off = int((width - cropsize) / 2) if spatial_pos >= 0: now_pos = spatial_pos % 3 if h_off > 0: h_off = h_off * now_pos else: w_off = w_off * now_pos else: h_off = np.random.randint(0, height - cropsize) w_off = np.random.randint(0, width - cropsize) outdata = np.zeros( (channel, length, cropsize, cropsize), dtype=rgbdata.dtype) outdata[:, :, :, :] = rgbdata[:, :, h_off:h_off + cropsize, w_off:w_off + cropsize] # apply mirror mirror_indicator = (np.random.rand() > 0.5) mirror_me = use_mirror and mirror_indicator if spatial_pos > 0: mirror_me = (int(spatial_pos / 3) > 0) if mirror_me: outdata = outdata[:, :, :, ::-1] # substract mean and divide std outdata = outdata.astype(np.float32) outdata = (outdata - mean) / std return outdata def make_reader(filelist, batch_size, sample_times, is_training, shuffle, **dataset_args): # should add smaple_times param fl = open(filelist).readlines() fl = [line.strip() for line in fl if line.strip() != ''] if shuffle: random.shuffle(fl) def reader(): batch_out = [] for line in fl: # start_time = time.time() line_items = line.split(' ') fn = line_items[0] label = int(line_items[1]) if len(line_items) > 2: start_frm = int(line_items[2]) spatial_pos = int(line_items[3]) in_sample_times = sample_times else: start_frm = -1 spatial_pos = -1 in_sample_times = 1 # print('label = ', label) label = np.array([label]).astype(np.int64) # 1, get rgb data for fixed length of frames try: rgbdata = nonlocal_video_io.video_fast_get_frame(fn, \ sampling_rate = dataset_args['sample_rate'], length = dataset_args['video_length'], \ start_frm = start_frm, sample_times = in_sample_times) except: logger.info('Error when loading {}, just skip this file'.format( fn)) continue # add prepocessing # 2, reszie to randomly scale between [min_size, max_size] when training, or cgf.TEST.SCALE when inference min_size = dataset_args['min_size'] max_size = dataset_args['max_size'] rgbdata = apply_resize(rgbdata, min_size, max_size) # transform [length, height, width, channel] to [channel, length, height, width] rgbdata = np.transpose(rgbdata, [3, 0, 1, 2]) # 3 crop, mirror and transform rgbdata = crop_mirror_transform(rgbdata, mean = dataset_args['image_mean'], \ std = dataset_args['image_std'], cropsize = dataset_args['crop_size'], \ use_mirror = is_training, center_crop = (not is_training), \ spatial_pos = spatial_pos) batch_out.append((rgbdata, label)) #elapsed_time = time.time() - start_time #print('read item time ', elapsed_time) if len(batch_out) == batch_size: yield batch_out batch_out = [] return reader def make_multi_reader(filelist, batch_size, sample_times, is_training, shuffle, **dataset_args): fl = open(filelist).readlines() fl = [line.strip() for line in fl if line.strip() != ''] if shuffle: random.shuffle(fl) n = dataset_args['num_reader_threads'] queue_size = 20 reader_lists = [None] * n file_num = int(len(fl) // n) for i in range(n): if i < len(reader_lists) - 1: tmp_list = fl[i * file_num:(i + 1) * file_num] else: tmp_list = fl[i * file_num:] reader_lists[i] = tmp_list def read_into_queue(flq, queue): batch_out = [] for line in flq: line_items = line.split(' ') fn = line_items[0] label = int(line_items[1]) if len(line_items) > 2: start_frm = int(line_items[2]) spatial_pos = int(line_items[3]) in_sample_times = sample_times else: start_frm = -1 spatial_pos = -1 in_sample_times = 1 label = np.array([label]).astype(np.int64) # 1, get rgb data for fixed length of frames try: rgbdata = nonlocal_video_io.video_fast_get_frame(fn, \ sampling_rate = dataset_args['sample_rate'], length = dataset_args['video_length'], \ start_frm = start_frm, sample_times = in_sample_times) except: logger.info('Error when loading {}, just skip this file'.format( fn)) continue # add prepocessing # 2, reszie to randomly scale between [min_size, max_size] when training, or cgf.TEST.SCALE when inference min_size = dataset_args['min_size'] max_size = dataset_args['max_size'] rgbdata = apply_resize(rgbdata, min_size, max_size) # transform [length, height, width, channel] to [channel, length, height, width] rgbdata = np.transpose(rgbdata, [3, 0, 1, 2]) # 3 crop, mirror and transform rgbdata = crop_mirror_transform(rgbdata, mean = dataset_args['image_mean'], \ std = dataset_args['image_std'], cropsize = dataset_args['crop_size'], \ use_mirror = is_training, center_crop = (not is_training), \ spatial_pos = spatial_pos) batch_out.append((rgbdata, label)) if len(batch_out) == batch_size: queue.put(batch_out) batch_out = [] queue.put(None) def queue_reader(): queue = multiprocessing.Queue(queue_size) p_list = [None] * len(reader_lists) # for reader_list in reader_lists: for i in range(len(reader_lists)): reader_list = reader_lists[i] p_list[i] = multiprocessing.Process( target=read_into_queue, args=(reader_list, queue)) p_list[i].start() reader_num = len(reader_lists) finish_num = 0 while finish_num < reader_num: sample = queue.get() if sample is None: finish_num += 1 else: yield sample for i in range(len(p_list)): p_list[i].terminate() p_list[i].join() return queue_reader