From 0e74eea26245292f1302633c1c19fa3d13707508 Mon Sep 17 00:00:00 2001 From: Baibaifan <39549453+Baibaifan@users.noreply.github.com> Date: Fri, 23 Apr 2021 14:32:33 +0800 Subject: [PATCH] solve hccl communicate conflict (#32447) solve hccl communicate conflict (#32447) --- paddle/fluid/framework/device_worker.h | 3 ++- paddle/fluid/framework/device_worker_factory.cc | 3 ++- paddle/fluid/framework/pipeline_trainer.cc | 7 ++++++- paddle/fluid/framework/section_worker.cc | 3 ++- paddle/fluid/framework/trainer.h | 3 ++- paddle/fluid/operators/cast_op_npu.cc | 1 + paddle/fluid/operators/expand_op_npu.cc | 1 + paddle/fluid/operators/lookup_table_v2_op_npu.cc | 2 ++ paddle/fluid/operators/slice_op_npu.cc | 2 ++ .../paddle/distributed/fleet/meta_optimizers/common.py | 2 ++ .../fleet/meta_optimizers/pipeline_optimizer.py | 1 + .../fleet/meta_optimizers/sharding_optimizer.py | 4 ++-- python/paddle/fluid/device_worker.py | 5 ++++- python/paddle/fluid/executor.py | 8 ++++++-- python/paddle/fluid/optimizer.py | 10 ++++++++-- python/paddle/fluid/transpiler/collective.py | 3 +++ python/paddle/hapi/model.py | 2 +- 17 files changed, 47 insertions(+), 13 deletions(-) diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 628b9f0d70f..a49e492e480 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -638,7 +638,8 @@ class PSGPUWorker : public HogwildWorker { }; #endif -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(WITH_ASCEND_CL) class SectionWorker : public DeviceWorker { public: SectionWorker() {} diff --git a/paddle/fluid/framework/device_worker_factory.cc b/paddle/fluid/framework/device_worker_factory.cc index a539a5d5f96..5780a953433 100644 --- a/paddle/fluid/framework/device_worker_factory.cc +++ b/paddle/fluid/framework/device_worker_factory.cc @@ -79,7 +79,8 @@ REGISTER_DEVICE_WORKER_CLASS(HeterBoxWorker); REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker); #endif -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(WITH_ASCEND_CL) REGISTER_DEVICE_WORKER_CLASS(SectionWorker); #endif } // namespace framework diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 5968df548df..3649e00e7c9 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(WITH_ASCEND_CL) #include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/trainer.h" @@ -34,7 +35,11 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, ParseDumpConfig(trainer_desc); const auto& section_config = section_params.section_config(); int place_id = section_config.place_id(); +#if (defined PADDLE_WITH_NCCL) place_ = platform::CUDAPlace(place_id); +#elif (defined WITH_ASCEND_CL) + place_ = platform::NPUPlace(place_id); +#endif worker_ = DeviceWorkerFactory::CreateDeviceWorker( trainer_desc.device_worker_name()); auto this_worker = diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc index e740771e5ca..7860b69313e 100644 --- a/paddle/fluid/framework/section_worker.cc +++ b/paddle/fluid/framework/section_worker.cc @@ -9,7 +9,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(WITH_ASCEND_CL) #include #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/executor_gc_helper.h" diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 7efb89ad7d9..01aa07e6184 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -332,7 +332,8 @@ class PSGPUTrainer : public TrainerBase { }; #endif -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(WITH_ASCEND_CL) class PipelineTrainer : public TrainerBase { public: PipelineTrainer() {} diff --git a/paddle/fluid/operators/cast_op_npu.cc b/paddle/fluid/operators/cast_op_npu.cc index 8d34e0ba99c..0de0f5e4505 100644 --- a/paddle/fluid/operators/cast_op_npu.cc +++ b/paddle/fluid/operators/cast_op_npu.cc @@ -92,6 +92,7 @@ REGISTER_OP_NPU_KERNEL( cast, ops::CastNPUKernel, ops::CastNPUKernel, ops::CastNPUKernel, + ops::CastNPUKernel, ops::CastNPUKernel, ops::CastNPUKernel, ops::CastNPUKernel, diff --git a/paddle/fluid/operators/expand_op_npu.cc b/paddle/fluid/operators/expand_op_npu.cc index 453a990efbd..bb3a6512d2c 100644 --- a/paddle/fluid/operators/expand_op_npu.cc +++ b/paddle/fluid/operators/expand_op_npu.cc @@ -79,6 +79,7 @@ class ExpandNPUKernel : public framework::OpKernel { namespace ops = paddle::operators; REGISTER_OP_NPU_KERNEL( expand, ops::ExpandNPUKernel, + ops::ExpandNPUKernel, ops::ExpandNPUKernel); diff --git a/paddle/fluid/operators/lookup_table_v2_op_npu.cc b/paddle/fluid/operators/lookup_table_v2_op_npu.cc index f614d906baa..320b498156b 100644 --- a/paddle/fluid/operators/lookup_table_v2_op_npu.cc +++ b/paddle/fluid/operators/lookup_table_v2_op_npu.cc @@ -86,9 +86,11 @@ namespace ops = paddle::operators; REGISTER_OP_NPU_KERNEL( lookup_table_v2, ops::LookupTableV2NPUKernel, + ops::LookupTableV2NPUKernel, ops::LookupTableV2NPUKernel); REGISTER_OP_NPU_KERNEL( lookup_table_v2_grad, ops::LookupTableV2GradNPUKernel, + ops::LookupTableV2GradNPUKernel, ops::LookupTableV2GradNPUKernel); diff --git a/paddle/fluid/operators/slice_op_npu.cc b/paddle/fluid/operators/slice_op_npu.cc index e5e0dafdae0..9974536da9a 100644 --- a/paddle/fluid/operators/slice_op_npu.cc +++ b/paddle/fluid/operators/slice_op_npu.cc @@ -124,11 +124,13 @@ namespace ops = paddle::operators; REGISTER_OP_NPU_KERNEL( slice, ops::SliceNPUKernel, + ops::SliceNPUKernel, ops::SliceNPUKernel); REGISTER_OP_NPU_KERNEL( slice_grad, ops::SliceGradNPUKernel, + ops::SliceGradNPUKernel, ops::SliceGradNPUKernel); diff --git a/python/paddle/distributed/fleet/meta_optimizers/common.py b/python/paddle/distributed/fleet/meta_optimizers/common.py index 1b51d4f66f3..9e2723dad72 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/common.py +++ b/python/paddle/distributed/fleet/meta_optimizers/common.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import print_function +import os import paddle.fluid as fluid from paddle.fluid import core, unique_name @@ -77,6 +78,7 @@ class CollectiveHelper(object): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) + if rank == 0 and wait_port: wait_server_ready(other_endpoints) diff --git a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py index ae2daa9b9d8..1aa51a6671c 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py @@ -13,6 +13,7 @@ from __future__ import print_function from __future__ import division +import os import paddle.fluid as fluid from paddle.fluid import core, unique_name diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 2c4ad33c361..852421523b1 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -365,8 +365,8 @@ class ShardingOptimizer(MetaOptimizerBase): 'w') as f: f.writelines(str(main_block.program)) - self._wait() - + if core.is_compiled_with_cuda(): + self._wait() return optimize_ops, params_grads def _init_comm(self): diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index 7bcd10a7269..7fed27ee459 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -433,7 +433,10 @@ class Section(DeviceWorker): # cfg.program_desc.CopyFrom(program.program._get_desc()) place = pipeline_opt["place"] place_id = pipeline_opt["place_id"] - assert isinstance(place, core.CUDAPlace) + if core.is_compiled_with_cuda(): + assert isinstance(place, core.CUDAPlace) + elif core.is_compiled_with_npu(): + assert isinstance(place, core.NPUPlace) cfg.place = cfg.CUDAPlace cfg.place_id = place_id diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 76bc68f24d2..62a9c42ee0a 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1451,8 +1451,12 @@ class Executor(object): for var in program.global_block().vars.values(): if var.is_data: data_vars.append(var) - dataset = paddle.fluid.DatasetFactory().create_dataset( - 'FileInstantDataset') + if core.is_compiled_with_npu(): + dataset = paddle.fluid.DatasetFactory().create_dataset( + 'InMemoryDataset') + else: + dataset = paddle.fluid.DatasetFactory().create_dataset( + 'FileInstantDataset') dataset.set_batch_size(1) dataset.set_thread(1) dataset.set_filelist(['None']) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index e1122471982..21b4c429a66 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -4818,7 +4818,10 @@ class PipelineOptimizer(object): place_list = [] for dev in device_list: dev_index = int(dev.split(":")[1]) - place_list.append(core.CUDAPlace(0)) + if core.is_compiled_with_cuda(): + place_list.append(core.CUDAPlace(dev_index % 1)) + elif core.is_compiled_with_npu(): + place_list.append(core.NPUPlace(dev_index % 1)) # Step6: Split startup program new_startup_program = self._split_startup_program(startup_program, @@ -4837,7 +4840,10 @@ class PipelineOptimizer(object): self._accumulate_gradients(real_block) real_block._sync_with_cpp() - place_id = int(os.getenv("FLAGS_selected_gpus", "0")) + if core.is_compiled_with_cuda(): + place_id = int(os.getenv("FLAGS_selected_gpus", "0")) + elif core.is_compiled_with_npu(): + place_id = int(os.getenv("FLAGS_selected_npus", "0")) main_program._pipeline_opt = { "trainer": "PipelineTrainer", "device_worker": "Section", diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/fluid/transpiler/collective.py index c8cb474343a..ef6975c3d24 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/fluid/transpiler/collective.py @@ -17,6 +17,7 @@ from __future__ import print_function import sys import math from functools import reduce +import os import collections import six @@ -101,6 +102,8 @@ class Collective(object): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) + block = program.global_block() + if rank == 0 and wait_port: wait_server_ready(other_endpoints) diff --git a/python/paddle/hapi/model.py b/python/paddle/hapi/model.py index cc4e7a8b319..fa8bd600bb2 100644 --- a/python/paddle/hapi/model.py +++ b/python/paddle/hapi/model.py @@ -133,9 +133,9 @@ def init_communicator(program, rank, nranks, wait_port, current_endpoint, return other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) + block = program.global_block() if rank == 0 and wait_port: wait_server_ready(other_endpoints) - block = program.global_block() if core.is_compiled_with_cuda(): nccl_id_var = block.create_var( name=fluid.unique_name.generate('nccl_id'), -- GitLab