diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt index 17e96243878bc5c3f64ac2f6ecd175c9138d3611..24923d72681869348ec7db816349bdef010c973d 100644 --- a/paddle/fluid/distributed/CMakeLists.txt +++ b/paddle/fluid/distributed/CMakeLists.txt @@ -1,4 +1,5 @@ if(NOT WITH_PSCORE) + add_subdirectory(fleet_executor) return() endif() @@ -16,6 +17,7 @@ add_subdirectory(service) add_subdirectory(table) add_subdirectory(test) add_subdirectory(index_dataset) +add_subdirectory(fleet_executor) get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..0ff8d09703a3d8c8d643f407eb8a2a9e154955e4 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -0,0 +1,6 @@ +proto_library(fleet_executor_desc_proto SRCS fleet_executor_desc.proto) +cc_library(fleet_executor SRCS fleet_executor.cc DEPS fleet_executor_desc_proto) + +if(WITH_PYTHON) + py_proto_compile(fleet_executor_desc_py_proto SRCS fleet_executor_desc.proto) +endif() diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc new file mode 100644 index 0000000000000000000000000000000000000000..3373b26d8d69dabf97d617c7b61f55d0ff162724 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -0,0 +1,43 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" +#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace paddle { +namespace distributed { + +FleetExecutor::FleetExecutor(const std::string& exe_desc_str) { + // Initialize Executor +} + +FleetExecutor::~FleetExecutor() { + // Destroy Executor +} + +void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { + // Compile and Initialize +} + +void FleetExecutor::Run() { + // Run +} + +void FleetExecutor::Release() { + // Release +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h new file mode 100644 index 0000000000000000000000000000000000000000..47b97fc833fce596fcb62aee39cd0921a5a9fac9 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -0,0 +1,44 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" +#include "paddle/fluid/platform/macros.h" + +namespace paddle { +namespace framework { +class ProgramDesc; +} + +namespace distributed { +class RuntimeGraph; + +class FleetExecutor final { + public: + FleetExecutor() = delete; + FleetExecutor(const std::string& exe_desc_str); + ~FleetExecutor(); + void Init(const paddle::framework::ProgramDesc& program_desc); + void Run(); + void Release(); + + private: + DISABLE_COPY_AND_ASSIGN(FleetExecutor); + FleetExecutorDesc exe_desc_; + std::unique_ptr runtime_graph_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto new file mode 100644 index 0000000000000000000000000000000000000000..3db8984b5dcff1a4714fbb5352d615d59340d8b2 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto @@ -0,0 +1,21 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +package paddle.distributed; + +message FleetExecutorDesc { + optional string grain = 1 [ default = "coarse" ]; + repeated string addrs = 2; // "ip:port" of all ranks +} diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.h b/paddle/fluid/distributed/fleet_executor/runtime_graph.h new file mode 100644 index 0000000000000000000000000000000000000000..7ae573039e671b7aea765ed8f635cee7b669dc9b --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.h @@ -0,0 +1,34 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace framework { +class ProgramDesc; +} + +namespace distributed { + +class RuntimeGraph final { + public: + RuntimeGraph() = default; + explicit RuntimeGraph(const paddle::framework::ProgramDesc &program) {} + ~RuntimeGraph() = default; + + DISABLE_COPY_AND_ASSIGN(RuntimeGraph); +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 1acce718ad9891657484b22d4933dc1d7c074e28..f037a83bcab48481178deaabe68b6860bb69ee90 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -247,6 +247,9 @@ if(WITH_PYTHON) COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMENT "Copy generated python proto into directory paddle/fluid/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_executor_desc_py_proto + COMMAND cp ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto + COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto.") else(NOT WIN32) string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/") string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/") @@ -286,7 +289,7 @@ if(WITH_DISTRIBUTE) fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer lod_rank_table feed_fetch_method collective_helper ${GLOB_DISTRIBUTE_DEPS} graph_to_program_pass variable_helper data_feed_proto timer monitor - heter_service_proto ${BRPC_DEP}) + heter_service_proto fleet_executor ${BRPC_DEP}) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) set(DISTRIBUTE_COMPILE_FLAGS @@ -305,7 +308,7 @@ if(WITH_DISTRIBUTE) pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method - graph_to_program_pass variable_helper timer monitor heter_service_proto fleet) + graph_to_program_pass variable_helper timer monitor heter_service_proto fleet fleet_executor) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(multi_trainer.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) @@ -319,7 +322,7 @@ if(WITH_DISTRIBUTE) pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method - graph_to_program_pass variable_helper timer monitor) + graph_to_program_pass variable_helper timer monitor fleet_executor) endif() elseif(WITH_PSLIB) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") @@ -339,7 +342,7 @@ elseif(WITH_PSLIB) pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method - graph_to_program_pass variable_helper timer monitor ${BRPC_DEP}) + graph_to_program_pass variable_helper timer monitor fleet_executor ${BRPC_DEP}) else() cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc @@ -349,7 +352,7 @@ else() pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method - graph_to_program_pass variable_helper timer monitor) + graph_to_program_pass variable_helper timer monitor fleet_executor) endif() target_link_libraries(executor while_op_helper executor_gc_helper recurrent_op_helper conditional_block_op_helper) diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 875e6af9652a259e73f05257860c3cf2ff6818a4..595c833cbfa8a065bc7ee4d5c97fcf407186aec3 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -7,7 +7,7 @@ set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapp feed_fetch_method pass generate_pass pass_builder parallel_executor profiler layer tracer engine scope_pool analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context gloo_wrapper infer_io_utils heter_wrapper generator op_version_registry ps_gpu_wrapper custom_operator - cost_model cuda_graph_with_memory_pool) + cost_model cuda_graph_with_memory_pool fleet_executor) if (WITH_PSCORE) set(PYBIND_DEPS ${PYBIND_DEPS} ps_service) @@ -61,6 +61,7 @@ set(PYBIND_SRCS imperative.cc ir.cc bind_cost_model.cc + bind_fleet_executor.cc inference_api.cc compatible.cc io.cc diff --git a/paddle/fluid/pybind/bind_fleet_executor.cc b/paddle/fluid/pybind/bind_fleet_executor.cc new file mode 100644 index 0000000000000000000000000000000000000000..392cdfe19bd7a4fe0f2eaa441381b4308d2d1bf8 --- /dev/null +++ b/paddle/fluid/pybind/bind_fleet_executor.cc @@ -0,0 +1,34 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/pybind/bind_fleet_executor.h" +#include +#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace py = pybind11; + +namespace paddle { +namespace pybind { + +using paddle::distributed::FleetExecutor; + +void BindFleetExecutor(py::module* m) { + py::class_(*m, "FleetExecutor") + .def(py::init()) + .def("init", &FleetExecutor::Init) + .def("run", &FleetExecutor::Run); +} +} // namespace pybind +} // namespace paddle diff --git a/paddle/fluid/pybind/bind_fleet_executor.h b/paddle/fluid/pybind/bind_fleet_executor.h new file mode 100644 index 0000000000000000000000000000000000000000..733701fa36ba852d4b499640a3698c023aaae3b3 --- /dev/null +++ b/paddle/fluid/pybind/bind_fleet_executor.h @@ -0,0 +1,25 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace paddle { +namespace pybind { + +void BindFleetExecutor(pybind11::module* m); + +} // namespace pybind +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index fdff8310e710ba58982ce25550dc862d32662679..372fc74d19d0347ebdc073c75083c629e7cf6229 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -81,6 +81,7 @@ limitations under the License. */ #include "paddle/fluid/pybind/ascend_wrapper_py.h" #endif #include "paddle/fluid/pybind/bind_cost_model.h" +#include "paddle/fluid/pybind/bind_fleet_executor.h" #include "paddle/fluid/pybind/box_helper_py.h" #include "paddle/fluid/pybind/compatible.h" #include "paddle/fluid/pybind/const_value.h" @@ -2216,6 +2217,7 @@ All parameter, weight, gradient are variables in Paddle. BindConstValue(&m); BindGlobalValueGetterSetter(&m); BindProcessMeshDesc(&m); + BindFleetExecutor(&m); py::class_(m, "LodRankTable") .def("items", [](framework::LoDRankTable &table) { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 377a40af7a3d538788d8a5c9ed1e8882a58cfbd3..c6649a615adbbba809529327ae12bbdbade4cc65 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1271,6 +1271,11 @@ class Executor(object): fetch_list = self._check_fetch_list(fetch_list) if isinstance(program, Program) and program._pipeline_opt: + if "fleet_opt" in program._pipeline_opt: + return self._run_using_fleet_executor( + program, + fetch_list=fetch_list, + use_program_cache=use_program_cache) if "startup_program" in program._pipeline_opt: program = program._pipeline_opt["startup_program"] else: @@ -1820,6 +1825,31 @@ class Executor(object): return ctx + def _run_using_fleet_executor(self, + program=None, + dataset=None, + scope=None, + thread=0, + is_infer=False, + debug=False, + fetch_list=None, + fetch_info=None, + print_period=100, + fetch_handler=None, + use_program_cache=False): + scope, real_fetch_list, trainer_instance = \ + self._prepare_pipeline_ctx(program, dataset, scope, thread, + is_infer, debug, fetch_list, fetch_info, + print_period, fetch_handler, + use_program_cache) + from ..distributed.fleet.proto import fleet_executor_desc_pb2 + from google.protobuf import text_format + fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc() + fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString()) + fleet_exe.init(program._pipeline_opt["section_program"].desc) + fleet_exe.run() + return None + def _run_pipeline(self, program=None, dataset=None, diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 5fa191d75d689d882f46a1da5819e1881efa73e7..9d53757188e739e0100cc9efb539fdfe3daf1292 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -140,6 +140,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_raw_program_optimizer) LIST(REMOVE_ITEM TEST_OPS test_fleet_gradient_scale) LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler) + LIST(REMOVE_ITEM TEST_OPS test_fleet_executor) endif() # Temporally disable test_deprecated_decorator diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor.py b/python/paddle/fluid/tests/unittests/test_fleet_executor.py new file mode 100644 index 0000000000000000000000000000000000000000..1d042547e2067a195b713d9901e4bebd18ace906 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor.py @@ -0,0 +1,43 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import paddle.fluid as fluid + +paddle.enable_static() + + +class TestFleetExecutor(unittest.TestCase): + def run_fleet_executor(self, place): + exe = paddle.static.Executor(place) + empty_program = paddle.static.Program() + with fluid.program_guard(empty_program, empty_program): + x = fluid.layers.data(name='x', shape=[1], dtype=paddle.float32) + empty_program._pipeline_opt = { + "fleet_opt": True, + "section_program": empty_program + } + exe.run(empty_program, feed={'x': [1]}) + + def test_executor_on_multi_devices(self): + places = [fluid.CPUPlace()] + if fluid.is_compiled_with_cuda(): + places.append(fluid.CUDAPlace(0)) + for place in places: + self.run_fleet_executor(place) + + +if __name__ == "__main__": + unittest.main()