diff --git a/.gitignore b/.gitignore index 784978471ee703ed47795cc619d4a800cb19226f..206b85709a4bb3bc17007e816faf24de5ba9e366 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.pyc +.idea *.DS_Store diff --git a/plsc/__init__.py b/plsc/__init__.py index a4341c9e293aaf6368658e6ee5d6147811abd483..6ada55ac481bcb1bfb1ac1badb64080ebe44d6e0 100644 --- a/plsc/__init__.py +++ b/plsc/__init__.py @@ -13,5 +13,6 @@ # limitations under the License. from .entry import Entry +from .version import plsc_version as __version__ __all__ = ['Entry'] diff --git a/plsc/config.py b/plsc/config.py index 8b1acbf703b0f3506909b9dc54995fc437ba9606..3f6bd9f2807ad263c3aacc9e6e779cd50e93a54f 100644 --- a/plsc/config.py +++ b/plsc/config.py @@ -35,9 +35,9 @@ config.warmup_epochs = 0 config.loss_type = "dist_arcface" config.num_classes = 85742 -config.image_shape = (3,112,112) +config.image_shape = (3, 112, 112) config.margin = 0.5 config.scale = 64.0 config.lr = 0.1 -config.lr_steps = (100000,160000,220000) +config.lr_steps = (100000, 160000, 220000) config.emb_dim = 512 diff --git a/plsc/entry.py b/plsc/entry.py index efddb44bc73fd45cc09d87091dbdc25bdcdc8d40..2d38335060adebf75a3a1509e27643b7cf2d3e89 100644 --- a/plsc/entry.py +++ b/plsc/entry.py @@ -12,36 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function from __future__ import division -import os -import sys -import time -import argparse -import numpy as np +from __future__ import print_function + +import errno +import json +import logging import math -import pickle -import subprocess +import os import shutil -import logging +import subprocess +import sys import tempfile +import time +import numpy as np import paddle import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler import sklearn +from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy +from paddle.fluid.optimizer import Optimizer +from paddle.fluid.transpiler.details.program_utils import program_to_code + from . import config -from .models import resnet +from .models import DistributedClassificationOptimizer from .models import base_model -from .models.dist_algo import DistributedClassificationOptimizer +from .models import resnet +from .utils import jpeg_reader as reader from .utils.learning_rate import lr_warmup +from .utils.parameter_converter import ParameterConverter from .utils.verification import evaluate -from .utils import jpeg_reader as reader -from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy -import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.transpiler.details.program_utils import program_to_code -import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler -from paddle.fluid.optimizer import Optimizer - logging.basicConfig( level=logging.INFO, @@ -59,9 +61,6 @@ class Entry(object): """ Check the validation of parameters. """ - assert os.getenv("PADDLE_TRAINERS_NUM") is not None, \ - "Please start script using paddle.distributed.launch module." - supported_types = ["softmax", "arcface", "dist_softmax", "dist_arcface"] assert self.loss_type in supported_types, \ @@ -70,7 +69,8 @@ class Entry(object): if self.loss_type in ["dist_softmax", "dist_arcface"]: assert self.num_trainers > 1, \ - "At least 2 trainers are required to use distributed fc-layer." + "At least 2 trainers are required for distributed fc-layer. " \ + "You can start your job using paddle.distributed.launch module." def __init__(self): self.config = config.config @@ -89,6 +89,7 @@ class Entry(object): self.model = None self.train_reader = None self.test_reader = None + self.predict_reader = None self.train_program = fluid.Program() self.startup_program = fluid.Program() @@ -97,7 +98,15 @@ class Entry(object): self.fs_name = None self.fs_ugi = None - self.fs_dir = None + self.fs_dir_for_save = None + self.fs_checkpoint_dir = None + + self.param_attr = None + self.bias_attr = None + + self.has_run_train = False # Whether has run training or not + self.test_initialized = False + self.train_pass_id = -1 self.use_fp16 = False self.fp16_user_dict = None @@ -150,13 +159,13 @@ class Entry(object): def set_mixed_precision(self, use_fp16, - init_loss_scaling = 1.0, - incr_every_n_steps = 2000, - decr_every_n_nan_or_inf = 2, - incr_ratio = 2.0, - decr_ratio = 0.5, - use_dynamic_loss_scaling = True, - amp_lists = None): + init_loss_scaling=1.0, + incr_every_n_steps=2000, + decr_every_n_nan_or_inf=2, + incr_ratio=2.0, + decr_ratio=0.5, + use_dynamic_loss_scaling=True, + amp_lists=None): """ Whether to use mixed precision training. """ @@ -178,7 +187,11 @@ class Entry(object): self.global_test_batch_size = batch_size * self.num_trainers logger.info("Set test batch size to {}.".format(batch_size)) - def set_hdfs_info(self, fs_name, fs_ugi, directory): + def set_hdfs_info(self, + fs_name, + fs_ugi, + fs_dir_for_save=None, + fs_checkpoint_dir=None): """ Set the info to download from or upload to hdfs filesystems. If the information is provided, we will download pretrained @@ -187,11 +200,13 @@ class Entry(object): """ self.fs_name = fs_name self.fs_ugi = fs_ugi - self.fs_dir = directory + self.fs_dir_for_save = fs_dir_for_save + self.fs_checkpoint_dir = fs_checkpoint_dir logger.info("HDFS Info:") logger.info("\tfs_name: {}".format(fs_name)) logger.info("\tfs_ugi: {}".format(fs_ugi)) - logger.info("\tremote directory: {}".format(directory)) + logger.info("\tfs dir for save: {}".format(self.fs_dir_for_save)) + logger.info("\tfs checkpoint dir: {}".format(self.fs_checkpoint_dir)) def set_model_save_dir(self, directory): """ @@ -207,7 +222,7 @@ class Entry(object): Whether to calcuate acc1 and acc5 during training. """ self.calc_train_acc = calc - logger.info("Calcuating acc1 and acc5 during training: {}.".format( + logger.info("Calculating acc1 and acc5 during training: {}.".format( calc)) def set_dataset_dir(self, directory): @@ -237,8 +252,8 @@ class Entry(object): """ Set the size of the last hidding layer before the distributed fc-layer. """ - self.emb_size = size - logger.info("Set emb_size to {}.".format(size)) + self.emb_dim = size + logger.info("Set emb_dim to {}.".format(size)) def set_model(self, model): """ @@ -270,13 +285,13 @@ class Entry(object): self.warmup_epochs = num logger.info("Set warmup_epochs to {}.".format(num)) - def set_loss_type(self, type): + def set_loss_type(self, loss_type): supported_types = ["dist_softmax", "dist_arcface", "softmax", "arcface"] - if not type in supported_types: + if loss_type not in supported_types: raise ValueError("All supported loss types: {}".format( supported_types)) - self.loss_type = type - logger.info("Set loss_type to {}.".format(type)) + self.loss_type = loss_type + logger.info("Set loss_type to {}.".format(loss_type)) def set_image_shape(self, shape): if not isinstance(shape, (list, tuple)): @@ -286,9 +301,21 @@ class Entry(object): def set_optimizer(self, optimizer): if not isinstance(optimizer, Optimizer): - raise ValueError("Optimizer must be type of Optimizer") + raise ValueError("Optimizer must be of type Optimizer") self.optimizer = optimizer - logger.info("User manually set optimizer") + logger.info("User manually set optimizer.") + + def set_with_test(self, with_test): + self.with_test = with_test + logger.info("Set with_test to {}.".format(with_test)) + + def set_distfc_attr(self, param_attr=None, bias_attr=None): + self.param_attr = param_attr + logger.info("Set param_attr for distfc to {}.".format(self.param_attr)) + if self.bias_attr: + self.bias_attr = bias_attr + logger.info( + "Set bias_attr for distfc to {}.".format(self.bias_attr)) def _get_optimizer(self): if not self.optimizer: @@ -310,7 +337,10 @@ class Entry(object): logger.info("lr_step: {}".format(lr)) if self.warmup_epochs: lr_val = lr_warmup(fluid.layers.piecewise_decay(boundaries=bd, - values=lr), warmup_steps, start_lr, base_lr) + values=lr), + warmup_steps, + start_lr, + base_lr) else: lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) @@ -321,25 +351,30 @@ class Entry(object): if self.loss_type in ["dist_softmax", "dist_arcface"]: self.optimizer = DistributedClassificationOptimizer( - self.optimizer, global_batch_size, use_fp16=self.use_fp16, + self.optimizer, + self.train_batch_size, + use_fp16=self.use_fp16, loss_type=self.loss_type, fp16_user_dict=self.fp16_user_dict) elif self.use_fp16: self.optimizer = fluid.contrib.mixed_precision.decorate( - optimizer=optimizer, + optimizer=self.optimizer, init_loss_scaling=self.fp16_user_dict['init_loss_scaling'], incr_every_n_steps=self.fp16_user_dict['incr_every_n_steps'], - decr_every_n_nan_or_inf=self.fp16_user_dict['decr_every_n_nan_or_inf'], + decr_every_n_nan_or_inf=self.fp16_user_dict[ + 'decr_every_n_nan_or_inf'], incr_ratio=self.fp16_user_dict['incr_ratio'], decr_ratio=self.fp16_user_dict['decr_ratio'], - use_dynamic_loss_scaling=self.fp16_user_dict['use_dynamic_loss_scaling'], + use_dynamic_loss_scaling=self.fp16_user_dict[ + 'use_dynamic_loss_scaling'], amp_lists=self.fp16_user_dict['amp_lists'] - ) + ) return self.optimizer def build_program(self, is_train=True, - use_parallel_test=False): + use_parallel_test=False, + dist_strategy=None): model_name = self.model_name assert not (is_train and use_parallel_test), \ "is_train and use_parallel_test cannot be set simultaneously." @@ -357,18 +392,23 @@ class Entry(object): with fluid.program_guard(main_program, startup_program): with fluid.unique_name.guard(): image = fluid.layers.data(name='image', - shape=image_shape, dtype='float32') + shape=image_shape, + dtype='float32') label = fluid.layers.data(name='label', - shape=[1], dtype='int64') - - emb, loss, prob = model.get_output( - input=image, - label=label, - is_train=is_train, - num_classes=self.num_classes, - loss_type=self.loss_type, - margin=self.margin, - scale=self.scale) + shape=[1], + dtype='int64') + + emb, loss, prob = model.get_output(input=image, + label=label, + num_ranks=num_trainers, + rank_id=trainer_id, + is_train=is_train, + num_classes=self.num_classes, + loss_type=self.loss_type, + param_attr=self.param_attr, + bias_attr=self.bias_attr, + margin=self.margin, + scale=self.scale) acc1 = None acc5 = None @@ -377,78 +417,93 @@ class Entry(object): if self.calc_train_acc: shard_prob = loss._get_info("shard_prob") - prob_all = fluid.layers.collective._c_allgather(shard_prob, - nranks=num_trainers, use_calc_stream=True) - prob_list = fluid.layers.split(prob_all, dim=0, + prob_all = fluid.layers.collective._c_allgather( + shard_prob, + nranks=num_trainers, + use_calc_stream=True) + prob_list = fluid.layers.split( + prob_all, + dim=0, num_or_sections=num_trainers) prob = fluid.layers.concat(prob_list, axis=1) - label_all = fluid.layers.collective._c_allgather(label, - nranks=num_trainers, use_calc_stream=True) - acc1 = fluid.layers.accuracy(input=prob, label=label_all, k=1) - acc5 = fluid.layers.accuracy(input=prob, label=label_all, k=5) + label_all = fluid.layers.collective._c_allgather( + label, + nranks=num_trainers, + use_calc_stream=True) + acc1 = fluid.layers.accuracy(input=prob, + label=label_all, + k=1) + acc5 = fluid.layers.accuracy(input=prob, + label=label_all, + k=5) else: if self.calc_train_acc: - acc1 = fluid.layers.accuracy(input=prob, label=label, k=1) - acc5 = fluid.layers.accuracy(input=prob, label=label, k=5) + acc1 = fluid.layers.accuracy(input=prob, + label=label, + k=1) + acc5 = fluid.layers.accuracy(input=prob, + label=label, + k=5) optimizer = None if is_train: # initialize optimizer optimizer = self._get_optimizer() - dist_optimizer = self.fleet.distributed_optimizer( - optimizer, strategy=self.strategy) - dist_optimizer.minimize(loss) + if self.num_trainers > 1: + dist_optimizer = fleet.distributed_optimizer( + optimizer, strategy=dist_strategy) + dist_optimizer.minimize(loss) + else: # single card training + optimizer.minimize(loss) if "dist" in self.loss_type or self.use_fp16: optimizer = optimizer._optimizer elif use_parallel_test: - emb = fluid.layers.collective._c_allgather(emb, - nranks=num_trainers, use_calc_stream=True) + emb = fluid.layers.collective._c_allgather( + emb, + nranks=num_trainers, + use_calc_stream=True) return emb, loss, acc1, acc5, optimizer - - def get_files_from_hdfs(self, local_dir): + def get_files_from_hdfs(self): + assert self.fs_checkpoint_dir, \ + logger.error("Please set the fs_checkpoint_dir paramerters for " + "set_hdfs_info to get models from hdfs.") + self.fs_checkpoint_dir = os.path.join(self.fs_checkpoint_dir, '*') cmd = "hadoop fs -D fs.default.name=" cmd += self.fs_name + " " cmd += "-D hadoop.job.ugi=" cmd += self.fs_ugi + " " - cmd += "-get " + self.fs_dir - cmd += " " + local_dir + cmd += "-get " + self.fs_checkpoint_dir + cmd += " " + self.checkpoint_dir logger.info("hdfs download cmd: {}".format(cmd)) cmd = cmd.split(' ') process = subprocess.Popen(cmd, - stdout=sys.stdout, - stderr=subprocess.STDOUT) + stdout=sys.stdout, + stderr=subprocess.STDOUT) process.wait() def put_files_to_hdfs(self, local_dir): + assert self.fs_dir_for_save, \ + logger.error("Please set fs_dir_for_save paramerter " + "for set_hdfs_info to save models to hdfs.") cmd = "hadoop fs -D fs.default.name=" cmd += self.fs_name + " " cmd += "-D hadoop.job.ugi=" cmd += self.fs_ugi + " " cmd += "-put " + local_dir - cmd += " " + self.fs_dir + cmd += " " + self.fs_dir_for_save logger.info("hdfs upload cmd: {}".format(cmd)) cmd = cmd.split(' ') process = subprocess.Popen(cmd, - stdout=sys.stdout, - stderr=subprocess.STDOUT) + stdout=sys.stdout, + stderr=subprocess.STDOUT) process.wait() - def preprocess_distributed_params(self, - local_dir): + def process_distributed_params(self, local_dir): local_dir = os.path.abspath(local_dir) output_dir = tempfile.mkdtemp() - cmd = sys.executable + ' -m plsc.utils.process_distfc_parameter ' - cmd += "--nranks {} ".format(self.num_trainers) - cmd += "--num_classes {} ".format(self.num_classes) - cmd += "--pretrained_model_dir {} ".format(local_dir) - cmd += "--output_dir {}".format(output_dir) - cmd = cmd.split(' ') - logger.info("Distributed parameters processing cmd: {}".format(cmd)) - process = subprocess.Popen(cmd, - stdout=sys.stdout, - stderr=subprocess.STDOUT) - process.wait() + converter = ParameterConverter(local_dir, output_dir, self.num_trainers) + converter.process() for file in os.listdir(local_dir): if "dist@" in file and "@rank@" in file: @@ -477,7 +532,6 @@ class Entry(object): outputs={'Out': var}, attrs={'use_calc_stream': True}) - def load_checkpoint(self, executor, main_program, @@ -493,30 +547,28 @@ class Entry(object): if os.path.exists(checkpoint_dir): logger.info("Local dir {} exists, we'll overwrite it.".format( checkpoint_dir)) - shutil.rmtree(checkpoint_dir) - os.makedirs(checkpoint_dir) - - # sync all trainers to avoid loading checkpoints before - # parameters are downloaded - file_name = os.path.join(checkpoint_dir, '.lock') - if self.trainer_id == 0: - self.get_files_from_hdfs(checkpoint_dir) - with open(file_name, 'w') as f: - pass - time.sleep(10) - os.remove(file_name) - else: - while True: - if not os.path.exists(file_name): - time.sleep(1) - else: - break + # sync all trainers to avoid loading checkpoints before + # parameters are downloaded + file_name = os.path.join(checkpoint_dir, '.lock') + if self.trainer_id == 0: + self.get_files_from_hdfs() + with open(file_name, 'w') as f: + pass + time.sleep(10) + os.remove(file_name) + else: + while True: + if not os.path.exists(file_name): + time.sleep(1) + else: + break + # Preporcess distributed parameters. file_name = os.path.join(checkpoint_dir, '.lock') distributed = self.loss_type in ["dist_softmax", "dist_arcface"] if load_for_train and self.trainer_id == 0 and distributed: - self.preprocess_distributed_params(checkpoint_dir) + self.process_distributed_params(checkpoint_dir) with open(file_name, 'w') as f: pass time.sleep(10) @@ -532,11 +584,13 @@ class Entry(object): def if_exist(var): has_var = os.path.exists(os.path.join(checkpoint_dir, var.name)) if has_var: - print('var: %s found' % (var.name)) + logger.info('var: %s found' % (var.name)) return has_var - fluid.io.load_vars(executor, checkpoint_dir, predicate=if_exist, - main_program=main_program) + fluid.io.load_vars(executor, + checkpoint_dir, + predicate=if_exist, + main_program=main_program) def convert_for_prediction(self): model_name = self.model_name @@ -545,19 +599,20 @@ class Entry(object): model = self.model if model is None: model = resnet.__dict__[model_name](emb_dim=self.emb_dim) - main_program = self.train_program + main_program = self.predict_program startup_program = self.startup_program with fluid.program_guard(main_program, startup_program): with fluid.unique_name.guard(): image = fluid.layers.data(name='image', - shape=image_shape, dtype='float32') + shape=image_shape, + dtype='float32') label = fluid.layers.data(name='label', - shape=[1], dtype='int64') + shape=[1], + dtype='int64') - emb = model.build_network( - input=image, - label=label, - is_train=False) + emb = model.build_network(input=image, + label=label, + is_train=False) gpu_id = int(os.getenv("FLAGS_selected_gpus", 0)) place = fluid.CUDAPlace(gpu_id) @@ -565,8 +620,9 @@ class Entry(object): exe.run(startup_program) assert self.checkpoint_dir, "No checkpoint found for converting." - self.load_checkpoint(executor=exe, main_program=main_program, - load_for_train=False) + self.load_checkpoint(executor=exe, + main_program=main_program, + load_for_train=False) assert self.model_save_dir, \ "Does not set model_save_dir for inference model converting." @@ -582,6 +638,16 @@ class Entry(object): if self.fs_name: self.put_files_to_hdfs(self.model_save_dir) + def _set_info(self, key, value): + if not hasattr(self, '_info'): + self._info = {} + self._info[key] = value + + def _get_info(self, key): + if hasattr(self, '_info') and key in self._info: + return self._info[key] + return None + def predict(self): model_name = self.model_name image_shape = [int(m) for m in self.image_shape] @@ -594,14 +660,15 @@ class Entry(object): with fluid.program_guard(main_program, startup_program): with fluid.unique_name.guard(): image = fluid.layers.data(name='image', - shape=image_shape, dtype='float32') + shape=image_shape, + dtype='float32') label = fluid.layers.data(name='label', - shape=[1], dtype='int64') + shape=[1], + dtype='int64') - emb = model.build_network( - input=image, - label=label, - is_train=False) + emb = model.build_network(input=image, + label=label, + is_train=False) gpu_id = int(os.getenv("FLAGS_selected_gpus", 0)) place = fluid.CUDAPlace(gpu_id) @@ -609,104 +676,77 @@ class Entry(object): exe.run(startup_program) assert self.checkpoint_dir, "No checkpoint found for predicting." - self.load_checkpoint(executor=exe, main_program=main_program, - load_for_train=False) - - if self.train_reader is None: - predict_reader = paddle.batch(reader.arc_train( - self.dataset_dir, self.num_classes), - batch_size=self.train_batch_size) + self.load_checkpoint(executor=exe, + main_program=main_program, + load_for_train=False) + + if self.predict_reader is None: + predict_reader = paddle.batch(reader.arc_train(self.dataset_dir, + self.num_classes), + batch_size=self.train_batch_size) else: - predict_reader = self.train_reader + predict_reader = self.predict_reader feeder = fluid.DataFeeder(place=place, - feed_list=['image', 'label'], program=main_program) + feed_list=['image', 'label'], + program=main_program) fetch_list = [emb.name] for data in predict_reader(): - emb = exe.run(main_program, feed=feeder.feed(data), - fetch_list=fetch_list, use_program_cache=True) + emb = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=fetch_list, + use_program_cache=True) print("emb: ", emb) - def test(self, pass_id=0): - self._check() - + def _run_test(self, + exe, + test_list, + test_name_list, + feeder, + fetch_list): trainer_id = self.trainer_id - num_trainers = self.num_trainers - worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") - current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") - - emb, loss, acc1, acc5, _ = self.build_program( - False, self.num_trainers > 1) - - config = dist_transpiler.DistributeTranspilerConfig() - config.mode = "collective" - config.collective_mode = "grad_allreduce" - t = dist_transpiler.DistributeTranspiler(config=config) - t.transpile( - trainer_id=trainer_id, - trainers=worker_endpoints, - startup_program=self.startup_program, - program=self.test_program, - current_endpoint=current_endpoint) - - gpu_id = int(os.getenv("FLAGS_selected_gpus", 0)) - place = fluid.CUDAPlace(gpu_id) - exe = fluid.Executor(place) - exe.run(self.startup_program) - - test_list, test_name_list = reader.test( - self.dataset_dir, self.val_targets) - test_program = self.test_program - #test_program = test_program._prune(emb) - - assert self.checkpoint_dir, "No checkpoint found for test." - self.load_checkpoint(executor=exe, main_program=test_program, - load_for_train=False) - - feeder = fluid.DataFeeder(place=place, - feed_list=['image', 'label'], program=test_program) - fetch_list = [emb.name] real_test_batch_size = self.global_test_batch_size - - test_start = time.time() for i in range(len(test_list)): data_list, issame_list = test_list[i] embeddings_list = [] - for j in xrange(len(data_list)): + for j in range(len(data_list)): data = data_list[j] embeddings = None parallel_test_steps = data.shape[0] // real_test_batch_size - beg = 0 - end = 0 for idx in range(parallel_test_steps): start = idx * real_test_batch_size offset = trainer_id * self.test_batch_size begin = start + offset end = begin + self.test_batch_size _data = [] - for k in xrange(begin, end): + for k in range(begin, end): _data.append((data[k], 0)) assert len(_data) == self.test_batch_size - [_embeddings] = exe.run(test_program, - fetch_list = fetch_list, feed=feeder.feed(_data), - use_program_cache=True) + [_embeddings] = exe.run(self.test_program, + fetch_list=fetch_list, + feed=feeder.feed(_data), + use_program_cache=True) if embeddings is None: - embeddings = np.zeros((data.shape[0], _embeddings.shape[1])) - embeddings[start:start+real_test_batch_size, :] = _embeddings[:, :] + embeddings = np.zeros((data.shape[0], + _embeddings.shape[1])) + end = start + real_test_batch_size + embeddings[start:end, :] = _embeddings[:, :] beg = parallel_test_steps * real_test_batch_size while beg < data.shape[0]: end = min(beg + self.test_batch_size, data.shape[0]) count = end - beg _data = [] - for k in xrange(end - self.test_batch_size, end): + for k in range(end - self.test_batch_size, end): _data.append((data[k], 0)) - [_embeddings] = exe.run(test_program, - fetch_list = fetch_list, feed=feeder.feed(_data), - use_program_cache=True) - _embeddings = _embeddings[0:self.test_batch_size,:] - embeddings[beg:end, :] = _embeddings[(self.test_batch_size-count):, :] + [_embeddings] = exe.run(self.test_program, + fetch_list=fetch_list, + feed=feeder.feed(_data), + use_program_cache=True) + _embeddings = _embeddings[0:self.test_batch_size, :] + embeddings[beg:end, :] = _embeddings[ + (self.test_batch_size - count):, :] beg = end embeddings_list.append(embeddings) @@ -719,44 +759,140 @@ class Entry(object): embeddings = embeddings_list[0] + embeddings_list[1] embeddings = sklearn.preprocessing.normalize(embeddings) - _, _, accuracy, val, val_std, far = evaluate(embeddings, issame_list, nrof_folds=10) + _, _, accuracy, val, val_std, far = evaluate(embeddings, + issame_list, + nrof_folds=10) acc, std = np.mean(accuracy), np.std(accuracy) - print('[%s][%d]XNorm: %f' % (test_name_list[i], pass_id, xnorm)) - print('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % (test_name_list[i], pass_id, acc, std)) + if self.train_pass_id >= 0: + logger.info('[{}][{}]XNorm: {:.5f}'.format(test_name_list[i], + self.train_pass_id, + xnorm)) + logger.info('[{}][{}]Accuracy-Flip: {:.5f}+-{:.5f}'.format( + test_name_list[i], + self.train_pass_id, + acc, + std)) + else: + logger.info('[{}]XNorm: {:.5f}'.format(test_name_list[i], + xnorm)) + logger.info('[{}]Accuracy-Flip: {:.5f}+-{:.5f}'.format( + test_name_list[i], + acc, + std)) sys.stdout.flush() + + def test(self): + self._check() + + trainer_id = self.trainer_id + num_trainers = self.num_trainers + + # if the test program is not built, which means that is the first time + # to call the test method, we will first build the test program and + # add ops to broadcast bn-related parameters from trainer 0 to other + # trainers for distributed tests. + if not self.test_initialized: + emb, loss, _, _, _ = self.build_program(False, + self.num_trainers > 1) + emb_name = emb.name + assert self._get_info(emb_name) is None + self._set_info('emb_name', emb.name) + + if num_trainers > 1 and self.has_run_train: + self._append_broadcast_ops(self.test_program) + + if num_trainers > 1 and not self.has_run_train: + worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + + config = dist_transpiler.DistributeTranspilerConfig() + config.mode = "collective" + config.collective_mode = "grad_allreduce" + t = dist_transpiler.DistributeTranspiler(config=config) + t.transpile(trainer_id=trainer_id, + trainers=worker_endpoints, + startup_program=self.startup_program, + program=self.test_program, + current_endpoint=current_endpoint) + else: + emb_name = self._get_info('emb_name') + + gpu_id = int(os.getenv("FLAGS_selected_gpus", 0)) + place = fluid.CUDAPlace(gpu_id) + exe = fluid.Executor(place) + if not self.has_run_train: + exe.run(self.startup_program) + + if not self.test_reader: + test_reader = reader.test + else: + test_reader = self.test_reader + if not self.test_initialized: + test_list, test_name_list = test_reader(self.dataset_dir, + self.val_targets) + assert self._get_info('test_list') is None + assert self._get_info('test_name_list') is None + self._set_info('test_list', test_list) + self._set_info('test_name_list', test_name_list) + else: + test_list = self._get_info('test_list') + test_name_list = self._get_info('test_name_list') + + test_program = self.test_program + + if not self.has_run_train: + assert self.checkpoint_dir, "No checkpoint found for test." + self.load_checkpoint(executor=exe, + main_program=test_program, + load_for_train=False) + + feeder = fluid.DataFeeder(place=place, + feed_list=['image', 'label'], + program=test_program) + fetch_list = [emb_name] + + self.test_initialized = True + + test_start = time.time() + self._run_test(exe, + test_list, + test_name_list, + feeder, + fetch_list) test_end = time.time() - print("test time: {}".format(test_end - test_start)) + logger.info("test time: {:.4f}".format(test_end - test_start)) def train(self): self._check() + self.has_run_train = True trainer_id = self.trainer_id num_trainers = self.num_trainers - role = role_maker.PaddleCloudRoleMaker(is_collective=True) - fleet.init(role) - strategy = DistributedStrategy() - strategy.mode = "collective" - strategy.collective_mode = "grad_allreduce" - self.fleet = fleet - self.strategy = strategy - - train_emb, train_loss, train_acc1, train_acc5, optimizer = \ - self.build_program(True, False) - if self.with_test: - test_emb, test_loss, test_acc1, test_acc5, _ = \ - self.build_program(False, self.num_trainers > 1) - test_list, test_name_list = reader.test( - self.dataset_dir, self.val_targets) - test_program = self.test_program - self._append_broadcast_ops(test_program) - + strategy = None + if num_trainers > 1: + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) + strategy = DistributedStrategy() + strategy.mode = "collective" + strategy.collective_mode = "grad_allreduce" + + emb, loss, acc1, acc5, optimizer = self.build_program( + True, + False, + dist_strategy=strategy) + global_lr = optimizer._global_learning_rate( program=self.train_program) + + if num_trainers > 1: + origin_prog = fleet._origin_program + train_prog = fleet.main_program + else: + origin_prog = self.train_program + train_prog = self.train_program - origin_prog = fleet._origin_program - train_prog = fleet.main_program if trainer_id == 0: with open('start.program', 'w') as fout: program_to_code(self.startup_program, fout, True) @@ -764,20 +900,12 @@ class Entry(object): program_to_code(train_prog, fout, True) with open('origin.program', 'w') as fout: program_to_code(origin_prog, fout, True) - with open('test.program', 'w') as fout: - program_to_code(test_program, fout, True) gpu_id = int(os.getenv("FLAGS_selected_gpus", 0)) place = fluid.CUDAPlace(gpu_id) exe = fluid.Executor(place) exe.run(self.startup_program) - if self.with_test: - test_feeder = fluid.DataFeeder(place=place, - feed_list=['image', 'label'], program=test_program) - fetch_list_test = [test_emb.name] - real_test_batch_size = self.global_test_batch_size - if self.checkpoint_dir: load_checkpoint = True else: @@ -793,31 +921,38 @@ class Entry(object): train_reader = self.train_reader feeder = fluid.DataFeeder(place=place, - feed_list=['image', 'label'], program=origin_prog) - + feed_list=['image', 'label'], + program=origin_prog) + if self.calc_train_acc: - fetch_list = [train_loss.name, global_lr.name, - train_acc1.name, train_acc5.name] + fetch_list = [loss.name, global_lr.name, + acc1.name, acc5.name] else: - fetch_list = [train_loss.name, global_lr.name] - + fetch_list = [loss.name, global_lr.name] + local_time = 0.0 nsamples = 0 inspect_steps = 200 global_batch_size = self.global_train_batch_size for pass_id in range(self.train_epochs): + self.train_pass_id = pass_id train_info = [[], [], [], []] local_train_info = [[], [], [], []] for batch_id, data in enumerate(train_reader()): nsamples += global_batch_size t1 = time.time() + acc1 = None + acc5 = None if self.calc_train_acc: loss, lr, acc1, acc5 = exe.run(train_prog, - feed=feeder.feed(data), fetch_list=fetch_list, - use_program_cache=True) + feed=feeder.feed(data), + fetch_list=fetch_list, + use_program_cache=True) else: - loss, lr = exe.run(train_prog, feed=feeder.feed(data), - fetch_list=fetch_list, use_program_cache=True) + loss, lr = exe.run(train_prog, + feed=feeder.feed(data), + fetch_list=fetch_list, + use_program_cache=True) t2 = time.time() period = t2 - t1 local_time += period @@ -828,83 +963,37 @@ class Entry(object): if batch_id % inspect_steps == 0: avg_loss = np.mean(local_train_info[0]) avg_lr = np.mean(local_train_info[1]) + speed = nsamples / local_time if self.calc_train_acc: - logger.info("Pass:%d batch:%d lr:%f loss:%f qps:%.2f " - "acc1:%.4f acc5:%.4f" % (pass_id, batch_id, avg_lr, - avg_loss, nsamples / local_time, acc1, acc5)) + logger.info("Pass:{} batch:%d lr:{:.8f} loss:{:.6f} " + "qps:{:.2f} acc1:{:.6f} acc5:{:.6f}".format( + pass_id, + batch_id, + avg_lr, + avg_loss, + speed, + acc1, + acc5)) else: - logger.info("Pass:%d batch:%d lr:%f loss:%f qps:%.2f" %( - pass_id, batch_id, avg_lr, avg_loss, - nsamples / local_time)) + logger.info("Pass:{} batch:{} lr:{:.8f} loss:{:.6f} " + "qps:{:.2f}".format(pass_id, + batch_id, + avg_lr, + avg_loss, + speed)) local_time = 0 nsamples = 0 local_train_info = [[], [], [], []] train_loss = np.array(train_info[0]).mean() - print("End pass {0}, train_loss {1}".format(pass_id, train_loss)) + logger.info("End pass {}, train_loss {:.6f}".format(pass_id, + train_loss)) sys.stdout.flush() if self.with_test: - test_start = time.time() - for i in xrange(len(test_list)): - data_list, issame_list = test_list[i] - embeddings_list = [] - for j in xrange(len(data_list)): - data = data_list[j] - embeddings = None - parallel_test_steps = data.shape[0] // real_test_batch_size - beg = 0 - end = 0 - for idx in range(parallel_test_steps): - start = idx * real_test_batch_size - offset = trainer_id * self.test_batch_size - begin = start + offset - end = begin + self.test_batch_size - _data = [] - for k in xrange(begin, end): - _data.append((data[k], 0)) - assert len(_data) == self.test_batch_size - [_embeddings] = exe.run(test_program, - fetch_list = fetch_list_test, feed=test_feeder.feed(_data), - use_program_cache=True) - if embeddings is None: - embeddings = np.zeros((data.shape[0], _embeddings.shape[1])) - embeddings[start:start+real_test_batch_size, :] = _embeddings[:, :] - beg = parallel_test_steps * real_test_batch_size - - while beg < data.shape[0]: - end = min(beg + self.test_batch_size, data.shape[0]) - count = end - beg - _data = [] - for k in xrange(end - self.test_batch_size, end): - _data.append((data[k], 0)) - [_embeddings] = exe.run(test_program, - fetch_list = fetch_list_test, feed=test_feeder.feed(_data), - use_program_cache=True) - _embeddings = _embeddings[0:self.test_batch_size,:] - embeddings[beg:end, :] = _embeddings[(self.test_batch_size-count):, :] - beg = end - embeddings_list.append(embeddings) - - xnorm = 0.0 - xnorm_cnt = 0 - for embed in embeddings_list: - xnorm += np.sqrt((embed * embed).sum(axis=1)).sum(axis=0) - xnorm_cnt += embed.shape[0] - xnorm /= xnorm_cnt - - embeddings = embeddings_list[0] + embeddings_list[1] - embeddings = sklearn.preprocessing.normalize(embeddings) - _, _, accuracy, val, val_std, far = evaluate(embeddings, issame_list, nrof_folds=10) - acc, std = np.mean(accuracy), np.std(accuracy) - - print('[%s][%d]XNorm: %f' % (test_name_list[i], pass_id, xnorm)) - print('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % (test_name_list[i], pass_id, acc, std)) - sys.stdout.flush() - test_end = time.time() - print("test time: {}".format(test_end - test_start)) - - #save model + self.test() + + # save model if self.model_save_dir: model_save_dir = os.path.join( self.model_save_dir, str(pass_id)) @@ -919,27 +1008,30 @@ class Entry(object): pass if trainer_id == 0: fluid.io.save_persistables(exe, - model_save_dir, - origin_prog) + model_save_dir, + origin_prog) else: def save_var(var): to_save = "dist@" in var.name and '@rank@' in var.name return to_save and var.persistable - fluid.io.save_vars(exe, model_save_dir, - origin_prog, predicate=save_var) - #save training info + fluid.io.save_vars(exe, + model_save_dir, + origin_prog, + predicate=save_var) + + # save training info if self.model_save_dir and trainer_id == 0: config_file = os.path.join( - self.model_save_dir, str(pass_id), 'meta.pickle') + self.model_save_dir, str(pass_id), 'meta.json') train_info = dict() train_info["pretrain_nranks"] = self.num_trainers train_info["emb_dim"] = self.emb_dim train_info['num_classes'] = self.num_classes - with open(config_file, 'wb') as f: - pickle.dump(train_info, f) + with open(config_file, 'w') as f: + json.dump(train_info, f) - #upload model + # upload model if self.model_save_dir and self.fs_name and trainer_id == 0: self.put_files_to_hdfs(self.model_save_dir) diff --git a/plsc/models/__init__.py b/plsc/models/__init__.py index fa8c619f86ccd2a99e4966cad09e9910b3665fc1..0b2c3179e6a6089ccecfd5cbd062a3cfa55b15ab 100644 --- a/plsc/models/__init__.py +++ b/plsc/models/__init__.py @@ -12,11 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from . import resnet -from .resnet import * from . import base_model +from . import dist_algo +from . import resnet from .base_model import * +from .dist_algo import * +from .resnet import * __all__ = [] __all__ += resnet.__all__ __all__ += base_model.__all__ +__all__ += dist_algo.__all__ diff --git a/plsc/models/base_model.py b/plsc/models/base_model.py index 5416ecb559d5090b3d775ad0c0bc11b5de2c84c7..b13a8adb4f96a1bc8c55f66cbf5526188c2bc47b 100644 --- a/plsc/models/base_model.py +++ b/plsc/models/base_model.py @@ -13,14 +13,11 @@ # limitations under the License. import math -import os -import numpy as np -import paddle import paddle.fluid as fluid from paddle.fluid import unique_name -from . import dist_algo +from . import dist_algo __all__ = ["BaseModel"] @@ -32,21 +29,24 @@ class BaseModel(object): which constructs the custom model. And we will add the distributed fc layer for you automatically. """ + def __init__(self): super(BaseModel, self).__init__() def build_network(self, input, label, is_train=True): """ - Construct the custom model, and we will add the - distributed fc layer for you automatically. + Construct the custom model, and we will add the distributed fc layer + at the end of your model automatically. """ raise NotImplementedError( - "You must implement this method in your sub class.") + "You must implement this method in your subclass.") def get_output(self, input, label, num_classes, + num_ranks=1, + rank_id=0, is_train=True, param_attr=None, bias_attr=None, @@ -55,6 +55,20 @@ class BaseModel(object): scale=64.0): """ Add the distributed fc layer for the custom model. + + Params: + input: input for the model + label: label for the input + num_classes: number of classes for the classifier + num_ranks: number of trainers, i.e., GPUs + rank_id: id for the current trainer, from 0 to num_ranks - 1 + is_train: build the network for training or not + param_attr: param_attr for the weight parameter of fc + bias_attr: bias_attr for the weight parameter for fc + loss_type: loss type to use, one of dist_softmax, softmax, arcface + and dist_arcface + margin: the margin parameter for arcface and dist_arcface + scale: the scale parameter for arcface and dist_arcface """ supported_loss_types = ["dist_softmax", "dist_arcface", "softmax", "arcface"] @@ -62,67 +76,75 @@ class BaseModel(object): "Supported loss types: {}, but given: {}".format( supported_loss_types, loss_type) - nranks = int(os.getenv("PADDLE_TRAINERS_NUM", 1)) - rank_id = int(os.getenv("PADDLE_TRAINER_ID", 0)) - emb = self.build_network(input, label, is_train) + prob = None + loss = None if loss_type == "softmax": - loss, prob = self.fc_classify(emb, - label, - num_classes, - param_attr, - bias_attr) + loss, prob = BaseModel._fc_classify(emb, + label, + num_classes, + param_attr, + bias_attr) elif loss_type == "arcface": - loss, prob = self.arcface(emb, - label, - num_classes, - param_attr, - margin, - scale) + loss, prob = BaseModel._arcface(emb, + label, + num_classes, + param_attr, + margin, + scale) elif loss_type == "dist_arcface": - loss = dist_algo._distributed_arcface_classify( - x=emb, label=label, class_num=num_classes, - nranks=nranks, rank_id=rank_id, margin=margin, - logit_scale=scale, param_attr=param_attr) - prob = None + loss = dist_algo.distributed_arcface_classify(x=emb, + label=label, + class_num=num_classes, + nranks=num_ranks, + rank_id=rank_id, + margin=margin, + logit_scale=scale, + param_attr=param_attr) elif loss_type == "dist_softmax": - loss = dist_algo._distributed_softmax_classify( - x=emb, label=label, class_num=num_classes, - nranks=nranks, rank_id=rank_id, param_attr=param_attr, - use_bias=True, bias_attr=bias_attr) - prob = None + loss = dist_algo.distributed_softmax_classify(x=emb, + label=label, + class_num=num_classes, + nranks=num_ranks, + rank_id=rank_id, + param_attr=param_attr, + use_bias=True, + bias_attr=bias_attr) return emb, loss, prob - def fc_classify(self, input, label, out_dim, param_attr, bias_attr): + @staticmethod + def _fc_classify(input, label, out_dim, param_attr, bias_attr): if param_attr is None: - stdv = 1.0 / math.sqrt(input.shape[1] * 1.0) - param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.Uniform(-stdv, stdv)) - + stddev = 1.0 / math.sqrt(input.shape[1] * 1.0) + param_attr = fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stddev, stddev)) + out = fluid.layers.fc(input=input, size=out_dim, param_attr=param_attr, bias_attr=bias_attr) - loss, prob = fluid.layers.softmax_with_cross_entropy(logits=out, - label=label, return_softmax=True) + loss, prob = fluid.layers.softmax_with_cross_entropy( + logits=out, + label=label, + return_softmax=True) avg_loss = fluid.layers.mean(x=loss) return avg_loss, prob - def arcface(self, input, label, out_dim, param_attr, margin, scale): + @staticmethod + def _arcface(input, label, out_dim, param_attr, margin, scale): input_norm = fluid.layers.sqrt( fluid.layers.reduce_sum(fluid.layers.square(input), dim=1)) input = fluid.layers.elementwise_div(input, input_norm, axis=0) if param_attr is None: - param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.Xavier( - uniform=False, fan_in=0.0)) + param_attr = fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Xavier(uniform=False, fan_in=0.0)) weight = fluid.layers.create_parameter( - shape=[input.shape[1], out_dim], - dtype='float32', - name=unique_name.generate('final_fc_w'), - attr=param_attr) + shape=[input.shape[1], out_dim], + dtype='float32', + name=unique_name.generate('final_fc_w'), + attr=param_attr) weight_norm = fluid.layers.sqrt( fluid.layers.reduce_sum(fluid.layers.square(weight), dim=0)) @@ -137,10 +159,11 @@ class BaseModel(object): logit = fluid.layers.scale(target_cos, scale=scale) loss, prob = fluid.layers.softmax_with_cross_entropy( - logits=logit, label=label, return_softmax=True) + logits=logit, + label=label, + return_softmax=True) avg_loss = fluid.layers.mean(x=loss) one_hot.stop_gradient = True return avg_loss, prob - diff --git a/plsc/models/dist_algo.py b/plsc/models/dist_algo.py index 1d12b689ad20ad82c2845eacff9a0f808e55a699..a2d1af089833a688c7cd161d9ba72157bb6fef94 100644 --- a/plsc/models/dist_algo.py +++ b/plsc/models/dist_algo.py @@ -12,32 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import division from __future__ import print_function + import math import logging -from six.moves import reduce import paddle.fluid as fluid -from paddle.fluid.layer_helper import LayerHelper -from paddle.fluid.framework import Variable, default_startup_program -from paddle.fluid.param_attr import ParamAttr -from paddle.fluid.initializer import Normal, Constant +import paddle.fluid.layers.collective as collective import paddle.fluid.layers.nn as nn import paddle.fluid.layers.ops as ops import paddle.fluid.layers as layers -import paddle.fluid.layers.collective as collective from paddle.fluid.optimizer import Optimizer import paddle.fluid.unique_name as unique_name +from paddle.fluid.framework import Variable, default_startup_program +from paddle.fluid.initializer import Normal +from paddle.fluid.layer_helper import LayerHelper +from paddle.fluid.optimizer import Optimizer +from paddle.fluid.param_attr import ParamAttr from ..utils.fp16_utils import rewrite_program, update_role_var_grad from ..utils.fp16_utils import update_loss_scaling, move_optimize_ops_back from ..utils.fp16_lists import AutoMixedPrecisionLists +from six.moves import reduce + +__all__ = ['distributed_arcface_classify', 'distributed_softmax_classify', + 'DistributedClassificationOptimizer'] class DistributedClassificationOptimizer(Optimizer): - ''' - A optimizer wrapper to generate backward network for distributed + """ + An optimizer wrapper to generate backward network for distributed classification training of model parallelism. - ''' + """ def init_fp16_params(self, loss_type, fp16_user_dict): # set default value for fp16_params_dict fp16_params_dict = dict() @@ -261,15 +267,15 @@ class DistributedClassificationOptimizer(Optimizer): }) def insert_commom_backward_op(self, - block, - index, - shard_logit, - shard_prob, - shard_label, - shard_dim, - op_role_key, - backward_role, - loss_backward_role): + block, + index, + shard_logit, + shard_prob, + shard_label, + shard_dim, + op_role_key, + backward_role, + loss_backward_role): ''' insert backward ops when not using mixed precision training. common use in all lose type. @@ -421,10 +427,10 @@ class DistributedClassificationOptimizer(Optimizer): class DistributedClassifier(object): - ''' + """ Tookit for distributed classification, in which the parameter of the last full-connected layer is distributed to all trainers - ''' + """ def __init__(self, nclasses, nranks, rank_id, layer_helper): self.nclasses = nclasses @@ -446,29 +452,29 @@ class DistributedClassifier(object): dtype, in_dim, param_attr=None, + use_bias=True, bias_attr=None, - transpose_weight=False, - use_bias=True): + transpose_weight=False): if param_attr is None: - stdv = math.sqrt(2.0 / (in_dim + self.nclasses)) - param_attr = ParamAttr(initializer=Normal(scale=stdv)) + stddev = math.sqrt(2.0 / (in_dim + self.nclasses)) + param_attr = ParamAttr(initializer=Normal(scale=stddev)) weight_shape = [self.shard_dim, in_dim ] if transpose_weight else [in_dim, self.shard_dim] weight = self._layer_helper.create_parameter( shape=weight_shape, dtype=dtype, attr=param_attr, is_bias=False) - # avoid distributed parameter allreduce gradients + + # avoid allreducing gradients for distributed parameters weight.is_distributed = True - # avoid distributed parameter broadcasting in startup program + # avoid broadcasting distributed parameters in startup program default_startup_program().global_block().vars[ weight.name].is_distributed = True bias = None if use_bias: - bias = self._layer_helper.create_parameter( - shape=[self.shard_dim], - attr=bias_attr, - dtype=dtype, - is_bias=True) + bias = self._layer_helper.create_parameter(shape=[self.shard_dim], + attr=bias_attr, + dtype=dtype, + is_bias=True) bias.is_distributed = True default_startup_program().global_block().vars[ bias.name].is_distributed = True @@ -505,12 +511,11 @@ class DistributedClassifier(object): use_bias=True, bias_attr=None): flatten_dim = reduce(lambda a, b: a * b, x.shape[1:], 1) - weight, bias = self.create_parameter( - dtype=x.dtype, - in_dim=flatten_dim, - param_attr=param_attr, - bias_attr=bias_attr, - use_bias=use_bias) + weight, bias = self.create_parameter(dtype=x.dtype, + in_dim=flatten_dim, + param_attr=param_attr, + bias_attr=bias_attr, + use_bias=use_bias) x_all = collective._c_allgather( x, nranks=self.nranks, use_calc_stream=True) @@ -551,11 +556,10 @@ class DistributedClassifier(object): reference: ArcFace. https://arxiv.org/abs/1801.07698 ''' flatten_dim = reduce(lambda a, b: a * b, x.shape[1:], 1) - weight, bias = self.create_parameter( - dtype=x.dtype, - in_dim=flatten_dim, - param_attr=param_attr, - use_bias=False) + weight, bias = self.create_parameter(dtype=x.dtype, + in_dim=flatten_dim, + param_attr=param_attr, + use_bias=False) # normalize x x_l2 = ops.sqrt(nn.reduce_sum(ops.square(x), dim=1)) @@ -566,12 +570,11 @@ class DistributedClassifier(object): label_all = collective._c_allgather( label, nranks=self.nranks, use_calc_stream=True) label_all.stop_gradient = True - shard_label = nn.shard_index( - label_all, - index_num=self.nclasses, - nshards=self.nranks, - shard_id=self.rank_id, - ignore_value=-1) + shard_label = nn.shard_index(label_all, + index_num=self.nclasses, + nshards=self.nranks, + shard_id=self.rank_id, + ignore_value=-1) # TODO check necessary shard_label.stop_gradient = True @@ -605,16 +608,16 @@ class DistributedClassifier(object): return avg_loss -def _distributed_softmax_classify(x, - label, - class_num, - nranks, - rank_id, - param_attr=None, - use_bias=True, - bias_attr=None, - name=None): - ''' +def distributed_softmax_classify(x, + label, + class_num, + nranks, + rank_id, + param_attr=None, + use_bias=True, + bias_attr=None, + name=None): + """ Classification layer with FC, softmax and cross entropy calculation of distibuted version in case of too large number of classes. @@ -652,26 +655,29 @@ def _distributed_softmax_classify(x, class_num=1000, nranks=8, rank_id=0) - ''' + """ if name is None: name = 'dist@softmax@rank@%05d' % rank_id helper = LayerHelper(name, **locals()) classifier = DistributedClassifier(class_num, nranks, rank_id, helper) - return classifier.softmax_classify(x, label, param_attr, use_bias, + return classifier.softmax_classify(x, + label, + param_attr, + use_bias, bias_attr) -def _distributed_arcface_classify(x, - label, - class_num, - nranks, - rank_id, - margin=0.5, - logit_scale=64.0, - param_attr=None, - name=None): - ''' +def distributed_arcface_classify(x, + label, + class_num, + nranks, + rank_id, + margin=0.5, + logit_scale=64.0, + param_attr=None, + name=None): + """ Classification layer with ArcFace loss of distibuted version in case of too large number of classes. the equation is @@ -719,14 +725,13 @@ def _distributed_arcface_classify(x, class_num=1000, nranks=8, rank_id=0) - ''' + """ if name is None: name = 'dist@arcface@rank@%05d' % rank_id helper = LayerHelper(name, **locals()) classifier = DistributedClassifier(class_num, nranks, rank_id, helper) - return classifier.arcface_classify( - x=x, - label=label, - margin=margin, - logit_scale=logit_scale, - param_attr=param_attr) + return classifier.arcface_classify(x=x, + label=label, + margin=margin, + logit_scale=logit_scale, + param_attr=param_attr) diff --git a/plsc/models/resnet.py b/plsc/models/resnet.py index df856513d7639db3b7bac0d4fed32c44d6b1e5aa..30857f2377008526a028eb8713449d27a9bf8ba8 100644 --- a/plsc/models/resnet.py +++ b/plsc/models/resnet.py @@ -12,14 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.fluid as fluid -import math -import os -import numpy as np -from paddle.fluid import unique_name -from .base_model import BaseModel +from .base_model import BaseModel __all__ = ["ResNet", "ResNet50", "ResNet101", "ResNet152"] @@ -33,12 +28,13 @@ class ResNet(BaseModel): def build_network(self, input, label, - is_train): + is_train=True): layers = self.layers supported_layers = [50, 101, 152] assert layers in supported_layers, \ "supported layers {}, but given {}".format(supported_layers, layers) + depth = None if layers == 50: depth = [3, 4, 14, 3] elif layers == 101: @@ -59,21 +55,26 @@ class ResNet(BaseModel): stride=2 if i == 0 else 1, is_train=is_train) - bn = fluid.layers.batch_norm(input=conv, act=None, epsilon=2e-05, - is_test=False if is_train else True) - drop = fluid.layers.dropout(x=bn, dropout_prob=0.4, - dropout_implementation='upscale_in_train', - is_test=False if is_train else True) + bn = fluid.layers.batch_norm(input=conv, + act=None, + epsilon=2e-05, + is_test=False if is_train else True) + drop = fluid.layers.dropout(x=bn, + dropout_prob=0.4, + dropout_implementation='upscale_in_train', + is_test=False if is_train else True) fc = fluid.layers.fc( input=drop, size=self.emb_dim, - act=None, param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.Xavier(uniform=False, fan_in=0.0)), + initializer=fluid.initializer.Xavier(uniform=False, + fan_in=0.0)), bias_attr=fluid.param_attr.ParamAttr( initializer=fluid.initializer.ConstantInitializer())) - emb = fluid.layers.batch_norm(input=fc, act=None, epsilon=2e-05, - is_test=False if is_train else True) + emb = fluid.layers.batch_norm(input=fc, + act=None, + epsilon=2e-05, + is_test=False if is_train else True) return emb def conv_bn_layer(self, @@ -92,51 +93,79 @@ class ResNet(BaseModel): stride=stride, padding=pad, groups=groups, - act=None, param_attr=fluid.param_attr.ParamAttr( initializer=fluid.initializer.Xavier( uniform=False, fan_in=0.0)), bias_attr=False) if act == 'prelu': - bn = fluid.layers.batch_norm(input=conv, act=None, epsilon=2e-05, - momentum=0.9, is_test=False if is_train else True) - return fluid.layers.prelu(bn, mode="all", + bn = fluid.layers.batch_norm(input=conv, + act=None, + epsilon=2e-05, + momentum=0.9, + is_test=False if is_train else True) + return fluid.layers.prelu( + bn, + mode="all", param_attr=fluid.param_attr.ParamAttr( initializer=fluid.initializer.Constant(0.25))) else: - return fluid.layers.batch_norm(input=conv, act=act, epsilon=2e-05, - is_test=False if is_train else True) + return fluid.layers.batch_norm(input=conv, + act=act, + epsilon=2e-05, + is_test=False if is_train else True) def shortcut(self, input, ch_out, stride, is_train): ch_in = input.shape[1] if ch_in != ch_out or stride != 1: - return self.conv_bn_layer(input, ch_out, 1, stride, - is_train=is_train) + return self.conv_bn_layer(input, + ch_out, + 1, + stride, + is_train=is_train) else: return input def bottleneck_block(self, input, num_filters, stride, is_train): if self.layers < 101: - bn1 = fluid.layers.batch_norm(input=input, act=None, epsilon=2e-05, - is_test=False if is_train else True) - conv1 = self.conv_bn_layer( - input=bn1, num_filters=num_filters, filter_size=3, pad=1, - act='prelu', is_train=is_train) - conv2 = self.conv_bn_layer( - input=conv1, num_filters=num_filters, filter_size=3, - stride=stride, pad=1, act=None, is_train=is_train) + bn1 = fluid.layers.batch_norm(input=input, + act=None, + epsilon=2e-05, + is_test=False if is_train else True) + conv1 = self.conv_bn_layer(input=bn1, + num_filters=num_filters, + filter_size=3, + pad=1, + act='prelu', + is_train=is_train) + conv2 = self.conv_bn_layer(input=conv1, + num_filters=num_filters, + filter_size=3, + stride=stride, + pad=1, + is_train=is_train) else: - bn0 = fluid.layers.batch_norm(input=input, act=None, epsilon=2e-05, - is_test=False if is_train else True) - conv0 = self.conv_bn_layer( - input=bn0, num_filters=num_filters/4, filter_size=1, pad=0, - act='prelu', is_train=is_train) - conv1 = self.conv_bn_layer( - input=conv0, num_filters=num_filters/4, filter_size=3, pad=1, - act='prelu', is_train=is_train) - conv2 = self.conv_bn_layer( - input=conv1, num_filters=num_filters, filter_size=1, - stride=stride, pad=0, act=None, is_train=is_train) + bn0 = fluid.layers.batch_norm(input=input, + act=None, + epsilon=2e-05, + is_test=False if is_train else True) + conv0 = self.conv_bn_layer(input=bn0, + num_filters=num_filters / 4, + filter_size=1, + pad=0, + act='prelu', + is_train=is_train) + conv1 = self.conv_bn_layer(input=conv0, + num_filters=num_filters / 4, + filter_size=3, + pad=1, + act='prelu', + is_train=is_train) + conv2 = self.conv_bn_layer(input=conv1, + num_filters=num_filters, + filter_size=1, + stride=stride, + pad=0, + is_train=is_train) short = self.shortcut(input, num_filters, stride, is_train=is_train) return fluid.layers.elementwise_add(x=short, y=conv2, act=None) diff --git a/plsc/utils/__init__.py b/plsc/utils/__init__.py index e67f93465c86ba5dcca0b21925589cceaeb27b42..d0c32e26092f6ea25771279418582a24ea449ab2 100644 --- a/plsc/utils/__init__.py +++ b/plsc/utils/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/plsc/utils/base64_reader.py b/plsc/utils/base64_reader.py index cabbdda41992f65d582ce6d0233b2da969f97c91..820694eca080293c571960b341d89cc81345976f 100644 --- a/plsc/utils/base64_reader.py +++ b/plsc/utils/base64_reader.py @@ -1,16 +1,34 @@ -import os -import math -import random -import pickle +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import base64 import functools +import math +import os +import pickle +import random + import numpy as np import paddle +import six from PIL import Image, ImageEnhance + try: from StringIO import StringIO except ImportError: from io import StringIO +from io import BytesIO random.seed(0) @@ -18,7 +36,6 @@ DATA_DIM = 112 THREAD = 8 BUF_SIZE = 10240 - img_mean = np.array([127.5, 127.5, 127.5]).reshape((3, 1, 1)) img_std = np.array([128.0, 128.0, 128.0]).reshape((3, 1, 1)) @@ -97,13 +114,13 @@ def RandomResizedCrop(img, size): return img -def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]): +def random_crop(img, size, scale=(0.08, 1.0), ratio=(3. / 4., 4. / 3.)): aspect_ratio = math.sqrt(random.uniform(*ratio)) w = 1. * aspect_ratio h = 1. / aspect_ratio - bound = min((float(img.size[0]) / img.size[1]) / (w**2), - (float(img.size[1]) / img.size[0]) / (h**2)) + bound = min((float(img.size[0]) / img.size[1]) / (w ** 2), + (float(img.size[1]) / img.size[0]) / (h ** 2)) scale_max = min(scale[1], bound) scale_min = min(scale[0], bound) @@ -150,12 +167,12 @@ def distort_color(img): return img -def process_image_imagepath(sample, - class_dim, - color_jitter, - rotate, - rand_mirror, - normalize): +def process_image(sample, + class_dim, + color_jitter, + rotate, + rand_mirror, + normalize): img_data = base64.b64decode(sample[0]) img = Image.open(StringIO(img_data)) @@ -185,49 +202,62 @@ def process_image_imagepath(sample, return img, sample[1] -def arc_iterator(file_list, +def arc_iterator(data_dir, + file_list, class_dim, color_jitter=False, rotate=False, rand_mirror=False, normalize=False): trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + def reader(): with open(file_list, 'r') as f: flist = f.readlines() - assert len(flist) % trainer_count == 0, \ - "Number of files should be divisible by trainer count, " \ - "run base64 file preprocessing tool first." - num_files_per_trainer = len(flist) // trainer_count - start = num_files_per_trainer * trainer_id - end = start + num_files_per_trainer - flist = flist[start:end] - - for file in flist: - with open(file, 'r') as f: - for line in f.xreadlines(): - line = line.strip().split('\t') - image, label = line[0], line[1] - yield image, label - - mapper = functools.partial(process_image_imagepath, - class_dim=class_dim, color_jitter=color_jitter, rotate=rotate, - rand_mirror=rand_mirror, normalize=normalize) + assert len(flist) == num_trainers, \ + "Please use process_base64_files.py to pre-process the dataset." + file = flist[trainer_id] + file = os.path.join(data_dir, file) + + with open(file, 'r') as f: + if six.PY2: + for line in f.xreadlines(): + line = line.strip().split('\t') + image, label = line[0], line[1] + yield image, label + else: + for line in f: + line = line.strip().split('\t') + image, label = line[0], line[1] + yield image, label + + mapper = functools.partial(process_image, + class_dim=class_dim, + color_jitter=color_jitter, + rotate=rotate, + rand_mirror=rand_mirror, + normalize=normalize) return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE) def load_bin(path, image_size): - bins, issame_list = pickle.load(open(path, 'rb')) + if six.PY2: + bins, issame_list = pickle.load(open(path, 'rb')) + else: + bins, issame_list = pickle.load(open(path, 'rb'), encoding='bytes') data_list = [] for flip in [0, 1]: - data = np.empty((len(issame_list)*2, 3, image_size[0], image_size[1])) + data = np.empty((len(issame_list) * 2, 3, image_size[0], image_size[1])) data_list.append(data) - for i in xrange(len(issame_list)*2): + for i in range(len(issame_list) * 2): _bin = bins[i] - if not isinstance(_bin, basestring): - _bin = _bin.tostring() - img_ori = Image.open(StringIO(_bin)) + if six.PY2: + if not isinstance(_bin, six.string_types): + _bin = _bin.tostring() + img_ori = Image.open(StringIO(_bin)) + else: + img_ori = Image.open(BytesIO(_bin)) for flip in [0, 1]: img = img_ori.copy() if flip == 1: @@ -241,13 +271,18 @@ def load_bin(path, image_size): if i % 1000 == 0: print('loading bin', i) print(data_list[0].shape) - return (data_list, issame_list) - - -def train(data_dir, file_list, num_classes): - file_path = os.path.join(data_dir, file_list) - return arc_iterator(file_path, class_dim=num_classes, color_jitter=False, - rotate=False, rand_mirror=True, normalize=True) + return data_list, issame_list + + +def train(data_dir, num_classes): + file_path = os.path.join(data_dir, 'file_list.txt') + return arc_iterator(data_dir, + file_path, + class_dim=num_classes, + color_jitter=False, + rotate=False, + rand_mirror=True, + normalize=True) def test(data_dir, datasets): diff --git a/plsc/utils/jpeg_reader.py b/plsc/utils/jpeg_reader.py index 6d68872cd712b8f711a74f5a17faf9c0fcd1da4a..f8bb850387cb139131d093e3a2ffc1e423b62a46 100644 --- a/plsc/utils/jpeg_reader.py +++ b/plsc/utils/jpeg_reader.py @@ -1,16 +1,19 @@ -import os +import functools import math -import random +import os import pickle -import functools +import random + import numpy as np import paddle import six from PIL import Image, ImageEnhance + try: from StringIO import StringIO except ImportError: from io import StringIO +from io import BytesIO random.seed(0) @@ -123,13 +126,13 @@ def RandomResizedCrop(img, size): return img -def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]): +def random_crop(img, size, scale=(0.08, 1.0), ratio=(3. / 4., 4. / 3.)): aspect_ratio = math.sqrt(random.uniform(*ratio)) w = 1. * aspect_ratio h = 1. / aspect_ratio - bound = min((float(img.size[0]) / img.size[1]) / (w**2), - (float(img.size[1]) / img.size[0]) / (h**2)) + bound = min((float(img.size[0]) / img.size[1]) / (w ** 2), + (float(img.size[1]) / img.size[0]) / (h ** 2)) scale_max = min(scale[1], bound) scale_min = min(scale[0], bound) @@ -222,28 +225,37 @@ def arc_iterator(data, def reader(): if shuffle: random.shuffle(data) - for j in xrange(len(data)): + for j in range(len(data)): path, label = data[j] path = os.path.join(data_dir, path) yield path, label - mapper = functools.partial(process_image_imagepath, class_dim=class_dim, - color_jitter=color_jitter, rotate=rotate, - rand_mirror=rand_mirror, normalize=normalize) + mapper = functools.partial(process_image_imagepath, + class_dim=class_dim, + color_jitter=color_jitter, + rotate=rotate, + rand_mirror=rand_mirror, + normalize=normalize) return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE) def load_bin(path, image_size): - bins, issame_list = pickle.load(open(path, 'rb')) + if six.PY2: + bins, issame_list = pickle.load(open(path, 'rb')) + else: + bins, issame_list = pickle.load(open(path, 'rb'), encoding='bytes') data_list = [] for flip in [0, 1]: - data = np.empty((len(issame_list)*2, 3, image_size[0], image_size[1])) + data = np.empty((len(issame_list) * 2, 3, image_size[0], image_size[1])) data_list.append(data) - for i in range(len(issame_list)*2): + for i in range(len(issame_list) * 2): _bin = bins[i] - if not isinstance(_bin, six.string_types): - _bin = _bin.tostring() - img_ori = Image.open(StringIO(_bin)) + if six.PY2: + if not isinstance(_bin, six.string_types): + _bin = _bin.tostring() + img_ori = Image.open(StringIO(_bin)) + else: + img_ori = Image.open(BytesIO(_bin)) for flip in [0, 1]: img = img_ori.copy() if flip == 1: @@ -257,14 +269,19 @@ def load_bin(path, image_size): if i % 1000 == 0: print('loading bin', i) print(data_list[0].shape) - return (data_list, issame_list) + return data_list, issame_list def arc_train(data_dir, class_dim): train_image_list = get_train_image_list(data_dir) - return arc_iterator(train_image_list, shuffle=True, class_dim=class_dim, - data_dir=data_dir, color_jitter=False, rotate=False, rand_mirror=True, - normalize=True) + return arc_iterator(train_image_list, + shuffle=True, + class_dim=class_dim, + data_dir=data_dir, + color_jitter=False, + rotate=False, + rand_mirror=True, + normalize=True) def test(data_dir, datasets): diff --git a/plsc/utils/parameter_converter.py b/plsc/utils/parameter_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..f3bfac899793b111d3a1201a7a5516d18e298158 --- /dev/null +++ b/plsc/utils/parameter_converter.py @@ -0,0 +1,583 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import json +import logging +import os +import shutil +from functools import cmp_to_key + +import paddle.fluid as fluid + +logging.basicConfig( + level=logging.INFO, + format='[%(levelname)s %(asctime)s line:%(lineno)d] %(message)s', + datefmt='%d %b %Y %H:%M:%S') +logger = logging.getLogger() + + +class ParameterConverter(object): + """ + Tool to convert pre-trained distributed fc parameters for inference or + fine-tuning. Note that the number of ranks or GPUs for inference or + fine-tuning can be different from that for pre-training. + """ + + def __init__(self, model_dir, output_dir, num_trainers): + super(ParameterConverter, self).__init__() + self.model_dir = model_dir + self.output_dir = output_dir + self.pretrain_nranks = -1 + self.emb_dim = -1 + self.num_classes = -1 + self.nranks = num_trainers + + self.load_config() + + def load_config(self): + """ + Load config file which contains the following information for + pre-training: + 1. pretrain_nranks (int): number of ranks for pre-training; + 2. emb_dim (int): embedding dim for pre-training; + 3. num_classes (int): number of classes for classification. + """ + meta_file = os.path.join(self.model_dir, 'meta.json') + if not os.path.exists(meta_file): + logger.error("Meta file does not exist, make sure your pre-trained " + "models are legal.") + exit() + + with open(meta_file, 'r') as handle: + config = json.load(handle) + + self.pretrain_nranks = config['pretrain_nranks'] + assert self.pretrain_nranks > 0 + self.emb_dim = config['emb_dim'] + assert self.emb_dim > 0 + self.num_classes = config['num_classes'] + assert self.num_classes > 0 + + logger.info("Parameters for pre-training: pretrain_nranks ({}), " + "emb_dim ({}), and num_classes ({}).".format( + self.pretrain_nranks, + self.emb_dim, + self.num_classes)) + logger.debug("Parameters for inference or fine-tuning: " + "nranks ({}).".format(self.nranks)) + + def find_var_names(self): + """ + Find all names of pre-trained parameters for the distributed fc layer, + e.g., dist@softmax@rank@00000.w_0, dist@softmax@rank@00000.b_0 etc. + We assume that names of distributed fc related parameters start with the + prefix dist@ and have @rank@ in their names. + """ + var_names = [] + model_dir = os.path.abspath(self.model_dir) + if not os.path.exists(model_dir): + logger.error("The directory for pre-trained model ({}) does not " + "exist, please check it.".format(model_dir)) + exit() + logger.info("The directory for pre-trained model: {}".format(model_dir)) + for file in os.listdir(model_dir): + if 'dist@' in file and '@rank@' in file: + var_names.append(file) + assert len(var_names) > 0, \ + logger.error("No distributed fc parameters found.") + logger.info("Number of distributed fc parameters: {}.".format( + len(var_names))) + logger.info("Distributed fc parameters: {}.".format(var_names)) + return var_names + + def split_load_and_save(self, + name_index, + param_names, + save_rank_id, + remainder, + as_bias, + train_nshards, + train_nranks, + nshards, + dtype="float32"): + var2 = None + advance = False + emb_dim = self.emb_dim + main_program = fluid.Program() + startup_program = fluid.Program() + num_classes = self.num_classes + + load_var_name = param_names[name_index] + save_var_name_list = load_var_name.split('.') + save_var_name_list[0] = save_var_name_list[0].split('@') + save_var_name_list[0][-1] = "%05d" % save_rank_id + save_var_name_list[0] = '@'.join(save_var_name_list[0]) + save_var_name = '.'.join(save_var_name_list) + + last_train_nshards = num_classes - (train_nranks - 1) * train_nshards + + with fluid.program_guard(main_program, startup_program): + if name_index == train_nranks - 1: + var_dim = last_train_nshards + else: + var_dim = train_nshards + + shape = [var_dim] if as_bias else [emb_dim, var_dim] + var = fluid.layers.create_parameter(shape, + dtype=dtype, + name=load_var_name) + + if as_bias: + var = fluid.layers.slice(var, + axes=[0], + starts=[var.shape[0] - remainder], + ends=[var.shape[0]]) + else: + var = fluid.layers.split(var, + [var.shape[1] - remainder, + remainder], + dim=1)[1] + + save_var_dim = nshards + if remainder < nshards: + if name_index == train_nranks - 1: + save_var_dim = remainder + else: + name_index += 1 + advance = True + load_var_name = param_names[name_index] + + if name_index == train_nranks - 1: + var_dim = last_train_nshards + else: + var_dim = train_nshards + shape = [var_dim] if as_bias else [emb_dim, var_dim] + var2 = fluid.layers.create_parameter(shape, + dtype=dtype, + name=load_var_name) + + if remainder + var_dim < nshards: + # The last train rank + save_var_dim = remainder + var_dim + else: + remainder = remainder + var_dim - nshards + elif remainder == nshards: + if name_index == train_nranks - 2: + remainder = last_train_nshards + advance = True + elif name_index < train_nranks - 2: + remainder = train_nshards + advance = True + else: + remainder = remainder - nshards + if var2 is not None: + var = fluid.layers.concat([var, var2], axis=0 if as_bias else 1) + + shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim] + to_save_var = fluid.layers.create_parameter( + shape, + dtype=dtype, + name=save_var_name + '_temp') + if save_var_dim != nshards: # get last dim + if as_bias: + temp_var = fluid.layers.slice( + var, + axes=[0], + starts=[var.shape[0] - save_var_dim], + ends=[var.shape[0]]) + else: + temp_var = fluid.layers.split( + var, + [var.shape[1] - save_var_dim, save_var_dim], + dim=1)[1] + fluid.layers.assign(temp_var, to_save_var) + else: + if as_bias: + temp_var = fluid.layers.slice(var, + axes=[0], + starts=[0], + ends=[nshards]) + else: + temp_var = fluid.layers.split( + var, + [nshards, var.shape[1] - nshards], + dim=1)[0] + fluid.layers.assign(temp_var, to_save_var) + + def expected_var(var): + has_var = os.path.exists(os.path.join(self.model_dir, var.name)) + if has_var: + return True + return False + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_program) + fluid.io.load_vars(exe, + dirname=self.model_dir, + predicate=expected_var, + main_program=main_program) + exe.run(main_program) + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + fluid.io.save_vars(exe, + self.output_dir, + vars=[to_save_var], + main_program=main_program) + srcfile = os.path.join(self.output_dir, to_save_var.name) + dstfile = os.path.join(self.output_dir, save_var_name) + shutil.move(srcfile, dstfile) + return remainder, advance + + def split_parameters(self, param_names, as_bias): + """ + Split parameters whose names are in param_names. + Params: + param_names: list of names of parameters to split + as_bias: whether parameters to split are as bias or not + """ + num_classes = self.num_classes + train_nranks = self.pretrain_nranks + nranks = self.nranks + + train_nshards = (num_classes + train_nranks - 1) // train_nranks + nshards = (num_classes + nranks - 1) // nranks + + save_rank_id = 0 + # remainder dim that is not split in a var + remainder_var_dim = train_nshards + name_index = 0 # index of name of pre-trained parameter to process + for save_rank_id in range(nranks): + assert name_index < train_nranks + remainder_var_dim, advance = self.split_load_and_save( + name_index, + param_names, + save_rank_id, + remainder_var_dim, + as_bias, + train_nshards, + train_nranks, + nshards) + name_index += 1 if advance else 0 + processed_var_count = name_index + 1 + + assert processed_var_count == train_nranks, \ + logger.error("Number of pre-trained parameters processed ({}) is " + "not equal to the number of ranks ({}) for " + "pre-training.".format(processed_var_count, + train_nranks)) + assert save_rank_id == nranks - 1, \ + logger.error("Number of saved parameters ({}) is not equal to the " + "number of ranks ({}) for inference or " + "fine-tuning.".format(save_rank_id + 1, nranks)) + + def split_distfc_parameters(self, + weight_param_names, + weight_velocity_param_names, + bias_param_names, + bias_velocity_param_names): + """ + Split each distributed fc-related parameter according to number of ranks + for inference or fine-tuning. + + Params: + weight_param_names: list of names of weight parameters + bias_param_names: list of names of bias parameters + """ + self.split_parameters(weight_param_names, as_bias=False) + self.split_parameters(weight_velocity_param_names, as_bias=False) + if len(bias_param_names) != 0: + self.split_parameters(bias_param_names, as_bias=True) + self.split_parameters(bias_velocity_param_names, as_bias=True) + + def concat_load_and_save(self, + name_index, + param_names, + save_rank_id, + remainder, + as_bias, + train_nshards, + train_nranks, + nshards, + dtype="float32"): + advance = 0 + emb_dim = self.emb_dim + main_program = fluid.Program() + startup_program = fluid.Program() + num_classes = self.num_classes + + load_var_name = param_names[name_index] + save_var_name_list = load_var_name.split('.') + save_var_name_list[0] = save_var_name_list[0].split('@') + save_var_name_list[0][-1] = "%05d" % save_rank_id + save_var_name_list[0] = '@'.join(save_var_name_list[0]) + save_var_name = '.'.join(save_var_name_list) + + last_train_nshards = num_classes - (train_nranks - 1) * train_nshards + + with fluid.program_guard(main_program, startup_program): + if name_index == train_nranks - 1: + var_dim = last_train_nshards + else: + var_dim = train_nshards + + shape = [var_dim] if as_bias else [emb_dim, var_dim] + var = fluid.layers.create_parameter(shape, + dtype=dtype, + name=load_var_name) + + if as_bias: + var = fluid.layers.slice(var, + axes=[0], + starts=[var.shape[0] - remainder], + ends=[var.shape[0]]) + else: + var = fluid.layers.split(var, + [var.shape[1] - remainder, + remainder], + dim=1)[1] + to_concat_var_list = [var] + while remainder < nshards and name_index < train_nranks - 1: + name_index += 1 + advance += 1 + load_var_name = param_names[name_index] + if name_index == train_nranks - 1: + var_dim = last_train_nshards + else: + var_dim = train_nshards + shape = [var_dim] if as_bias else [emb_dim, var_dim] + var = fluid.layers.create_parameter(shape, + dtype=dtype, + name=load_var_name) + + to_concat_var_list.append(var) + remainder += var_dim + if len(to_concat_var_list) > 1: + var = fluid.layers.concat(to_concat_var_list, + axis=0 if as_bias else 1) + save_var_dim = nshards + if remainder > nshards: + if as_bias: + var = fluid.layers.slice(var, + axes=[0], + starts=[0], + ends=[nshards]) + else: + var = fluid.layers.split( + var, + [nshards, var.shape[1] - nshards], + dim=1)[0] + remainder = remainder - nshards + elif remainder == nshards: + if name_index == train_nranks - 2: + # advance += 1 if len(to_concat_var_list) > 1 else 0 + # to avoid duplicate add + # name_index += 1 if len(to_concat_var_list) > 1 else 0 + # to avoid duplicate add + advance += 1 + name_index += 1 + remainder = last_train_nshards + elif name_index < train_nranks - 2: + # advance += 1 if len(to_concat_var_list) > 1 else 0 + # to avoid duplicate add + # name_index += 1 if len(to_concat_var_list) > 1 else 0 + # to avoid duplicate add + advance += 1 + name_index += 1 + remainder = train_nshards + else: + save_var_dim = remainder + + shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim] + to_save_var = fluid.layers.create_parameter( + shape, + dtype=dtype, + name=save_var_name + '_temp') + + fluid.layers.assign(var, to_save_var) + + def expected_var(var): + has_var = os.path.exists(os.path.join(self.model_dir, var.name)) + if has_var: + return True + return False + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_program) + fluid.io.load_vars(exe, + dirname=self.model_dir, + predicate=expected_var, + main_program=main_program) + exe.run(main_program) + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + fluid.io.save_vars(exe, + self.output_dir, + vars=[to_save_var], + main_program=main_program) + srcfile = os.path.join(self.output_dir, to_save_var.name) + dstfile = os.path.join(self.output_dir, save_var_name) + shutil.move(srcfile, dstfile) + return remainder, advance + + def concat_parameters(self, param_names, as_bias): + """ + Concat parameters whose names are in param_names. + Params: + param_names: list of names of parameters to concat + as_bias: whether parameters to split are as bias or not + """ + num_classes = self.num_classes + train_nranks = self.pretrain_nranks + nranks = self.nranks + + train_nshards = (num_classes + train_nranks - 1) // train_nranks + nshards = (num_classes + nranks - 1) // nranks + + save_rank_id = 0 + remainder_dim = train_nshards # remainder dim that is not concated + name_index = 0 # index of name of pre-trained parameter to process + for save_rank_id in range(nranks): + assert name_index < train_nranks + remainder_dim, advance = self.concat_load_and_save(name_index, + param_names, + save_rank_id, + remainder_dim, + as_bias, + train_nshards, + train_nranks, + nshards) + name_index += advance + processed_var_count = name_index + 1 + + assert processed_var_count == train_nranks, \ + logger.error("Number of pre-trained parameters processed ({}) is " + "not equal to the number of ranks ({}) for " + "pre-training.".format(processed_var_count, + train_nranks)) + assert save_rank_id == nranks - 1, \ + logger.error("Number of saved parameters ({}) is not equal to the " + "number of ranks ({}) for inference or " + "fine-tuning.".format(save_rank_id + 1, nranks)) + + def concat_distfc_parameters(self, + weight_param_names, + weight_velocity_param_names, + bias_param_names, + bias_velocity_param_names): + """ + Concat distributed fc-related parameters according to number of ranks + for inference or finetuning. + + Params: + weight_param_names: list of names of weight parameters + weight_velocity_param_names: list of names of weight velocity + parameters + bias_param_names: list of names of bias parameters + bias_velocity_param_names: list of names of bias velocity parameters + """ + self.concat_parameters(weight_param_names, as_bias=False) + self.concat_parameters(weight_velocity_param_names, as_bias=False) + if len(bias_param_names) != 0: + self.concat_parameters(bias_param_names, as_bias=True) + self.concat_parameters(bias_velocity_param_names, as_bias=True) + + def process(self): + self.load_config() + var_names = self.find_var_names() + weight_param_names = [name for name in var_names + if '.w' in name and 'velocity' not in name] + weight_velocity_param_names = [name for name in var_names + if '.w' in name and 'velocity' in name] + bias_param_names = [name for name in var_names + if '.b' in name and 'velocity' not in name] + bias_velocity_param_names = [name for name in var_names + if '.b' in name and 'velocity' in name] + + def parameter_name_compare(x, y): + """ + Compare two parameter names depend on their rank id. + A parameter name is like dist_softmax_rank_00000.w_0, + where 00000 is the rank id. + """ + rank_id_x = int(x.split('.')[0].split('@')[-1]) + rank_id_y = int(y.split('.')[0].split('@')[-1]) + if rank_id_x < rank_id_y: + return -1 + elif rank_id_x == rank_id_y: + return 0 + else: + return 1 + + weight_param_names.sort(key=cmp_to_key(parameter_name_compare)) + weight_velocity_param_names.sort( + key=cmp_to_key(parameter_name_compare)) + bias_param_names.sort(key=cmp_to_key(parameter_name_compare)) + bias_velocity_param_names.sort(key=cmp_to_key(parameter_name_compare)) + + assert len(weight_param_names) == self.pretrain_nranks, \ + logger.error( + "Number of distributed fc-related weight parameters ({}) " + "should be equal to the number of ranks ({}) for " + "pre-training.".format(len(weight_param_names), + self.pretrain_nranks)) + assert len(weight_velocity_param_names) == self.pretrain_nranks, \ + logger.error( + "Number of distributed fc-related weight parameters ({}) " + "should be equal to the number of ranks ({}) for " + "pre-training.".format(len(weight_velocity_param_names), + self.pretrain_nranks)) + assert (len(bias_param_names) == 0 or + len(bias_param_names) == self.pretrain_nranks), \ + logger.error( + "Number of distributed fc-related bias parameters ({}) " + "should be 0 or equal to the number of ranks ({}) for " + "pre-training.".format(len(bias_param_names), + self.pretrain_nranks)) + assert (len(bias_velocity_param_names) == 0 or + len(bias_velocity_param_names) == self.pretrain_nranks), \ + logger.error( + "Number of distributed fc-related bias parameters ({}) " + "should be 0 or equal to the number of ranks ({}) for " + "pre-training.".format(len(bias_velocity_param_names), + self.pretrain_nranks)) + + pretrain_nranks = self.pretrain_nranks + nranks = self.nranks + if pretrain_nranks == nranks: + logger.info( + "Pre-training and inference (or fine-tuning) have the same " + "number of ranks, nothing to do.") + elif pretrain_nranks < nranks: + self.split_distfc_parameters(weight_param_names, + weight_velocity_param_names, + bias_param_names, + bias_velocity_param_names) + else: + self.concat_distfc_parameters(weight_param_names, + weight_velocity_param_names, + bias_param_names, + bias_velocity_param_names) + + logger.info("Done.") + + +if __name__ == "__main__": + converter = ParameterConverter('./trained_model', + "./trained_model_temp", + 8) + converter.process() diff --git a/plsc/utils/process_distfc_parameter.py b/plsc/utils/process_distfc_parameter.py deleted file mode 100644 index 9ddd495cf986fa3cd86c6c482756fd4d63100f1d..0000000000000000000000000000000000000000 --- a/plsc/utils/process_distfc_parameter.py +++ /dev/null @@ -1,572 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import warnings -import os -import six -import logging -import argparse -import shutil -import pickle - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.fluid.transpiler.details import program_to_code - - -logging.basicConfig(level=logging.INFO, - format='[%(levelname)s %(asctime)s line:%(lineno)d] %(message)s', - datefmt='%d %b %Y %H:%M:%S') -logger = logging.getLogger() - - -parser = argparse.ArgumentParser(description=""" - Tool to convert pretrained distributed fc parameters for inference. - - Note that the number of ranks or GPUs for inference can be different - from that for pretraining.""") - -parser.add_argument("--name_feature", - type=str, - default="@rank@", - help="Feature for names of distributed fc parameters. " - "For example, by default the name for the " - "distributed fc weight parameter is like " - "dist@xxx@rank@id.w_0 where xxx is softmax or arcface " - "depending on the loss types used and rank_id is the " - "rank_id generating this parameter, and hence the " - "feature cloud be @rank@.") -parser.add_argument("--pretrain_nranks", - type=int, - default=-1, - help="Number of ranks (GPUs) for pre-training.") -parser.add_argument("--nranks", - type=int, - required=True, - help="Number of ranks (GPUs) for inference or finetuning.") -parser.add_argument("--num_classes", - type=int, - default=-1, - help="Number of classes for classification.") -parser.add_argument("--emb_dim", - type=int, - default=-1, - help="Embedding dim.") -parser.add_argument("--pretrained_model_dir", - type=str, - required=True, - default=None, - help="Directory for pretrained model.") -parser.add_argument("--output_dir", - type=str, - required=True, - default=None, - help="Directory for output.") -args = parser.parse_args() - - -def load_config(args): - """ - Load config file which contains the following information for pretraining: - 1. pretrain_nranks (int): number of ranks for pretraining; - 2. emb_dim (int): embedding dim for pretraining; - 3. num_classes (int): number of classes for classification. - """ - meta_file = os.path.join(args.pretrained_model_dir, 'meta.pickle') - if not os.path.exists(meta_file): - if args.pretrain_nranks < 0 or args.emb_dim < 0 or args.num_classes < 0: - logger.error("Meta file does not exist, you have to set " - "'--pretrain_nranks', '--emb_dim' and '--num_classes " - "parameters manually.") - exit() - logger.debug("Meta file does not exist, make sure you have correctly " - "set --pretrain_nranks ({}), --emb_dim ({}) and " - "--num_classes ({}) parameters manually.".format( - args.pretrain_nranks, args.emb_dim, args.num_classes)) - else: - with open(meta_file, 'rb') as handle: - config = pickle.load(handle) - if args.pretrain_nranks < 0: - args.pretrain_nranks = config['pretrain_nranks'] - elif args.pretrain_nranks != config['pretrain_nranks']: - logger.error("The --pretrain_nranks ({}) parameter you set is not " - "equal to that ({}) for pretraining, please check " - "it.".format(args.pretrain_nranks, - config['pretrain_nranks'])) - exit() - if args.emb_dim < 0: - args.emb_dim = config['emb_dim'] - elif args.emb_dim != config['emb_dim']: - logger.error("The --emb_dim ({}) parameter you set is not equal to " - "that ({}) for pretraining, please check it.".format( - args.emb_dim, config['emb_dim'])) - exit() - if args.num_classes < 0: - args.num_classes = config['num_classes'] - elif args.num_classes != config['num_classes']: - logger.error("The --num_classes ({}) parameter you set is not equal" - " to that ({}) for pretraining, please check " - "it.".format(args.emb_dim, config['emb_dim'])) - exit() - logger.debug("Parameters for pretraining: pretrain_nranks ({}), emb_dim " - "({}), and num_classes ({}).".format(args.pretrain_nranks, - args.emb_dim, args.num_classes)) - logger.debug("Parameters for inference or finetuning: nranks ({}).".format( - args.nranks)) - - -def find_distfc_var_names(args): - """ - Find all names of pretrained distfc-related parameters, - e.g., dist_softmax_rank_00000.w_0, dist_softmax_rank_00000.b_0 etc. - We assume that names of distfc-related parameters start with the - prefix 'dist'. - """ - var_names = [] - model_dir = os.path.abspath(args.pretrained_model_dir) - if not os.path.exists(model_dir): - logger.error("The directory for pretrained model ({}) does not exist, " - "please check it.".format(model_dir)) - exit() - logger.info("The directory for pretrained model: {}".format(model_dir)) - args.pretrained_model_dir = model_dir - for file in os.listdir(model_dir): - if args.name_feature in file: - var_names.append(file) - assert len(var_names) > 0, \ - logger.error("No distributed fc parameters found.") - logger.info("Number of distributed fc parameters: {}.".format( - len(var_names))) - logger.debug("Distributed fc parameters: {}.".format(var_names)) - return var_names - - -def split_load_and_save(args, - name_index, - param_names, - save_rank_id, - remainder, - as_bias, - train_nshards, - train_nranks, - nshards, - dtype="float32"): - var2 = None - advance = False - emb_dim = args.emb_dim - main_program = fluid.Program() - startup_program = fluid.Program() - - load_var_name = param_names[name_index] - save_var_name_list = load_var_name.split('.') - save_var_name_list[0] = save_var_name_list[0].split('@') - save_var_name_list[0][-1] = "%05d" % save_rank_id - save_var_name_list[0] = '@'.join(save_var_name_list[0]) - save_var_name = '.'.join(save_var_name_list) - - last_train_nshards = args.num_classes - (train_nranks - 1) * train_nshards - - with fluid.program_guard(main_program, startup_program): - if name_index == train_nranks - 1: - var_dim = last_train_nshards - else: - var_dim = train_nshards - - shape = [var_dim] if as_bias else [emb_dim, var_dim] - var = fluid.layers.create_parameter(shape, dtype=dtype, - name=load_var_name) - - if as_bias: - var = fluid.layers.slice(var, axes=[0], - starts=[var.shape[0] - remainder], ends=[var.shape[0]]) - else: - var = fluid.layers.split(var, [var.shape[1] - remainder, remainder], - dim=1)[1] - - save_var_dim = nshards - if remainder < nshards: - if name_index == train_nranks - 1: - save_var_dim = remainder - else: - name_index += 1 - advance = True - load_var_name = param_names[name_index] - - if name_index == train_nranks - 1: - var_dim = last_train_nshards - else: - var_dim = train_nshards - shape = [var_dim] if as_bias else [emb_dim, var_dim] - var2 = fluid.layers.create_parameter(shape, dtype=dtype, - name=load_var_name) - - if remainder + var_dim < nshards: - # The last train rank - save_var_dim = remainder + var_dim - else: - remainder = remainder + var_dim - nshards - elif remainder == nshards: - if name_index == train_nranks - 2: - remainder = last_train_nshards - advance = True - elif name_index < train_nranks - 2: - remainder = train_nshards - advance = True - else: - remainder = remainder - nshards - if var2 is not None: - var = fluid.layers.concat([var, var2], axis=0 if as_bias else 1) - - shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim] - to_save_var = fluid.layers.create_parameter(shape, dtype=dtype, - name=save_var_name + '_temp') - if save_var_dim != nshards: # get last dim - if as_bias: - temp_var = fluid.layers.slice(var, axes=[0], - starts=[var.shape[0] - save_var_dim], ends=[var.shape[0]]) - else: - temp_var = fluid.layers.split(var, - [var.shape[1] - save_var_dim, save_var_dim], dim=1)[1] - fluid.layers.assign(temp_var, to_save_var) - else: - if as_bias: - temp_var = fluid.layers.slice(var, axes=[0], starts=[0], - ends=[nshards]) - else: - temp_var = fluid.layers.split(var, - [nshards, var.shape[1] - nshards], dim=1)[0] - fluid.layers.assign(temp_var, to_save_var) - - def expected_var(var): - has_var = os.path.exists(os.path.join(args.pretrained_model_dir, - var.name)) - if has_var: - return True - return False - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_program) - fluid.io.load_vars(exe, dirname=args.pretrained_model_dir, - predicate=expected_var, main_program=main_program) - exe.run(main_program) - if not os.path.exists(args.output_dir): - os.makedirs(args.output_dir) - fluid.io.save_vars(exe, args.output_dir, vars=[to_save_var], - main_program=main_program) - srcfile = os.path.join(args.output_dir, to_save_var.name) - dstfile = os.path.join(args.output_dir, save_var_name) - shutil.move(srcfile, dstfile) - return remainder, advance - - -def split_parameters(args, param_names, as_bias): - """ - Split parameters whose names are in param_names. - Params: - args: command line paramters - param_names: list of names of parameters to split - as_bias: whether parameters to split are as bias or not - """ - num_classes = args.num_classes - train_nranks = args.pretrain_nranks - nranks = args.nranks - - train_nshards = (num_classes + train_nranks - 1) // train_nranks - nshards = (num_classes + nranks - 1) // nranks # for inference of finetuning - - save_rank_id = 0 - remainder_var_dim = train_nshards # remainder dim that is not split in a var - name_index = 0 # index of name of pretrained parameter to process - for save_rank_id in range(nranks): - assert name_index < train_nranks - remainder_var_dim, advance = split_load_and_save(args, name_index, - param_names, save_rank_id, remainder_var_dim, as_bias, - train_nshards, train_nranks, nshards) - name_index += 1 if advance else 0 - processed_var_count = name_index + 1 - - assert processed_var_count == train_nranks, logger.error("Number of " - "pretrained parameters processed ({}) is not equal to the number of " - "ranks ({}) for pretraining.".format(processed_var_count, train_nranks)) - assert save_rank_id == nranks - 1, logger.error("Number of saved parameters" - " ({}) is not equal to the number of ranks ({}) for inference or " - "finetuning.".format(save_rank_id + 1, nranks)) - - -def split_distfc_parameters(args, - weight_param_names, - weight_velocity_param_names, - bias_param_names, - bias_velocity_param_names): - """ - Split each distributed fc-related parameter according to number of ranks - for inference or finetuning. - - Params: - args: command line paramters - weight_param_names: list of names of weight parameters - bias_param_names: list of names of bias parameters - """ - split_parameters(args, weight_param_names, as_bias=False) - split_parameters(args, weight_velocity_param_names, as_bias=False) - if len(bias_param_names) != 0: - split_parameters(args, bias_param_names, as_bias=True) - split_parameters(args, bias_velocity_param_names, as_bias=True) - - -def concat_load_and_save(args, - name_index, - param_names, - save_rank_id, - remainder, - as_bias, - train_nshards, - train_nranks, - nshards, - dtype="float32"): - advance = 0 - orig_nshards = nshards - emb_dim = args.emb_dim - main_program = fluid.Program() - startup_program = fluid.Program() - - load_var_name = param_names[name_index] - save_var_name_list = load_var_name.split('.') - save_var_name_list[0] = save_var_name_list[0].split('@') - save_var_name_list[0][-1] = "%05d" % save_rank_id - save_var_name_list[0] = '@'.join(save_var_name_list[0]) - save_var_name = '.'.join(save_var_name_list) - - last_train_nshards = args.num_classes - (train_nranks - 1) * train_nshards - - with fluid.program_guard(main_program, startup_program): - if name_index == train_nranks - 1: - var_dim = last_train_nshards - else: - var_dim = train_nshards - - shape = [var_dim] if as_bias else [emb_dim, var_dim] - var = fluid.layers.create_parameter(shape, dtype=dtype, - name=load_var_name) - - if as_bias: - var = fluid.layers.slice(var, axes=[0], - starts=[var.shape[0] - remainder], ends=[var.shape[0]]) - else: - var = fluid.layers.split(var, [var.shape[1] - remainder, remainder], - dim=1)[1] - to_concat_var_list = [var] - while remainder < nshards and name_index < train_nranks - 1: - name_index += 1 - advance += 1 - load_var_name = param_names[name_index] - if name_index == train_nranks - 1: - var_dim = last_train_nshards - else: - var_dim = train_nshards - shape = [var_dim] if as_bias else [emb_dim, var_dim] - var = fluid.layers.create_parameter(shape, dtype=dtype, - name=load_var_name) - - to_concat_var_list.append(var) - remainder += var_dim - if len(to_concat_var_list) > 1: - var = fluid.layers.concat( - to_concat_var_list, axis=0 if as_bias else 1) - save_var_dim = nshards - if remainder > nshards: - if as_bias: - var = fluid.layers.slice(var, axes=[0], starts=[0], - ends=[nshards]) - else: - var = fluid.layers.split(var, - [nshards, var.shape[1] - nshards], dim=1)[0] - remainder = remainder - nshards - elif remainder == nshards: - if name_index == train_nranks - 2: - #advance += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add - #name_index += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add - advance += 1 - name_index += 1 - remainder = last_train_nshards - elif name_index < train_nranks - 2: - #advance += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add - #name_index += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add - advance += 1 - name_index += 1 - remainder = train_nshards - else: - save_var_dim = remainder - - shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim] - to_save_var = fluid.layers.create_parameter(shape, dtype=dtype, - name=save_var_name + '_temp') - - fluid.layers.assign(var, to_save_var) - - def expected_var(var): - has_var = os.path.exists(os.path.join(args.pretrained_model_dir, - var.name)) - if has_var: - return True - return False - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_program) - fluid.io.load_vars(exe, dirname=args.pretrained_model_dir, - predicate=expected_var, main_program=main_program) - exe.run(main_program) - if not os.path.exists(args.output_dir): - os.makedirs(args.output_dir) - fluid.io.save_vars(exe, args.output_dir, vars=[to_save_var], - main_program=main_program) - srcfile = os.path.join(args.output_dir, to_save_var.name) - dstfile = os.path.join(args.output_dir, save_var_name) - shutil.move(srcfile, dstfile) - return remainder, advance - - -def concat_parameters(args, param_names, as_bias): - """ - Concat parameters whose names are in param_names. - Params: - args: command line paramters - param_names: list of names of parameters to concat - as_bias: whether parameters to split are as bias or not - """ - num_classes = args.num_classes - train_nranks = args.pretrain_nranks - nranks = args.nranks - - train_nshards = (num_classes + train_nranks - 1) // train_nranks - nshards = (num_classes + nranks - 1) // nranks # for inference of finetuning - - save_rank_id = 0 - remainder_dim = train_nshards # remainder dim that is not concatted - name_index = 0 # index of name of pretrained parameter to process - for save_rank_id in range(nranks): - assert name_index < train_nranks - remainder_dim, advance = concat_load_and_save(args, - name_index, param_names, save_rank_id, remainder_dim, - as_bias, train_nshards, train_nranks, nshards) - name_index += advance - processed_var_count = name_index + 1 - - assert processed_var_count == train_nranks, logger.error("Number of " - "pretrained parameters processed ({}) is not equal to the number of " - "ranks ({}) for pretraining.".format(processed_var_count, train_nranks)) - assert save_rank_id == nranks - 1, logger.error("Number of saved parameters" - " ({}) is not equal to the number of ranks ({}) for inference or " - "finetuning.".format(save_rank_id + 1, nranks)) - - -def concat_distfc_parameters(args, - weight_param_names, - weight_velocity_param_names, - bias_param_names, - bias_velocity_param_names): - """ - Concat distributed fc-related parameters according to number of ranks - for inference or finetuning. - - Params: - args: command line paramters - weight_param_names: list of names of weight parameters - bias_param_names: list of names of bias parameters - """ - concat_parameters(args, weight_param_names, as_bias=False) - concat_parameters(args, weight_velocity_param_names, as_bias=False) - if len(bias_param_names) != 0: - concat_parameters(args, bias_param_names, as_bias=True) - concat_parameters(args, bias_velocity_param_names, as_bias=True) - - -def parameter_name_compare(x, y): - """ - Compare two parameter names depend on their rank id. - A parameter name is like dist_softmax_rank_00000.w_0, - where 00000 is the rank id. - """ - rank_id_x = int(x.split('.')[0].split('@')[-1]) - rank_id_y = int(y.split('.')[0].split('@')[-1]) - if rank_id_x < rank_id_y: - return -1 - elif rank_id_x == rank_id_y: - return 0 - else: - return 1 - - -def main(): - global args - load_config(args) - - var_names = find_distfc_var_names(args) - weight_param_names = [name for name in var_names - if '.w' in name and 'velocity' not in name] - weight_velocity_param_names = [name for name in var_names - if '.w' in name and 'velocity' in name] - bias_param_names = [name for name in var_names - if '.b' in name and 'velocity' not in name] - bias_velocity_param_names = [name for name in var_names - if '.b' in name and 'velocity' in name] - - weight_param_names.sort(parameter_name_compare) - weight_velocity_param_names.sort(parameter_name_compare) - bias_param_names.sort(parameter_name_compare) - bias_velocity_param_names.sort(parameter_name_compare) - assert len(weight_param_names) == args.pretrain_nranks, \ - logger.error("Number of distributed fc-related weight parameters ({}) " - "should be equal to the number of ranks ({}) for " - "pretraining.".format(len(weight_param_names), - args.pretrain_nranks)) - assert len(weight_velocity_param_names) == args.pretrain_nranks, \ - logger.error("Number of distributed fc-related weight parameters ({}) " - "should be equal to the number of ranks ({}) for " - "pretraining.".format(len(weight_velocity_param_names), - args.pretrain_nranks)) - assert len(bias_param_names) == 0 or \ - len(bias_param_names) == args.pretrain_nranks, logger.error("Number of " - "distributed fc-related bias parameters ({}) should be 0 or equal " - "to the number of ranks ({}) for pretraining.".format( - len(bias_param_names), args.pretrain_nranks)) - assert len(bias_velocity_param_names) == 0 or \ - len(bias_velocity_param_names) == args.pretrain_nranks, logger.error("Number of " - "distributed fc-related bias parameters ({}) should be 0 or equal " - "to the number of ranks ({}) for pretraining.".format( - len(bias_velocity_param_names), args.pretrain_nranks)) - - pretrain_nranks = args.pretrain_nranks - nranks = args.nranks - if pretrain_nranks == nranks: - logger.info("Pre-training and inference (or finetuning) have the same " - "number of ranks, nothing to do.") - elif pretrain_nranks < nranks: - split_distfc_parameters(args, weight_param_names, - weight_velocity_param_names, bias_param_names, - bias_velocity_param_names) - else: - concat_distfc_parameters(args, weight_param_names, - weight_velocity_param_names, bias_param_names, - bias_velocity_param_names) - - logger.info("Done.") - - -if __name__ == "__main__": - main() diff --git a/plsc/version.py b/plsc/version.py index 0b618a61b046c996d9fbcc91b04f917e7473b2be..a6a2ac64ff2d4bca3f6fa071fcc15337d67ef36c 100644 --- a/plsc/version.py +++ b/plsc/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. """ PLSC version string """ -plsc_version = "0.1.0" +plsc_version = "0.0.0" diff --git a/tools/process_base64_files.py b/tools/process_base64_files.py index 8aa0ad4c9422d0a2951f5b73470debf333af28eb..783afca57ee4ba954e4398e62820702e7b0a3339 100644 --- a/tools/process_base64_files.py +++ b/tools/process_base64_files.py @@ -12,29 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function from __future__ import division +from __future__ import print_function -import os import argparse -import random -import time -import math import logging +import math +import os +import random import sqlite3 import tempfile -import six +import time +import six logging.basicConfig(level=logging.INFO, - format='[%(levelname)s %(asctime)s line:%(lineno)d] %(message)s', - datefmt='%d %b %Y %H:%M:%S') + format='[%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d %b %Y %H:%M:%S') logger = logging.getLogger() - parser = argparse.ArgumentParser(description=""" Tool to preprocess dataset in base64 format.""") - + """ We assume that the directory of dataset contains a file-list file, and one or more data files. Each line of the file-list file represents a data file. @@ -111,9 +110,9 @@ class Base64Preprocessor(object): line = line.strip() file_path = os.path.join(self.data_dir, line) with open(file_path, 'r') as df: - for line in df.xreadlines(): - line = line.strip().split('\t') - self.insert_to_db(cnt, line) + for line_local in df.xreadlines(): + line_local = line_local.strip().split('\t') + self.insert_to_db(cnt, line_local) cnt += 1 os.remove(file_path) else: @@ -121,9 +120,9 @@ class Base64Preprocessor(object): line = line.strip() file_path = os.path.join(self.data_dir, line) with open(file_path, 'r') as df: - for line in df: - line = line.strip().split('\t') - self.insert_to_db(cnt, line) + for line_local in df: + line_local = line_local.strip().split('\t') + self.insert_to_db(cnt, line_local) cnt += 1 os.remove(file_path) @@ -143,19 +142,20 @@ class Base64Preprocessor(object): start_time = time.time() - lines_per_rank = int(math.ceil(num/nranks)) + lines_per_rank = int(math.ceil(num / nranks)) total_num = lines_per_rank * nranks index = index + index[0:total_num - num] assert len(index) == total_num for rank in range(nranks): start = rank * lines_per_rank - end = (rank + 1) * lines_per_rank # exclusive + end = (rank + 1) * lines_per_rank # exclusive f_handler = open(os.path.join(self.data_dir, - ".tmp_" + str(rank)), 'w') + ".tmp_" + str(rank)), 'w') for i in range(start, end): idx = index[i] - sql_cmd = "SELECT DATA, LABEL FROM DATASET WHERE ID={};".format(idx) + sql_cmd = "SELECT DATA, LABEL FROM DATASET WHERE ID={};".format( + idx) cursor = self.cursor.execute(sql_cmd) for result in cursor: data = result[0] @@ -174,7 +174,7 @@ class Base64Preprocessor(object): line += '\n' f_t.writelines(line) os.rename(os.path.join(data_dir, ".tmp_" + str(rank)), - os.path.join(data_dir, "base64_rank_{}".format(rank))) + os.path.join(data_dir, "base64_rank_{}".format(rank))) os.remove(file_list) os.rename(temp_file_list, file_list) @@ -183,21 +183,16 @@ class Base64Preprocessor(object): def close_db(self): self.conn.close() self.tempfile.close() - + os.remove(self.sqlite3_file) + def main(): global args - + obj = Base64Preprocessor(args.data_dir, args.file_list, args.nranks) obj.shuffle_files() obj.close_db() - #data_dir = args.data_dir - #file_list = args.file_list - #nranks = args.nranks - #names, file_num_map, num = get_image_info(data_dir, file_list) - # - if __name__ == "__main__": main()