未验证 提交 0e74eea2 编写于 作者: B Baibaifan 提交者: GitHub

solve hccl communicate conflict (#32447)

solve hccl communicate conflict (#32447)
上级 2b108a04
...@@ -638,7 +638,8 @@ class PSGPUWorker : public HogwildWorker { ...@@ -638,7 +638,8 @@ class PSGPUWorker : public HogwildWorker {
}; };
#endif #endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(WITH_ASCEND_CL)
class SectionWorker : public DeviceWorker { class SectionWorker : public DeviceWorker {
public: public:
SectionWorker() {} SectionWorker() {}
......
...@@ -79,7 +79,8 @@ REGISTER_DEVICE_WORKER_CLASS(HeterBoxWorker); ...@@ -79,7 +79,8 @@ REGISTER_DEVICE_WORKER_CLASS(HeterBoxWorker);
REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker); REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker);
#endif #endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(WITH_ASCEND_CL)
REGISTER_DEVICE_WORKER_CLASS(SectionWorker); REGISTER_DEVICE_WORKER_CLASS(SectionWorker);
#endif #endif
} // namespace framework } // namespace framework
......
...@@ -12,7 +12,8 @@ ...@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(WITH_ASCEND_CL)
#include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/trainer.h" #include "paddle/fluid/framework/trainer.h"
...@@ -34,7 +35,11 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, ...@@ -34,7 +35,11 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc,
ParseDumpConfig(trainer_desc); ParseDumpConfig(trainer_desc);
const auto& section_config = section_params.section_config(); const auto& section_config = section_params.section_config();
int place_id = section_config.place_id(); int place_id = section_config.place_id();
#if (defined PADDLE_WITH_NCCL)
place_ = platform::CUDAPlace(place_id); place_ = platform::CUDAPlace(place_id);
#elif (defined WITH_ASCEND_CL)
place_ = platform::NPUPlace(place_id);
#endif
worker_ = DeviceWorkerFactory::CreateDeviceWorker( worker_ = DeviceWorkerFactory::CreateDeviceWorker(
trainer_desc.device_worker_name()); trainer_desc.device_worker_name());
auto this_worker = auto this_worker =
......
...@@ -9,7 +9,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -9,7 +9,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(WITH_ASCEND_CL)
#include <float.h> #include <float.h>
#include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/executor_gc_helper.h"
......
...@@ -332,7 +332,8 @@ class PSGPUTrainer : public TrainerBase { ...@@ -332,7 +332,8 @@ class PSGPUTrainer : public TrainerBase {
}; };
#endif #endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(WITH_ASCEND_CL)
class PipelineTrainer : public TrainerBase { class PipelineTrainer : public TrainerBase {
public: public:
PipelineTrainer() {} PipelineTrainer() {}
......
...@@ -92,6 +92,7 @@ REGISTER_OP_NPU_KERNEL( ...@@ -92,6 +92,7 @@ REGISTER_OP_NPU_KERNEL(
cast, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int16_t>, cast, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int16_t>,
ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int32_t>, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int32_t>,
ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int64_t>, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int64_t>,
ops::CastNPUKernel<paddle::platform::NPUDeviceContext, int>,
ops::CastNPUKernel<paddle::platform::NPUDeviceContext, bool>, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, bool>,
ops::CastNPUKernel<paddle::platform::NPUDeviceContext, double>, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, double>,
ops::CastNPUKernel<paddle::platform::NPUDeviceContext, float>, ops::CastNPUKernel<paddle::platform::NPUDeviceContext, float>,
......
...@@ -79,6 +79,7 @@ class ExpandNPUKernel : public framework::OpKernel<T> { ...@@ -79,6 +79,7 @@ class ExpandNPUKernel : public framework::OpKernel<T> {
namespace ops = paddle::operators; namespace ops = paddle::operators;
REGISTER_OP_NPU_KERNEL( REGISTER_OP_NPU_KERNEL(
expand, ops::ExpandNPUKernel<paddle::platform::NPUDeviceContext, float>, expand, ops::ExpandNPUKernel<paddle::platform::NPUDeviceContext, float>,
ops::ExpandNPUKernel<paddle::platform::NPUDeviceContext, int>,
ops::ExpandNPUKernel<paddle::platform::NPUDeviceContext, ops::ExpandNPUKernel<paddle::platform::NPUDeviceContext,
paddle::platform::float16>); paddle::platform::float16>);
......
...@@ -86,9 +86,11 @@ namespace ops = paddle::operators; ...@@ -86,9 +86,11 @@ namespace ops = paddle::operators;
REGISTER_OP_NPU_KERNEL( REGISTER_OP_NPU_KERNEL(
lookup_table_v2, lookup_table_v2,
ops::LookupTableV2NPUKernel<paddle::platform::NPUDeviceContext, float>, ops::LookupTableV2NPUKernel<paddle::platform::NPUDeviceContext, float>,
ops::LookupTableV2NPUKernel<paddle::platform::NPUDeviceContext, int>,
ops::LookupTableV2NPUKernel<paddle::platform::NPUDeviceContext, ops::LookupTableV2NPUKernel<paddle::platform::NPUDeviceContext,
paddle::platform::float16>); paddle::platform::float16>);
REGISTER_OP_NPU_KERNEL( REGISTER_OP_NPU_KERNEL(
lookup_table_v2_grad, ops::LookupTableV2GradNPUKernel<float>, lookup_table_v2_grad, ops::LookupTableV2GradNPUKernel<float>,
ops::LookupTableV2GradNPUKernel<int>,
ops::LookupTableV2GradNPUKernel<paddle::platform::float16>); ops::LookupTableV2GradNPUKernel<paddle::platform::float16>);
...@@ -124,11 +124,13 @@ namespace ops = paddle::operators; ...@@ -124,11 +124,13 @@ namespace ops = paddle::operators;
REGISTER_OP_NPU_KERNEL( REGISTER_OP_NPU_KERNEL(
slice, ops::SliceNPUKernel<paddle::platform::NPUDeviceContext, float>, slice, ops::SliceNPUKernel<paddle::platform::NPUDeviceContext, float>,
ops::SliceNPUKernel<paddle::platform::NPUDeviceContext, int>,
ops::SliceNPUKernel<paddle::platform::NPUDeviceContext, ops::SliceNPUKernel<paddle::platform::NPUDeviceContext,
paddle::platform::float16>); paddle::platform::float16>);
REGISTER_OP_NPU_KERNEL( REGISTER_OP_NPU_KERNEL(
slice_grad, slice_grad,
ops::SliceGradNPUKernel<paddle::platform::NPUDeviceContext, float>, ops::SliceGradNPUKernel<paddle::platform::NPUDeviceContext, float>,
ops::SliceGradNPUKernel<paddle::platform::NPUDeviceContext, int>,
ops::SliceGradNPUKernel<paddle::platform::NPUDeviceContext, ops::SliceGradNPUKernel<paddle::platform::NPUDeviceContext,
paddle::platform::float16>); paddle::platform::float16>);
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core, unique_name from paddle.fluid import core, unique_name
...@@ -77,6 +78,7 @@ class CollectiveHelper(object): ...@@ -77,6 +78,7 @@ class CollectiveHelper(object):
nranks = len(endpoints) nranks = len(endpoints)
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port: if rank == 0 and wait_port:
wait_server_ready(other_endpoints) wait_server_ready(other_endpoints)
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
from __future__ import print_function from __future__ import print_function
from __future__ import division from __future__ import division
import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core, unique_name from paddle.fluid import core, unique_name
......
...@@ -365,8 +365,8 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -365,8 +365,8 @@ class ShardingOptimizer(MetaOptimizerBase):
'w') as f: 'w') as f:
f.writelines(str(main_block.program)) f.writelines(str(main_block.program))
self._wait() if core.is_compiled_with_cuda():
self._wait()
return optimize_ops, params_grads return optimize_ops, params_grads
def _init_comm(self): def _init_comm(self):
......
...@@ -433,7 +433,10 @@ class Section(DeviceWorker): ...@@ -433,7 +433,10 @@ class Section(DeviceWorker):
# cfg.program_desc.CopyFrom(program.program._get_desc()) # cfg.program_desc.CopyFrom(program.program._get_desc())
place = pipeline_opt["place"] place = pipeline_opt["place"]
place_id = pipeline_opt["place_id"] place_id = pipeline_opt["place_id"]
assert isinstance(place, core.CUDAPlace) if core.is_compiled_with_cuda():
assert isinstance(place, core.CUDAPlace)
elif core.is_compiled_with_npu():
assert isinstance(place, core.NPUPlace)
cfg.place = cfg.CUDAPlace cfg.place = cfg.CUDAPlace
cfg.place_id = place_id cfg.place_id = place_id
......
...@@ -1451,8 +1451,12 @@ class Executor(object): ...@@ -1451,8 +1451,12 @@ class Executor(object):
for var in program.global_block().vars.values(): for var in program.global_block().vars.values():
if var.is_data: if var.is_data:
data_vars.append(var) data_vars.append(var)
dataset = paddle.fluid.DatasetFactory().create_dataset( if core.is_compiled_with_npu():
'FileInstantDataset') dataset = paddle.fluid.DatasetFactory().create_dataset(
'InMemoryDataset')
else:
dataset = paddle.fluid.DatasetFactory().create_dataset(
'FileInstantDataset')
dataset.set_batch_size(1) dataset.set_batch_size(1)
dataset.set_thread(1) dataset.set_thread(1)
dataset.set_filelist(['None']) dataset.set_filelist(['None'])
......
...@@ -4818,7 +4818,10 @@ class PipelineOptimizer(object): ...@@ -4818,7 +4818,10 @@ class PipelineOptimizer(object):
place_list = [] place_list = []
for dev in device_list: for dev in device_list:
dev_index = int(dev.split(":")[1]) dev_index = int(dev.split(":")[1])
place_list.append(core.CUDAPlace(0)) if core.is_compiled_with_cuda():
place_list.append(core.CUDAPlace(dev_index % 1))
elif core.is_compiled_with_npu():
place_list.append(core.NPUPlace(dev_index % 1))
# Step6: Split startup program # Step6: Split startup program
new_startup_program = self._split_startup_program(startup_program, new_startup_program = self._split_startup_program(startup_program,
...@@ -4837,7 +4840,10 @@ class PipelineOptimizer(object): ...@@ -4837,7 +4840,10 @@ class PipelineOptimizer(object):
self._accumulate_gradients(real_block) self._accumulate_gradients(real_block)
real_block._sync_with_cpp() real_block._sync_with_cpp()
place_id = int(os.getenv("FLAGS_selected_gpus", "0")) if core.is_compiled_with_cuda():
place_id = int(os.getenv("FLAGS_selected_gpus", "0"))
elif core.is_compiled_with_npu():
place_id = int(os.getenv("FLAGS_selected_npus", "0"))
main_program._pipeline_opt = { main_program._pipeline_opt = {
"trainer": "PipelineTrainer", "trainer": "PipelineTrainer",
"device_worker": "Section", "device_worker": "Section",
......
...@@ -17,6 +17,7 @@ from __future__ import print_function ...@@ -17,6 +17,7 @@ from __future__ import print_function
import sys import sys
import math import math
from functools import reduce from functools import reduce
import os
import collections import collections
import six import six
...@@ -101,6 +102,8 @@ class Collective(object): ...@@ -101,6 +102,8 @@ class Collective(object):
nranks = len(endpoints) nranks = len(endpoints)
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
block = program.global_block()
if rank == 0 and wait_port: if rank == 0 and wait_port:
wait_server_ready(other_endpoints) wait_server_ready(other_endpoints)
......
...@@ -133,9 +133,9 @@ def init_communicator(program, rank, nranks, wait_port, current_endpoint, ...@@ -133,9 +133,9 @@ def init_communicator(program, rank, nranks, wait_port, current_endpoint,
return return
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
block = program.global_block()
if rank == 0 and wait_port: if rank == 0 and wait_port:
wait_server_ready(other_endpoints) wait_server_ready(other_endpoints)
block = program.global_block()
if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
nccl_id_var = block.create_var( nccl_id_var = block.create_var(
name=fluid.unique_name.generate('nccl_id'), name=fluid.unique_name.generate('nccl_id'),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册