未验证 提交 10b039b7 编写于 作者: L LiYuRio 提交者: GitHub

executor framework (#36892)

上级 b0cbca40
if(NOT WITH_PSCORE)
add_subdirectory(fleet_executor)
return()
endif()
......@@ -16,6 +17,7 @@ add_subdirectory(service)
add_subdirectory(table)
add_subdirectory(test)
add_subdirectory(index_dataset)
add_subdirectory(fleet_executor)
get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS)
......
proto_library(fleet_executor_desc_proto SRCS fleet_executor_desc.proto)
cc_library(fleet_executor SRCS fleet_executor.cc DEPS fleet_executor_desc_proto)
if(WITH_PYTHON)
py_proto_compile(fleet_executor_desc_py_proto SRCS fleet_executor_desc.proto)
endif()
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
#include "paddle/fluid/framework/program_desc.h"
namespace paddle {
namespace distributed {
FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
// Initialize Executor
}
FleetExecutor::~FleetExecutor() {
// Destroy Executor
}
void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) {
// Compile and Initialize
}
void FleetExecutor::Run() {
// Run
}
void FleetExecutor::Release() {
// Release
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/platform/macros.h"
namespace paddle {
namespace framework {
class ProgramDesc;
}
namespace distributed {
class RuntimeGraph;
class FleetExecutor final {
public:
FleetExecutor() = delete;
FleetExecutor(const std::string& exe_desc_str);
~FleetExecutor();
void Init(const paddle::framework::ProgramDesc& program_desc);
void Run();
void Release();
private:
DISABLE_COPY_AND_ASSIGN(FleetExecutor);
FleetExecutorDesc exe_desc_;
std::unique_ptr<RuntimeGraph> runtime_graph_;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package paddle.distributed;
message FleetExecutorDesc {
optional string grain = 1 [ default = "coarse" ];
repeated string addrs = 2; // "ip:port" of all ranks
}
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
namespace paddle {
namespace framework {
class ProgramDesc;
}
namespace distributed {
class RuntimeGraph final {
public:
RuntimeGraph() = default;
explicit RuntimeGraph(const paddle::framework::ProgramDesc &program) {}
~RuntimeGraph() = default;
DISABLE_COPY_AND_ASSIGN(RuntimeGraph);
};
} // namespace distributed
} // namespace paddle
......@@ -247,6 +247,9 @@ if(WITH_PYTHON)
COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMENT "Copy generated python proto into directory paddle/fluid/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_executor_desc_py_proto
COMMAND cp ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto.")
else(NOT WIN32)
string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/")
string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/")
......@@ -286,7 +289,7 @@ if(WITH_DISTRIBUTE)
fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer
lod_rank_table feed_fetch_method collective_helper ${GLOB_DISTRIBUTE_DEPS}
graph_to_program_pass variable_helper data_feed_proto timer monitor
heter_service_proto ${BRPC_DEP})
heter_service_proto fleet_executor ${BRPC_DEP})
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
......@@ -305,7 +308,7 @@ if(WITH_DISTRIBUTE)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor heter_service_proto fleet)
graph_to_program_pass variable_helper timer monitor heter_service_proto fleet fleet_executor)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(multi_trainer.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......@@ -319,7 +322,7 @@ if(WITH_DISTRIBUTE)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor fleet_executor)
endif()
elseif(WITH_PSLIB)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
......@@ -339,7 +342,7 @@ elseif(WITH_PSLIB)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor ${BRPC_DEP})
graph_to_program_pass variable_helper timer monitor fleet_executor ${BRPC_DEP})
else()
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
......@@ -349,7 +352,7 @@ else()
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor fleet_executor)
endif()
target_link_libraries(executor while_op_helper executor_gc_helper recurrent_op_helper conditional_block_op_helper)
......
......@@ -7,7 +7,7 @@ set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapp
feed_fetch_method pass generate_pass pass_builder parallel_executor profiler layer tracer engine scope_pool
analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context
gloo_wrapper infer_io_utils heter_wrapper generator op_version_registry ps_gpu_wrapper custom_operator
cost_model cuda_graph_with_memory_pool)
cost_model cuda_graph_with_memory_pool fleet_executor)
if (WITH_PSCORE)
set(PYBIND_DEPS ${PYBIND_DEPS} ps_service)
......@@ -61,6 +61,7 @@ set(PYBIND_SRCS
imperative.cc
ir.cc
bind_cost_model.cc
bind_fleet_executor.cc
inference_api.cc
compatible.cc
io.cc
......
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/pybind/bind_fleet_executor.h"
#include <pybind11/stl.h>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/framework/program_desc.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
using paddle::distributed::FleetExecutor;
void BindFleetExecutor(py::module* m) {
py::class_<FleetExecutor>(*m, "FleetExecutor")
.def(py::init<const std::string&>())
.def("init", &FleetExecutor::Init)
.def("run", &FleetExecutor::Run);
}
} // namespace pybind
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <pybind11/pybind11.h>
namespace paddle {
namespace pybind {
void BindFleetExecutor(pybind11::module* m);
} // namespace pybind
} // namespace paddle
......@@ -81,6 +81,7 @@ limitations under the License. */
#include "paddle/fluid/pybind/ascend_wrapper_py.h"
#endif
#include "paddle/fluid/pybind/bind_cost_model.h"
#include "paddle/fluid/pybind/bind_fleet_executor.h"
#include "paddle/fluid/pybind/box_helper_py.h"
#include "paddle/fluid/pybind/compatible.h"
#include "paddle/fluid/pybind/const_value.h"
......@@ -2216,6 +2217,7 @@ All parameter, weight, gradient are variables in Paddle.
BindConstValue(&m);
BindGlobalValueGetterSetter(&m);
BindProcessMeshDesc(&m);
BindFleetExecutor(&m);
py::class_<framework::LoDRankTable>(m, "LodRankTable")
.def("items", [](framework::LoDRankTable &table) {
......
......@@ -1271,6 +1271,11 @@ class Executor(object):
fetch_list = self._check_fetch_list(fetch_list)
if isinstance(program, Program) and program._pipeline_opt:
if "fleet_opt" in program._pipeline_opt:
return self._run_using_fleet_executor(
program,
fetch_list=fetch_list,
use_program_cache=use_program_cache)
if "startup_program" in program._pipeline_opt:
program = program._pipeline_opt["startup_program"]
else:
......@@ -1820,6 +1825,31 @@ class Executor(object):
return ctx
def _run_using_fleet_executor(self,
program=None,
dataset=None,
scope=None,
thread=0,
is_infer=False,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
use_program_cache=False):
scope, real_fetch_list, trainer_instance = \
self._prepare_pipeline_ctx(program, dataset, scope, thread,
is_infer, debug, fetch_list, fetch_info,
print_period, fetch_handler,
use_program_cache)
from ..distributed.fleet.proto import fleet_executor_desc_pb2
from google.protobuf import text_format
fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc()
fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString())
fleet_exe.init(program._pipeline_opt["section_program"].desc)
fleet_exe.run()
return None
def _run_pipeline(self,
program=None,
dataset=None,
......
......@@ -140,6 +140,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_raw_program_optimizer)
LIST(REMOVE_ITEM TEST_OPS test_fleet_gradient_scale)
LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler)
LIST(REMOVE_ITEM TEST_OPS test_fleet_executor)
endif()
# Temporally disable test_deprecated_decorator
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
paddle.enable_static()
class TestFleetExecutor(unittest.TestCase):
def run_fleet_executor(self, place):
exe = paddle.static.Executor(place)
empty_program = paddle.static.Program()
with fluid.program_guard(empty_program, empty_program):
x = fluid.layers.data(name='x', shape=[1], dtype=paddle.float32)
empty_program._pipeline_opt = {
"fleet_opt": True,
"section_program": empty_program
}
exe.run(empty_program, feed={'x': [1]})
def test_executor_on_multi_devices(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
self.run_fleet_executor(place)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册