# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os import imghdr import cv2 from paddle import fluid def initial_logger(): FORMAT = '%(asctime)s-%(levelname)s: %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) logger = logging.getLogger(__name__) return logger import importlib def create_module(module_str): tmpss = module_str.split(",") assert len(tmpss) == 2, "Error formate\ of the module path: {}".format(module_str) module_name, function_name = tmpss[0], tmpss[1] somemodule = importlib.import_module(module_name, __package__) function = getattr(somemodule, function_name) return function def get_check_global_params(mode): check_params = ['use_gpu', 'max_text_length', 'image_shape',\ 'image_shape', 'character_type', 'loss_type'] if mode == "train_eval": check_params = check_params + [\ 'train_batch_size_per_card', 'test_batch_size_per_card'] elif mode == "test": check_params = check_params + ['test_batch_size_per_card'] return check_params def get_check_reader_params(mode): check_params = [] if mode == "train_eval": check_params = ['TrainReader', 'EvalReader'] elif mode == "test": check_params = ['TestReader'] return check_params def get_image_file_list(img_file): imgs_lists = [] if img_file is None or not os.path.exists(img_file): raise Exception("not found any img file in {}".format(img_file)) img_end = {'jpg', 'bmp', 'png', 'jpeg', 'rgb', 'tif', 'tiff', 'gif', 'GIF'} if os.path.isfile(img_file) and imghdr.what(img_file) in img_end: imgs_lists.append(img_file) elif os.path.isdir(img_file): for single_file in os.listdir(img_file): file_path = os.path.join(img_file, single_file) if imghdr.what(file_path) in img_end: imgs_lists.append(file_path) if len(imgs_lists) == 0: raise Exception("not found any img file in {}".format(img_file)) return imgs_lists def check_and_read_gif(img_path): if os.path.basename(img_path)[-3:] in ['gif', 'GIF']: gif = cv2.VideoCapture(img_path) ret, frame = gif.read() if not ret: logging.info("Cannot read {}. This gif image maybe corrupted.") return None, False if len(frame.shape) == 2 or frame.shape[-1] == 1: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) imgvalue = frame[:, :, ::-1] return imgvalue, True return None, False def create_multi_devices_program(program, loss_var_name): build_strategy = fluid.BuildStrategy() build_strategy.memory_optimize = False build_strategy.enable_inplace = True exec_strategy = fluid.ExecutionStrategy() exec_strategy.num_iteration_per_drop_scope = 1 compile_program = fluid.CompiledProgram(program).with_data_parallel( loss_name=loss_var_name, build_strategy=build_strategy, exec_strategy=exec_strategy) return compile_program