未验证 提交 c56ceffc 编写于 作者: L lilong12 提交者: GitHub

fix the compatibility issue between PY2 and PY3 (#21)

1. be compatible with PY3
2. reformat code.
上级 a36148cf
......@@ -13,5 +13,6 @@
# limitations under the License.
from .entry import Entry
from .version import plsc_version as __version__
__all__ = ['Entry']
......@@ -35,9 +35,9 @@ config.warmup_epochs = 0
config.loss_type = "dist_arcface"
config.num_classes = 85742
config.image_shape = (3,112,112)
config.image_shape = (3, 112, 112)
config.margin = 0.5
config.scale = 64.0
config.lr = 0.1
config.lr_steps = (100000,160000,220000)
config.lr_steps = (100000, 160000, 220000)
config.emb_dim = 512
......@@ -12,36 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
import os
import sys
import time
import argparse
import numpy as np
from __future__ import print_function
import errno
import json
import logging
import math
import pickle
import subprocess
import os
import shutil
import logging
import subprocess
import sys
import tempfile
import time
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler
import sklearn
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.transpiler.details.program_utils import program_to_code
from . import config
from .models import resnet
from .models import DistributedClassificationOptimizer
from .models import base_model
from .models.dist_algo import DistributedClassificationOptimizer
from .models import resnet
from .utils import jpeg_reader as reader
from .utils.learning_rate import lr_warmup
from .utils.parameter_converter import ParameterConverter
from .utils.verification import evaluate
from .utils import jpeg_reader as reader
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.transpiler.details.program_utils import program_to_code
import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler
from paddle.fluid.optimizer import Optimizer
logging.basicConfig(
level=logging.INFO,
......@@ -59,9 +61,6 @@ class Entry(object):
"""
Check the validation of parameters.
"""
assert os.getenv("PADDLE_TRAINERS_NUM") is not None, \
"Please start script using paddle.distributed.launch module."
supported_types = ["softmax", "arcface",
"dist_softmax", "dist_arcface"]
assert self.loss_type in supported_types, \
......@@ -70,7 +69,8 @@ class Entry(object):
if self.loss_type in ["dist_softmax", "dist_arcface"]:
assert self.num_trainers > 1, \
"At least 2 trainers are required to use distributed fc-layer."
"At least 2 trainers are required for distributed fc-layer. " \
"You can start your job using paddle.distributed.launch module."
def __init__(self):
self.config = config.config
......@@ -89,6 +89,7 @@ class Entry(object):
self.model = None
self.train_reader = None
self.test_reader = None
self.predict_reader = None
self.train_program = fluid.Program()
self.startup_program = fluid.Program()
......@@ -97,7 +98,15 @@ class Entry(object):
self.fs_name = None
self.fs_ugi = None
self.fs_dir = None
self.fs_dir_for_save = None
self.fs_checkpoint_dir = None
self.param_attr = None
self.bias_attr = None
self.has_run_train = False # Whether has run training or not
self.test_initialized = False
self.train_pass_id = -1
self.use_fp16 = False
self.fp16_user_dict = None
......@@ -150,13 +159,13 @@ class Entry(object):
def set_mixed_precision(self,
use_fp16,
init_loss_scaling = 1.0,
incr_every_n_steps = 2000,
decr_every_n_nan_or_inf = 2,
incr_ratio = 2.0,
decr_ratio = 0.5,
use_dynamic_loss_scaling = True,
amp_lists = None):
init_loss_scaling=1.0,
incr_every_n_steps=2000,
decr_every_n_nan_or_inf=2,
incr_ratio=2.0,
decr_ratio=0.5,
use_dynamic_loss_scaling=True,
amp_lists=None):
"""
Whether to use mixed precision training.
"""
......@@ -178,7 +187,11 @@ class Entry(object):
self.global_test_batch_size = batch_size * self.num_trainers
logger.info("Set test batch size to {}.".format(batch_size))
def set_hdfs_info(self, fs_name, fs_ugi, directory):
def set_hdfs_info(self,
fs_name,
fs_ugi,
fs_dir_for_save=None,
fs_checkpoint_dir=None):
"""
Set the info to download from or upload to hdfs filesystems.
If the information is provided, we will download pretrained
......@@ -187,11 +200,13 @@ class Entry(object):
"""
self.fs_name = fs_name
self.fs_ugi = fs_ugi
self.fs_dir = directory
self.fs_dir_for_save = fs_dir_for_save
self.fs_checkpoint_dir = fs_checkpoint_dir
logger.info("HDFS Info:")
logger.info("\tfs_name: {}".format(fs_name))
logger.info("\tfs_ugi: {}".format(fs_ugi))
logger.info("\tremote directory: {}".format(directory))
logger.info("\tfs dir for save: {}".format(self.fs_dir_for_save))
logger.info("\tfs checkpoint dir: {}".format(self.fs_checkpoint_dir))
def set_model_save_dir(self, directory):
"""
......@@ -207,7 +222,7 @@ class Entry(object):
Whether to calcuate acc1 and acc5 during training.
"""
self.calc_train_acc = calc
logger.info("Calcuating acc1 and acc5 during training: {}.".format(
logger.info("Calculating acc1 and acc5 during training: {}.".format(
calc))
def set_dataset_dir(self, directory):
......@@ -237,8 +252,8 @@ class Entry(object):
"""
Set the size of the last hidding layer before the distributed fc-layer.
"""
self.emb_size = size
logger.info("Set emb_size to {}.".format(size))
self.emb_dim = size
logger.info("Set emb_dim to {}.".format(size))
def set_model(self, model):
"""
......@@ -270,13 +285,13 @@ class Entry(object):
self.warmup_epochs = num
logger.info("Set warmup_epochs to {}.".format(num))
def set_loss_type(self, type):
def set_loss_type(self, loss_type):
supported_types = ["dist_softmax", "dist_arcface", "softmax", "arcface"]
if not type in supported_types:
if loss_type not in supported_types:
raise ValueError("All supported loss types: {}".format(
supported_types))
self.loss_type = type
logger.info("Set loss_type to {}.".format(type))
self.loss_type = loss_type
logger.info("Set loss_type to {}.".format(loss_type))
def set_image_shape(self, shape):
if not isinstance(shape, (list, tuple)):
......@@ -286,9 +301,21 @@ class Entry(object):
def set_optimizer(self, optimizer):
if not isinstance(optimizer, Optimizer):
raise ValueError("Optimizer must be type of Optimizer")
raise ValueError("Optimizer must be of type Optimizer")
self.optimizer = optimizer
logger.info("User manually set optimizer")
logger.info("User manually set optimizer.")
def set_with_test(self, with_test):
self.with_test = with_test
logger.info("Set with_test to {}.".format(with_test))
def set_distfc_attr(self, param_attr=None, bias_attr=None):
self.param_attr = param_attr
logger.info("Set param_attr for distfc to {}.".format(self.param_attr))
if self.bias_attr:
self.bias_attr = bias_attr
logger.info(
"Set bias_attr for distfc to {}.".format(self.bias_attr))
def _get_optimizer(self):
if not self.optimizer:
......@@ -310,7 +337,10 @@ class Entry(object):
logger.info("lr_step: {}".format(lr))
if self.warmup_epochs:
lr_val = lr_warmup(fluid.layers.piecewise_decay(boundaries=bd,
values=lr), warmup_steps, start_lr, base_lr)
values=lr),
warmup_steps,
start_lr,
base_lr)
else:
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
......@@ -321,25 +351,30 @@ class Entry(object):
if self.loss_type in ["dist_softmax", "dist_arcface"]:
self.optimizer = DistributedClassificationOptimizer(
self.optimizer, global_batch_size, use_fp16=self.use_fp16,
self.optimizer,
self.train_batch_size,
use_fp16=self.use_fp16,
loss_type=self.loss_type,
fp16_user_dict=self.fp16_user_dict)
elif self.use_fp16:
self.optimizer = fluid.contrib.mixed_precision.decorate(
optimizer=optimizer,
optimizer=self.optimizer,
init_loss_scaling=self.fp16_user_dict['init_loss_scaling'],
incr_every_n_steps=self.fp16_user_dict['incr_every_n_steps'],
decr_every_n_nan_or_inf=self.fp16_user_dict['decr_every_n_nan_or_inf'],
decr_every_n_nan_or_inf=self.fp16_user_dict[
'decr_every_n_nan_or_inf'],
incr_ratio=self.fp16_user_dict['incr_ratio'],
decr_ratio=self.fp16_user_dict['decr_ratio'],
use_dynamic_loss_scaling=self.fp16_user_dict['use_dynamic_loss_scaling'],
use_dynamic_loss_scaling=self.fp16_user_dict[
'use_dynamic_loss_scaling'],
amp_lists=self.fp16_user_dict['amp_lists']
)
)
return self.optimizer
def build_program(self,
is_train=True,
use_parallel_test=False):
use_parallel_test=False,
dist_strategy=None):
model_name = self.model_name
assert not (is_train and use_parallel_test), \
"is_train and use_parallel_test cannot be set simultaneously."
......@@ -357,18 +392,23 @@ class Entry(object):
with fluid.program_guard(main_program, startup_program):
with fluid.unique_name.guard():
image = fluid.layers.data(name='image',
shape=image_shape, dtype='float32')
shape=image_shape,
dtype='float32')
label = fluid.layers.data(name='label',
shape=[1], dtype='int64')
emb, loss, prob = model.get_output(
input=image,
label=label,
is_train=is_train,
num_classes=self.num_classes,
loss_type=self.loss_type,
margin=self.margin,
scale=self.scale)
shape=[1],
dtype='int64')
emb, loss, prob = model.get_output(input=image,
label=label,
num_ranks=num_trainers,
rank_id=trainer_id,
is_train=is_train,
num_classes=self.num_classes,
loss_type=self.loss_type,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
margin=self.margin,
scale=self.scale)
acc1 = None
acc5 = None
......@@ -377,78 +417,93 @@ class Entry(object):
if self.calc_train_acc:
shard_prob = loss._get_info("shard_prob")
prob_all = fluid.layers.collective._c_allgather(shard_prob,
nranks=num_trainers, use_calc_stream=True)
prob_list = fluid.layers.split(prob_all, dim=0,
prob_all = fluid.layers.collective._c_allgather(
shard_prob,
nranks=num_trainers,
use_calc_stream=True)
prob_list = fluid.layers.split(
prob_all,
dim=0,
num_or_sections=num_trainers)
prob = fluid.layers.concat(prob_list, axis=1)
label_all = fluid.layers.collective._c_allgather(label,
nranks=num_trainers, use_calc_stream=True)
acc1 = fluid.layers.accuracy(input=prob, label=label_all, k=1)
acc5 = fluid.layers.accuracy(input=prob, label=label_all, k=5)
label_all = fluid.layers.collective._c_allgather(
label,
nranks=num_trainers,
use_calc_stream=True)
acc1 = fluid.layers.accuracy(input=prob,
label=label_all,
k=1)
acc5 = fluid.layers.accuracy(input=prob,
label=label_all,
k=5)
else:
if self.calc_train_acc:
acc1 = fluid.layers.accuracy(input=prob, label=label, k=1)
acc5 = fluid.layers.accuracy(input=prob, label=label, k=5)
acc1 = fluid.layers.accuracy(input=prob,
label=label,
k=1)
acc5 = fluid.layers.accuracy(input=prob,
label=label,
k=5)
optimizer = None
if is_train:
# initialize optimizer
optimizer = self._get_optimizer()
dist_optimizer = self.fleet.distributed_optimizer(
optimizer, strategy=self.strategy)
dist_optimizer.minimize(loss)
if self.num_trainers > 1:
dist_optimizer = fleet.distributed_optimizer(
optimizer, strategy=dist_strategy)
dist_optimizer.minimize(loss)
else: # single card training
optimizer.minimize(loss)
if "dist" in self.loss_type or self.use_fp16:
optimizer = optimizer._optimizer
elif use_parallel_test:
emb = fluid.layers.collective._c_allgather(emb,
nranks=num_trainers, use_calc_stream=True)
emb = fluid.layers.collective._c_allgather(
emb,
nranks=num_trainers,
use_calc_stream=True)
return emb, loss, acc1, acc5, optimizer
def get_files_from_hdfs(self, local_dir):
def get_files_from_hdfs(self):
assert self.fs_checkpoint_dir, \
logger.error("Please set the fs_checkpoint_dir paramerters for "
"set_hdfs_info to get models from hdfs.")
self.fs_checkpoint_dir = os.path.join(self.fs_checkpoint_dir, '*')
cmd = "hadoop fs -D fs.default.name="
cmd += self.fs_name + " "
cmd += "-D hadoop.job.ugi="
cmd += self.fs_ugi + " "
cmd += "-get " + self.fs_dir
cmd += " " + local_dir
cmd += "-get " + self.fs_checkpoint_dir
cmd += " " + self.checkpoint_dir
logger.info("hdfs download cmd: {}".format(cmd))
cmd = cmd.split(' ')
process = subprocess.Popen(cmd,
stdout=sys.stdout,
stderr=subprocess.STDOUT)
stdout=sys.stdout,
stderr=subprocess.STDOUT)
process.wait()
def put_files_to_hdfs(self, local_dir):
assert self.fs_dir_for_save, \
logger.error("Please set fs_dir_for_save paramerter "
"for set_hdfs_info to save models to hdfs.")
cmd = "hadoop fs -D fs.default.name="
cmd += self.fs_name + " "
cmd += "-D hadoop.job.ugi="
cmd += self.fs_ugi + " "
cmd += "-put " + local_dir
cmd += " " + self.fs_dir
cmd += " " + self.fs_dir_for_save
logger.info("hdfs upload cmd: {}".format(cmd))
cmd = cmd.split(' ')
process = subprocess.Popen(cmd,
stdout=sys.stdout,
stderr=subprocess.STDOUT)
stdout=sys.stdout,
stderr=subprocess.STDOUT)
process.wait()
def preprocess_distributed_params(self,
local_dir):
def process_distributed_params(self, local_dir):
local_dir = os.path.abspath(local_dir)
output_dir = tempfile.mkdtemp()
cmd = sys.executable + ' -m plsc.utils.process_distfc_parameter '
cmd += "--nranks {} ".format(self.num_trainers)
cmd += "--num_classes {} ".format(self.num_classes)
cmd += "--pretrained_model_dir {} ".format(local_dir)
cmd += "--output_dir {}".format(output_dir)
cmd = cmd.split(' ')
logger.info("Distributed parameters processing cmd: {}".format(cmd))
process = subprocess.Popen(cmd,
stdout=sys.stdout,
stderr=subprocess.STDOUT)
process.wait()
converter = ParameterConverter(local_dir, output_dir, self.num_trainers)
converter.process()
for file in os.listdir(local_dir):
if "dist@" in file and "@rank@" in file:
......@@ -477,7 +532,6 @@ class Entry(object):
outputs={'Out': var},
attrs={'use_calc_stream': True})
def load_checkpoint(self,
executor,
main_program,
......@@ -493,30 +547,28 @@ class Entry(object):
if os.path.exists(checkpoint_dir):
logger.info("Local dir {} exists, we'll overwrite it.".format(
checkpoint_dir))
shutil.rmtree(checkpoint_dir)
os.makedirs(checkpoint_dir)
# sync all trainers to avoid loading checkpoints before
# parameters are downloaded
file_name = os.path.join(checkpoint_dir, '.lock')
if self.trainer_id == 0:
self.get_files_from_hdfs(checkpoint_dir)
with open(file_name, 'w') as f:
pass
time.sleep(10)
os.remove(file_name)
else:
while True:
if not os.path.exists(file_name):
time.sleep(1)
else:
break
# sync all trainers to avoid loading checkpoints before
# parameters are downloaded
file_name = os.path.join(checkpoint_dir, '.lock')
if self.trainer_id == 0:
self.get_files_from_hdfs()
with open(file_name, 'w') as f:
pass
time.sleep(10)
os.remove(file_name)
else:
while True:
if not os.path.exists(file_name):
time.sleep(1)
else:
break
# Preporcess distributed parameters.
file_name = os.path.join(checkpoint_dir, '.lock')
distributed = self.loss_type in ["dist_softmax", "dist_arcface"]
if load_for_train and self.trainer_id == 0 and distributed:
self.preprocess_distributed_params(checkpoint_dir)
self.process_distributed_params(checkpoint_dir)
with open(file_name, 'w') as f:
pass
time.sleep(10)
......@@ -532,11 +584,13 @@ class Entry(object):
def if_exist(var):
has_var = os.path.exists(os.path.join(checkpoint_dir, var.name))
if has_var:
print('var: %s found' % (var.name))
logger.info('var: %s found' % (var.name))
return has_var
fluid.io.load_vars(executor, checkpoint_dir, predicate=if_exist,
main_program=main_program)
fluid.io.load_vars(executor,
checkpoint_dir,
predicate=if_exist,
main_program=main_program)
def convert_for_prediction(self):
model_name = self.model_name
......@@ -545,19 +599,20 @@ class Entry(object):
model = self.model
if model is None:
model = resnet.__dict__[model_name](emb_dim=self.emb_dim)
main_program = self.train_program
main_program = self.predict_program
startup_program = self.startup_program
with fluid.program_guard(main_program, startup_program):
with fluid.unique_name.guard():
image = fluid.layers.data(name='image',
shape=image_shape, dtype='float32')
shape=image_shape,
dtype='float32')
label = fluid.layers.data(name='label',
shape=[1], dtype='int64')
shape=[1],
dtype='int64')
emb = model.build_network(
input=image,
label=label,
is_train=False)
emb = model.build_network(input=image,
label=label,
is_train=False)
gpu_id = int(os.getenv("FLAGS_selected_gpus", 0))
place = fluid.CUDAPlace(gpu_id)
......@@ -565,8 +620,9 @@ class Entry(object):
exe.run(startup_program)
assert self.checkpoint_dir, "No checkpoint found for converting."
self.load_checkpoint(executor=exe, main_program=main_program,
load_for_train=False)
self.load_checkpoint(executor=exe,
main_program=main_program,
load_for_train=False)
assert self.model_save_dir, \
"Does not set model_save_dir for inference model converting."
......@@ -582,6 +638,16 @@ class Entry(object):
if self.fs_name:
self.put_files_to_hdfs(self.model_save_dir)
def _set_info(self, key, value):
if not hasattr(self, '_info'):
self._info = {}
self._info[key] = value
def _get_info(self, key):
if hasattr(self, '_info') and key in self._info:
return self._info[key]
return None
def predict(self):
model_name = self.model_name
image_shape = [int(m) for m in self.image_shape]
......@@ -594,14 +660,15 @@ class Entry(object):
with fluid.program_guard(main_program, startup_program):
with fluid.unique_name.guard():
image = fluid.layers.data(name='image',
shape=image_shape, dtype='float32')
shape=image_shape,
dtype='float32')
label = fluid.layers.data(name='label',
shape=[1], dtype='int64')
shape=[1],
dtype='int64')
emb = model.build_network(
input=image,
label=label,
is_train=False)
emb = model.build_network(input=image,
label=label,
is_train=False)
gpu_id = int(os.getenv("FLAGS_selected_gpus", 0))
place = fluid.CUDAPlace(gpu_id)
......@@ -609,104 +676,77 @@ class Entry(object):
exe.run(startup_program)
assert self.checkpoint_dir, "No checkpoint found for predicting."
self.load_checkpoint(executor=exe, main_program=main_program,
load_for_train=False)
if self.train_reader is None:
predict_reader = paddle.batch(reader.arc_train(
self.dataset_dir, self.num_classes),
batch_size=self.train_batch_size)
self.load_checkpoint(executor=exe,
main_program=main_program,
load_for_train=False)
if self.predict_reader is None:
predict_reader = paddle.batch(reader.arc_train(self.dataset_dir,
self.num_classes),
batch_size=self.train_batch_size)
else:
predict_reader = self.train_reader
predict_reader = self.predict_reader
feeder = fluid.DataFeeder(place=place,
feed_list=['image', 'label'], program=main_program)
feed_list=['image', 'label'],
program=main_program)
fetch_list = [emb.name]
for data in predict_reader():
emb = exe.run(main_program, feed=feeder.feed(data),
fetch_list=fetch_list, use_program_cache=True)
emb = exe.run(main_program,
feed=feeder.feed(data),
fetch_list=fetch_list,
use_program_cache=True)
print("emb: ", emb)
def test(self, pass_id=0):
self._check()
def _run_test(self,
exe,
test_list,
test_name_list,
feeder,
fetch_list):
trainer_id = self.trainer_id
num_trainers = self.num_trainers
worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
emb, loss, acc1, acc5, _ = self.build_program(
False, self.num_trainers > 1)
config = dist_transpiler.DistributeTranspilerConfig()
config.mode = "collective"
config.collective_mode = "grad_allreduce"
t = dist_transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id=trainer_id,
trainers=worker_endpoints,
startup_program=self.startup_program,
program=self.test_program,
current_endpoint=current_endpoint)
gpu_id = int(os.getenv("FLAGS_selected_gpus", 0))
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
exe.run(self.startup_program)
test_list, test_name_list = reader.test(
self.dataset_dir, self.val_targets)
test_program = self.test_program
#test_program = test_program._prune(emb)
assert self.checkpoint_dir, "No checkpoint found for test."
self.load_checkpoint(executor=exe, main_program=test_program,
load_for_train=False)
feeder = fluid.DataFeeder(place=place,
feed_list=['image', 'label'], program=test_program)
fetch_list = [emb.name]
real_test_batch_size = self.global_test_batch_size
test_start = time.time()
for i in range(len(test_list)):
data_list, issame_list = test_list[i]
embeddings_list = []
for j in xrange(len(data_list)):
for j in range(len(data_list)):
data = data_list[j]
embeddings = None
parallel_test_steps = data.shape[0] // real_test_batch_size
beg = 0
end = 0
for idx in range(parallel_test_steps):
start = idx * real_test_batch_size
offset = trainer_id * self.test_batch_size
begin = start + offset
end = begin + self.test_batch_size
_data = []
for k in xrange(begin, end):
for k in range(begin, end):
_data.append((data[k], 0))
assert len(_data) == self.test_batch_size
[_embeddings] = exe.run(test_program,
fetch_list = fetch_list, feed=feeder.feed(_data),
use_program_cache=True)
[_embeddings] = exe.run(self.test_program,
fetch_list=fetch_list,
feed=feeder.feed(_data),
use_program_cache=True)
if embeddings is None:
embeddings = np.zeros((data.shape[0], _embeddings.shape[1]))
embeddings[start:start+real_test_batch_size, :] = _embeddings[:, :]
embeddings = np.zeros((data.shape[0],
_embeddings.shape[1]))
end = start + real_test_batch_size
embeddings[start:end, :] = _embeddings[:, :]
beg = parallel_test_steps * real_test_batch_size
while beg < data.shape[0]:
end = min(beg + self.test_batch_size, data.shape[0])
count = end - beg
_data = []
for k in xrange(end - self.test_batch_size, end):
for k in range(end - self.test_batch_size, end):
_data.append((data[k], 0))
[_embeddings] = exe.run(test_program,
fetch_list = fetch_list, feed=feeder.feed(_data),
use_program_cache=True)
_embeddings = _embeddings[0:self.test_batch_size,:]
embeddings[beg:end, :] = _embeddings[(self.test_batch_size-count):, :]
[_embeddings] = exe.run(self.test_program,
fetch_list=fetch_list,
feed=feeder.feed(_data),
use_program_cache=True)
_embeddings = _embeddings[0:self.test_batch_size, :]
embeddings[beg:end, :] = _embeddings[
(self.test_batch_size - count):, :]
beg = end
embeddings_list.append(embeddings)
......@@ -719,44 +759,140 @@ class Entry(object):
embeddings = embeddings_list[0] + embeddings_list[1]
embeddings = sklearn.preprocessing.normalize(embeddings)
_, _, accuracy, val, val_std, far = evaluate(embeddings, issame_list, nrof_folds=10)
_, _, accuracy, val, val_std, far = evaluate(embeddings,
issame_list,
nrof_folds=10)
acc, std = np.mean(accuracy), np.std(accuracy)
print('[%s][%d]XNorm: %f' % (test_name_list[i], pass_id, xnorm))
print('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % (test_name_list[i], pass_id, acc, std))
if self.train_pass_id >= 0:
logger.info('[{}][{}]XNorm: {:.5f}'.format(test_name_list[i],
self.train_pass_id,
xnorm))
logger.info('[{}][{}]Accuracy-Flip: {:.5f}+-{:.5f}'.format(
test_name_list[i],
self.train_pass_id,
acc,
std))
else:
logger.info('[{}]XNorm: {:.5f}'.format(test_name_list[i],
xnorm))
logger.info('[{}]Accuracy-Flip: {:.5f}+-{:.5f}'.format(
test_name_list[i],
acc,
std))
sys.stdout.flush()
def test(self):
self._check()
trainer_id = self.trainer_id
num_trainers = self.num_trainers
# if the test program is not built, which means that is the first time
# to call the test method, we will first build the test program and
# add ops to broadcast bn-related parameters from trainer 0 to other
# trainers for distributed tests.
if not self.test_initialized:
emb, loss, _, _, _ = self.build_program(False,
self.num_trainers > 1)
emb_name = emb.name
assert self._get_info(emb_name) is None
self._set_info('emb_name', emb.name)
if num_trainers > 1 and self.has_run_train:
self._append_broadcast_ops(self.test_program)
if num_trainers > 1 and not self.has_run_train:
worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
config = dist_transpiler.DistributeTranspilerConfig()
config.mode = "collective"
config.collective_mode = "grad_allreduce"
t = dist_transpiler.DistributeTranspiler(config=config)
t.transpile(trainer_id=trainer_id,
trainers=worker_endpoints,
startup_program=self.startup_program,
program=self.test_program,
current_endpoint=current_endpoint)
else:
emb_name = self._get_info('emb_name')
gpu_id = int(os.getenv("FLAGS_selected_gpus", 0))
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
if not self.has_run_train:
exe.run(self.startup_program)
if not self.test_reader:
test_reader = reader.test
else:
test_reader = self.test_reader
if not self.test_initialized:
test_list, test_name_list = test_reader(self.dataset_dir,
self.val_targets)
assert self._get_info('test_list') is None
assert self._get_info('test_name_list') is None
self._set_info('test_list', test_list)
self._set_info('test_name_list', test_name_list)
else:
test_list = self._get_info('test_list')
test_name_list = self._get_info('test_name_list')
test_program = self.test_program
if not self.has_run_train:
assert self.checkpoint_dir, "No checkpoint found for test."
self.load_checkpoint(executor=exe,
main_program=test_program,
load_for_train=False)
feeder = fluid.DataFeeder(place=place,
feed_list=['image', 'label'],
program=test_program)
fetch_list = [emb_name]
self.test_initialized = True
test_start = time.time()
self._run_test(exe,
test_list,
test_name_list,
feeder,
fetch_list)
test_end = time.time()
print("test time: {}".format(test_end - test_start))
logger.info("test time: {:.4f}".format(test_end - test_start))
def train(self):
self._check()
self.has_run_train = True
trainer_id = self.trainer_id
num_trainers = self.num_trainers
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = DistributedStrategy()
strategy.mode = "collective"
strategy.collective_mode = "grad_allreduce"
self.fleet = fleet
self.strategy = strategy
train_emb, train_loss, train_acc1, train_acc5, optimizer = \
self.build_program(True, False)
if self.with_test:
test_emb, test_loss, test_acc1, test_acc5, _ = \
self.build_program(False, self.num_trainers > 1)
test_list, test_name_list = reader.test(
self.dataset_dir, self.val_targets)
test_program = self.test_program
self._append_broadcast_ops(test_program)
strategy = None
if num_trainers > 1:
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = DistributedStrategy()
strategy.mode = "collective"
strategy.collective_mode = "grad_allreduce"
emb, loss, acc1, acc5, optimizer = self.build_program(
True,
False,
dist_strategy=strategy)
global_lr = optimizer._global_learning_rate(
program=self.train_program)
if num_trainers > 1:
origin_prog = fleet._origin_program
train_prog = fleet.main_program
else:
origin_prog = self.train_program
train_prog = self.train_program
origin_prog = fleet._origin_program
train_prog = fleet.main_program
if trainer_id == 0:
with open('start.program', 'w') as fout:
program_to_code(self.startup_program, fout, True)
......@@ -764,20 +900,12 @@ class Entry(object):
program_to_code(train_prog, fout, True)
with open('origin.program', 'w') as fout:
program_to_code(origin_prog, fout, True)
with open('test.program', 'w') as fout:
program_to_code(test_program, fout, True)
gpu_id = int(os.getenv("FLAGS_selected_gpus", 0))
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
exe.run(self.startup_program)
if self.with_test:
test_feeder = fluid.DataFeeder(place=place,
feed_list=['image', 'label'], program=test_program)
fetch_list_test = [test_emb.name]
real_test_batch_size = self.global_test_batch_size
if self.checkpoint_dir:
load_checkpoint = True
else:
......@@ -793,31 +921,38 @@ class Entry(object):
train_reader = self.train_reader
feeder = fluid.DataFeeder(place=place,
feed_list=['image', 'label'], program=origin_prog)
feed_list=['image', 'label'],
program=origin_prog)
if self.calc_train_acc:
fetch_list = [train_loss.name, global_lr.name,
train_acc1.name, train_acc5.name]
fetch_list = [loss.name, global_lr.name,
acc1.name, acc5.name]
else:
fetch_list = [train_loss.name, global_lr.name]
fetch_list = [loss.name, global_lr.name]
local_time = 0.0
nsamples = 0
inspect_steps = 200
global_batch_size = self.global_train_batch_size
for pass_id in range(self.train_epochs):
self.train_pass_id = pass_id
train_info = [[], [], [], []]
local_train_info = [[], [], [], []]
for batch_id, data in enumerate(train_reader()):
nsamples += global_batch_size
t1 = time.time()
acc1 = None
acc5 = None
if self.calc_train_acc:
loss, lr, acc1, acc5 = exe.run(train_prog,
feed=feeder.feed(data), fetch_list=fetch_list,
use_program_cache=True)
feed=feeder.feed(data),
fetch_list=fetch_list,
use_program_cache=True)
else:
loss, lr = exe.run(train_prog, feed=feeder.feed(data),
fetch_list=fetch_list, use_program_cache=True)
loss, lr = exe.run(train_prog,
feed=feeder.feed(data),
fetch_list=fetch_list,
use_program_cache=True)
t2 = time.time()
period = t2 - t1
local_time += period
......@@ -828,83 +963,37 @@ class Entry(object):
if batch_id % inspect_steps == 0:
avg_loss = np.mean(local_train_info[0])
avg_lr = np.mean(local_train_info[1])
speed = nsamples / local_time
if self.calc_train_acc:
logger.info("Pass:%d batch:%d lr:%f loss:%f qps:%.2f "
"acc1:%.4f acc5:%.4f" % (pass_id, batch_id, avg_lr,
avg_loss, nsamples / local_time, acc1, acc5))
logger.info("Pass:{} batch:%d lr:{:.8f} loss:{:.6f} "
"qps:{:.2f} acc1:{:.6f} acc5:{:.6f}".format(
pass_id,
batch_id,
avg_lr,
avg_loss,
speed,
acc1,
acc5))
else:
logger.info("Pass:%d batch:%d lr:%f loss:%f qps:%.2f" %(
pass_id, batch_id, avg_lr, avg_loss,
nsamples / local_time))
logger.info("Pass:{} batch:{} lr:{:.8f} loss:{:.6f} "
"qps:{:.2f}".format(pass_id,
batch_id,
avg_lr,
avg_loss,
speed))
local_time = 0
nsamples = 0
local_train_info = [[], [], [], []]
train_loss = np.array(train_info[0]).mean()
print("End pass {0}, train_loss {1}".format(pass_id, train_loss))
logger.info("End pass {}, train_loss {:.6f}".format(pass_id,
train_loss))
sys.stdout.flush()
if self.with_test:
test_start = time.time()
for i in xrange(len(test_list)):
data_list, issame_list = test_list[i]
embeddings_list = []
for j in xrange(len(data_list)):
data = data_list[j]
embeddings = None
parallel_test_steps = data.shape[0] // real_test_batch_size
beg = 0
end = 0
for idx in range(parallel_test_steps):
start = idx * real_test_batch_size
offset = trainer_id * self.test_batch_size
begin = start + offset
end = begin + self.test_batch_size
_data = []
for k in xrange(begin, end):
_data.append((data[k], 0))
assert len(_data) == self.test_batch_size
[_embeddings] = exe.run(test_program,
fetch_list = fetch_list_test, feed=test_feeder.feed(_data),
use_program_cache=True)
if embeddings is None:
embeddings = np.zeros((data.shape[0], _embeddings.shape[1]))
embeddings[start:start+real_test_batch_size, :] = _embeddings[:, :]
beg = parallel_test_steps * real_test_batch_size
while beg < data.shape[0]:
end = min(beg + self.test_batch_size, data.shape[0])
count = end - beg
_data = []
for k in xrange(end - self.test_batch_size, end):
_data.append((data[k], 0))
[_embeddings] = exe.run(test_program,
fetch_list = fetch_list_test, feed=test_feeder.feed(_data),
use_program_cache=True)
_embeddings = _embeddings[0:self.test_batch_size,:]
embeddings[beg:end, :] = _embeddings[(self.test_batch_size-count):, :]
beg = end
embeddings_list.append(embeddings)
xnorm = 0.0
xnorm_cnt = 0
for embed in embeddings_list:
xnorm += np.sqrt((embed * embed).sum(axis=1)).sum(axis=0)
xnorm_cnt += embed.shape[0]
xnorm /= xnorm_cnt
embeddings = embeddings_list[0] + embeddings_list[1]
embeddings = sklearn.preprocessing.normalize(embeddings)
_, _, accuracy, val, val_std, far = evaluate(embeddings, issame_list, nrof_folds=10)
acc, std = np.mean(accuracy), np.std(accuracy)
print('[%s][%d]XNorm: %f' % (test_name_list[i], pass_id, xnorm))
print('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % (test_name_list[i], pass_id, acc, std))
sys.stdout.flush()
test_end = time.time()
print("test time: {}".format(test_end - test_start))
#save model
self.test()
# save model
if self.model_save_dir:
model_save_dir = os.path.join(
self.model_save_dir, str(pass_id))
......@@ -919,27 +1008,30 @@ class Entry(object):
pass
if trainer_id == 0:
fluid.io.save_persistables(exe,
model_save_dir,
origin_prog)
model_save_dir,
origin_prog)
else:
def save_var(var):
to_save = "dist@" in var.name and '@rank@' in var.name
return to_save and var.persistable
fluid.io.save_vars(exe, model_save_dir,
origin_prog, predicate=save_var)
#save training info
fluid.io.save_vars(exe,
model_save_dir,
origin_prog,
predicate=save_var)
# save training info
if self.model_save_dir and trainer_id == 0:
config_file = os.path.join(
self.model_save_dir, str(pass_id), 'meta.pickle')
self.model_save_dir, str(pass_id), 'meta.json')
train_info = dict()
train_info["pretrain_nranks"] = self.num_trainers
train_info["emb_dim"] = self.emb_dim
train_info['num_classes'] = self.num_classes
with open(config_file, 'wb') as f:
pickle.dump(train_info, f)
with open(config_file, 'w') as f:
json.dump(train_info, f)
#upload model
# upload model
if self.model_save_dir and self.fs_name and trainer_id == 0:
self.put_files_to_hdfs(self.model_save_dir)
......
......@@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from . import resnet
from .resnet import *
from . import base_model
from . import dist_algo
from . import resnet
from .base_model import *
from .dist_algo import *
from .resnet import *
__all__ = []
__all__ += resnet.__all__
__all__ += base_model.__all__
__all__ += dist_algo.__all__
......@@ -13,14 +13,11 @@
# limitations under the License.
import math
import os
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.fluid import unique_name
from . import dist_algo
from . import dist_algo
__all__ = ["BaseModel"]
......@@ -32,21 +29,24 @@ class BaseModel(object):
which constructs the custom model. And we will add the
distributed fc layer for you automatically.
"""
def __init__(self):
super(BaseModel, self).__init__()
def build_network(self, input, label, is_train=True):
"""
Construct the custom model, and we will add the
distributed fc layer for you automatically.
Construct the custom model, and we will add the distributed fc layer
at the end of your model automatically.
"""
raise NotImplementedError(
"You must implement this method in your sub class.")
"You must implement this method in your subclass.")
def get_output(self,
input,
label,
num_classes,
num_ranks=1,
rank_id=0,
is_train=True,
param_attr=None,
bias_attr=None,
......@@ -55,6 +55,20 @@ class BaseModel(object):
scale=64.0):
"""
Add the distributed fc layer for the custom model.
Params:
input: input for the model
label: label for the input
num_classes: number of classes for the classifier
num_ranks: number of trainers, i.e., GPUs
rank_id: id for the current trainer, from 0 to num_ranks - 1
is_train: build the network for training or not
param_attr: param_attr for the weight parameter of fc
bias_attr: bias_attr for the weight parameter for fc
loss_type: loss type to use, one of dist_softmax, softmax, arcface
and dist_arcface
margin: the margin parameter for arcface and dist_arcface
scale: the scale parameter for arcface and dist_arcface
"""
supported_loss_types = ["dist_softmax", "dist_arcface",
"softmax", "arcface"]
......@@ -62,67 +76,75 @@ class BaseModel(object):
"Supported loss types: {}, but given: {}".format(
supported_loss_types, loss_type)
nranks = int(os.getenv("PADDLE_TRAINERS_NUM", 1))
rank_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
emb = self.build_network(input, label, is_train)
prob = None
loss = None
if loss_type == "softmax":
loss, prob = self.fc_classify(emb,
label,
num_classes,
param_attr,
bias_attr)
loss, prob = BaseModel._fc_classify(emb,
label,
num_classes,
param_attr,
bias_attr)
elif loss_type == "arcface":
loss, prob = self.arcface(emb,
label,
num_classes,
param_attr,
margin,
scale)
loss, prob = BaseModel._arcface(emb,
label,
num_classes,
param_attr,
margin,
scale)
elif loss_type == "dist_arcface":
loss = dist_algo._distributed_arcface_classify(
x=emb, label=label, class_num=num_classes,
nranks=nranks, rank_id=rank_id, margin=margin,
logit_scale=scale, param_attr=param_attr)
prob = None
loss = dist_algo.distributed_arcface_classify(x=emb,
label=label,
class_num=num_classes,
nranks=num_ranks,
rank_id=rank_id,
margin=margin,
logit_scale=scale,
param_attr=param_attr)
elif loss_type == "dist_softmax":
loss = dist_algo._distributed_softmax_classify(
x=emb, label=label, class_num=num_classes,
nranks=nranks, rank_id=rank_id, param_attr=param_attr,
use_bias=True, bias_attr=bias_attr)
prob = None
loss = dist_algo.distributed_softmax_classify(x=emb,
label=label,
class_num=num_classes,
nranks=num_ranks,
rank_id=rank_id,
param_attr=param_attr,
use_bias=True,
bias_attr=bias_attr)
return emb, loss, prob
def fc_classify(self, input, label, out_dim, param_attr, bias_attr):
@staticmethod
def _fc_classify(input, label, out_dim, param_attr, bias_attr):
if param_attr is None:
stdv = 1.0 / math.sqrt(input.shape[1] * 1.0)
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv))
stddev = 1.0 / math.sqrt(input.shape[1] * 1.0)
param_attr = fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stddev, stddev))
out = fluid.layers.fc(input=input,
size=out_dim,
param_attr=param_attr,
bias_attr=bias_attr)
loss, prob = fluid.layers.softmax_with_cross_entropy(logits=out,
label=label, return_softmax=True)
loss, prob = fluid.layers.softmax_with_cross_entropy(
logits=out,
label=label,
return_softmax=True)
avg_loss = fluid.layers.mean(x=loss)
return avg_loss, prob
def arcface(self, input, label, out_dim, param_attr, margin, scale):
@staticmethod
def _arcface(input, label, out_dim, param_attr, margin, scale):
input_norm = fluid.layers.sqrt(
fluid.layers.reduce_sum(fluid.layers.square(input), dim=1))
input = fluid.layers.elementwise_div(input, input_norm, axis=0)
if param_attr is None:
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Xavier(
uniform=False, fan_in=0.0))
param_attr = fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Xavier(uniform=False, fan_in=0.0))
weight = fluid.layers.create_parameter(
shape=[input.shape[1], out_dim],
dtype='float32',
name=unique_name.generate('final_fc_w'),
attr=param_attr)
shape=[input.shape[1], out_dim],
dtype='float32',
name=unique_name.generate('final_fc_w'),
attr=param_attr)
weight_norm = fluid.layers.sqrt(
fluid.layers.reduce_sum(fluid.layers.square(weight), dim=0))
......@@ -137,10 +159,11 @@ class BaseModel(object):
logit = fluid.layers.scale(target_cos, scale=scale)
loss, prob = fluid.layers.softmax_with_cross_entropy(
logits=logit, label=label, return_softmax=True)
logits=logit,
label=label,
return_softmax=True)
avg_loss = fluid.layers.mean(x=loss)
one_hot.stop_gradient = True
return avg_loss, prob
......@@ -12,32 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import math
import logging
from six.moves import reduce
import paddle.fluid as fluid
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.framework import Variable, default_startup_program
from paddle.fluid.param_attr import ParamAttr
from paddle.fluid.initializer import Normal, Constant
import paddle.fluid.layers.collective as collective
import paddle.fluid.layers.nn as nn
import paddle.fluid.layers.ops as ops
import paddle.fluid.layers as layers
import paddle.fluid.layers.collective as collective
from paddle.fluid.optimizer import Optimizer
import paddle.fluid.unique_name as unique_name
from paddle.fluid.framework import Variable, default_startup_program
from paddle.fluid.initializer import Normal
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.param_attr import ParamAttr
from ..utils.fp16_utils import rewrite_program, update_role_var_grad
from ..utils.fp16_utils import update_loss_scaling, move_optimize_ops_back
from ..utils.fp16_lists import AutoMixedPrecisionLists
from six.moves import reduce
__all__ = ['distributed_arcface_classify', 'distributed_softmax_classify',
'DistributedClassificationOptimizer']
class DistributedClassificationOptimizer(Optimizer):
'''
A optimizer wrapper to generate backward network for distributed
"""
An optimizer wrapper to generate backward network for distributed
classification training of model parallelism.
'''
"""
def init_fp16_params(self, loss_type, fp16_user_dict):
# set default value for fp16_params_dict
fp16_params_dict = dict()
......@@ -261,15 +267,15 @@ class DistributedClassificationOptimizer(Optimizer):
})
def insert_commom_backward_op(self,
block,
index,
shard_logit,
shard_prob,
shard_label,
shard_dim,
op_role_key,
backward_role,
loss_backward_role):
block,
index,
shard_logit,
shard_prob,
shard_label,
shard_dim,
op_role_key,
backward_role,
loss_backward_role):
'''
insert backward ops when not using mixed precision training.
common use in all lose type.
......@@ -421,10 +427,10 @@ class DistributedClassificationOptimizer(Optimizer):
class DistributedClassifier(object):
'''
"""
Tookit for distributed classification, in which the parameter of the last
full-connected layer is distributed to all trainers
'''
"""
def __init__(self, nclasses, nranks, rank_id, layer_helper):
self.nclasses = nclasses
......@@ -446,29 +452,29 @@ class DistributedClassifier(object):
dtype,
in_dim,
param_attr=None,
use_bias=True,
bias_attr=None,
transpose_weight=False,
use_bias=True):
transpose_weight=False):
if param_attr is None:
stdv = math.sqrt(2.0 / (in_dim + self.nclasses))
param_attr = ParamAttr(initializer=Normal(scale=stdv))
stddev = math.sqrt(2.0 / (in_dim + self.nclasses))
param_attr = ParamAttr(initializer=Normal(scale=stddev))
weight_shape = [self.shard_dim, in_dim
] if transpose_weight else [in_dim, self.shard_dim]
weight = self._layer_helper.create_parameter(
shape=weight_shape, dtype=dtype, attr=param_attr, is_bias=False)
# avoid distributed parameter allreduce gradients
# avoid allreducing gradients for distributed parameters
weight.is_distributed = True
# avoid distributed parameter broadcasting in startup program
# avoid broadcasting distributed parameters in startup program
default_startup_program().global_block().vars[
weight.name].is_distributed = True
bias = None
if use_bias:
bias = self._layer_helper.create_parameter(
shape=[self.shard_dim],
attr=bias_attr,
dtype=dtype,
is_bias=True)
bias = self._layer_helper.create_parameter(shape=[self.shard_dim],
attr=bias_attr,
dtype=dtype,
is_bias=True)
bias.is_distributed = True
default_startup_program().global_block().vars[
bias.name].is_distributed = True
......@@ -505,12 +511,11 @@ class DistributedClassifier(object):
use_bias=True,
bias_attr=None):
flatten_dim = reduce(lambda a, b: a * b, x.shape[1:], 1)
weight, bias = self.create_parameter(
dtype=x.dtype,
in_dim=flatten_dim,
param_attr=param_attr,
bias_attr=bias_attr,
use_bias=use_bias)
weight, bias = self.create_parameter(dtype=x.dtype,
in_dim=flatten_dim,
param_attr=param_attr,
bias_attr=bias_attr,
use_bias=use_bias)
x_all = collective._c_allgather(
x, nranks=self.nranks, use_calc_stream=True)
......@@ -551,11 +556,10 @@ class DistributedClassifier(object):
reference: ArcFace. https://arxiv.org/abs/1801.07698
'''
flatten_dim = reduce(lambda a, b: a * b, x.shape[1:], 1)
weight, bias = self.create_parameter(
dtype=x.dtype,
in_dim=flatten_dim,
param_attr=param_attr,
use_bias=False)
weight, bias = self.create_parameter(dtype=x.dtype,
in_dim=flatten_dim,
param_attr=param_attr,
use_bias=False)
# normalize x
x_l2 = ops.sqrt(nn.reduce_sum(ops.square(x), dim=1))
......@@ -566,12 +570,11 @@ class DistributedClassifier(object):
label_all = collective._c_allgather(
label, nranks=self.nranks, use_calc_stream=True)
label_all.stop_gradient = True
shard_label = nn.shard_index(
label_all,
index_num=self.nclasses,
nshards=self.nranks,
shard_id=self.rank_id,
ignore_value=-1)
shard_label = nn.shard_index(label_all,
index_num=self.nclasses,
nshards=self.nranks,
shard_id=self.rank_id,
ignore_value=-1)
# TODO check necessary
shard_label.stop_gradient = True
......@@ -605,16 +608,16 @@ class DistributedClassifier(object):
return avg_loss
def _distributed_softmax_classify(x,
label,
class_num,
nranks,
rank_id,
param_attr=None,
use_bias=True,
bias_attr=None,
name=None):
'''
def distributed_softmax_classify(x,
label,
class_num,
nranks,
rank_id,
param_attr=None,
use_bias=True,
bias_attr=None,
name=None):
"""
Classification layer with FC, softmax and cross entropy calculation of
distibuted version in case of too large number of classes.
......@@ -652,26 +655,29 @@ def _distributed_softmax_classify(x,
class_num=1000,
nranks=8,
rank_id=0)
'''
"""
if name is None:
name = 'dist@softmax@rank@%05d' % rank_id
helper = LayerHelper(name, **locals())
classifier = DistributedClassifier(class_num, nranks, rank_id, helper)
return classifier.softmax_classify(x, label, param_attr, use_bias,
return classifier.softmax_classify(x,
label,
param_attr,
use_bias,
bias_attr)
def _distributed_arcface_classify(x,
label,
class_num,
nranks,
rank_id,
margin=0.5,
logit_scale=64.0,
param_attr=None,
name=None):
'''
def distributed_arcface_classify(x,
label,
class_num,
nranks,
rank_id,
margin=0.5,
logit_scale=64.0,
param_attr=None,
name=None):
"""
Classification layer with ArcFace loss of distibuted version in case of
too large number of classes. the equation is
......@@ -719,14 +725,13 @@ def _distributed_arcface_classify(x,
class_num=1000,
nranks=8,
rank_id=0)
'''
"""
if name is None:
name = 'dist@arcface@rank@%05d' % rank_id
helper = LayerHelper(name, **locals())
classifier = DistributedClassifier(class_num, nranks, rank_id, helper)
return classifier.arcface_classify(
x=x,
label=label,
margin=margin,
logit_scale=logit_scale,
param_attr=param_attr)
return classifier.arcface_classify(x=x,
label=label,
margin=margin,
logit_scale=logit_scale,
param_attr=param_attr)
......@@ -12,14 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid as fluid
import math
import os
import numpy as np
from paddle.fluid import unique_name
from .base_model import BaseModel
from .base_model import BaseModel
__all__ = ["ResNet", "ResNet50", "ResNet101", "ResNet152"]
......@@ -33,12 +28,13 @@ class ResNet(BaseModel):
def build_network(self,
input,
label,
is_train):
is_train=True):
layers = self.layers
supported_layers = [50, 101, 152]
assert layers in supported_layers, \
"supported layers {}, but given {}".format(supported_layers, layers)
depth = None
if layers == 50:
depth = [3, 4, 14, 3]
elif layers == 101:
......@@ -59,21 +55,26 @@ class ResNet(BaseModel):
stride=2 if i == 0 else 1,
is_train=is_train)
bn = fluid.layers.batch_norm(input=conv, act=None, epsilon=2e-05,
is_test=False if is_train else True)
drop = fluid.layers.dropout(x=bn, dropout_prob=0.4,
dropout_implementation='upscale_in_train',
is_test=False if is_train else True)
bn = fluid.layers.batch_norm(input=conv,
act=None,
epsilon=2e-05,
is_test=False if is_train else True)
drop = fluid.layers.dropout(x=bn,
dropout_prob=0.4,
dropout_implementation='upscale_in_train',
is_test=False if is_train else True)
fc = fluid.layers.fc(
input=drop,
size=self.emb_dim,
act=None,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Xavier(uniform=False, fan_in=0.0)),
initializer=fluid.initializer.Xavier(uniform=False,
fan_in=0.0)),
bias_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.ConstantInitializer()))
emb = fluid.layers.batch_norm(input=fc, act=None, epsilon=2e-05,
is_test=False if is_train else True)
emb = fluid.layers.batch_norm(input=fc,
act=None,
epsilon=2e-05,
is_test=False if is_train else True)
return emb
def conv_bn_layer(self,
......@@ -92,51 +93,79 @@ class ResNet(BaseModel):
stride=stride,
padding=pad,
groups=groups,
act=None,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Xavier(
uniform=False, fan_in=0.0)),
bias_attr=False)
if act == 'prelu':
bn = fluid.layers.batch_norm(input=conv, act=None, epsilon=2e-05,
momentum=0.9, is_test=False if is_train else True)
return fluid.layers.prelu(bn, mode="all",
bn = fluid.layers.batch_norm(input=conv,
act=None,
epsilon=2e-05,
momentum=0.9,
is_test=False if is_train else True)
return fluid.layers.prelu(
bn,
mode="all",
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Constant(0.25)))
else:
return fluid.layers.batch_norm(input=conv, act=act, epsilon=2e-05,
is_test=False if is_train else True)
return fluid.layers.batch_norm(input=conv,
act=act,
epsilon=2e-05,
is_test=False if is_train else True)
def shortcut(self, input, ch_out, stride, is_train):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1:
return self.conv_bn_layer(input, ch_out, 1, stride,
is_train=is_train)
return self.conv_bn_layer(input,
ch_out,
1,
stride,
is_train=is_train)
else:
return input
def bottleneck_block(self, input, num_filters, stride, is_train):
if self.layers < 101:
bn1 = fluid.layers.batch_norm(input=input, act=None, epsilon=2e-05,
is_test=False if is_train else True)
conv1 = self.conv_bn_layer(
input=bn1, num_filters=num_filters, filter_size=3, pad=1,
act='prelu', is_train=is_train)
conv2 = self.conv_bn_layer(
input=conv1, num_filters=num_filters, filter_size=3,
stride=stride, pad=1, act=None, is_train=is_train)
bn1 = fluid.layers.batch_norm(input=input,
act=None,
epsilon=2e-05,
is_test=False if is_train else True)
conv1 = self.conv_bn_layer(input=bn1,
num_filters=num_filters,
filter_size=3,
pad=1,
act='prelu',
is_train=is_train)
conv2 = self.conv_bn_layer(input=conv1,
num_filters=num_filters,
filter_size=3,
stride=stride,
pad=1,
is_train=is_train)
else:
bn0 = fluid.layers.batch_norm(input=input, act=None, epsilon=2e-05,
is_test=False if is_train else True)
conv0 = self.conv_bn_layer(
input=bn0, num_filters=num_filters/4, filter_size=1, pad=0,
act='prelu', is_train=is_train)
conv1 = self.conv_bn_layer(
input=conv0, num_filters=num_filters/4, filter_size=3, pad=1,
act='prelu', is_train=is_train)
conv2 = self.conv_bn_layer(
input=conv1, num_filters=num_filters, filter_size=1,
stride=stride, pad=0, act=None, is_train=is_train)
bn0 = fluid.layers.batch_norm(input=input,
act=None,
epsilon=2e-05,
is_test=False if is_train else True)
conv0 = self.conv_bn_layer(input=bn0,
num_filters=num_filters / 4,
filter_size=1,
pad=0,
act='prelu',
is_train=is_train)
conv1 = self.conv_bn_layer(input=conv0,
num_filters=num_filters / 4,
filter_size=3,
pad=1,
act='prelu',
is_train=is_train)
conv2 = self.conv_bn_layer(input=conv1,
num_filters=num_filters,
filter_size=1,
stride=stride,
pad=0,
is_train=is_train)
short = self.shortcut(input, num_filters, stride, is_train=is_train)
return fluid.layers.elementwise_add(x=short, y=conv2, act=None)
......
......@@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import math
import random
import pickle
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import functools
import math
import os
import pickle
import random
import numpy as np
import paddle
import six
from PIL import Image, ImageEnhance
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import BytesIO
random.seed(0)
......@@ -18,7 +36,6 @@ DATA_DIM = 112
THREAD = 8
BUF_SIZE = 10240
img_mean = np.array([127.5, 127.5, 127.5]).reshape((3, 1, 1))
img_std = np.array([128.0, 128.0, 128.0]).reshape((3, 1, 1))
......@@ -97,13 +114,13 @@ def RandomResizedCrop(img, size):
return img
def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]):
def random_crop(img, size, scale=(0.08, 1.0), ratio=(3. / 4., 4. / 3.)):
aspect_ratio = math.sqrt(random.uniform(*ratio))
w = 1. * aspect_ratio
h = 1. / aspect_ratio
bound = min((float(img.size[0]) / img.size[1]) / (w**2),
(float(img.size[1]) / img.size[0]) / (h**2))
bound = min((float(img.size[0]) / img.size[1]) / (w ** 2),
(float(img.size[1]) / img.size[0]) / (h ** 2))
scale_max = min(scale[1], bound)
scale_min = min(scale[0], bound)
......@@ -150,12 +167,12 @@ def distort_color(img):
return img
def process_image_imagepath(sample,
class_dim,
color_jitter,
rotate,
rand_mirror,
normalize):
def process_image(sample,
class_dim,
color_jitter,
rotate,
rand_mirror,
normalize):
img_data = base64.b64decode(sample[0])
img = Image.open(StringIO(img_data))
......@@ -185,49 +202,62 @@ def process_image_imagepath(sample,
return img, sample[1]
def arc_iterator(file_list,
def arc_iterator(data_dir,
file_list,
class_dim,
color_jitter=False,
rotate=False,
rand_mirror=False,
normalize=False):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
def reader():
with open(file_list, 'r') as f:
flist = f.readlines()
assert len(flist) % trainer_count == 0, \
"Number of files should be divisible by trainer count, " \
"run base64 file preprocessing tool first."
num_files_per_trainer = len(flist) // trainer_count
start = num_files_per_trainer * trainer_id
end = start + num_files_per_trainer
flist = flist[start:end]
for file in flist:
with open(file, 'r') as f:
for line in f.xreadlines():
line = line.strip().split('\t')
image, label = line[0], line[1]
yield image, label
mapper = functools.partial(process_image_imagepath,
class_dim=class_dim, color_jitter=color_jitter, rotate=rotate,
rand_mirror=rand_mirror, normalize=normalize)
assert len(flist) == num_trainers, \
"Please use process_base64_files.py to pre-process the dataset."
file = flist[trainer_id]
file = os.path.join(data_dir, file)
with open(file, 'r') as f:
if six.PY2:
for line in f.xreadlines():
line = line.strip().split('\t')
image, label = line[0], line[1]
yield image, label
else:
for line in f:
line = line.strip().split('\t')
image, label = line[0], line[1]
yield image, label
mapper = functools.partial(process_image,
class_dim=class_dim,
color_jitter=color_jitter,
rotate=rotate,
rand_mirror=rand_mirror,
normalize=normalize)
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
def load_bin(path, image_size):
bins, issame_list = pickle.load(open(path, 'rb'))
if six.PY2:
bins, issame_list = pickle.load(open(path, 'rb'))
else:
bins, issame_list = pickle.load(open(path, 'rb'), encoding='bytes')
data_list = []
for flip in [0, 1]:
data = np.empty((len(issame_list)*2, 3, image_size[0], image_size[1]))
data = np.empty((len(issame_list) * 2, 3, image_size[0], image_size[1]))
data_list.append(data)
for i in xrange(len(issame_list)*2):
for i in range(len(issame_list) * 2):
_bin = bins[i]
if not isinstance(_bin, basestring):
_bin = _bin.tostring()
img_ori = Image.open(StringIO(_bin))
if six.PY2:
if not isinstance(_bin, six.string_types):
_bin = _bin.tostring()
img_ori = Image.open(StringIO(_bin))
else:
img_ori = Image.open(BytesIO(_bin))
for flip in [0, 1]:
img = img_ori.copy()
if flip == 1:
......@@ -241,13 +271,18 @@ def load_bin(path, image_size):
if i % 1000 == 0:
print('loading bin', i)
print(data_list[0].shape)
return (data_list, issame_list)
def train(data_dir, file_list, num_classes):
file_path = os.path.join(data_dir, file_list)
return arc_iterator(file_path, class_dim=num_classes, color_jitter=False,
rotate=False, rand_mirror=True, normalize=True)
return data_list, issame_list
def train(data_dir, num_classes):
file_path = os.path.join(data_dir, 'file_list.txt')
return arc_iterator(data_dir,
file_path,
class_dim=num_classes,
color_jitter=False,
rotate=False,
rand_mirror=True,
normalize=True)
def test(data_dir, datasets):
......
import os
import functools
import math
import random
import os
import pickle
import functools
import random
import numpy as np
import paddle
import six
from PIL import Image, ImageEnhance
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import BytesIO
random.seed(0)
......@@ -123,13 +126,13 @@ def RandomResizedCrop(img, size):
return img
def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]):
def random_crop(img, size, scale=(0.08, 1.0), ratio=(3. / 4., 4. / 3.)):
aspect_ratio = math.sqrt(random.uniform(*ratio))
w = 1. * aspect_ratio
h = 1. / aspect_ratio
bound = min((float(img.size[0]) / img.size[1]) / (w**2),
(float(img.size[1]) / img.size[0]) / (h**2))
bound = min((float(img.size[0]) / img.size[1]) / (w ** 2),
(float(img.size[1]) / img.size[0]) / (h ** 2))
scale_max = min(scale[1], bound)
scale_min = min(scale[0], bound)
......@@ -222,28 +225,37 @@ def arc_iterator(data,
def reader():
if shuffle:
random.shuffle(data)
for j in xrange(len(data)):
for j in range(len(data)):
path, label = data[j]
path = os.path.join(data_dir, path)
yield path, label
mapper = functools.partial(process_image_imagepath, class_dim=class_dim,
color_jitter=color_jitter, rotate=rotate,
rand_mirror=rand_mirror, normalize=normalize)
mapper = functools.partial(process_image_imagepath,
class_dim=class_dim,
color_jitter=color_jitter,
rotate=rotate,
rand_mirror=rand_mirror,
normalize=normalize)
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
def load_bin(path, image_size):
bins, issame_list = pickle.load(open(path, 'rb'))
if six.PY2:
bins, issame_list = pickle.load(open(path, 'rb'))
else:
bins, issame_list = pickle.load(open(path, 'rb'), encoding='bytes')
data_list = []
for flip in [0, 1]:
data = np.empty((len(issame_list)*2, 3, image_size[0], image_size[1]))
data = np.empty((len(issame_list) * 2, 3, image_size[0], image_size[1]))
data_list.append(data)
for i in range(len(issame_list)*2):
for i in range(len(issame_list) * 2):
_bin = bins[i]
if not isinstance(_bin, six.string_types):
_bin = _bin.tostring()
img_ori = Image.open(StringIO(_bin))
if six.PY2:
if not isinstance(_bin, six.string_types):
_bin = _bin.tostring()
img_ori = Image.open(StringIO(_bin))
else:
img_ori = Image.open(BytesIO(_bin))
for flip in [0, 1]:
img = img_ori.copy()
if flip == 1:
......@@ -257,14 +269,19 @@ def load_bin(path, image_size):
if i % 1000 == 0:
print('loading bin', i)
print(data_list[0].shape)
return (data_list, issame_list)
return data_list, issame_list
def arc_train(data_dir, class_dim):
train_image_list = get_train_image_list(data_dir)
return arc_iterator(train_image_list, shuffle=True, class_dim=class_dim,
data_dir=data_dir, color_jitter=False, rotate=False, rand_mirror=True,
normalize=True)
return arc_iterator(train_image_list,
shuffle=True,
class_dim=class_dim,
data_dir=data_dir,
color_jitter=False,
rotate=False,
rand_mirror=True,
normalize=True)
def test(data_dir, datasets):
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import json
import logging
import os
import shutil
from functools import cmp_to_key
import paddle.fluid as fluid
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s %(asctime)s line:%(lineno)d] %(message)s',
datefmt='%d %b %Y %H:%M:%S')
logger = logging.getLogger()
class ParameterConverter(object):
"""
Tool to convert pre-trained distributed fc parameters for inference or
fine-tuning. Note that the number of ranks or GPUs for inference or
fine-tuning can be different from that for pre-training.
"""
def __init__(self, model_dir, output_dir, num_trainers):
super(ParameterConverter, self).__init__()
self.model_dir = model_dir
self.output_dir = output_dir
self.pretrain_nranks = -1
self.emb_dim = -1
self.num_classes = -1
self.nranks = num_trainers
self.load_config()
def load_config(self):
"""
Load config file which contains the following information for
pre-training:
1. pretrain_nranks (int): number of ranks for pre-training;
2. emb_dim (int): embedding dim for pre-training;
3. num_classes (int): number of classes for classification.
"""
meta_file = os.path.join(self.model_dir, 'meta.json')
if not os.path.exists(meta_file):
logger.error("Meta file does not exist, make sure your pre-trained "
"models are legal.")
exit()
with open(meta_file, 'r') as handle:
config = json.load(handle)
self.pretrain_nranks = config['pretrain_nranks']
assert self.pretrain_nranks > 0
self.emb_dim = config['emb_dim']
assert self.emb_dim > 0
self.num_classes = config['num_classes']
assert self.num_classes > 0
logger.info("Parameters for pre-training: pretrain_nranks ({}), "
"emb_dim ({}), and num_classes ({}).".format(
self.pretrain_nranks,
self.emb_dim,
self.num_classes))
logger.debug("Parameters for inference or fine-tuning: "
"nranks ({}).".format(self.nranks))
def find_var_names(self):
"""
Find all names of pre-trained parameters for the distributed fc layer,
e.g., dist@softmax@rank@00000.w_0, dist@softmax@rank@00000.b_0 etc.
We assume that names of distributed fc related parameters start with the
prefix dist@ and have @rank@ in their names.
"""
var_names = []
model_dir = os.path.abspath(self.model_dir)
if not os.path.exists(model_dir):
logger.error("The directory for pre-trained model ({}) does not "
"exist, please check it.".format(model_dir))
exit()
logger.info("The directory for pre-trained model: {}".format(model_dir))
for file in os.listdir(model_dir):
if 'dist@' in file and '@rank@' in file:
var_names.append(file)
assert len(var_names) > 0, \
logger.error("No distributed fc parameters found.")
logger.info("Number of distributed fc parameters: {}.".format(
len(var_names)))
logger.info("Distributed fc parameters: {}.".format(var_names))
return var_names
def split_load_and_save(self,
name_index,
param_names,
save_rank_id,
remainder,
as_bias,
train_nshards,
train_nranks,
nshards,
dtype="float32"):
var2 = None
advance = False
emb_dim = self.emb_dim
main_program = fluid.Program()
startup_program = fluid.Program()
num_classes = self.num_classes
load_var_name = param_names[name_index]
save_var_name_list = load_var_name.split('.')
save_var_name_list[0] = save_var_name_list[0].split('@')
save_var_name_list[0][-1] = "%05d" % save_rank_id
save_var_name_list[0] = '@'.join(save_var_name_list[0])
save_var_name = '.'.join(save_var_name_list)
last_train_nshards = num_classes - (train_nranks - 1) * train_nshards
with fluid.program_guard(main_program, startup_program):
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var = fluid.layers.create_parameter(shape,
dtype=dtype,
name=load_var_name)
if as_bias:
var = fluid.layers.slice(var,
axes=[0],
starts=[var.shape[0] - remainder],
ends=[var.shape[0]])
else:
var = fluid.layers.split(var,
[var.shape[1] - remainder,
remainder],
dim=1)[1]
save_var_dim = nshards
if remainder < nshards:
if name_index == train_nranks - 1:
save_var_dim = remainder
else:
name_index += 1
advance = True
load_var_name = param_names[name_index]
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var2 = fluid.layers.create_parameter(shape,
dtype=dtype,
name=load_var_name)
if remainder + var_dim < nshards:
# The last train rank
save_var_dim = remainder + var_dim
else:
remainder = remainder + var_dim - nshards
elif remainder == nshards:
if name_index == train_nranks - 2:
remainder = last_train_nshards
advance = True
elif name_index < train_nranks - 2:
remainder = train_nshards
advance = True
else:
remainder = remainder - nshards
if var2 is not None:
var = fluid.layers.concat([var, var2], axis=0 if as_bias else 1)
shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim]
to_save_var = fluid.layers.create_parameter(
shape,
dtype=dtype,
name=save_var_name + '_temp')
if save_var_dim != nshards: # get last dim
if as_bias:
temp_var = fluid.layers.slice(
var,
axes=[0],
starts=[var.shape[0] - save_var_dim],
ends=[var.shape[0]])
else:
temp_var = fluid.layers.split(
var,
[var.shape[1] - save_var_dim, save_var_dim],
dim=1)[1]
fluid.layers.assign(temp_var, to_save_var)
else:
if as_bias:
temp_var = fluid.layers.slice(var,
axes=[0],
starts=[0],
ends=[nshards])
else:
temp_var = fluid.layers.split(
var,
[nshards, var.shape[1] - nshards],
dim=1)[0]
fluid.layers.assign(temp_var, to_save_var)
def expected_var(var):
has_var = os.path.exists(os.path.join(self.model_dir, var.name))
if has_var:
return True
return False
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
fluid.io.load_vars(exe,
dirname=self.model_dir,
predicate=expected_var,
main_program=main_program)
exe.run(main_program)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
fluid.io.save_vars(exe,
self.output_dir,
vars=[to_save_var],
main_program=main_program)
srcfile = os.path.join(self.output_dir, to_save_var.name)
dstfile = os.path.join(self.output_dir, save_var_name)
shutil.move(srcfile, dstfile)
return remainder, advance
def split_parameters(self, param_names, as_bias):
"""
Split parameters whose names are in param_names.
Params:
param_names: list of names of parameters to split
as_bias: whether parameters to split are as bias or not
"""
num_classes = self.num_classes
train_nranks = self.pretrain_nranks
nranks = self.nranks
train_nshards = (num_classes + train_nranks - 1) // train_nranks
nshards = (num_classes + nranks - 1) // nranks
save_rank_id = 0
# remainder dim that is not split in a var
remainder_var_dim = train_nshards
name_index = 0 # index of name of pre-trained parameter to process
for save_rank_id in range(nranks):
assert name_index < train_nranks
remainder_var_dim, advance = self.split_load_and_save(
name_index,
param_names,
save_rank_id,
remainder_var_dim,
as_bias,
train_nshards,
train_nranks,
nshards)
name_index += 1 if advance else 0
processed_var_count = name_index + 1
assert processed_var_count == train_nranks, \
logger.error("Number of pre-trained parameters processed ({}) is "
"not equal to the number of ranks ({}) for "
"pre-training.".format(processed_var_count,
train_nranks))
assert save_rank_id == nranks - 1, \
logger.error("Number of saved parameters ({}) is not equal to the "
"number of ranks ({}) for inference or "
"fine-tuning.".format(save_rank_id + 1, nranks))
def split_distfc_parameters(self,
weight_param_names,
weight_velocity_param_names,
bias_param_names,
bias_velocity_param_names):
"""
Split each distributed fc-related parameter according to number of ranks
for inference or fine-tuning.
Params:
weight_param_names: list of names of weight parameters
bias_param_names: list of names of bias parameters
"""
self.split_parameters(weight_param_names, as_bias=False)
self.split_parameters(weight_velocity_param_names, as_bias=False)
if len(bias_param_names) != 0:
self.split_parameters(bias_param_names, as_bias=True)
self.split_parameters(bias_velocity_param_names, as_bias=True)
def concat_load_and_save(self,
name_index,
param_names,
save_rank_id,
remainder,
as_bias,
train_nshards,
train_nranks,
nshards,
dtype="float32"):
advance = 0
emb_dim = self.emb_dim
main_program = fluid.Program()
startup_program = fluid.Program()
num_classes = self.num_classes
load_var_name = param_names[name_index]
save_var_name_list = load_var_name.split('.')
save_var_name_list[0] = save_var_name_list[0].split('@')
save_var_name_list[0][-1] = "%05d" % save_rank_id
save_var_name_list[0] = '@'.join(save_var_name_list[0])
save_var_name = '.'.join(save_var_name_list)
last_train_nshards = num_classes - (train_nranks - 1) * train_nshards
with fluid.program_guard(main_program, startup_program):
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var = fluid.layers.create_parameter(shape,
dtype=dtype,
name=load_var_name)
if as_bias:
var = fluid.layers.slice(var,
axes=[0],
starts=[var.shape[0] - remainder],
ends=[var.shape[0]])
else:
var = fluid.layers.split(var,
[var.shape[1] - remainder,
remainder],
dim=1)[1]
to_concat_var_list = [var]
while remainder < nshards and name_index < train_nranks - 1:
name_index += 1
advance += 1
load_var_name = param_names[name_index]
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var = fluid.layers.create_parameter(shape,
dtype=dtype,
name=load_var_name)
to_concat_var_list.append(var)
remainder += var_dim
if len(to_concat_var_list) > 1:
var = fluid.layers.concat(to_concat_var_list,
axis=0 if as_bias else 1)
save_var_dim = nshards
if remainder > nshards:
if as_bias:
var = fluid.layers.slice(var,
axes=[0],
starts=[0],
ends=[nshards])
else:
var = fluid.layers.split(
var,
[nshards, var.shape[1] - nshards],
dim=1)[0]
remainder = remainder - nshards
elif remainder == nshards:
if name_index == train_nranks - 2:
# advance += 1 if len(to_concat_var_list) > 1 else 0
# to avoid duplicate add
# name_index += 1 if len(to_concat_var_list) > 1 else 0
# to avoid duplicate add
advance += 1
name_index += 1
remainder = last_train_nshards
elif name_index < train_nranks - 2:
# advance += 1 if len(to_concat_var_list) > 1 else 0
# to avoid duplicate add
# name_index += 1 if len(to_concat_var_list) > 1 else 0
# to avoid duplicate add
advance += 1
name_index += 1
remainder = train_nshards
else:
save_var_dim = remainder
shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim]
to_save_var = fluid.layers.create_parameter(
shape,
dtype=dtype,
name=save_var_name + '_temp')
fluid.layers.assign(var, to_save_var)
def expected_var(var):
has_var = os.path.exists(os.path.join(self.model_dir, var.name))
if has_var:
return True
return False
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
fluid.io.load_vars(exe,
dirname=self.model_dir,
predicate=expected_var,
main_program=main_program)
exe.run(main_program)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
fluid.io.save_vars(exe,
self.output_dir,
vars=[to_save_var],
main_program=main_program)
srcfile = os.path.join(self.output_dir, to_save_var.name)
dstfile = os.path.join(self.output_dir, save_var_name)
shutil.move(srcfile, dstfile)
return remainder, advance
def concat_parameters(self, param_names, as_bias):
"""
Concat parameters whose names are in param_names.
Params:
param_names: list of names of parameters to concat
as_bias: whether parameters to split are as bias or not
"""
num_classes = self.num_classes
train_nranks = self.pretrain_nranks
nranks = self.nranks
train_nshards = (num_classes + train_nranks - 1) // train_nranks
nshards = (num_classes + nranks - 1) // nranks
save_rank_id = 0
remainder_dim = train_nshards # remainder dim that is not concated
name_index = 0 # index of name of pre-trained parameter to process
for save_rank_id in range(nranks):
assert name_index < train_nranks
remainder_dim, advance = self.concat_load_and_save(name_index,
param_names,
save_rank_id,
remainder_dim,
as_bias,
train_nshards,
train_nranks,
nshards)
name_index += advance
processed_var_count = name_index + 1
assert processed_var_count == train_nranks, \
logger.error("Number of pre-trained parameters processed ({}) is "
"not equal to the number of ranks ({}) for "
"pre-training.".format(processed_var_count,
train_nranks))
assert save_rank_id == nranks - 1, \
logger.error("Number of saved parameters ({}) is not equal to the "
"number of ranks ({}) for inference or "
"fine-tuning.".format(save_rank_id + 1, nranks))
def concat_distfc_parameters(self,
weight_param_names,
weight_velocity_param_names,
bias_param_names,
bias_velocity_param_names):
"""
Concat distributed fc-related parameters according to number of ranks
for inference or finetuning.
Params:
weight_param_names: list of names of weight parameters
weight_velocity_param_names: list of names of weight velocity
parameters
bias_param_names: list of names of bias parameters
bias_velocity_param_names: list of names of bias velocity parameters
"""
self.concat_parameters(weight_param_names, as_bias=False)
self.concat_parameters(weight_velocity_param_names, as_bias=False)
if len(bias_param_names) != 0:
self.concat_parameters(bias_param_names, as_bias=True)
self.concat_parameters(bias_velocity_param_names, as_bias=True)
def process(self):
self.load_config()
var_names = self.find_var_names()
weight_param_names = [name for name in var_names
if '.w' in name and 'velocity' not in name]
weight_velocity_param_names = [name for name in var_names
if '.w' in name and 'velocity' in name]
bias_param_names = [name for name in var_names
if '.b' in name and 'velocity' not in name]
bias_velocity_param_names = [name for name in var_names
if '.b' in name and 'velocity' in name]
def parameter_name_compare(x, y):
"""
Compare two parameter names depend on their rank id.
A parameter name is like dist_softmax_rank_00000.w_0,
where 00000 is the rank id.
"""
rank_id_x = int(x.split('.')[0].split('@')[-1])
rank_id_y = int(y.split('.')[0].split('@')[-1])
if rank_id_x < rank_id_y:
return -1
elif rank_id_x == rank_id_y:
return 0
else:
return 1
weight_param_names.sort(key=cmp_to_key(parameter_name_compare))
weight_velocity_param_names.sort(
key=cmp_to_key(parameter_name_compare))
bias_param_names.sort(key=cmp_to_key(parameter_name_compare))
bias_velocity_param_names.sort(key=cmp_to_key(parameter_name_compare))
assert len(weight_param_names) == self.pretrain_nranks, \
logger.error(
"Number of distributed fc-related weight parameters ({}) "
"should be equal to the number of ranks ({}) for "
"pre-training.".format(len(weight_param_names),
self.pretrain_nranks))
assert len(weight_velocity_param_names) == self.pretrain_nranks, \
logger.error(
"Number of distributed fc-related weight parameters ({}) "
"should be equal to the number of ranks ({}) for "
"pre-training.".format(len(weight_velocity_param_names),
self.pretrain_nranks))
assert (len(bias_param_names) == 0 or
len(bias_param_names) == self.pretrain_nranks), \
logger.error(
"Number of distributed fc-related bias parameters ({}) "
"should be 0 or equal to the number of ranks ({}) for "
"pre-training.".format(len(bias_param_names),
self.pretrain_nranks))
assert (len(bias_velocity_param_names) == 0 or
len(bias_velocity_param_names) == self.pretrain_nranks), \
logger.error(
"Number of distributed fc-related bias parameters ({}) "
"should be 0 or equal to the number of ranks ({}) for "
"pre-training.".format(len(bias_velocity_param_names),
self.pretrain_nranks))
pretrain_nranks = self.pretrain_nranks
nranks = self.nranks
if pretrain_nranks == nranks:
logger.info(
"Pre-training and inference (or fine-tuning) have the same "
"number of ranks, nothing to do.")
elif pretrain_nranks < nranks:
self.split_distfc_parameters(weight_param_names,
weight_velocity_param_names,
bias_param_names,
bias_velocity_param_names)
else:
self.concat_distfc_parameters(weight_param_names,
weight_velocity_param_names,
bias_param_names,
bias_velocity_param_names)
logger.info("Done.")
if __name__ == "__main__":
converter = ParameterConverter('./trained_model',
"./trained_model_temp",
8)
converter.process()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import os
import six
import logging
import argparse
import shutil
import pickle
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.fluid.transpiler.details import program_to_code
logging.basicConfig(level=logging.INFO,
format='[%(levelname)s %(asctime)s line:%(lineno)d] %(message)s',
datefmt='%d %b %Y %H:%M:%S')
logger = logging.getLogger()
parser = argparse.ArgumentParser(description="""
Tool to convert pretrained distributed fc parameters for inference.
Note that the number of ranks or GPUs for inference can be different
from that for pretraining.""")
parser.add_argument("--name_feature",
type=str,
default="@rank@",
help="Feature for names of distributed fc parameters. "
"For example, by default the name for the "
"distributed fc weight parameter is like "
"dist@xxx@rank@id.w_0 where xxx is softmax or arcface "
"depending on the loss types used and rank_id is the "
"rank_id generating this parameter, and hence the "
"feature cloud be @rank@.")
parser.add_argument("--pretrain_nranks",
type=int,
default=-1,
help="Number of ranks (GPUs) for pre-training.")
parser.add_argument("--nranks",
type=int,
required=True,
help="Number of ranks (GPUs) for inference or finetuning.")
parser.add_argument("--num_classes",
type=int,
default=-1,
help="Number of classes for classification.")
parser.add_argument("--emb_dim",
type=int,
default=-1,
help="Embedding dim.")
parser.add_argument("--pretrained_model_dir",
type=str,
required=True,
default=None,
help="Directory for pretrained model.")
parser.add_argument("--output_dir",
type=str,
required=True,
default=None,
help="Directory for output.")
args = parser.parse_args()
def load_config(args):
"""
Load config file which contains the following information for pretraining:
1. pretrain_nranks (int): number of ranks for pretraining;
2. emb_dim (int): embedding dim for pretraining;
3. num_classes (int): number of classes for classification.
"""
meta_file = os.path.join(args.pretrained_model_dir, 'meta.pickle')
if not os.path.exists(meta_file):
if args.pretrain_nranks < 0 or args.emb_dim < 0 or args.num_classes < 0:
logger.error("Meta file does not exist, you have to set "
"'--pretrain_nranks', '--emb_dim' and '--num_classes "
"parameters manually.")
exit()
logger.debug("Meta file does not exist, make sure you have correctly "
"set --pretrain_nranks ({}), --emb_dim ({}) and "
"--num_classes ({}) parameters manually.".format(
args.pretrain_nranks, args.emb_dim, args.num_classes))
else:
with open(meta_file, 'rb') as handle:
config = pickle.load(handle)
if args.pretrain_nranks < 0:
args.pretrain_nranks = config['pretrain_nranks']
elif args.pretrain_nranks != config['pretrain_nranks']:
logger.error("The --pretrain_nranks ({}) parameter you set is not "
"equal to that ({}) for pretraining, please check "
"it.".format(args.pretrain_nranks,
config['pretrain_nranks']))
exit()
if args.emb_dim < 0:
args.emb_dim = config['emb_dim']
elif args.emb_dim != config['emb_dim']:
logger.error("The --emb_dim ({}) parameter you set is not equal to "
"that ({}) for pretraining, please check it.".format(
args.emb_dim, config['emb_dim']))
exit()
if args.num_classes < 0:
args.num_classes = config['num_classes']
elif args.num_classes != config['num_classes']:
logger.error("The --num_classes ({}) parameter you set is not equal"
" to that ({}) for pretraining, please check "
"it.".format(args.emb_dim, config['emb_dim']))
exit()
logger.debug("Parameters for pretraining: pretrain_nranks ({}), emb_dim "
"({}), and num_classes ({}).".format(args.pretrain_nranks,
args.emb_dim, args.num_classes))
logger.debug("Parameters for inference or finetuning: nranks ({}).".format(
args.nranks))
def find_distfc_var_names(args):
"""
Find all names of pretrained distfc-related parameters,
e.g., dist_softmax_rank_00000.w_0, dist_softmax_rank_00000.b_0 etc.
We assume that names of distfc-related parameters start with the
prefix 'dist'.
"""
var_names = []
model_dir = os.path.abspath(args.pretrained_model_dir)
if not os.path.exists(model_dir):
logger.error("The directory for pretrained model ({}) does not exist, "
"please check it.".format(model_dir))
exit()
logger.info("The directory for pretrained model: {}".format(model_dir))
args.pretrained_model_dir = model_dir
for file in os.listdir(model_dir):
if args.name_feature in file:
var_names.append(file)
assert len(var_names) > 0, \
logger.error("No distributed fc parameters found.")
logger.info("Number of distributed fc parameters: {}.".format(
len(var_names)))
logger.debug("Distributed fc parameters: {}.".format(var_names))
return var_names
def split_load_and_save(args,
name_index,
param_names,
save_rank_id,
remainder,
as_bias,
train_nshards,
train_nranks,
nshards,
dtype="float32"):
var2 = None
advance = False
emb_dim = args.emb_dim
main_program = fluid.Program()
startup_program = fluid.Program()
load_var_name = param_names[name_index]
save_var_name_list = load_var_name.split('.')
save_var_name_list[0] = save_var_name_list[0].split('@')
save_var_name_list[0][-1] = "%05d" % save_rank_id
save_var_name_list[0] = '@'.join(save_var_name_list[0])
save_var_name = '.'.join(save_var_name_list)
last_train_nshards = args.num_classes - (train_nranks - 1) * train_nshards
with fluid.program_guard(main_program, startup_program):
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var = fluid.layers.create_parameter(shape, dtype=dtype,
name=load_var_name)
if as_bias:
var = fluid.layers.slice(var, axes=[0],
starts=[var.shape[0] - remainder], ends=[var.shape[0]])
else:
var = fluid.layers.split(var, [var.shape[1] - remainder, remainder],
dim=1)[1]
save_var_dim = nshards
if remainder < nshards:
if name_index == train_nranks - 1:
save_var_dim = remainder
else:
name_index += 1
advance = True
load_var_name = param_names[name_index]
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var2 = fluid.layers.create_parameter(shape, dtype=dtype,
name=load_var_name)
if remainder + var_dim < nshards:
# The last train rank
save_var_dim = remainder + var_dim
else:
remainder = remainder + var_dim - nshards
elif remainder == nshards:
if name_index == train_nranks - 2:
remainder = last_train_nshards
advance = True
elif name_index < train_nranks - 2:
remainder = train_nshards
advance = True
else:
remainder = remainder - nshards
if var2 is not None:
var = fluid.layers.concat([var, var2], axis=0 if as_bias else 1)
shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim]
to_save_var = fluid.layers.create_parameter(shape, dtype=dtype,
name=save_var_name + '_temp')
if save_var_dim != nshards: # get last dim
if as_bias:
temp_var = fluid.layers.slice(var, axes=[0],
starts=[var.shape[0] - save_var_dim], ends=[var.shape[0]])
else:
temp_var = fluid.layers.split(var,
[var.shape[1] - save_var_dim, save_var_dim], dim=1)[1]
fluid.layers.assign(temp_var, to_save_var)
else:
if as_bias:
temp_var = fluid.layers.slice(var, axes=[0], starts=[0],
ends=[nshards])
else:
temp_var = fluid.layers.split(var,
[nshards, var.shape[1] - nshards], dim=1)[0]
fluid.layers.assign(temp_var, to_save_var)
def expected_var(var):
has_var = os.path.exists(os.path.join(args.pretrained_model_dir,
var.name))
if has_var:
return True
return False
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
fluid.io.load_vars(exe, dirname=args.pretrained_model_dir,
predicate=expected_var, main_program=main_program)
exe.run(main_program)
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
fluid.io.save_vars(exe, args.output_dir, vars=[to_save_var],
main_program=main_program)
srcfile = os.path.join(args.output_dir, to_save_var.name)
dstfile = os.path.join(args.output_dir, save_var_name)
shutil.move(srcfile, dstfile)
return remainder, advance
def split_parameters(args, param_names, as_bias):
"""
Split parameters whose names are in param_names.
Params:
args: command line paramters
param_names: list of names of parameters to split
as_bias: whether parameters to split are as bias or not
"""
num_classes = args.num_classes
train_nranks = args.pretrain_nranks
nranks = args.nranks
train_nshards = (num_classes + train_nranks - 1) // train_nranks
nshards = (num_classes + nranks - 1) // nranks # for inference of finetuning
save_rank_id = 0
remainder_var_dim = train_nshards # remainder dim that is not split in a var
name_index = 0 # index of name of pretrained parameter to process
for save_rank_id in range(nranks):
assert name_index < train_nranks
remainder_var_dim, advance = split_load_and_save(args, name_index,
param_names, save_rank_id, remainder_var_dim, as_bias,
train_nshards, train_nranks, nshards)
name_index += 1 if advance else 0
processed_var_count = name_index + 1
assert processed_var_count == train_nranks, logger.error("Number of "
"pretrained parameters processed ({}) is not equal to the number of "
"ranks ({}) for pretraining.".format(processed_var_count, train_nranks))
assert save_rank_id == nranks - 1, logger.error("Number of saved parameters"
" ({}) is not equal to the number of ranks ({}) for inference or "
"finetuning.".format(save_rank_id + 1, nranks))
def split_distfc_parameters(args,
weight_param_names,
weight_velocity_param_names,
bias_param_names,
bias_velocity_param_names):
"""
Split each distributed fc-related parameter according to number of ranks
for inference or finetuning.
Params:
args: command line paramters
weight_param_names: list of names of weight parameters
bias_param_names: list of names of bias parameters
"""
split_parameters(args, weight_param_names, as_bias=False)
split_parameters(args, weight_velocity_param_names, as_bias=False)
if len(bias_param_names) != 0:
split_parameters(args, bias_param_names, as_bias=True)
split_parameters(args, bias_velocity_param_names, as_bias=True)
def concat_load_and_save(args,
name_index,
param_names,
save_rank_id,
remainder,
as_bias,
train_nshards,
train_nranks,
nshards,
dtype="float32"):
advance = 0
orig_nshards = nshards
emb_dim = args.emb_dim
main_program = fluid.Program()
startup_program = fluid.Program()
load_var_name = param_names[name_index]
save_var_name_list = load_var_name.split('.')
save_var_name_list[0] = save_var_name_list[0].split('@')
save_var_name_list[0][-1] = "%05d" % save_rank_id
save_var_name_list[0] = '@'.join(save_var_name_list[0])
save_var_name = '.'.join(save_var_name_list)
last_train_nshards = args.num_classes - (train_nranks - 1) * train_nshards
with fluid.program_guard(main_program, startup_program):
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var = fluid.layers.create_parameter(shape, dtype=dtype,
name=load_var_name)
if as_bias:
var = fluid.layers.slice(var, axes=[0],
starts=[var.shape[0] - remainder], ends=[var.shape[0]])
else:
var = fluid.layers.split(var, [var.shape[1] - remainder, remainder],
dim=1)[1]
to_concat_var_list = [var]
while remainder < nshards and name_index < train_nranks - 1:
name_index += 1
advance += 1
load_var_name = param_names[name_index]
if name_index == train_nranks - 1:
var_dim = last_train_nshards
else:
var_dim = train_nshards
shape = [var_dim] if as_bias else [emb_dim, var_dim]
var = fluid.layers.create_parameter(shape, dtype=dtype,
name=load_var_name)
to_concat_var_list.append(var)
remainder += var_dim
if len(to_concat_var_list) > 1:
var = fluid.layers.concat(
to_concat_var_list, axis=0 if as_bias else 1)
save_var_dim = nshards
if remainder > nshards:
if as_bias:
var = fluid.layers.slice(var, axes=[0], starts=[0],
ends=[nshards])
else:
var = fluid.layers.split(var,
[nshards, var.shape[1] - nshards], dim=1)[0]
remainder = remainder - nshards
elif remainder == nshards:
if name_index == train_nranks - 2:
#advance += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add
#name_index += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add
advance += 1
name_index += 1
remainder = last_train_nshards
elif name_index < train_nranks - 2:
#advance += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add
#name_index += 1 if len(to_concat_var_list) > 1 else 0 # to avoid duplicate add
advance += 1
name_index += 1
remainder = train_nshards
else:
save_var_dim = remainder
shape = [save_var_dim] if as_bias else [emb_dim, save_var_dim]
to_save_var = fluid.layers.create_parameter(shape, dtype=dtype,
name=save_var_name + '_temp')
fluid.layers.assign(var, to_save_var)
def expected_var(var):
has_var = os.path.exists(os.path.join(args.pretrained_model_dir,
var.name))
if has_var:
return True
return False
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
fluid.io.load_vars(exe, dirname=args.pretrained_model_dir,
predicate=expected_var, main_program=main_program)
exe.run(main_program)
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
fluid.io.save_vars(exe, args.output_dir, vars=[to_save_var],
main_program=main_program)
srcfile = os.path.join(args.output_dir, to_save_var.name)
dstfile = os.path.join(args.output_dir, save_var_name)
shutil.move(srcfile, dstfile)
return remainder, advance
def concat_parameters(args, param_names, as_bias):
"""
Concat parameters whose names are in param_names.
Params:
args: command line paramters
param_names: list of names of parameters to concat
as_bias: whether parameters to split are as bias or not
"""
num_classes = args.num_classes
train_nranks = args.pretrain_nranks
nranks = args.nranks
train_nshards = (num_classes + train_nranks - 1) // train_nranks
nshards = (num_classes + nranks - 1) // nranks # for inference of finetuning
save_rank_id = 0
remainder_dim = train_nshards # remainder dim that is not concatted
name_index = 0 # index of name of pretrained parameter to process
for save_rank_id in range(nranks):
assert name_index < train_nranks
remainder_dim, advance = concat_load_and_save(args,
name_index, param_names, save_rank_id, remainder_dim,
as_bias, train_nshards, train_nranks, nshards)
name_index += advance
processed_var_count = name_index + 1
assert processed_var_count == train_nranks, logger.error("Number of "
"pretrained parameters processed ({}) is not equal to the number of "
"ranks ({}) for pretraining.".format(processed_var_count, train_nranks))
assert save_rank_id == nranks - 1, logger.error("Number of saved parameters"
" ({}) is not equal to the number of ranks ({}) for inference or "
"finetuning.".format(save_rank_id + 1, nranks))
def concat_distfc_parameters(args,
weight_param_names,
weight_velocity_param_names,
bias_param_names,
bias_velocity_param_names):
"""
Concat distributed fc-related parameters according to number of ranks
for inference or finetuning.
Params:
args: command line paramters
weight_param_names: list of names of weight parameters
bias_param_names: list of names of bias parameters
"""
concat_parameters(args, weight_param_names, as_bias=False)
concat_parameters(args, weight_velocity_param_names, as_bias=False)
if len(bias_param_names) != 0:
concat_parameters(args, bias_param_names, as_bias=True)
concat_parameters(args, bias_velocity_param_names, as_bias=True)
def parameter_name_compare(x, y):
"""
Compare two parameter names depend on their rank id.
A parameter name is like dist_softmax_rank_00000.w_0,
where 00000 is the rank id.
"""
rank_id_x = int(x.split('.')[0].split('@')[-1])
rank_id_y = int(y.split('.')[0].split('@')[-1])
if rank_id_x < rank_id_y:
return -1
elif rank_id_x == rank_id_y:
return 0
else:
return 1
def main():
global args
load_config(args)
var_names = find_distfc_var_names(args)
weight_param_names = [name for name in var_names
if '.w' in name and 'velocity' not in name]
weight_velocity_param_names = [name for name in var_names
if '.w' in name and 'velocity' in name]
bias_param_names = [name for name in var_names
if '.b' in name and 'velocity' not in name]
bias_velocity_param_names = [name for name in var_names
if '.b' in name and 'velocity' in name]
weight_param_names.sort(parameter_name_compare)
weight_velocity_param_names.sort(parameter_name_compare)
bias_param_names.sort(parameter_name_compare)
bias_velocity_param_names.sort(parameter_name_compare)
assert len(weight_param_names) == args.pretrain_nranks, \
logger.error("Number of distributed fc-related weight parameters ({}) "
"should be equal to the number of ranks ({}) for "
"pretraining.".format(len(weight_param_names),
args.pretrain_nranks))
assert len(weight_velocity_param_names) == args.pretrain_nranks, \
logger.error("Number of distributed fc-related weight parameters ({}) "
"should be equal to the number of ranks ({}) for "
"pretraining.".format(len(weight_velocity_param_names),
args.pretrain_nranks))
assert len(bias_param_names) == 0 or \
len(bias_param_names) == args.pretrain_nranks, logger.error("Number of "
"distributed fc-related bias parameters ({}) should be 0 or equal "
"to the number of ranks ({}) for pretraining.".format(
len(bias_param_names), args.pretrain_nranks))
assert len(bias_velocity_param_names) == 0 or \
len(bias_velocity_param_names) == args.pretrain_nranks, logger.error("Number of "
"distributed fc-related bias parameters ({}) should be 0 or equal "
"to the number of ranks ({}) for pretraining.".format(
len(bias_velocity_param_names), args.pretrain_nranks))
pretrain_nranks = args.pretrain_nranks
nranks = args.nranks
if pretrain_nranks == nranks:
logger.info("Pre-training and inference (or finetuning) have the same "
"number of ranks, nothing to do.")
elif pretrain_nranks < nranks:
split_distfc_parameters(args, weight_param_names,
weight_velocity_param_names, bias_param_names,
bias_velocity_param_names)
else:
concat_distfc_parameters(args, weight_param_names,
weight_velocity_param_names, bias_param_names,
bias_velocity_param_names)
logger.info("Done.")
if __name__ == "__main__":
main()
......@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" PLSC version string """
plsc_version = "0.1.0"
plsc_version = "0.0.0"
......@@ -12,29 +12,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import print_function
import os
import argparse
import random
import time
import math
import logging
import math
import os
import random
import sqlite3
import tempfile
import six
import time
import six
logging.basicConfig(level=logging.INFO,
format='[%(levelname)s %(asctime)s line:%(lineno)d] %(message)s',
datefmt='%d %b %Y %H:%M:%S')
format='[%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d %b %Y %H:%M:%S')
logger = logging.getLogger()
parser = argparse.ArgumentParser(description="""
Tool to preprocess dataset in base64 format.""")
"""
We assume that the directory of dataset contains a file-list file, and one
or more data files. Each line of the file-list file represents a data file.
......@@ -111,9 +110,9 @@ class Base64Preprocessor(object):
line = line.strip()
file_path = os.path.join(self.data_dir, line)
with open(file_path, 'r') as df:
for line in df.xreadlines():
line = line.strip().split('\t')
self.insert_to_db(cnt, line)
for line_local in df.xreadlines():
line_local = line_local.strip().split('\t')
self.insert_to_db(cnt, line_local)
cnt += 1
os.remove(file_path)
else:
......@@ -121,9 +120,9 @@ class Base64Preprocessor(object):
line = line.strip()
file_path = os.path.join(self.data_dir, line)
with open(file_path, 'r') as df:
for line in df:
line = line.strip().split('\t')
self.insert_to_db(cnt, line)
for line_local in df:
line_local = line_local.strip().split('\t')
self.insert_to_db(cnt, line_local)
cnt += 1
os.remove(file_path)
......@@ -143,19 +142,20 @@ class Base64Preprocessor(object):
start_time = time.time()
lines_per_rank = int(math.ceil(num/nranks))
lines_per_rank = int(math.ceil(num / nranks))
total_num = lines_per_rank * nranks
index = index + index[0:total_num - num]
assert len(index) == total_num
for rank in range(nranks):
start = rank * lines_per_rank
end = (rank + 1) * lines_per_rank # exclusive
end = (rank + 1) * lines_per_rank # exclusive
f_handler = open(os.path.join(self.data_dir,
".tmp_" + str(rank)), 'w')
".tmp_" + str(rank)), 'w')
for i in range(start, end):
idx = index[i]
sql_cmd = "SELECT DATA, LABEL FROM DATASET WHERE ID={};".format(idx)
sql_cmd = "SELECT DATA, LABEL FROM DATASET WHERE ID={};".format(
idx)
cursor = self.cursor.execute(sql_cmd)
for result in cursor:
data = result[0]
......@@ -174,7 +174,7 @@ class Base64Preprocessor(object):
line += '\n'
f_t.writelines(line)
os.rename(os.path.join(data_dir, ".tmp_" + str(rank)),
os.path.join(data_dir, "base64_rank_{}".format(rank)))
os.path.join(data_dir, "base64_rank_{}".format(rank)))
os.remove(file_list)
os.rename(temp_file_list, file_list)
......@@ -183,21 +183,16 @@ class Base64Preprocessor(object):
def close_db(self):
self.conn.close()
self.tempfile.close()
os.remove(self.sqlite3_file)
def main():
global args
obj = Base64Preprocessor(args.data_dir, args.file_list, args.nranks)
obj.shuffle_files()
obj.close_db()
#data_dir = args.data_dir
#file_list = args.file_list
#nranks = args.nranks
#names, file_num_map, num = get_image_info(data_dir, file_list)
#
if __name__ == "__main__":
main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册