From 01950ceb4287b2fe0442b4620cb913f124a4ead9 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Fri, 25 Dec 2020 19:40:02 +0800 Subject: [PATCH] fix the bug in pipeline data parallelism (#29731) * update, test=develop --- .../meta_optimizers/pipeline_optimizer.py | 106 ++++++-------- python/paddle/fluid/optimizer.py | 105 ++++++++----- .../unittests/pipeline_mnist_one_device.py | 138 ++++++++++++++++++ .../test_fleet_pipeline_meta_optimizer.py | 2 +- .../fluid/tests/unittests/test_pipeline.py | 8 + 5 files changed, 265 insertions(+), 94 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/pipeline_mnist_one_device.py diff --git a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py index f3bdb305f4c..67a3312552c 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py @@ -41,35 +41,36 @@ class PipelineHelper(object): inner_parallelism=None): self.startup_program = startup_program - endpoints = self.role_maker._get_trainer_endpoints() - current_endpoint = endpoints[self.role_maker._worker_index()] - node_num = _get_node_num(endpoints) - assert len(endpoints) % node_num == 0 nranks = self.role_maker._worker_num() rank = self.role_maker._worker_index() - - # Create ring 0 for all gpus in a pipeline - pipeline_endpoints = [] - pipeline_rank = rank % inner_parallelism - pipeline_id = rank // inner_parallelism - for idx, ep in enumerate(endpoints): - if idx // inner_parallelism == pipeline_id: - pipeline_endpoints.append(ep) - self._init_communicator(self.startup_program, current_endpoint, - pipeline_endpoints, pipeline_rank, 0, - self.wait_port) + endpoints = self.role_maker._get_trainer_endpoints() + current_endpoint = endpoints[rank] + node_num = _get_node_num(endpoints) + assert nranks % node_num == 0 + + # Create ring 0 for all gpus in the same pipeline + if inner_parallelism > 1: + pipeline_rank = rank % inner_parallelism + pipeline_id = rank // inner_parallelism + start_index = pipeline_id * inner_parallelism + pipeline_endpoints = endpoints[start_index:start_index + + inner_parallelism] + self._init_communicator(self.startup_program, current_endpoint, + pipeline_endpoints, pipeline_rank, 0, + self.wait_port) pipeline_num = len(endpoints) // inner_parallelism if pipeline_num == 1: return - # Create rings for gpus with the same gpu id + # Create rings for gpus with the same pipeline id for data parallel eps = [] - local_rank = self.role_maker._worker_index() % inner_parallelism - ring_id = local_rank + 1 + pipeline_rank = rank % inner_parallelism + ring_id = pipeline_rank + 1 for i in range(pipeline_num): - eps.append(endpoints[i * inner_parallelism + local_rank]) - temp_rank = self.role_maker._worker_index() // inner_parallelism + eps.append(endpoints[i * inner_parallelism + pipeline_rank]) + # rank in a ring of gpus with the same pipeline id for data parallel + dp_rank = rank // inner_parallelism self._init_communicator(self.startup_program, current_endpoint, eps, - temp_rank, ring_id, self.wait_port) + dp_rank, ring_id, self.wait_port) self._broadcast_params(ring_id) def _init_communicator(self, program, current_endpoint, endpoints, rank, @@ -108,8 +109,10 @@ class PipelineHelper(object): def _broadcast_params(self, ring_id): block = self.startup_program.global_block() - for param in block.iter_parameters(): - if param.is_distributed: + for var_name in block.vars: + if "nccl_id" in var_name: continue + param = block.var(var_name) + if not param.persistable: continue block.append_op( @@ -136,7 +139,7 @@ class PipelineOptimizer(MetaOptimizerBase): self.inner_opt = optimizer # we do not allow meta optimizer to be inner optimizer currently self.meta_optimizers_white_list = [] - self.meta_optimizers_black_list = [] + self.meta_optimizers_black_list = ["GraphExecutionOptimizer", ] def _set_basic_info(self, loss, role_maker, user_defined_optimizer, user_defined_strategy): @@ -161,14 +164,6 @@ class PipelineOptimizer(MetaOptimizerBase): dist_strategy.pipeline = True dist_strategy.pipeline_configs = {"micro_batch": 1, } - def _get_local_rank(self, current_endpoint, endpoints): - cur_node_endpoints = [] - cur_ip = current_endpoint.split(':')[0].strip() - for ep in endpoints: - if cur_ip == ep.split(':')[0].strip(): - cur_node_endpoints.append(ep) - return cur_node_endpoints.index(current_endpoint) - def minimize_impl(self, loss, startup_program=None, @@ -176,56 +171,51 @@ class PipelineOptimizer(MetaOptimizerBase): no_grad_set=None): endpoints = self.role_maker._get_trainer_endpoints() current_endpoint = endpoints[self.role_maker._worker_index()] - self.local_rank = self._get_local_rank(current_endpoint, endpoints) self.wrapped_opt = PO(self.inner_opt, - num_microbatches=self.num_microbatches, - start_cpu_core_id=self.local_rank) + num_microbatches=self.num_microbatches) node_num = _get_node_num(endpoints) gpus_per_node = len(endpoints) // node_num self.startup_program = startup_program - self.local_rank = self._get_local_rank(current_endpoint, endpoints) if startup_program is None: self.startup_program = fluid.default_startup_program() - loss.block.program._pipeline_opt = dict() - loss.block.program._pipeline_opt['local_rank'] = self.local_rank - optimize_ops, params_grads, prog_list = \ - self.wrapped_opt.minimize(loss, startup_program, - parameter_list, no_grad_set) + self.rank = self.role_maker._worker_index() + self.nranks = self.role_maker._worker_num() + assert self.nranks % node_num == 0 + loss.block.program._pipeline_opt = dict() + loss.block.program._pipeline_opt['local_rank'] = self.rank + optimize_ops, params_grads, prog_list = self.wrapped_opt.minimize( + loss, startup_program, parameter_list, no_grad_set) assert prog_list + self.main_program_list = prog_list self.main_program = loss.block.program self.inner_parallelism = loss.block.program._pipeline_opt[ 'inner_parallelism'] - nranks = len(endpoints) - self.nranks = nranks - self.nrings = len(self.main_program_list) - - self.rank = self.role_maker._worker_index() - self.endpoints = endpoints - self.current_endpoint = current_endpoint + assert self.nranks % self.inner_parallelism == 0 pipeline_helper = PipelineHelper(self.role_maker) pipeline_helper.update_startup_program( self.startup_program._pipeline_opt["startup_program"], self.inner_parallelism) - self._transpile_main_program(loss, node_num, gpus_per_node) + pipeline_num = self.nranks // self.inner_parallelism + self._transpile_main_program(loss, pipeline_num, self.inner_parallelism) return optimize_ops, params_grads - def _transpile_main_program(self, loss, node_num, gpus_per_node): - self._insert_loss_grad_ops(loss, gpus_per_node, node_num) - for ring_id in range(1, gpus_per_node + 1): + def _transpile_main_program(self, loss, pipeline_num, inner_parallelism): + if pipeline_num <= 1: return + self._insert_loss_grad_ops(loss, pipeline_num) + for ring_id in range(1, inner_parallelism + 1): self._insert_allreduce_ops(ring_id) - def _insert_loss_grad_ops(self, loss, gpus_per_node, node_num): + def _insert_loss_grad_ops(self, loss, pipeline_num): """ In order to keep the learning rate consistent in different numbers of training workers, we scale the loss grad by the number of workers """ - block = self.main_program_list[gpus_per_node - 1][ - 'program'].global_block() + block = self.main_program_list[-1]['program'].global_block() for idx, op in reversed(list(enumerate(block.ops))): if is_loss_grad_op(op): loss_grad_var = block.vars[op.output_arg_names[0]] @@ -235,7 +225,7 @@ class PipelineOptimizer(MetaOptimizerBase): inputs={'X': loss_grad_var}, outputs={'Out': loss_grad_var}, attrs={ - 'scale': 1.0 / node_num, + 'scale': 1.0 / pipeline_num, OP_ROLE_KEY: OpRole.Backward }) @@ -269,7 +259,7 @@ class PipelineOptimizer(MetaOptimizerBase): block._insert_op( offset, - type='c_sync_calc_stream', + type='c_allreduce_sum', inputs={'X': grad}, outputs={'Out': grad}, attrs={ @@ -283,7 +273,7 @@ class PipelineOptimizer(MetaOptimizerBase): for idx, op in enumerate(block.ops): if is_optimizer_op(op): block._insert_op( - idx + ring_id, + idx, type='c_sync_comm_stream', inputs={'X': grad}, outputs={'Out': grad}, diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 684413435c9..97c50adf4a7 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -16,6 +16,7 @@ from __future__ import print_function import numpy as np import six +import os import logging from collections import defaultdict @@ -39,6 +40,7 @@ from .dygraph.learning_rate_scheduler import LearningRateDecay, _LearningRateEpo from paddle.fluid import core from paddle.fluid.layers import tensor from functools import reduce +from functools import cmp_to_key from .wrapped_decorator import signature_safe_contextmanager from .. import compat as cpt @@ -3773,7 +3775,7 @@ class PipelineOptimizer(object): self._op_device_key = op_maker.kOpDeviceAttrName() self._param_device_map = None - def _create_vars(self, block, main_program): + def _create_vars(self, block, ori_block): # Create vars for block, copied from main_program's global block used_var_set = set() for op_idx in range(block.desc.op_size()): @@ -3785,7 +3787,8 @@ class PipelineOptimizer(object): if var in used_var_set or "_blocking_queue" in var: continue used_var_set.add(var) - source_var = main_program.block(0).var(str(var)) + if block._find_var_recursive(str(var)): continue + source_var = ori_block._var_recursive(str(var)) if source_var.type == core.VarDesc.VarType.READER: block.create_var( name=var, @@ -3840,45 +3843,65 @@ class PipelineOptimizer(object): op_desc = op.desc ap_op = program["program"].block(0).desc.append_op() ap_op.copy_from(op_desc) - ap_op._set_attr(self._op_device_key, "") - elif op.type == "create_py_reader" or op.type == "read": + # ap_op._set_attr(self._op_device_key, "") + elif op.type == "create_py_reader" or op.type == "read" or op.type == "create_double_buffer_reader": # Copy read related ops to all section to make them exit after each epoch. for device in device_program_map.keys(): program = device_program_map[device] op_desc = op.desc ap_op = program["program"].block(0).desc.append_op() ap_op.copy_from(op_desc) - ap_op._set_attr(self._op_device_key, "") else: program = device_program_map[device] op_desc = op.desc ap_op = program["program"].block(0).desc.append_op() ap_op.copy_from(op_desc) - ap_op._set_attr(self._op_device_key, "") - for key in sorted(device_program_map.keys()): + for key in devices: program = device_program_map[key] program['program']._sync_with_cpp() programs.append(program) return programs + def _get_op_device_for_startup_program(self, var_name): + """ + For adam optimizer, it will add accumulators and initialize them + with fill_constant, and force the op device to cpu. Hence, we should + get the real op_device attribute of the fill_constant as the device + where the corresponding parameters on. + """ + assert "beta1_pow_acc" in var_name or "beta2_pow_acc" in var_name + param_name = var_name[0:var_name.index('_beta')] + device = self._param_device_map[param_name] + return device + def _split_startup_program(self, startup_program, local_rank): block = startup_program.block(0) new_startup_program = Program() for op in block.ops: device = op.attr(self._op_device_key) + if device == "cpu": + assert op.type == "fill_constant", ( + "For ops in startup " + "program that with the op_device attribute of cpu, " + "they must be fill_constant.") + output_var = op.output_arg_names[0] + device = self._get_op_device_for_startup_program(output_var) + if device: - device_index = int(device.split(":")[1]) + device_index = int(device.split(':')[1]) else: - device_index = None - if device_index is not None and device_index != local_rank: continue + # LR related ops + device = None + if device and device_index != local_rank: continue op_desc = op.desc ap_op = new_startup_program.block(0).desc.append_op() ap_op.copy_from(op_desc) ap_op._set_attr(self._op_device_key, "") new_startup_program._sync_with_cpp() - self._create_vars(new_startup_program.block(0), startup_program) + self._create_vars( + new_startup_program.block(0), startup_program.global_block()) return new_startup_program def _find_post_op(self, ops, cur_op, var_name): @@ -4093,6 +4116,8 @@ class PipelineOptimizer(object): first_device = op.attr(self._op_device_key) break assert first_device + first_device_type = first_device.split(":")[0] + assert first_device_type == "gpu" # set op_device attr for lr-related ops lrsched_role = int(self._op_role.LRSched) @@ -4136,10 +4161,11 @@ class PipelineOptimizer(object): dev_spec = op.attr(self._op_device_key) assert dev_spec, ("op_device attribute for op " "{} has not been set.".format(op.type)) + dev_type = dev_spec.split(':')[0] + assert dev_type == "gpu", ("Now only gpu devices are supported " + "for pipeline parallelism.") if not dev_spec in device_specs: device_specs.append(dev_spec) - sorted_device_specs = sorted(device_specs) - assert sorted_device_specs == device_specs return device_specs def _insert_sendrecv_ops_for_boundaries(self, block): @@ -4216,6 +4242,7 @@ class PipelineOptimizer(object): device = self._param_device_map[param_name] if device != dev_spec: continue grad_name = self._append_grad_suffix(param_name) + if not main_block.has_var(grad_name): continue grad_var = main_block.vars[grad_name] main_block._insert_op( index=0, @@ -4297,6 +4324,7 @@ class PipelineOptimizer(object): ap_op = new_sub_block.desc.append_op() ap_op.copy_from(op_desc) new_sub_block._sync_with_cpp() + self._create_vars(new_sub_block, origin_sub_block) op._set_attr('sub_block:', new_sub_block) def _get_device_info(self, block): @@ -4318,6 +4346,7 @@ class PipelineOptimizer(object): prog = prog_info['program'] block = prog.block(0) for var_name in block.vars: + if var_name == "double_buffer_0": continue var = block.var(var_name) if not var.persistable: continue if not var_name in var_info: @@ -4413,30 +4442,33 @@ class PipelineOptimizer(object): self._add_default_opdevice_attr(main_block) device_specs = self._check_validation(main_block) - assert len(device_specs) > 1 + + def device_cmp(device1, device2): + dev1_id = int(device1.split(':')[1]) + dev2_id = int(device2.split(':')[1]) + if dev1_id < dev2_id: + return -1 + elif dev1_id > dev2_id: + return 1 + else: + return 0 + + sorted_device_spec = sorted(device_specs, key=cmp_to_key(device_cmp)) + assert sorted_device_spec == device_specs, ( + "With pipeline " + "parallelism, you must use gpu devices one after another " + "in the order of their ids.") # Step3: add send and recv ops between section boundaries self._insert_sendrecv_ops_for_boundaries(main_block) - place_list = [] - place_id_list = [] - for dev_spec in device_specs: - if dev_spec == "cpu": - place_list.append(core.CPUPlace()) - place_id_list.append(-1) - elif "gpu" in dev_spec and ":" in dev_spec: - dev_index = dev_spec.split(":")[1] - place_list.append(core.CUDAPlace(int(dev_index))) - place_id_list.append(int(dev_index)) - else: - raise ValueError("Unknown device type: %s", dev_spec) - # Step4: split program into sections and add pairs of # send and recv ops for data var. main_program = main_block.program program_list = self._split_program(main_program, device_specs) for p in program_list: - self._create_vars(p["program"].block(0), main_program) + self._create_vars(p["program"].block(0), + main_program.global_block()) self._insert_sendrecv_for_data_var(main_block, program_list, startup_program, device_specs) @@ -4452,7 +4484,13 @@ class PipelineOptimizer(object): isinstance(main_program._pipeline_opt, dict) and 'local_rank' in main_program._pipeline_opt), \ "You must use pipeline with fleet" - local_rank = main_program._pipeline_opt['local_rank'] + local_rank = main_program._pipeline_opt['local_rank'] % len( + device_specs) + + place_list = [] + for dev_spec in device_specs: + dev_index = dev_spec.split(":")[1] + place_list.append(core.CUDAPlace(local_rank)) # Step7: Split startup program new_startup_program = self._split_startup_program(startup_program, @@ -4466,21 +4504,18 @@ class PipelineOptimizer(object): self._accumulate_gradients(program_list[local_rank]['program'] .global_block()) - with open("startup_prog_%d" % local_rank, 'w') as f: - f.writelines(str(new_startup_program)) - with open("main_prog_%d" % local_rank, 'w') as f: - f.writelines(str(program_list[local_rank]['program'])) - startup_program._pipeline_opt = { "startup_program": new_startup_program, } + + place_id = int(os.getenv("FLAGS_selected_gpus", "0")) main_program._pipeline_opt = { "trainer": "PipelineTrainer", "device_worker": "Section", "inner_parallelism": len(device_specs), "section_program": program_list[local_rank], "place": place_list[local_rank], - "place_id": place_id_list[local_rank], + "place_id": place_id, "sync_steps": -1, "num_microbatches": self._num_microbatches, "start_cpu_core_id": self._start_cpu_core_id, diff --git a/python/paddle/fluid/tests/unittests/pipeline_mnist_one_device.py b/python/paddle/fluid/tests/unittests/pipeline_mnist_one_device.py new file mode 100644 index 00000000000..d8d28ac1093 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/pipeline_mnist_one_device.py @@ -0,0 +1,138 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import time +import math + +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal +from functools import reduce +from test_dist_base import TestDistRunnerBase, runtime_main +import paddle.distributed.fleet as fleet + +paddle.enable_static() + +DTYPE = "float32" +paddle.dataset.mnist.fetch() + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +def cnn_model(data): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=data, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.01))) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.01))) + + SIZE = 10 + input_shape = conv_pool_2.shape + param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] + scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 + + predict = fluid.layers.fc( + input=conv_pool_2, + size=SIZE, + act="softmax", + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01))) + return predict + + +class TestDistMnist2x2(TestDistRunnerBase): + def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None): + # Input data + device_id = 0 + if dist_strategy: + fleet.init(is_collective=True) + with fluid.device_guard("gpu:0"): + images = fluid.layers.data( + name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + if dist_strategy: + data_loader = fluid.io.DataLoader.from_generator( + feed_list=[images, label], + capacity=64, + use_double_buffer=False, + iterable=False) + # Train program + predict = cnn_model(images) + with fluid.device_guard("gpu:0"): + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + with fluid.device_guard("gpu:0"): + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + base_lr = self.lr + passes = [30, 60, 80, 90] + steps_per_pass = 10 + bd = [steps_per_pass * p for p in passes] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) + opt = fluid.optimizer.Momentum(learning_rate=lr_val, momentum=0.9) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + + if dist_strategy: + strategy = fleet.DistributedStrategy() + strategy.pipeline = True + dist_opt = fleet.distributed_optimizer( + optimizer=opt, strategy=strategy) + dist_opt.minimize(avg_cost) + else: + opt.minimize(avg_cost) + + if dist_strategy: + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader + else: + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict + + +if __name__ == "__main__": + runtime_main(TestDistMnist2x2) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py index d1abc83568b..68702562dde 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py @@ -50,7 +50,7 @@ class TestFleetMetaOptimizer(unittest.TestCase): strategy.pipeline = True strategy.pipeline_configs = {'micro_batch': 2} - optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) + optimizer = paddle.fluid.optimizer.Adam(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/test_pipeline.py b/python/paddle/fluid/tests/unittests/test_pipeline.py index 2cedf8659b2..e6d585e5bc1 100644 --- a/python/paddle/fluid/tests/unittests/test_pipeline.py +++ b/python/paddle/fluid/tests/unittests/test_pipeline.py @@ -40,6 +40,14 @@ class TestPipeline(TestDistBase): check_error_log=True, log_name=flag_name) + def test_dist_train_one_device(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "pipeline_mnist_one_device.py", + check_error_log=True, + log_name=flag_name) + if __name__ == '__main__': unittest.main() -- GitLab