提交 759b68ea 编写于 作者: M MRXLT

Merge remote-tracking branch 'upstream/master'

...@@ -23,7 +23,7 @@ PLSC具备以下特点: ...@@ -23,7 +23,7 @@ PLSC具备以下特点:
### 基础功能 ### 基础功能
* [API简介](docs/api_intro.md) * [API简介](docs/api_intro.md)
* [自定义模型](docs/custom_modes.md) * [自定义模型](docs/custom_models.md)
* [自定义Reader接口] * [自定义Reader接口]
### 预测部署 ### 预测部署
...@@ -33,6 +33,5 @@ PLSC具备以下特点: ...@@ -33,6 +33,5 @@ PLSC具备以下特点:
### 高级功能 ### 高级功能
* [混合精度训练] * [分布式参数转换](docs/distributed_params.md)
* [分布式参数转换] * [Base64格式图像预处理](docs/base64_preprocessor.md)
* [Base64格式图像预处理]
# Base64格式图像预处理
## 简介
实际业务中,一种常见的训练数据存储格式是将图像数据编码为base64格式。训练数据文件
的每一行存储一张图像的base64数据和该图像的标签,并通常以制表符('\t')分隔。
通常,所有训练数据文件的文件列表记录在一个单独的文件中,整个训练数据集的目录结构如下:
```shell
dataset
|-- file_list.txt
|-- dataset.part1
|-- dataset.part2
... ....
`-- dataset.part10
```
其中,file_list.txt记录训练数据的文件列表,每行代表一个文件,以上面的例子来说,
file_list.txt的文件内容如下:
```shell
dataset.part1
dataset.part2
...
dataset.part10
```
而数据文件的每一行表示一张图像数据的base64表示,以及以制表符分隔的图像标签。
对于分布式训练,需要每张GPU卡处理相同数量的图像数据,并且通常需要在训练前做一次
训练数据的全局shuffle。
本文档介绍Base64格式图像预处理工具,用于在对训练数据做全局shuffle,并将训练数据均分到多个数据文件,
数据文件的数量和训练中使用的GPU卡数相同。当训练数据的总量不能整除GPU卡数时,通常会填充部分图像
数据(填充的图像数据随机选自训练数据集),以保证总的训练图像数量是GPU卡数的整数倍。
## 工具使用方法
工具位于tools目录下。
可以通过下面的命令行查看工具的使用帮助信息:
```python
python tools/process_base64_files.py --help
```
该工具支持以下命令行选项:
* data_dir: 训练数据的根目录
* file_list: 记录训练数据文件的列表文件,如file_list.txt
* nranks: 训练所使用的GPU卡的数量。
可以通过以下命令行运行该工具:
```shell
python tools/process_base64_files.py --data_dir=./dataset --file_list=file_list.txt --nranks=8
```
那么,会生成8个数量数据文件,每个文件中包含相同数量的训练数据。
最终的目录格式如下:
```shell
dataset
|-- file_list.txt
|-- dataset.part1
|-- dataset.part2
... ....
`-- dataset.part8
```
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
默认地,PaddlePaddle大规模分类库构建基于ResNet50模型的训练模型。 默认地,PaddlePaddle大规模分类库构建基于ResNet50模型的训练模型。
PLSC提供了模型基类plsc.models.base_model.BaseModel,用户可以基于该基类构建自己的网络模型。用户自定义的模型类需要继承自该基类,并实现build_network方法,该方法用于构建用户自定义模型。 PLSC提供了模型基类plsc.models.base_model.BaseModel,用户可以基于该基类构建自己的网络模型。用户自定义的模型类需要继承自该基类,并实现build_network方法,该方法用于构建用户自定义模型。
用户在使用时需要调用类的get_output方法,该方法在用户自定义模型的尾端自动添加分布式FC层。
下面的例子给出如何使用BaseModel基类定义用户自己的网络模型, 以及如何使用。 下面的例子给出如何使用BaseModel基类定义用户自己的网络模型, 以及如何使用。
```python ```python
......
# 分布式参数转换
## 简介
对于最后一层全连接层参数(W和b,假设参数b存在,否则,全连接参数仅为W),通常切分到所有训练GPU卡。例如,
假设训练阶段使用的GPU卡数为N,那么
$$W = [W_{1}, W_{2},..., W_{N}$$
$$b = [b_{1}, b_{2},..., b_{N}$$
并且,参数$W_{i}$和$b_{i}$保存在第i个GPU。
当保存模型时,各个GPU卡的分布式参数均会得到保存。
在热启动或fine-tuning阶段,如果训练GPU卡数和热启动前或者预训练阶段使用的GPU卡数不同时,需要
对分布式参数进行转换,以保证分布式参数的数量和训练使用的GPU卡数相同。
默认地,当使用plsc.entry.Entry.train()方法时,会自动进行分布式参数的转换。
## 工具使用方法
分布式参数转换工具也可以单独使用,可以通过下面的命令查看使用方法:
```shell
python -m plsc.utils.process_distfc_parameter --help
```
该工具支持以下命令行选项:
| 选项 | 描述 |
| :---------------------- | :------------------- |
| name_feature | 分布式参数的名称特征,用于识别分布式参数。默认的,分布式参数的名称前缀为dist@arcface@rank@rankid或者dist@softmax@rank@rankid。其中,rankid为表示GPU卡的id。默认地,name_feature的值为@rank@。用户通常不需要改变该参数的值 |
| pretrain_nranks | 预训练阶段使用的GPU卡数 |
| nranks | 本次训练将使用的GPU卡数 |
| num_classes | 分类类别的数目 |
| emb_dim | 倒数第二层全连接层的输出维度,不包含batch size |
| pretrained_model_dir | 预训练模型的保存目录 |
| output_dir | 转换后分布式参数的保存目录 |
通常,在预训练模型中包含meta.pickle文件,该文件记录预训练阶段使用的GPU卡数,分类类别书和倒数第二层全连接层的输出维度,因此通常不需要指定pretrain_nranks、num_classes和emb_dim参数。
可以通过以下命令转换分布式参数:
```shell
python -m plsc.utils.process_distfc_parameter --nranks=4 --pretrained_model_dir=./output --output_dir=./output_post
```
需要注意的是,转换后的分布式参数保存目录只包含转换后的分布式参数,而不包含其它模型参数。因此,通常需要使用转换后的分布式参数替换
预训练模型中的分布式参数。
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
通常,PaddlePaddle大规模分类库在训练过程中保存的模型只保存模型参数信息, 通常,PaddlePaddle大规模分类库在训练过程中保存的模型只保存模型参数信息,
而不包括预测模型结构。为了部署PLSC预测库,需要将预训练模型导出为预测模型。 而不包括预测模型结构。为了部署PLSC预测库,需要将预训练模型导出为预测模型。
预测模型包括预测所需要的模型参数和模型结构,用于后续地预测任务(参见[C++预测库使用])
可以通过下面的代码将预训练模型导出为预测模型: 可以通过下面的代码将预训练模型导出为预测模型:
......
# 混合精度训练
PLSC支持混合精度训练。使用混合精度训练可以提升训练的速度,同时减少训练使用的内存。
可以通过下面的代码设置开启混合精度训练:
```python
from __future__ import print_function
import plsc.entry as entry
def main():
ins = entry.Entry()
ins.set_mixed_precision(True, 1.0)
ins.train()
if __name__ == "__main__":
main()
```
其中,`set_mixed_precision`函数介绍如下:
| API | 描述 | 参数说明 |
| :------------------- | :--------------------| :---------------------- |
| set_mixed_precision(use_fp16, loss_scaling) | 设置混合精度训练 | `use_fp16`为是否开启混合精度训练,默认为False;`loss_scaling`为初始的损失缩放值,默认为1.0|
- `use_fp16`:bool类型,当想要开启混合精度训练时,可将此参数设为True即可。
- `loss_scaling`:float类型,为初始的损失缩放值,这个值有可能会影响混合精度训练的精度,建议设为默认值1.0。
为了提高混合精度训练的稳定性和精度,默认开启了动态损失缩放机制。更多关于混合精度训练的介绍可参考:[混合精度训练](https://arxiv.org/abs/1710.03740)
...@@ -12,3 +12,6 @@ ...@@ -12,3 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .entry import Entry
__all__ = ['Entry']
此差异已折叠。
...@@ -24,6 +24,7 @@ import pickle ...@@ -24,6 +24,7 @@ import pickle
import subprocess import subprocess
import shutil import shutil
import logging import logging
import tempfile
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -43,7 +44,8 @@ from paddle.fluid.optimizer import Optimizer ...@@ -43,7 +44,8 @@ from paddle.fluid.optimizer import Optimizer
logging.basicConfig( logging.basicConfig(
format='[%(asctime)s %(levelname)s line:%(lineno)d] %(message)s', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d %b %Y %H:%M:%S') datefmt='%d %b %Y %H:%M:%S')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -57,6 +59,9 @@ class Entry(object): ...@@ -57,6 +59,9 @@ class Entry(object):
""" """
Check the validation of parameters. Check the validation of parameters.
""" """
assert os.getenv("PADDLE_TRAINERS_NUM") is not None, \
"Please start script using paddle.distributed.launch module."
supported_types = ["softmax", "arcface", supported_types = ["softmax", "arcface",
"dist_softmax", "dist_arcface"] "dist_softmax", "dist_arcface"]
assert self.loss_type in supported_types, \ assert self.loss_type in supported_types, \
...@@ -70,10 +75,8 @@ class Entry(object): ...@@ -70,10 +75,8 @@ class Entry(object):
def __init__(self): def __init__(self):
self.config = config.config self.config = config.config
super(Entry, self).__init__() super(Entry, self).__init__()
assert os.getenv("PADDLE_TRAINERS_NUM") is not None, \ num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", 1))
"Please start script using paddle.distributed.launch module." trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM"))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
self.trainer_id = trainer_id self.trainer_id = trainer_id
self.num_trainers = num_trainers self.num_trainers = num_trainers
...@@ -114,8 +117,15 @@ class Entry(object): ...@@ -114,8 +117,15 @@ class Entry(object):
self.model_save_dir = self.config.model_save_dir self.model_save_dir = self.config.model_save_dir
self.warmup_epochs = self.config.warmup_epochs self.warmup_epochs = self.config.warmup_epochs
if self.checkpoint_dir:
self.checkpoint_dir = os.path.abspath(self.checkpoint_dir)
if self.model_save_dir:
self.model_save_dir = os.path.abspath(self.model_save_dir)
if self.dataset_dir:
self.dataset_dir = os.path.abspath(self.dataset_dir)
logger.info('=' * 30) logger.info('=' * 30)
logger.info("Default configuration: ") logger.info("Default configuration:")
for key in self.config: for key in self.config:
logger.info('\t' + str(key) + ": " + str(self.config[key])) logger.info('\t' + str(key) + ": " + str(self.config[key]))
logger.info('trainer_id: {}, num_trainers: {}'.format( logger.info('trainer_id: {}, num_trainers: {}'.format(
...@@ -123,18 +133,21 @@ class Entry(object): ...@@ -123,18 +133,21 @@ class Entry(object):
logger.info('=' * 30) logger.info('=' * 30)
def set_val_targets(self, targets): def set_val_targets(self, targets):
"""
Set the names of validation datasets, separated by comma.
"""
self.val_targets = targets self.val_targets = targets
logger.info("Set val_targets to {} by user.".format(targets)) logger.info("Set val_targets to {}.".format(targets))
def set_train_batch_size(self, batch_size): def set_train_batch_size(self, batch_size):
self.train_batch_size = batch_size self.train_batch_size = batch_size
self.global_train_batch_size = batch_size * self.num_trainers self.global_train_batch_size = batch_size * self.num_trainers
logger.info("Set train batch size to {} by user.".format(batch_size)) logger.info("Set train batch size to {}.".format(batch_size))
def set_test_batch_size(self, batch_size): def set_test_batch_size(self, batch_size):
self.test_batch_size = batch_size self.test_batch_size = batch_size
self.global_test_batch_size = batch_size * self.num_trainers self.global_test_batch_size = batch_size * self.num_trainers
logger.info("Set test batch size to {} by user.".format(batch_size)) logger.info("Set test batch size to {}.".format(batch_size))
def set_hdfs_info(self, fs_name, fs_ugi, directory): def set_hdfs_info(self, fs_name, fs_ugi, directory):
""" """
...@@ -153,38 +166,42 @@ class Entry(object): ...@@ -153,38 +166,42 @@ class Entry(object):
def set_model_save_dir(self, directory): def set_model_save_dir(self, directory):
""" """
Set the directory to save model. Set the directory to save models.
""" """
if directory:
directory = os.path.abspath(directory)
self.model_save_dir = directory self.model_save_dir = directory
logger.info("Set model_save_dir to {} by user.".format(directory)) logger.info("Set model_save_dir to {}.".format(directory))
def set_dataset_dir(self, directory): def set_dataset_dir(self, directory):
""" """
Set the root directory for datasets. Set the root directory for datasets.
""" """
if directory:
directory = os.path.abspath(directory)
self.dataset_dir = directory self.dataset_dir = directory
logger.info("Set dataset_dir to {} by user.".format(directory)) logger.info("Set dataset_dir to {}.".format(directory))
def set_train_image_num(self, num): def set_train_image_num(self, num):
""" """
Set the total number of images for train. Set the total number of images for train.
""" """
self.train_image_num = num self.train_image_num = num
logger.info("Set train_image_num to {} by user.".format(num)) logger.info("Set train_image_num to {}.".format(num))
def set_class_num(self, num): def set_class_num(self, num):
""" """
Set the number of classes. Set the number of classes.
""" """
self.num_classes = num self.num_classes = num
logger.info("Set num_classes to {} by user.".format(num)) logger.info("Set num_classes to {}.".format(num))
def set_emb_size(self, size): def set_emb_size(self, size):
""" """
Set the size of the last hidding layer before the distributed fc-layer. Set the size of the last hidding layer before the distributed fc-layer.
""" """
self.emb_size = size self.emb_size = size
logger.info("Set emb_size to {} by user.".format(size)) logger.info("Set emb_size to {}.".format(size))
def set_model(self, model): def set_model(self, model):
""" """
...@@ -194,25 +211,27 @@ class Entry(object): ...@@ -194,25 +211,27 @@ class Entry(object):
if not isinstance(model, base_model.BaseModel): if not isinstance(model, base_model.BaseModel):
raise ValueError("The parameter for set_model must be an " raise ValueError("The parameter for set_model must be an "
"instance of BaseModel.") "instance of BaseModel.")
logger.info("Set model to {} by user.".format(model)) logger.info("Set model to {}.".format(model))
def set_train_epochs(self, num): def set_train_epochs(self, num):
""" """
Set the number of epochs to train. Set the number of epochs to train.
""" """
self.train_epochs = num self.train_epochs = num
logger.info("Set train_epochs to {} by user.".format(num)) logger.info("Set train_epochs to {}.".format(num))
def set_checkpoint_dir(self, directory): def set_checkpoint_dir(self, directory):
""" """
Set the directory for checkpoint loaded before training/testing. Set the directory for checkpoint loaded before training/testing.
""" """
if directory:
directory = os.path.abspath(directory)
self.checkpoint_dir = directory self.checkpoint_dir = directory
logger.info("Set checkpoint_dir to {} by user.".format(directory)) logger.info("Set checkpoint_dir to {}.".format(directory))
def set_warmup_epochs(self, num): def set_warmup_epochs(self, num):
self.warmup_epochs = num self.warmup_epochs = num
logger.info("Set warmup_epochs to {} by user.".format(num)) logger.info("Set warmup_epochs to {}.".format(num))
def set_loss_type(self, type): def set_loss_type(self, type):
supported_types = ["dist_softmax", "dist_arcface", "softmax", "arcface"] supported_types = ["dist_softmax", "dist_arcface", "softmax", "arcface"]
...@@ -220,54 +239,53 @@ class Entry(object): ...@@ -220,54 +239,53 @@ class Entry(object):
raise ValueError("All supported loss types: {}".format( raise ValueError("All supported loss types: {}".format(
supported_types)) supported_types))
self.loss_type = type self.loss_type = type
logger.info("Set loss_type to {} by user.".format(type)) logger.info("Set loss_type to {}.".format(type))
def set_image_shape(self, shape): def set_image_shape(self, shape):
if not isinstance(shape, (list, tuple)): if not isinstance(shape, (list, tuple)):
raise ValueError("shape must be of type list or tuple") raise ValueError("Shape must be of type list or tuple")
self.image_shape = shape self.image_shape = shape
logger.info("Set image_shape to {} by user.".format(shape)) logger.info("Set image_shape to {}.".format(shape))
def set_optimizer(self, optimizer): def set_optimizer(self, optimizer):
if not isinstance(optimizer, Optimizer): if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be as type of Optimizer") raise ValueError("Optimizer must be type of Optimizer")
self.optimizer = optimizer self.optimizer = optimizer
logger.info("User manually set optimizer") logger.info("User manually set optimizer")
def get_optimizer(self): def _get_optimizer(self):
if self.optimizer: if not self.optimizer:
return self.optimizer bd = [step for step in self.lr_steps]
start_lr = self.lr
bd = [step for step in self.lr_steps]
start_lr = self.lr
global_batch_size = self.global_train_batch_size global_batch_size = self.global_train_batch_size
train_image_num = self.train_image_num train_image_num = self.train_image_num
images_per_trainer = int(math.ceil( images_per_trainer = int(math.ceil(
train_image_num * 1.0 / self.num_trainers)) train_image_num * 1.0 / self.num_trainers))
steps_per_pass = int(math.ceil( steps_per_pass = int(math.ceil(
images_per_trainer * 1.0 / self.train_batch_size)) images_per_trainer * 1.0 / self.train_batch_size))
logger.info("steps per epoch: %d" % steps_per_pass) logger.info("Steps per epoch: %d" % steps_per_pass)
warmup_steps = steps_per_pass * self.warmup_epochs warmup_steps = steps_per_pass * self.warmup_epochs
batch_denom = 1024 batch_denom = 1024
base_lr = start_lr * global_batch_size / batch_denom base_lr = start_lr * global_batch_size / batch_denom
lr = [base_lr * (0.1 ** i) for i in range(len(bd) + 1)] lr = [base_lr * (0.1 ** i) for i in range(len(bd) + 1)]
logger.info("lr boundaries: {}".format(bd)) logger.info("LR boundaries: {}".format(bd))
logger.info("lr_step: {}".format(lr)) logger.info("lr_step: {}".format(lr))
if self.warmup_epochs: if self.warmup_epochs:
lr_val = lr_warmup(fluid.layers.piecewise_decay(boundaries=bd, lr_val = lr_warmup(fluid.layers.piecewise_decay(boundaries=bd,
values=lr), warmup_steps, start_lr, base_lr) values=lr), warmup_steps, start_lr, base_lr)
else: else:
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
optimizer = fluid.optimizer.Momentum( optimizer = fluid.optimizer.Momentum(
learning_rate=lr_val, momentum=0.9, learning_rate=lr_val, momentum=0.9,
regularization=fluid.regularizer.L2Decay(5e-4)) regularization=fluid.regularizer.L2Decay(5e-4))
self.optimizer = optimizer self.optimizer = optimizer
if self.loss_type in ["dist_softmax", "dist_arcface"]: if self.loss_type in ["dist_softmax", "dist_arcface"]:
self.optimizer = DistributedClassificationOptimizer( self.optimizer = DistributedClassificationOptimizer(
self.optimizer, global_batch_size) self.optimizer, global_batch_size)
return self.optimizer return self.optimizer
def build_program(self, def build_program(self,
...@@ -302,6 +320,7 @@ class Entry(object): ...@@ -302,6 +320,7 @@ class Entry(object):
loss_type=self.loss_type, loss_type=self.loss_type,
margin=self.margin, margin=self.margin,
scale=self.scale) scale=self.scale)
if self.loss_type in ["dist_softmax", "dist_arcface"]: if self.loss_type in ["dist_softmax", "dist_arcface"]:
shard_prob = loss._get_info("shard_prob") shard_prob = loss._get_info("shard_prob")
...@@ -320,10 +339,12 @@ class Entry(object): ...@@ -320,10 +339,12 @@ class Entry(object):
optimizer = None optimizer = None
if is_train: if is_train:
# initialize optimizer # initialize optimizer
optimizer = self.get_optimizer() optimizer = self._get_optimizer()
dist_optimizer = self.fleet.distributed_optimizer( dist_optimizer = self.fleet.distributed_optimizer(
optimizer, strategy=self.strategy) optimizer, strategy=self.strategy)
dist_optimizer.minimize(loss) dist_optimizer.minimize(loss)
if "dist" in self.loss_type:
optimizer = optimizer._optimizer
elif use_parallel_test: elif use_parallel_test:
emb = fluid.layers.collective._c_allgather(emb, emb = fluid.layers.collective._c_allgather(emb,
nranks=num_trainers, use_calc_stream=True) nranks=num_trainers, use_calc_stream=True)
...@@ -361,11 +382,7 @@ class Entry(object): ...@@ -361,11 +382,7 @@ class Entry(object):
def preprocess_distributed_params(self, def preprocess_distributed_params(self,
local_dir): local_dir):
local_dir = os.path.abspath(local_dir) local_dir = os.path.abspath(local_dir)
output_dir = local_dir + "_@tmp" output_dir = tempfile.mkdtemp()
assert not os.path.exists(output_dir), \
"The temp directory {} for distributed params exists.".format(
output_dir)
os.makedirs(output_dir)
cmd = sys.executable + ' -m plsc.utils.process_distfc_parameter ' cmd = sys.executable + ' -m plsc.utils.process_distfc_parameter '
cmd += "--nranks {} ".format(self.num_trainers) cmd += "--nranks {} ".format(self.num_trainers)
cmd += "--num_classes {} ".format(self.num_classes) cmd += "--num_classes {} ".format(self.num_classes)
...@@ -388,13 +405,11 @@ class Entry(object): ...@@ -388,13 +405,11 @@ class Entry(object):
file = os.path.join(output_dir, file) file = os.path.join(output_dir, file)
shutil.move(file, local_dir) shutil.move(file, local_dir)
shutil.rmtree(output_dir) shutil.rmtree(output_dir)
file_name = os.path.join(local_dir, '.lock')
with open(file_name, 'w') as f:
pass
def append_broadcast_ops(self, program): def _append_broadcast_ops(self, program):
""" """
Before test, we broadcast bn-related parameters to all other trainers. Before test, we broadcast bathnorm-related parameters to all
other trainers from trainer-0.
""" """
bn_vars = [var for var in program.list_vars() bn_vars = [var for var in program.list_vars()
if 'batch_norm' in var.name and var.persistable] if 'batch_norm' in var.name and var.persistable]
...@@ -420,40 +435,42 @@ class Entry(object): ...@@ -420,40 +435,42 @@ class Entry(object):
checkpoint_dir = self.checkpoint_dir checkpoint_dir = self.checkpoint_dir
if self.fs_name is not None: if self.fs_name is not None:
ans = 'y'
if os.path.exists(checkpoint_dir): if os.path.exists(checkpoint_dir):
ans = input("Downloading pretrained model, but the local " ans = input("Downloading pretrained models, but the local "
"checkpoint directory ({}) exists, overwrite it " "checkpoint directory ({}) exists, overwrite it "
"or not? [Y/N]".format(checkpoint_dir)) "or not? [Y/N]".format(checkpoint_dir))
if ans.lower() == n: if ans.lower() == 'y':
logger.info("Using the local checkpoint directory, instead" if os.path.exists(checkpoint_dir):
" of the remote one.") logger.info("Using the local checkpoint directory.")
else:
logger.info("Overwriting the local checkpoint directory.")
shutil.rmtree(checkpoint_dir) shutil.rmtree(checkpoint_dir)
os.makedirs(checkpoint_dir) os.makedirs(checkpoint_dir)
file_name = os.path.join(checkpoint_dir, '.lock')
if self.trainer_id == 0: # sync all trainers to avoid loading checkpoints before
self.get_files_from_hdfs(checkpoint_dir) # parameters are downloaded
with open(file_name, 'w') as f: file_name = os.path.join(checkpoint_dir, '.lock')
pass if self.trainer_id == 0:
time.sleep(5)
os.remove(file_name)
else:
while True:
if not os.path.exists(file_name):
time.sleep(1)
else:
break
else:
self.get_files_from_hdfs(checkpoint_dir) self.get_files_from_hdfs(checkpoint_dir)
with open(file_name, 'w') as f:
pass
time.sleep(10)
os.remove(file_name)
else:
while True:
if not os.path.exists(file_name):
time.sleep(1)
else:
break
# Preporcess distributed parameters. # Preporcess distributed parameters.
file_name = os.path.join(checkpoint_dir, '.lock') file_name = os.path.join(checkpoint_dir, '.lock')
distributed = self.loss_type in ["dist_softmax", "dist_arcface"] distributed = self.loss_type in ["dist_softmax", "dist_arcface"]
if load_for_train and self.trainer_id == 0 and distributed: if load_for_train and self.trainer_id == 0 and distributed:
self.preprocess_distributed_params(checkpoint_dir) self.preprocess_distributed_params(checkpoint_dir)
time.sleep(5) with open(file_name, 'w') as f:
pass
time.sleep(10)
os.remove(file_name) os.remove(file_name)
elif load_for_train and distributed: elif load_for_train and distributed:
# wait trainer_id (0) to complete # wait trainer_id (0) to complete
...@@ -503,11 +520,11 @@ class Entry(object): ...@@ -503,11 +520,11 @@ class Entry(object):
load_for_train=False) load_for_train=False)
assert self.model_save_dir, \ assert self.model_save_dir, \
"Does not set model_save_dir for inference." "Does not set model_save_dir for inference model converting."
if os.path.exists(self.model_save_dir): if os.path.exists(self.model_save_dir):
ans = input("model_save_dir for inference model ({}) exists, " ans = input("model_save_dir for inference model ({}) exists, "
"overwrite it or not? [Y/N]".format(model_save_dir)) "overwrite it or not? [Y/N]".format(model_save_dir))
if ans.lower() == n: if ans.lower() == 'n':
logger.error("model_save_dir for inference model exists, " logger.error("model_save_dir for inference model exists, "
"and cannot overwrite it.") "and cannot overwrite it.")
exit() exit()
...@@ -551,17 +568,17 @@ class Entry(object): ...@@ -551,17 +568,17 @@ class Entry(object):
load_for_train=False) load_for_train=False)
if self.train_reader is None: if self.train_reader is None:
train_reader = paddle.batch(reader.arc_train( predict_reader = paddle.batch(reader.arc_train(
self.dataset_dir, self.num_classes), self.dataset_dir, self.num_classes),
batch_size=self.train_batch_size) batch_size=self.train_batch_size)
else: else:
train_reader = self.train_reader predict_reader = self.train_reader
feeder = fluid.DataFeeder(place=place, feeder = fluid.DataFeeder(place=place,
feed_list=['image', 'label'], program=main_program) feed_list=['image', 'label'], program=main_program)
fetch_list = [emb.name] fetch_list = [emb.name]
for data in train_reader(): for data in predict_reader():
emb = exe.run(main_program, feed=feeder.feed(data), emb = exe.run(main_program, feed=feeder.feed(data),
fetch_list=fetch_list, use_program_cache=True) fetch_list=fetch_list, use_program_cache=True)
print("emb: ", emb) print("emb: ", emb)
...@@ -684,18 +701,14 @@ class Entry(object): ...@@ -684,18 +701,14 @@ class Entry(object):
self.build_program(True, False) self.build_program(True, False)
if self.with_test: if self.with_test:
test_emb, test_loss, test_acc1, test_acc5, _ = \ test_emb, test_loss, test_acc1, test_acc5, _ = \
self.build_program(False, True) self.build_program(False, self.num_trainers > 1)
test_list, test_name_list = reader.test( test_list, test_name_list = reader.test(
self.dataset_dir, self.val_targets) self.dataset_dir, self.val_targets)
test_program = self.test_program test_program = self.test_program
self.append_broadcast_ops(test_program) self._append_broadcast_ops(test_program)
if self.loss_type in ["dist_softmax", "dist_arcface"]: global_lr = optimizer._global_learning_rate(
global_lr = optimizer._optimizer._global_learning_rate( program=self.train_program)
program=self.train_program)
else:
global_lr = optimizer._global_learning_rate(
program=self.train_program)
origin_prog = fleet._origin_program origin_prog = fleet._origin_program
train_prog = fleet.main_program train_prog = fleet.main_program
...@@ -720,10 +733,10 @@ class Entry(object): ...@@ -720,10 +733,10 @@ class Entry(object):
fetch_list_test = [test_emb.name, test_acc1.name, test_acc5.name] fetch_list_test = [test_emb.name, test_acc1.name, test_acc5.name]
real_test_batch_size = self.global_test_batch_size real_test_batch_size = self.global_test_batch_size
if self.checkpoint_dir == "": if self.checkpoint_dir:
load_checkpoint = False
else:
load_checkpoint = True load_checkpoint = True
else:
load_checkpoint = False
if load_checkpoint: if load_checkpoint:
self.load_checkpoint(executor=exe, main_program=origin_prog) self.load_checkpoint(executor=exe, main_program=origin_prog)
...@@ -839,7 +852,14 @@ class Entry(object): ...@@ -839,7 +852,14 @@ class Entry(object):
model_save_dir = os.path.join( model_save_dir = os.path.join(
self.model_save_dir, str(pass_id)) self.model_save_dir, str(pass_id))
if not os.path.exists(model_save_dir): if not os.path.exists(model_save_dir):
os.makedirs(model_save_dir) # may be more than one processes trying
# to create the directory
try:
os.makedirs(model_save_dir)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
pass
if trainer_id == 0: if trainer_id == 0:
fluid.io.save_persistables(exe, fluid.io.save_persistables(exe,
model_save_dir, model_save_dir,
......
#!/usr/bin/env bash
export FLAGS_cudnn_exhaustive_search=true
export FLAGS_fraction_of_gpu_memory_to_use=0.96
export FLAGS_eager_delete_tensor_gb=0.0
selected_gpus="0,1,2,3,4,5,6,7"
#selected_gpus="4,5,6"
python -m paddle.distributed.launch \
--selected_gpus $selected_gpus \
--log_dir mylog \
do_train.py \
--model=ResNet_ARCFACE50 \
--loss_type=dist_softmax \
--model_save_dir=output \
--margin=0.5 \
--train_batch_size 32 \
--class_dim 85742 \
--with_test=True
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PLSC version string """
plsc_version = "0.1.0"
numpy>=1.12, <=1.16.4 ; python_version<"3.5"
numpy>=1.12 ; python_version>="3.5"
scipy>=0.19.0, <=1.2.1 ; python_version<"3.5"
paddlepaddle>=1.6.2
scipy ; python_version>="3.5"
Pillow
sklearn
easydict
...@@ -11,17 +11,53 @@ ...@@ -11,17 +11,53 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from setuptools import setup, find_packages """Setup for pip package."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
setup(name="plsc",
version="0.1.0", from setuptools import find_packages
description="Large Scale Classfication via distributed fc.", from setuptools import setup
author='lilong', from plsc.version import plsc_version
author_email="lilong.albert@gmail.com",
url="http",
license="Apache", REQUIRED_PACKAGES = [
#packages=['paddleXML'], 'sklearn', 'easydict', 'paddlepaddle>=1.6.2', 'Pillow',
packages=find_packages(), 'numpy', 'scipy'
#install_requires=['paddlepaddle>=1.6.1'], ]
python_requires='>=2'
)
setup(
name="plsc",
version=plsc_version,
description=
("PaddlePaddle Large Scale Classfication Package."),
long_description='',
url='https://github.com/PaddlePaddle/PLSC',
author='PaddlePaddle Authors',
author_email='paddle-dev@baidu.com',
install_requires=REQUIRED_PACKAGES,
packages=find_packages(),
# PyPI package information.
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Education',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Scientific/Engineering',
'Topic :: Scientific/Engineering :: Mathematics',
'Topic :: Scientific/Engineering :: Artificial Intelligence',
'Topic :: Software Development',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules',
],
license="Apache 2.0",
keywords=
('plsc paddlepaddle large-scale classification model-parallelism distributed-training'))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册