未验证 提交 6bd90a39 编写于 作者: C chengduo 提交者: GitHub

Support multi-process for resnet and transformer. (#2360)

* support multi-process for resnet and transformer
上级 66e135cc
......@@ -17,6 +17,21 @@
```
env CUDA_VISIBLE_DEVICES=0 python mnist_dygraph.py
```
Paddle动态图支持多进程多卡进行模型训练,启动训练的方式:
```
python -m paddle.distributed.launch --selected_gpus=0,1,2,3 --log_dir ./mylog mnist_dygraph.py --use_data_parallel 1
```
此时,程序会将每个进程的输出log导入到`./mylog`路径下:
```
.
├── mylog
│   ├── workerlog.0
│   ├── workerlog.1
│   ├── workerlog.2
│   └── workerlog.3
├── README.md
└── train.py
```
## 输出
执行训练开始后,将得到类似如下的输出。
......@@ -58,5 +73,3 @@ with fluid.dygraph.guard():
```text
Inference result of image/infer_3.png is: 3
```
......@@ -174,6 +174,7 @@ def train_mnist(args):
epoch_num = 5
BATCH_SIZE = 64
trainer_count = fluid.dygraph.parallel.Env().nranks
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if args.use_data_parallel else fluid.CUDAPlace(0)
with fluid.dygraph.guard(place):
......@@ -186,7 +187,8 @@ def train_mnist(args):
if args.use_data_parallel:
train_reader = fluid.contrib.reader.distributed_sampler(
paddle.dataset.mnist.train(), batch_size=BATCH_SIZE)
paddle.dataset.mnist.train(),
batch_size=BATCH_SIZE * trainer_count)
else:
train_reader = paddle.batch(
paddle.dataset.mnist.train(),
......
......@@ -26,6 +26,22 @@ env CUDA_VISIBLE_DEVICES=0 python train.py
这里`CUDA_VISIBLE_DEVICES=0`表示是执行在0号设备卡上,请根据自身情况修改这个参数。
Paddle动态图支持多进程多卡进行模型训练,启动训练的方式:
```
python -m paddle.distributed.launch --selected_gpus=0,1,2,3 --log_dir ./mylog train.py --use_data_parallel 1
```
此时,程序会将每个进程的输出log导入到`./mylog`路径下:
```
.
├── mylog
│   ├── workerlog.0
│   ├── workerlog.1
│   ├── workerlog.2
│   └── workerlog.3
├── README.md
└── train.py
```
## 输出
执行训练开始后,将得到类似如下的输出。每一轮`batch`训练将会打印当前epoch、step以及loss值。当前默认执行`epoch=10`, `batch_size=8`。您可以调整参数以得到更好的训练效果,同时也意味着消耗更多的内存(显存)以及需要花费更长的时间。
```text
......
......@@ -13,7 +13,8 @@
# limitations under the License.
import numpy as np
import argparse
import ast
import paddle
import paddle.fluid as fluid
from paddle.fluid.layer_helper import LayerHelper
......@@ -33,6 +34,20 @@ momentum_rate = 0.9
l2_decay = 1e-4
def parse_args():
parser = argparse.ArgumentParser("Training for Mnist.")
parser.add_argument(
"--use_data_parallel",
type=ast.literal_eval,
default=False,
help="The flag indicating whether to shuffle instances in each pass.")
args = parser.parse_args()
return args
args = parse_args()
def optimizer_setting():
total_images = IMAGENET1000
......@@ -255,11 +270,27 @@ def eval(model, data):
def train_resnet():
with fluid.dygraph.guard():
trainer_count = fluid.dygraph.parallel.Env().nranks
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if args.use_data_parallel else fluid.CUDAPlace(0)
with fluid.dygraph.guard(place):
if args.use_data_parallel:
strategy = fluid.dygraph.parallel.prepare_context()
resnet = ResNet("resnet")
optimizer = optimizer_setting()
train_reader = paddle.batch(
paddle.dataset.flowers.train(use_xmap=False), batch_size=batch_size)
if args.use_data_parallel:
resnet = fluid.dygraph.parallel.DataParallel(resnet, strategy)
if args.use_data_parallel:
train_reader = fluid.contrib.reader.distributed_sampler(
paddle.dataset.flowers.train(use_xmap=False),
batch_size=batch_size * trainer_count)
else:
train_reader = paddle.batch(
paddle.dataset.flowers.train(use_xmap=False),
batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
......@@ -288,7 +319,7 @@ def train_resnet():
for x in data]).astype('int64')) != batch_size:
continue
y_data = np.array([x[1] for x in data]).astype('int64').reshape(
batch_size, 1)
-1, 1)
img = to_variable(dy_x_data)
label = to_variable(y_data)
......@@ -302,7 +333,13 @@ def train_resnet():
acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5)
dy_out = avg_loss.numpy()
avg_loss.backward()
if args.use_data_parallel:
avg_loss = resnet.scale_loss(avg_loss)
avg_loss.backward()
resnet.apply_collective_grads()
else:
avg_loss.backward()
optimizer.minimize(avg_loss)
resnet.clear_gradients()
......@@ -328,4 +365,5 @@ def train_resnet():
if __name__ == '__main__':
train_resnet()
......@@ -21,7 +21,28 @@
3. 环境依赖
### 执行训练:
利用python解释器执行train.py即可
如果是使用GPU单卡训练,启动训练的方式:
```
env CUDA_VISIBLE_DEVICES=0 python train.py
```
这里`CUDA_VISIBLE_DEVICES=0`表示是执行在0号设备卡上,请根据自身情况修改这个参数。
Paddle动态图支持多进程多卡进行模型训练,启动训练的方式:
```
python -m paddle.distributed.launch --selected_gpus=0,1,2,3 --log_dir ./mylog train.py --use_data_parallel 1
```
此时,程序会将每个进程的输出log导入到`./mylog`路径下:
```
.
├── mylog
│   ├── workerlog.0
│   ├── workerlog.1
│   ├── workerlog.2
│   └── workerlog.3
├── README.md
└── train.py
```
### 执行效果
......
from __future__ import print_function
import argparse
import ast
import paddle.fluid as fluid
from paddle.fluid.dygraph import Embedding, LayerNorm, FC, to_variable, Layer, guard
import numpy as np
......@@ -7,6 +8,20 @@ import paddle
import paddle.dataset.wmt16 as wmt16
def parse_args():
parser = argparse.ArgumentParser("Training for Mnist.")
parser.add_argument(
"--use_data_parallel",
type=ast.literal_eval,
default=False,
help="The flag indicating whether to shuffle instances in each pass.")
args = parser.parse_args()
return args
args = parse_args()
# Copy from models
class TrainTaskConfig(object):
"""
......@@ -1080,7 +1095,13 @@ def train():
:return:
"""
with guard():
trainer_count = fluid.dygraph.parallel.Env().nranks
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if args.use_data_parallel else fluid.CUDAPlace(0)
with fluid.dygraph.guard(place):
if args.use_data_parallel:
strategy = fluid.dygraph.parallel.prepare_context()
transformer = TransFormer(
'transformer', ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size, ModelHyperParams.max_length + 1,
......@@ -1094,10 +1115,21 @@ def train():
optimizer = fluid.optimizer.SGD(learning_rate=0.003)
reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=TrainTaskConfig.batch_size)
if args.use_data_parallel:
transformer = fluid.dygraph.parallel.DataParallel(transformer,
strategy)
if args.use_data_parallel:
reader = fluid.contrib.reader.distributed_sampler(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=TrainTaskConfig.batch_size * trainer_count)
else:
reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=TrainTaskConfig.batch_size)
for i in range(200):
dy_step = 0
for batch in reader():
......@@ -1108,7 +1140,14 @@ def train():
enc_inputs, dec_inputs, label, weights = create_data(np_values)
dy_sum_cost, dy_avg_cost, dy_predict, dy_token_num = transformer(
enc_inputs, dec_inputs, label, weights)
dy_avg_cost.backward()
if args.use_data_parallel:
dy_avg_cost = transformer.scale_loss(dy_avg_cost)
dy_avg_cost.backward()
transformer.apply_collective_grads()
else:
dy_avg_cost.backward()
optimizer.minimize(dy_avg_cost)
transformer.clear_gradients()
dy_step = dy_step + 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册