未验证 提交 f0e04e1f 编写于 作者: L lilong12 提交者: GitHub

fix the bug in pipeline data parallelism (#29731) (#29918)

* update, test=develop
上级 640f8cf0
......@@ -41,35 +41,36 @@ class PipelineHelper(object):
inner_parallelism=None):
self.startup_program = startup_program
endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[self.role_maker._worker_index()]
node_num = _get_node_num(endpoints)
assert len(endpoints) % node_num == 0
nranks = self.role_maker._worker_num()
rank = self.role_maker._worker_index()
endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[rank]
node_num = _get_node_num(endpoints)
assert nranks % node_num == 0
# Create ring 0 for all gpus in a pipeline
pipeline_endpoints = []
# Create ring 0 for all gpus in the same pipeline
if inner_parallelism > 1:
pipeline_rank = rank % inner_parallelism
pipeline_id = rank // inner_parallelism
for idx, ep in enumerate(endpoints):
if idx // inner_parallelism == pipeline_id:
pipeline_endpoints.append(ep)
start_index = pipeline_id * inner_parallelism
pipeline_endpoints = endpoints[start_index:start_index +
inner_parallelism]
self._init_communicator(self.startup_program, current_endpoint,
pipeline_endpoints, pipeline_rank, 0,
self.wait_port)
pipeline_num = len(endpoints) // inner_parallelism
if pipeline_num == 1: return
# Create rings for gpus with the same gpu id
# Create rings for gpus with the same pipeline id for data parallel
eps = []
local_rank = self.role_maker._worker_index() % inner_parallelism
ring_id = local_rank + 1
pipeline_rank = rank % inner_parallelism
ring_id = pipeline_rank + 1
for i in range(pipeline_num):
eps.append(endpoints[i * inner_parallelism + local_rank])
temp_rank = self.role_maker._worker_index() // inner_parallelism
eps.append(endpoints[i * inner_parallelism + pipeline_rank])
# rank in a ring of gpus with the same pipeline id for data parallel
dp_rank = rank // inner_parallelism
self._init_communicator(self.startup_program, current_endpoint, eps,
temp_rank, ring_id, self.wait_port)
dp_rank, ring_id, self.wait_port)
self._broadcast_params(ring_id)
def _init_communicator(self, program, current_endpoint, endpoints, rank,
......@@ -108,8 +109,10 @@ class PipelineHelper(object):
def _broadcast_params(self, ring_id):
block = self.startup_program.global_block()
for param in block.iter_parameters():
if param.is_distributed:
for var_name in block.vars:
if "nccl_id" in var_name: continue
param = block.var(var_name)
if not param.persistable:
continue
block.append_op(
......@@ -136,7 +139,7 @@ class PipelineOptimizer(MetaOptimizerBase):
self.inner_opt = optimizer
# we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = []
self.meta_optimizers_black_list = []
self.meta_optimizers_black_list = ["GraphExecutionOptimizer", ]
def _set_basic_info(self, loss, role_maker, user_defined_optimizer,
user_defined_strategy):
......@@ -161,14 +164,6 @@ class PipelineOptimizer(MetaOptimizerBase):
dist_strategy.pipeline = True
dist_strategy.pipeline_configs = {"micro_batch": 1, }
def _get_local_rank(self, current_endpoint, endpoints):
cur_node_endpoints = []
cur_ip = current_endpoint.split(':')[0].strip()
for ep in endpoints:
if cur_ip == ep.split(':')[0].strip():
cur_node_endpoints.append(ep)
return cur_node_endpoints.index(current_endpoint)
def minimize_impl(self,
loss,
startup_program=None,
......@@ -176,56 +171,51 @@ class PipelineOptimizer(MetaOptimizerBase):
no_grad_set=None):
endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[self.role_maker._worker_index()]
self.local_rank = self._get_local_rank(current_endpoint, endpoints)
self.wrapped_opt = PO(self.inner_opt,
num_microbatches=self.num_microbatches,
start_cpu_core_id=self.local_rank)
num_microbatches=self.num_microbatches)
node_num = _get_node_num(endpoints)
gpus_per_node = len(endpoints) // node_num
self.startup_program = startup_program
self.local_rank = self._get_local_rank(current_endpoint, endpoints)
if startup_program is None:
self.startup_program = fluid.default_startup_program()
loss.block.program._pipeline_opt = dict()
loss.block.program._pipeline_opt['local_rank'] = self.local_rank
optimize_ops, params_grads, prog_list = \
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
self.rank = self.role_maker._worker_index()
self.nranks = self.role_maker._worker_num()
assert self.nranks % node_num == 0
loss.block.program._pipeline_opt = dict()
loss.block.program._pipeline_opt['local_rank'] = self.rank
optimize_ops, params_grads, prog_list = self.wrapped_opt.minimize(
loss, startup_program, parameter_list, no_grad_set)
assert prog_list
self.main_program_list = prog_list
self.main_program = loss.block.program
self.inner_parallelism = loss.block.program._pipeline_opt[
'inner_parallelism']
nranks = len(endpoints)
self.nranks = nranks
self.nrings = len(self.main_program_list)
self.rank = self.role_maker._worker_index()
self.endpoints = endpoints
self.current_endpoint = current_endpoint
assert self.nranks % self.inner_parallelism == 0
pipeline_helper = PipelineHelper(self.role_maker)
pipeline_helper.update_startup_program(
self.startup_program._pipeline_opt["startup_program"],
self.inner_parallelism)
self._transpile_main_program(loss, node_num, gpus_per_node)
pipeline_num = self.nranks // self.inner_parallelism
self._transpile_main_program(loss, pipeline_num, self.inner_parallelism)
return optimize_ops, params_grads
def _transpile_main_program(self, loss, node_num, gpus_per_node):
self._insert_loss_grad_ops(loss, gpus_per_node, node_num)
for ring_id in range(1, gpus_per_node + 1):
def _transpile_main_program(self, loss, pipeline_num, inner_parallelism):
if pipeline_num <= 1: return
self._insert_loss_grad_ops(loss, pipeline_num)
for ring_id in range(1, inner_parallelism + 1):
self._insert_allreduce_ops(ring_id)
def _insert_loss_grad_ops(self, loss, gpus_per_node, node_num):
def _insert_loss_grad_ops(self, loss, pipeline_num):
"""
In order to keep the learning rate consistent in different numbers of
training workers, we scale the loss grad by the number of workers
"""
block = self.main_program_list[gpus_per_node - 1][
'program'].global_block()
block = self.main_program_list[-1]['program'].global_block()
for idx, op in reversed(list(enumerate(block.ops))):
if is_loss_grad_op(op):
loss_grad_var = block.vars[op.output_arg_names[0]]
......@@ -235,7 +225,7 @@ class PipelineOptimizer(MetaOptimizerBase):
inputs={'X': loss_grad_var},
outputs={'Out': loss_grad_var},
attrs={
'scale': 1.0 / node_num,
'scale': 1.0 / pipeline_num,
OP_ROLE_KEY: OpRole.Backward
})
......@@ -269,7 +259,7 @@ class PipelineOptimizer(MetaOptimizerBase):
block._insert_op(
offset,
type='c_sync_calc_stream',
type='c_allreduce_sum',
inputs={'X': grad},
outputs={'Out': grad},
attrs={
......@@ -283,7 +273,7 @@ class PipelineOptimizer(MetaOptimizerBase):
for idx, op in enumerate(block.ops):
if is_optimizer_op(op):
block._insert_op(
idx + ring_id,
idx,
type='c_sync_comm_stream',
inputs={'X': grad},
outputs={'Out': grad},
......
......@@ -16,6 +16,7 @@ from __future__ import print_function
import numpy as np
import six
import os
import logging
from collections import defaultdict
......@@ -39,6 +40,7 @@ from .dygraph.learning_rate_scheduler import LearningRateDecay, _LearningRateEpo
from paddle.fluid import core
from paddle.fluid.layers import tensor
from functools import reduce
from functools import cmp_to_key
from .wrapped_decorator import signature_safe_contextmanager
from .. import compat as cpt
......@@ -3773,7 +3775,7 @@ class PipelineOptimizer(object):
self._op_device_key = op_maker.kOpDeviceAttrName()
self._param_device_map = None
def _create_vars(self, block, main_program):
def _create_vars(self, block, ori_block):
# Create vars for block, copied from main_program's global block
used_var_set = set()
for op_idx in range(block.desc.op_size()):
......@@ -3785,7 +3787,8 @@ class PipelineOptimizer(object):
if var in used_var_set or "_blocking_queue" in var:
continue
used_var_set.add(var)
source_var = main_program.block(0).var(str(var))
if block._find_var_recursive(str(var)): continue
source_var = ori_block._var_recursive(str(var))
if source_var.type == core.VarDesc.VarType.READER:
block.create_var(
name=var,
......@@ -3840,45 +3843,65 @@ class PipelineOptimizer(object):
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
elif op.type == "create_py_reader" or op.type == "read":
# ap_op._set_attr(self._op_device_key, "")
elif op.type == "create_py_reader" or op.type == "read" or op.type == "create_double_buffer_reader":
# Copy read related ops to all section to make them exit after each epoch.
for device in device_program_map.keys():
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
else:
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
for key in sorted(device_program_map.keys()):
for key in devices:
program = device_program_map[key]
program['program']._sync_with_cpp()
programs.append(program)
return programs
def _get_op_device_for_startup_program(self, var_name):
"""
For adam optimizer, it will add accumulators and initialize them
with fill_constant, and force the op device to cpu. Hence, we should
get the real op_device attribute of the fill_constant as the device
where the corresponding parameters on.
"""
assert "beta1_pow_acc" in var_name or "beta2_pow_acc" in var_name
param_name = var_name[0:var_name.index('_beta')]
device = self._param_device_map[param_name]
return device
def _split_startup_program(self, startup_program, local_rank):
block = startup_program.block(0)
new_startup_program = Program()
for op in block.ops:
device = op.attr(self._op_device_key)
if device == "cpu":
assert op.type == "fill_constant", (
"For ops in startup "
"program that with the op_device attribute of cpu, "
"they must be fill_constant.")
output_var = op.output_arg_names[0]
device = self._get_op_device_for_startup_program(output_var)
if device:
device_index = int(device.split(":")[1])
device_index = int(device.split(':')[1])
else:
device_index = None
if device_index is not None and device_index != local_rank: continue
# LR related ops
device = None
if device and device_index != local_rank: continue
op_desc = op.desc
ap_op = new_startup_program.block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
new_startup_program._sync_with_cpp()
self._create_vars(new_startup_program.block(0), startup_program)
self._create_vars(
new_startup_program.block(0), startup_program.global_block())
return new_startup_program
def _find_post_op(self, ops, cur_op, var_name):
......@@ -4093,6 +4116,8 @@ class PipelineOptimizer(object):
first_device = op.attr(self._op_device_key)
break
assert first_device
first_device_type = first_device.split(":")[0]
assert first_device_type == "gpu"
# set op_device attr for lr-related ops
lrsched_role = int(self._op_role.LRSched)
......@@ -4136,10 +4161,11 @@ class PipelineOptimizer(object):
dev_spec = op.attr(self._op_device_key)
assert dev_spec, ("op_device attribute for op "
"{} has not been set.".format(op.type))
dev_type = dev_spec.split(':')[0]
assert dev_type == "gpu", ("Now only gpu devices are supported "
"for pipeline parallelism.")
if not dev_spec in device_specs:
device_specs.append(dev_spec)
sorted_device_specs = sorted(device_specs)
assert sorted_device_specs == device_specs
return device_specs
def _insert_sendrecv_ops_for_boundaries(self, block):
......@@ -4216,6 +4242,7 @@ class PipelineOptimizer(object):
device = self._param_device_map[param_name]
if device != dev_spec: continue
grad_name = self._append_grad_suffix(param_name)
if not main_block.has_var(grad_name): continue
grad_var = main_block.vars[grad_name]
main_block._insert_op(
index=0,
......@@ -4297,6 +4324,7 @@ class PipelineOptimizer(object):
ap_op = new_sub_block.desc.append_op()
ap_op.copy_from(op_desc)
new_sub_block._sync_with_cpp()
self._create_vars(new_sub_block, origin_sub_block)
op._set_attr('sub_block:', new_sub_block)
def _get_device_info(self, block):
......@@ -4318,6 +4346,7 @@ class PipelineOptimizer(object):
prog = prog_info['program']
block = prog.block(0)
for var_name in block.vars:
if var_name == "double_buffer_0": continue
var = block.var(var_name)
if not var.persistable: continue
if not var_name in var_info:
......@@ -4413,30 +4442,33 @@ class PipelineOptimizer(object):
self._add_default_opdevice_attr(main_block)
device_specs = self._check_validation(main_block)
assert len(device_specs) > 1
def device_cmp(device1, device2):
dev1_id = int(device1.split(':')[1])
dev2_id = int(device2.split(':')[1])
if dev1_id < dev2_id:
return -1
elif dev1_id > dev2_id:
return 1
else:
return 0
sorted_device_spec = sorted(device_specs, key=cmp_to_key(device_cmp))
assert sorted_device_spec == device_specs, (
"With pipeline "
"parallelism, you must use gpu devices one after another "
"in the order of their ids.")
# Step3: add send and recv ops between section boundaries
self._insert_sendrecv_ops_for_boundaries(main_block)
place_list = []
place_id_list = []
for dev_spec in device_specs:
if dev_spec == "cpu":
place_list.append(core.CPUPlace())
place_id_list.append(-1)
elif "gpu" in dev_spec and ":" in dev_spec:
dev_index = dev_spec.split(":")[1]
place_list.append(core.CUDAPlace(int(dev_index)))
place_id_list.append(int(dev_index))
else:
raise ValueError("Unknown device type: %s", dev_spec)
# Step4: split program into sections and add pairs of
# send and recv ops for data var.
main_program = main_block.program
program_list = self._split_program(main_program, device_specs)
for p in program_list:
self._create_vars(p["program"].block(0), main_program)
self._create_vars(p["program"].block(0),
main_program.global_block())
self._insert_sendrecv_for_data_var(main_block, program_list,
startup_program, device_specs)
......@@ -4452,7 +4484,13 @@ class PipelineOptimizer(object):
isinstance(main_program._pipeline_opt, dict) and
'local_rank' in main_program._pipeline_opt), \
"You must use pipeline with fleet"
local_rank = main_program._pipeline_opt['local_rank']
local_rank = main_program._pipeline_opt['local_rank'] % len(
device_specs)
place_list = []
for dev_spec in device_specs:
dev_index = dev_spec.split(":")[1]
place_list.append(core.CUDAPlace(local_rank))
# Step7: Split startup program
new_startup_program = self._split_startup_program(startup_program,
......@@ -4466,21 +4504,18 @@ class PipelineOptimizer(object):
self._accumulate_gradients(program_list[local_rank]['program']
.global_block())
with open("startup_prog_%d" % local_rank, 'w') as f:
f.writelines(str(new_startup_program))
with open("main_prog_%d" % local_rank, 'w') as f:
f.writelines(str(program_list[local_rank]['program']))
startup_program._pipeline_opt = {
"startup_program": new_startup_program,
}
place_id = int(os.getenv("FLAGS_selected_gpus", "0"))
main_program._pipeline_opt = {
"trainer": "PipelineTrainer",
"device_worker": "Section",
"inner_parallelism": len(device_specs),
"section_program": program_list[local_rank],
"place": place_list[local_rank],
"place_id": place_id_list[local_rank],
"place_id": place_id,
"sync_steps": -1,
"num_microbatches": self._num_microbatches,
"start_cpu_core_id": self._start_cpu_core_id,
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import time
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
from paddle.fluid import core
import unittest
from multiprocessing import Process
import os
import signal
from functools import reduce
from test_dist_base import TestDistRunnerBase, runtime_main
import paddle.distributed.fleet as fleet
paddle.enable_static()
DTYPE = "float32"
paddle.dataset.mnist.fetch()
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
def cnn_model(data):
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=data,
filter_size=5,
num_filters=20,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
SIZE = 10
input_shape = conv_pool_2.shape
param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE]
scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5
predict = fluid.layers.fc(
input=conv_pool_2,
size=SIZE,
act="softmax",
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01)))
return predict
class TestDistMnist2x2(TestDistRunnerBase):
def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None):
# Input data
device_id = 0
if dist_strategy:
fleet.init(is_collective=True)
with fluid.device_guard("gpu:0"):
images = fluid.layers.data(
name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if dist_strategy:
data_loader = fluid.io.DataLoader.from_generator(
feed_list=[images, label],
capacity=64,
use_double_buffer=False,
iterable=False)
# Train program
predict = cnn_model(images)
with fluid.device_guard("gpu:0"):
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
with fluid.device_guard("gpu:0"):
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
inference_program = fluid.default_main_program().clone()
base_lr = self.lr
passes = [30, 60, 80, 90]
steps_per_pass = 10
bd = [steps_per_pass * p for p in passes]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
opt = fluid.optimizer.Momentum(learning_rate=lr_val, momentum=0.9)
# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
if dist_strategy:
strategy = fleet.DistributedStrategy()
strategy.pipeline = True
dist_opt = fleet.distributed_optimizer(
optimizer=opt, strategy=strategy)
dist_opt.minimize(avg_cost)
else:
opt.minimize(avg_cost)
if dist_strategy:
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader
else:
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict
if __name__ == "__main__":
runtime_main(TestDistMnist2x2)
......@@ -50,7 +50,7 @@ class TestFleetMetaOptimizer(unittest.TestCase):
strategy.pipeline = True
strategy.pipeline_configs = {'micro_batch': 2}
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = paddle.fluid.optimizer.Adam(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
......
......@@ -40,6 +40,14 @@ class TestPipeline(TestDistBase):
check_error_log=True,
log_name=flag_name)
def test_dist_train_one_device(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"pipeline_mnist_one_device.py",
check_error_log=True,
log_name=flag_name)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册