未验证 提交 20295ab5 编写于 作者: W Wu Yi 提交者: GitHub

Merge pull request #1387 from typhoonzero/add_resnet_50_distributed_fine_tune

update dist resnet model config
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
__all__ = ['parse_args', ]
BENCHMARK_MODELS = [
"ResNet50", "ResNet101", "ResNet152"
]
def parse_args():
parser = argparse.ArgumentParser('Distributed Image Classification Training.')
parser.add_argument(
'--model',
type=str,
choices=BENCHMARK_MODELS,
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
# args related to learning rate
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_random',
action='store_true',
help='If set, keep the random seed and do not shuffle the data.')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
parser.add_argument(
'--data_dir',
type=str,
default="../data/ILSVRC2012",
help="The ImageNet dataset root dir."
)
args = parser.parse_args()
return args
...@@ -26,9 +26,84 @@ import six ...@@ -26,9 +26,84 @@ import six
import sys import sys
sys.path.append("..") sys.path.append("..")
import models import models
from args import *
from reader import train, val from reader import train, val
def parse_args():
parser = argparse.ArgumentParser('Distributed Image Classification Training.')
parser.add_argument(
'--model',
type=str,
default='DistResNet',
help='The model to run.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size per device.')
parser.add_argument(
'--multi_batch_repeat', type=int, default=1, help='Batch merge repeats.')
parser.add_argument(
'--learning_rate', type=float, default=0.1, help='The learning rate.')
parser.add_argument(
'--pass_num', type=int, default=90, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
parser.add_argument(
'--data_dir',
type=str,
default="../data/ILSVRC2012",
help="The ImageNet dataset root dir."
)
args = parser.parse_args()
return args
def get_model(args, is_train, main_prog, startup_prog): def get_model(args, is_train, main_prog, startup_prog):
pyreader = None pyreader = None
class_dim = 1000 class_dim = 1000
...@@ -51,7 +126,7 @@ def get_model(args, is_train, main_prog, startup_prog): ...@@ -51,7 +126,7 @@ def get_model(args, is_train, main_prog, startup_prog):
name="train_reader" if is_train else "test_reader", name="train_reader" if is_train else "test_reader",
use_double_buffer=True) use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader) input, label = fluid.layers.read_file(pyreader)
model_def = models.__dict__[args.model]() model_def = models.__dict__[args.model](layers=50, is_train=is_train)
predict = model_def.net(input, class_dim=class_dim) predict = model_def.net(input, class_dim=class_dim)
cost = fluid.layers.cross_entropy(input=predict, label=label) cost = fluid.layers.cross_entropy(input=predict, label=label)
...@@ -60,89 +135,64 @@ def get_model(args, is_train, main_prog, startup_prog): ...@@ -60,89 +135,64 @@ def get_model(args, is_train, main_prog, startup_prog):
batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1) batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5) batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
# configure optimize
optimizer = None optimizer = None
if is_train: if is_train:
start_lr = args.learning_rate
# n * worker * repeat
end_lr = args.learning_rate * trainer_count * args.multi_batch_repeat
total_images = 1281167 / trainer_count total_images = 1281167 / trainer_count
step = int(total_images / (args.batch_size * args.gpus * args.multi_batch_repeat) + 1)
step = int(total_images / (args.batch_size * args.gpus) + 1) warmup_steps = step * 5 # warmup 5 passes
epochs = [30, 60, 90] epochs = [30, 60, 80]
bd = [step * e for e in epochs] bd = [step * e for e in epochs]
base_lr = args.learning_rate base_lr = end_lr
lr = [] lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum( optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay( learning_rate=models.learning_rate.lr_warmup(
boundaries=bd, values=lr), fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
warmup_steps, start_lr, end_lr),
momentum=0.9, momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4)) regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
batched_reader = None batched_reader = None
pyreader.decorate_paddle_reader( pyreader.decorate_paddle_reader(
paddle.batch( paddle.batch(
reader if args.no_random else paddle.reader.shuffle( reader,
reader, buf_size=5120),
batch_size=args.batch_size)) batch_size=args.batch_size))
return avg_cost, optimizer, [batch_acc1, return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader batch_acc5], batched_reader, pyreader
def append_nccl2_prepare(trainer_id, startup_prog): def append_nccl2_prepare(trainer_id, startup_prog):
if trainer_id >= 0: trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
# append gen_nccl_id at the end of startup program port = os.getenv("PADDLE_PSERVER_PORT")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) worker_ips = os.getenv("PADDLE_TRAINER_IPS")
port = os.getenv("PADDLE_PSERVER_PORT") worker_endpoints = []
worker_ips = os.getenv("PADDLE_TRAINER_IPS") for ip in worker_ips.split(","):
worker_endpoints = [] worker_endpoints.append(':'.join([ip, port]))
for ip in worker_ips.split(","): current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
worker_endpoints.append(':'.join([ip, port]))
num_trainers = len(worker_endpoints) config = fluid.DistributeTranspilerConfig()
current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port config.mode = "nccl2"
worker_endpoints.remove(current_endpoint) t = fluid.DistributeTranspiler(config=config)
t.transpile(trainer_id, trainers=','.join(worker_endpoints),
nccl_id_var = startup_prog.global_block().create_var( current_endpoint=current_endpoint,
name="NCCLID", startup_program=startup_prog)
persistable=True,
type=fluid.core.VarDesc.VarType.RAW)
startup_prog.global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": worker_endpoints,
"trainer_id": trainer_id
})
return nccl_id_var, num_trainers, trainer_id
else:
raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
"nccl-based dist train.")
def dist_transpile(trainer_id, args, train_prog, startup_prog): def dist_transpile(trainer_id, args, train_prog, startup_prog):
if trainer_id < 0:
return None, None
# the port of all pservers, needed by both trainer and pserver
port = os.getenv("PADDLE_PSERVER_PORT", "6174") port = os.getenv("PADDLE_PSERVER_PORT", "6174")
# comma separated ips of all pservers, needed by trainer and
# pserver
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
eplist = [] eplist = []
for ip in pserver_ips.split(","): for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port])) eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist) pserver_endpoints = ",".join(eplist)
# total number of workers/trainers in the job, needed by
# trainer and pserver
trainers = int(os.getenv("PADDLE_TRAINERS")) trainers = int(os.getenv("PADDLE_TRAINERS"))
# the IP of the local machine, needed by pserver only
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
# the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE") training_role = os.getenv("PADDLE_TRAINING_ROLE")
config = fluid.DistributeTranspilerConfig() config = fluid.DistributeTranspilerConfig()
...@@ -150,8 +200,6 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog): ...@@ -150,8 +200,6 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog):
t = fluid.DistributeTranspiler(config=config) t = fluid.DistributeTranspiler(config=config)
t.transpile( t.transpile(
trainer_id, trainer_id,
# NOTE: *MUST* use train_prog, for we are using with guard to
# generate different program for train and test.
program=train_prog, program=train_prog,
pservers=pserver_endpoints, pservers=pserver_endpoints,
trainers=trainers, trainers=trainers,
...@@ -171,7 +219,7 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog): ...@@ -171,7 +219,7 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog):
) )
def test_parallel(exe, test_args, args, test_prog, feeder): def test_parallel(exe, test_args, args, test_prog):
acc_evaluators = [] acc_evaluators = []
for i in six.moves.xrange(len(test_args[2])): for i in six.moves.xrange(len(test_args[2])):
acc_evaluators.append(fluid.metrics.Accuracy()) acc_evaluators.append(fluid.metrics.Accuracy())
...@@ -190,13 +238,10 @@ def test_parallel(exe, test_args, args, test_prog, feeder): ...@@ -190,13 +238,10 @@ def test_parallel(exe, test_args, args, test_prog, feeder):
return [e.eval() for e in acc_evaluators] return [e.eval() for e in acc_evaluators]
# NOTE: only need to benchmark using parallelexe
def train_parallel(train_args, test_args, args, train_prog, test_prog, def train_parallel(train_args, test_args, args, train_prog, test_prog,
startup_prog, nccl_id_var, num_trainers, trainer_id): startup_prog, nccl_id_var, num_trainers, trainer_id):
over_all_start = time.time() over_all_start = time.time()
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
feeder = None
if nccl_id_var and trainer_id == 0: if nccl_id_var and trainer_id == 0:
#FIXME(wuyi): wait other trainer to start listening #FIXME(wuyi): wait other trainer to start listening
...@@ -237,31 +282,27 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog, ...@@ -237,31 +282,27 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog,
if args.update_method == "pserver": if args.update_method == "pserver":
test_scope = None test_scope = None
else: else:
# NOTE: use an empty scope to avoid test exe using NCCLID
test_scope = fluid.Scope() test_scope = fluid.Scope()
test_exe = fluid.ParallelExecutor( test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=exe) True, main_program=test_prog, share_vars_from=exe,
scope=test_scope)
pyreader = train_args[4] pyreader = train_args[4]
for pass_id in range(args.pass_num): for pass_id in range(args.pass_num):
num_samples = 0 num_samples = 0
iters = 0
start_time = time.time() start_time = time.time()
batch_id = 0 batch_id = 0
pyreader.start() pyreader.start()
while True: while True:
if iters == args.iterations:
break
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
fetch_list = [avg_loss.name] fetch_list = [avg_loss.name]
acc_name_list = [v.name for v in train_args[2]] acc_name_list = [v.name for v in train_args[2]]
fetch_list.extend(acc_name_list) fetch_list.extend(acc_name_list)
try: try:
fetch_ret = exe.run(fetch_list) if batch_id % 30 == 0:
fetch_ret = exe.run(fetch_list)
else:
fetch_ret = exe.run([])
except fluid.core.EOFException as eof: except fluid.core.EOFException as eof:
break break
except fluid.core.EnforceNotMet as ex: except fluid.core.EnforceNotMet as ex:
...@@ -269,20 +310,17 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog, ...@@ -269,20 +310,17 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog,
break break
num_samples += args.batch_size * args.gpus num_samples += args.batch_size * args.gpus
iters += 1 if batch_id % 30 == 0:
if batch_id % 1 == 0:
fetched_data = [np.mean(np.array(d)) for d in fetch_ret] fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, accucacys: %s" % print("Pass %d, batch %d, loss %s, accucacys: %s" %
(pass_id, batch_id, fetched_data[0], fetched_data[1:])) (pass_id, batch_id, fetched_data[0], fetched_data[1:]))
batch_id += 1 batch_id += 1
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
pyreader.reset() # reset reader handle pyreader.reset()
if not args.no_test and test_args[2]: if not args.no_test and test_args[2]:
test_feeder = None test_ret = test_parallel(test_exe, test_args, args, test_prog)
test_ret = test_parallel(test_exe, test_args, args, test_prog,
test_feeder)
print("Pass: %d, Test Accuracy: %s\n" % print("Pass: %d, Test Accuracy: %s\n" %
(pass_id, [np.mean(np.array(v)) for v in test_ret])) (pass_id, [np.mean(np.array(v)) for v in test_ret]))
...@@ -316,8 +354,6 @@ def main(): ...@@ -316,8 +354,6 @@ def main():
args = parse_args() args = parse_args()
print_arguments(args) print_arguments(args)
print_paddle_envs() print_paddle_envs()
if args.no_random:
fluid.default_startup_program().random_seed = 1
# the unique trainer id, starting from 0, needed by trainer # the unique trainer id, starting from 0, needed by trainer
# only # only
......
...@@ -3,6 +3,8 @@ from .mobilenet import MobileNet ...@@ -3,6 +3,8 @@ from .mobilenet import MobileNet
from .googlenet import GoogleNet from .googlenet import GoogleNet
from .vgg import VGG11, VGG13, VGG16, VGG19 from .vgg import VGG11, VGG13, VGG16, VGG19
from .resnet import ResNet50, ResNet101, ResNet152 from .resnet import ResNet50, ResNet101, ResNet152
from .resnet_dist import DistResNet
from .inception_v4 import InceptionV4 from .inception_v4 import InceptionV4
from .se_resnext import SE_ResNeXt50_32x4d, SE_ResNeXt101_32x4d, SE_ResNeXt152_32x4d from .se_resnext import SE_ResNeXt50_32x4d, SE_ResNeXt101_32x4d, SE_ResNeXt152_32x4d
from .dpn import DPN68, DPN92, DPN98, DPN107, DPN131 from .dpn import DPN68, DPN92, DPN98, DPN107, DPN131
import learning_rate
...@@ -20,3 +20,31 @@ def cosine_decay(learning_rate, step_each_epoch, epochs=120): ...@@ -20,3 +20,31 @@ def cosine_decay(learning_rate, step_each_epoch, epochs=120):
decayed_lr = learning_rate * \ decayed_lr = learning_rate * \
(ops.cos(epoch * (math.pi / epochs)) + 1)/2 (ops.cos(epoch * (math.pi / epochs)) + 1)/2
return decayed_lr return decayed_lr
def lr_warmup(learning_rate, warmup_steps, start_lr, end_lr):
""" Applies linear learning rate warmup for distributed training
Argument learning_rate can be float or a Variable
lr = lr + (warmup_rate * step / warmup_steps)
"""
assert(isinstance(end_lr, float))
assert(isinstance(start_lr, float))
linear_step = end_lr - start_lr
with fluid.default_main_program()._lr_schedule_guard():
lr = fluid.layers.tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="learning_rate_warmup")
global_step = fluid.layers.learning_rate_scheduler._decay_step_counter()
with fluid.layers.control_flow.Switch() as switch:
with switch.case(global_step < warmup_steps):
decayed_lr = start_lr + linear_step * (global_step / warmup_steps)
fluid.layers.tensor.assign(decayed_lr, lr)
with switch.default():
fluid.layers.tensor.assign(learning_rate, lr)
return lr
\ No newline at end of file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import paddle
import paddle.fluid as fluid
import math
__all__ = ["DistResNet"]
train_parameters = {
"input_size": [3, 224, 224],
"input_mean": [0.485, 0.456, 0.406],
"input_std": [0.229, 0.224, 0.225],
"learning_strategy": {
"name": "piecewise_decay",
"batch_size": 256,
"epochs": [30, 60, 90],
"steps": [0.1, 0.01, 0.001, 0.0001]
}
}
class DistResNet():
def __init__(self, layers=50, is_train=True):
self.params = train_parameters
self.layers = layers
self.is_train = is_train
self.weight_decay = 1e-4
def net(self, input, class_dim=1000):
layers = self.layers
supported_layers = [50, 101, 152]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(supported_layers, layers)
if layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
num_filters = [64, 128, 256, 512]
conv = self.conv_bn_layer(
input=input, num_filters=64, filter_size=7, stride=2, act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
for block in range(len(depth)):
for i in range(depth[block]):
conv = self.bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1)
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(input=pool,
size=class_dim,
act='softmax',
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv,
stdv),
regularizer=fluid.regularizer.L2Decay(self.weight_decay)),
bias_attr=fluid.ParamAttr(
regularizer=fluid.regularizer.L2Decay(self.weight_decay))
)
return out
def conv_bn_layer(self,
input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
bn_init_value=1.0):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
bias_attr=False,
param_attr=fluid.ParamAttr(regularizer=fluid.regularizer.L2Decay(self.weight_decay)))
return fluid.layers.batch_norm(
input=conv, act=act, is_test=not self.is_train,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(bn_init_value),
regularizer=None))
def shortcut(self, input, ch_out, stride):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1:
return self.conv_bn_layer(input, ch_out, 1, stride)
else:
return input
def bottleneck_block(self, input, num_filters, stride):
conv0 = self.conv_bn_layer(
input=input, num_filters=num_filters, filter_size=1, act='relu')
conv1 = self.conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
act='relu')
# NOTE: default bias is 0.0 already
conv2 = self.conv_bn_layer(
input=conv1, num_filters=num_filters * 4, filter_size=1, act=None, bn_init_value=0.0)
short = self.shortcut(input, num_filters * 4, stride)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册