diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 1fa1f119191044f1a220962fc6c53b1902bc1d5e..9d9755569b2fc0933c97c95ad7615a1107331c2b 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -240,13 +240,12 @@ void Carrier::CreateInterceptors() { task_node->run_at_offset(), task_node->run_per_steps())); std::unique_ptr interceptor; - if (task_node->type().empty()) { - // TODO(wangxi): delete this in future - interceptor.reset(new Interceptor(interceptor_id, task_node)); - } else { - interceptor = InterceptorFactory::Create(task_node->type(), - interceptor_id, task_node); - } + PADDLE_ENFORCE_NE(task_node->type().empty(), true, + platform::errors::NotFound( + "Cannot found type for task node with id %lld", + task_node->task_id())); + interceptor = InterceptorFactory::Create(task_node->type(), interceptor_id, + task_node); interceptor->SetPlace(place_); interceptor->SetMiniBatchScope(minibatch_scope_); interceptor->SetMicroBatchScope(microbatch_scopes_); diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index a2a51c45f4390e6932b73337439a0fd8288ee95d..f7173a7b8bdfbc3d819f55c5349b7a3ef5025288 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -48,32 +48,29 @@ void FleetExecutor::Init( const framework::ProgramDesc& program_desc, framework::Scope* scope, const platform::Place& place, const std::vector& task_nodes, const std::unordered_map& task_id_to_rank) { - if (task_nodes.size() == 0) { - LOG(INFO) << "fleet executor will use c++ side scheduler construction."; - runtime_graph_ = std::make_shared(program_desc, exe_desc_); - } else { - LOG(INFO) << "fleet executor has been set dependency on python side."; - // TODO(fleet_exe devs): the unused_vars should be got from run time graph - std::vector> ops; - for (auto task_node : task_nodes) { - for (auto op : task_node->ops()) { - ops.emplace_back(std::unique_ptr(op)); - } - } - auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {}); - runtime_graph_ = std::make_shared(); - std::unordered_map interceptor_id_to_task; - for (auto task_node : task_nodes) { - task_node->SetUnusedVars(unused_vars); - int64_t interceptor_id = task_node->task_id(); - interceptor_id_to_task.emplace(interceptor_id, task_node); - } - runtime_graph_->SetInterceptorIdToRank(task_id_to_rank); - runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task); - for (auto& unique_op : ops) { - unique_op.release(); + PADDLE_ENFORCE_GT(task_nodes.size(), 0, + platform::errors::InvalidArgument( + "Fleet executor is inited with empty task node")); + // TODO(fleet_exe devs): the unused_vars should be got from run time graph + std::vector> ops; + for (auto task_node : task_nodes) { + for (auto op : task_node->ops()) { + ops.emplace_back(std::unique_ptr(op)); } } + auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {}); + runtime_graph_ = std::make_shared(); + std::unordered_map interceptor_id_to_task; + for (auto task_node : task_nodes) { + task_node->SetUnusedVars(unused_vars); + int64_t interceptor_id = task_node->task_id(); + interceptor_id_to_task.emplace(interceptor_id, task_node); + } + runtime_graph_->SetInterceptorIdToRank(task_id_to_rank); + runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task); + for (auto& unique_op : ops) { + unique_op.release(); + } root_scope_ = scope; place_ = place; PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto index 6890c311ec0039823cf940d2000cecb7d40d0930..aa553557852a767b571b000cbf9b78069db94019 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto @@ -23,9 +23,5 @@ message RankInfo { message FleetExecutorDesc { optional int64 cur_rank = 1 [ default = 0 ]; // Rank id of current processor repeated RankInfo cluster_info = 2; - optional int32 dp_degree = 3 [ default = 1 ]; - optional int32 mp_degree = 4 [ default = 1 ]; - optional int32 pp_degree = 5 [ default = 1 ]; - optional int64 num_micro_batches = 6 [ default = 1 ]; - optional int64 num_slots = 7 [ default = 1 ]; + optional int64 num_micro_batches = 3 [ default = 1 ]; } diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index 32f9e36e53037064c52e85147db7270c2d534594..1ad144470af2668fcbc0098e91f272b8f5f96b96 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -14,300 +14,15 @@ #include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" -#include "paddle/fluid/framework/executor_gc_helper.h" -#include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/framework/operator.h" -#include "paddle/fluid/framework/program_desc.h" namespace paddle { namespace distributed { -namespace { - -using OperatorBase = RuntimeGraph::OperatorBase; -using OpRole = paddle::framework::OpRole; -using OpRegistry = paddle::framework::OpRegistry; -using ProgramDesc = paddle::framework::ProgramDesc; - -bool IsForward(int32_t op_role) { - return (op_role == static_cast(OpRole::kForward)) || - (op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss))); -} - -bool IsLRSched(int32_t op_role) { - return op_role == static_cast(OpRole::kLRSched); -} - -bool IsBackward(int32_t op_role) { - return (op_role == static_cast(OpRole::kBackward)) || - (op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))); -} - -bool IsOptimize(int32_t op_role) { - return op_role == static_cast(OpRole::kOptimize); -} - -struct DistCoord { - int32_t dp_idx; - int32_t pp_idx; - int32_t mp_idx; -}; - -class DistCoordSys final { - public: - DistCoordSys(int32_t dp_degree, int32_t pp_degree, int32_t mp_degree) - : dp_degree_(dp_degree), pp_degree_(pp_degree), mp_degree_(mp_degree) {} - DistCoord RankToCoord(int64_t rank) const; - int64_t CoordToRank(const DistCoord& coord) const; - - private: - DISABLE_COPY_AND_ASSIGN(DistCoordSys); - bool InvalidCoord(const DistCoord& coord) const; - int32_t dp_degree_; - int32_t pp_degree_; - int32_t mp_degree_; -}; - -DistCoord DistCoordSys::RankToCoord(int64_t rank) const { - DistCoord coord; - coord.mp_idx = rank % mp_degree_; - rank /= mp_degree_; - coord.pp_idx = rank % pp_degree_; - rank /= pp_degree_; - coord.dp_idx = rank % dp_degree_; - return coord; -} - -int64_t DistCoordSys::CoordToRank(const DistCoord& coord) const { - if (InvalidCoord(coord)) { - return -1; - } - return coord.dp_idx * pp_degree_ * mp_degree_ + coord.pp_idx * mp_degree_ + - coord.mp_idx; -} - -bool DistCoordSys::InvalidCoord(const DistCoord& coord) const { - return coord.mp_idx < 0 || coord.mp_idx >= mp_degree_ || coord.pp_idx < 0 || - coord.pp_idx >= pp_degree_ || coord.dp_idx < 0 || - coord.dp_idx >= dp_degree_; -} - -} // namespace - -std::vector RuntimeGraph::functionality_order = { - OpRole::kLRSched, OpRole::kForward, OpRole::kBackward, OpRole::kOptimize}; - -RuntimeGraph::RuntimeGraph(const ProgramDesc& program, - const FleetExecutorDesc& exe_desc) - : exe_desc_(exe_desc) { - if (exe_desc.pp_degree() == 1) { - OriginProgramCompile(program); - } else { - SplitProgramBasedFunctionality(program); - AssignTaskToIntercepter(); - FakeDependence(); - FakeRuntimeInfo(); - } -} - -void RuntimeGraph::OriginProgramCompile(const ProgramDesc& program) { - int64_t cur_rank = exe_desc_.cur_rank(); - int64_t max_run_times = exe_desc_.num_micro_batches(); - int64_t max_slot_nums = exe_desc_.num_slots(); - - auto task_node = std::make_unique(program, cur_rank, max_run_times, - max_slot_nums); - // TODO(wangxi): add skip vars - auto unused_vars = - framework::GetUnusedVars(program.Block(0), task_node->unique_ops(), {}); - task_node->SetType("Compute"); - task_node->SetUnusedVars(unused_vars); - - task_nodes_.emplace_back(std::move(task_node)); - int64_t task_id = task_nodes_[0]->task_id(); - intercepter_id_to_rank_.insert({task_id, cur_rank}); - intercepter_id_to_node_.insert({task_id, task_nodes_[0].get()}); -} - -void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { - for (const auto& op_desc : program.Block(0).AllOps()) { - ops_.emplace_back(OpRegistry::CreateOp(*op_desc)); - } - // TODO(wangxi): how to gc pipeline backward send - auto unused_vars = framework::GetUnusedVars(program.Block(0), ops_, {}); - - std::unordered_map> role_to_ops; - for (const auto& op : ops_) { - int32_t op_role = op->Attr("op_role"); - OpRole new_op_role; - if (IsLRSched(op_role)) { - new_op_role = OpRole::kLRSched; - } else if (IsForward(op_role)) { - new_op_role = OpRole::kForward; - } else if (IsBackward(op_role)) { - new_op_role = OpRole::kBackward; - } else if (IsOptimize(op_role)) { - new_op_role = OpRole::kOptimize; - } else { - PADDLE_THROW(platform::errors::PreconditionNotMet( - "The op %s is None of LRSched, Forward, Backward or Optimize.", - op->Type())); - } - int32_t new_op_role_id = static_cast(new_op_role); - if (role_to_ops.find(new_op_role_id) == role_to_ops.end()) { - role_to_ops.insert({new_op_role_id, {}}); - } - role_to_ops.at(new_op_role_id).emplace_back(op.get()); - } - - int64_t cur_rank = exe_desc_.cur_rank(); - DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(), - exe_desc_.mp_degree()); - const auto& coord = coord_sys.RankToCoord(cur_rank); - int pipeline_stage = coord.pp_idx; - int64_t num_pipeline_stages = exe_desc_.pp_degree(); - - // TODO(fleet_executor dev): start up steps should be a config `num_slots` - int64_t start_up_steps = num_pipeline_stages - pipeline_stage; - int64_t num_micro_batches = exe_desc_.num_micro_batches(); - int64_t task_id = cur_rank * functionality_order.size(); - for (std::size_t i = 0; i < functionality_order.size(); ++i) { - VLOG(3) << "Runtime graph is creating task node for: " << task_id << "."; - OpRole role = functionality_order[i]; - int32_t role_id = static_cast(role); - int64_t max_run_times = num_micro_batches; - int64_t max_slot_nums = start_up_steps; - // NOTE: use short path, each interceptor should run for max_run_times - std::vector task_ops{}; - if (role_to_ops.find(role_id) != role_to_ops.end()) { - task_ops = role_to_ops.at(role_id); - } - std::unique_ptr task_node = std::make_unique( - role_id, task_ops, cur_rank, task_id, max_run_times, max_slot_nums); - if (IsLRSched(role_id) || IsOptimize(role_id)) { - task_node->SetType("Amplifier"); - if (IsLRSched(role_id)) { - task_node->SetRunPerSteps(max_run_times); - } else { - task_node->SetRunAtOffset(max_run_times - 1); - task_node->SetRunPerSteps(max_run_times); - } - } else { - task_node->SetType("Compute"); - } - task_node->SetUnusedVars(unused_vars); - task_nodes_.emplace_back(std::move(task_node)); - ++task_id; - } -} - -void RuntimeGraph::FakeDependence() { - int64_t cur_rank = exe_desc_.cur_rank(); - DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(), - exe_desc_.mp_degree()); - const auto& coord = coord_sys.RankToCoord(cur_rank); - DistCoord upstream_coord = coord, downstream_coord = coord; - upstream_coord.pp_idx -= 1; - downstream_coord.pp_idx += 1; - int64_t pp_upstream = coord_sys.CoordToRank(upstream_coord); - int64_t pp_downstream = coord_sys.CoordToRank(downstream_coord); - bool is_first_stage = (pp_upstream == -1); - bool is_last_stage = (pp_downstream == -1); - - int32_t num_of_functionality = functionality_order.size(); - // lr(1:m) -> forward -> backward -> (m:1)optimize - // ↑ ↓ - // lr(1:m) -> forward -> backward -> (m:1)optimize - // ↑ ↓ - // lr(1:m) -> forward -> backward -> (m:1)optimize - for (std::size_t i = 0; i < task_nodes_.size(); ++i) { - auto& node = task_nodes_[i]; - bool is_forward = IsForward(node->role()); - bool is_backward = IsBackward(node->role()); - - int64_t cur_id = cur_rank * num_of_functionality + i; - int64_t prev_id = cur_id - 1; - int64_t next_id = cur_id + 1; - - int64_t upstream_id = pp_upstream * num_of_functionality + i; - int64_t downstream_id = pp_downstream * num_of_functionality + i; - - // 1F1B, last stage pp_buff_size should be 1, while first stage - // pp_buff_size should be pp_degree - int64_t pp_buff_size = exe_desc_.pp_degree() - coord.pp_idx; - - std::vector> ups; - std::vector> downs; - - if (i != 0) { // not lr - int64_t buff_size = is_backward ? pp_buff_size : 2; - ups.emplace_back(prev_id, buff_size); - } - if (i != task_nodes_.size() - 1) { // not optimize - int64_t buff_size = is_forward ? pp_buff_size : 2; - downs.emplace_back(next_id, buff_size); - } - - if (is_forward) { - if (!is_first_stage) { - ups.emplace_back(upstream_id, 2); - } - if (!is_last_stage) { - downs.emplace_back(downstream_id, 2); - } - } else if (is_backward) { - if (!is_last_stage) { - ups.emplace_back(downstream_id, 2); - } - if (!is_first_stage) { - downs.emplace_back(upstream_id, 2); - } - } - - for (auto up : ups) { - VLOG(3) << "Task(" << cur_id << ") AddUpstream Task(" << up.first - << ") with buff_size=" << up.second; - node->AddUpstreamTask(up.first, up.second); - } - for (auto down : downs) { - VLOG(3) << "Task(" << cur_id << ") AddDownstream Task(" << down.first - << ") with buff_size=" << down.second; - node->AddDownstreamTask(down.first, down.second); - } - } -} - -void RuntimeGraph::AssignTaskToIntercepter() { - for (const auto& task : task_nodes_) { - int64_t intercepter_id = task->task_id(); - VLOG(3) << "Runtime graph is assigning task to interceptor: " - << intercepter_id << " with type: " << task->type() << "."; - if (intercepter_id_to_node_.find(intercepter_id) != - intercepter_id_to_node_.end()) { - PADDLE_THROW(platform::errors::PreconditionNotMet( - "Repeated intercepter id: %d", intercepter_id)); - } - intercepter_id_to_node_.insert({intercepter_id, task.get()}); - } -} - -void RuntimeGraph::FakeRuntimeInfo() { - int64_t nrank = exe_desc_.cluster_info().size(); - int32_t num_of_functionality = functionality_order.size(); - for (int64_t i = 0; i < nrank; ++i) { - for (int32_t j = 0; j < num_of_functionality; ++j) { - int64_t intercepter_id = i * num_of_functionality + j; - intercepter_id_to_rank_.insert({intercepter_id, i}); - } - } -} std::string RuntimeGraph::DebugString() const { std::ostringstream os; os << "\nRuntime Graph Debug: \n"; - for (const auto& task : task_nodes_) { - os << task->DebugString(); + for (const auto& pair : intercepter_id_to_node_) { + os << pair.second->DebugString(); os << "\n"; } return os.str(); diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.h b/paddle/fluid/distributed/fleet_executor/runtime_graph.h index 9ffc9cc2cc137e8846957ba2d9feab5e8e7e4f43..3678e2e860a9d9e746c9d338bd55dea47cf8edc4 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.h +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.h @@ -22,21 +22,12 @@ #include "paddle/fluid/platform/macros.h" namespace paddle { -namespace framework { -class ProgramDesc; -class OperatorBase; -} - namespace distributed { class TaskNode; class RuntimeGraph final { public: - using ProgramDesc = paddle::framework::ProgramDesc; - using OperatorBase = paddle::framework::OperatorBase; RuntimeGraph() = default; - explicit RuntimeGraph(const ProgramDesc& program, - const FleetExecutorDesc& exe_desc); ~RuntimeGraph() = default; const std::unordered_map& intercepter_id_to_node() const { return intercepter_id_to_node_; @@ -56,18 +47,8 @@ class RuntimeGraph final { private: DISABLE_COPY_AND_ASSIGN(RuntimeGraph); - void SplitProgramBasedFunctionality(const ProgramDesc& program); - void FakeDependence(); - void AssignTaskToIntercepter(); - void FakeRuntimeInfo(); - void OriginProgramCompile(const ProgramDesc& program); - // LRSched, Forward, Backward, Optimize - static std::vector functionality_order; - std::vector> task_nodes_; - std::vector> ops_; std::unordered_map intercepter_id_to_node_; std::unordered_map intercepter_id_to_rank_; - FleetExecutorDesc exe_desc_; }; } // namespace distributed diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 9422774bb64253c843a10768a824a059731655cd..dba3388d18ffa66e490e478948aabf1e0616ebf0 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -89,7 +89,7 @@ def is_backward_op(op_role): (op_role == (int(OpRole.Backward) ^ int(OpRole.Loss))) -def one_f_one_b(program, cur_rank, max_run_times, dist_opt, nrank): +def run1f1b(program, cur_rank, max_run_times, dist_opt, nrank): """ Split the program to support 1f1b pipeline scheduler. This funct will split the program based on the op_role. @@ -201,3 +201,20 @@ def one_f_one_b(program, cur_rank, max_run_times, dist_opt, nrank): for j in range(num_of_functionality): task_id_to_rank[int(i * num_of_functionality + j)] = i return task_nodes, task_id_to_rank + + +def origin(program, cur_rank): + """ + Origin scheduler for fleet executor, supports non-pp mode + :param program: The origin program. + :param cur_rank: Current rank (can be got from fleet.worker_index()). + :return: + task_nodes (list): four task nodes for current rank + task_id_to_rank (dict): a fake dict, since there is no upstream or downstream, this dict won't be used + """ + print("fleet executor will use python side origin scheduler.") + task_node = core.TaskNode(program.desc, cur_rank, 1, 1) + task_node.set_type("Compute") + task_id = task_node.task_id() + task_id_to_rank = {task_id: cur_rank} + return [task_node], task_id_to_rank diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index b00449a747542f8ecff264a0673adab910c47bee..a65370d99a8a5c9cb852d014d636ef2e023489d6 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1972,31 +1972,37 @@ class Executor(object): rank_info.rank = rank rank_info.ip_port = endpoint fleet_exe_desc.cluster_info.append(rank_info) - if "dist_strategy" in fleet_opt: - fleet_exe_desc.dp_degree = fleet_opt["dist_strategy"]["dp_degree"] - fleet_exe_desc.mp_degree = fleet_opt["dist_strategy"]["mp_degree"] - fleet_exe_desc.pp_degree = fleet_opt["dist_strategy"]["pp_degree"] if "num_micro_batches" in fleet_opt: fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"] - num_of_gpu = fleet_exe_desc.dp_degree * fleet_exe_desc.mp_degree * fleet_exe_desc.pp_degree - assert nrank == num_of_gpu, "The number of rank is not equal to the number of gpu." - if 'python_side' in fleet_opt: - strategy = fleet_opt['python_side'] - if strategy == '1F1B': - from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b - tasks, task_id_to_rank = one_f_one_b( - program, cur_rank, - fleet_opt.get('num_micro_batches', 1), - fleet_opt.get('dist_strategy', {}), nrank) - # NOTE: have to hold these vars, otherwise will be destructed - fleet_opt['tasks'] = tasks - fleet_opt['task_id_to_rank'] = task_id_to_rank - else: - raise "Fleet_executor only supports 1F1B scheduler if you choose python side split, " \ - "but received " + str(strategy) + "." + assert 'scheduler' in fleet_opt, \ + "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin." + scheduler = fleet_opt['scheduler'] + if scheduler == '1F1B': + from paddle.distributed.fleet.fleet_executor_utils import run1f1b + if "dist_strategy" not in fleet_opt or \ + "pp_degree" not in fleet_opt["dist_strategy"] or \ + fleet_opt["dist_strategy"]["pp_degree"] == 1: + warnings.warn("Using 1F1B scheduler with pp_degree == 1.") + tasks, task_id_to_rank = run1f1b( + program, cur_rank, + fleet_opt.get('num_micro_batches', 1), + fleet_opt.get('dist_strategy', {}), nrank) + elif scheduler == 'Origin': + from paddle.distributed.fleet.fleet_executor_utils import origin + if "dist_strategy" in fleet_opt and \ + "pp_degree" in fleet_opt["dist_strategy"]: + assert fleet_opt["dist_strategy"]["pp_degree"] == 1, \ + "For pipeline mode, the scheduler should be 1F1B instead of Origin." + if "num_micro_batches" in fleet_opt: + assert fleet_opt["num_micro_batches"] == 1, \ + "For origin scheduler mode, the num micro batches should be 1." + tasks, task_id_to_rank = origin(program, cur_rank) else: - task_id_to_rank = fleet_opt.get("task_id_to_rank", {}) - tasks = fleet_opt.get("tasks", []) + raise "Fleet_executor only supports 1F1B and Origin scheduler, " \ + "but received " + str(scheduler) + "." + # NOTE: have to hold these vars, otherwise will be destructed + fleet_opt['tasks'] = tasks + fleet_opt['task_id_to_rank'] = task_id_to_rank fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString()) place = core.Place() place.set_place(self.place) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 5fdcd6d0a9d385cf4cd3238fda551a4da9beffdb..598ca24f5955030de4a15c6eb3a0a2eccc2dc065 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -146,6 +146,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_multi_devices) + LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_origin_scheduler) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_mapper) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_task_node) endif() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor.py b/python/paddle/fluid/tests/unittests/test_fleet_executor.py index 30ba376dd4dc5d608b672551fa0f6bf5c540e834..8b73a714bbbc5f04ff0fcdb665be40af673e34a6 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_executor.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor.py @@ -34,7 +34,7 @@ class TestFleetExecutor(unittest.TestCase): fleet_opt = { "dist_strategy": strategy.sharding_configs, "num_micro_batches": strategy.pipeline_configs["accumulate_steps"], - "python_side": "1F1B" + "scheduler": "1F1B" } return fleet_opt diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_scheduler.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_scheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..4bbb3bff07f974699624959cfb096d0d59c43f34 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_scheduler.py @@ -0,0 +1,87 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +import paddle +import paddle.fluid as fluid + +paddle.enable_static() + + +class TestFleetExecutor(unittest.TestCase): + def fake_fleet_opt(self): + # TODO: Fake for coverage will be removed in the future + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + strategy.sharding_configs = { + "dp_degree": 1, + "mp_degree": 1, + "pp_degree": 1 + } + strategy.pipeline_configs = {"accumulate_steps": 1} + fleet_opt = { + "dist_strategy": strategy.sharding_configs, + "num_micro_batches": strategy.pipeline_configs["accumulate_steps"], + "scheduler": "Origin" + } + return fleet_opt + + def run_fleet_executor(self, place, x_data, y_data): + exe = paddle.static.Executor(place) + empty_program = paddle.static.Program() + with fluid.program_guard(empty_program, empty_program): + x = fluid.layers.data( + name='x', shape=x_data.shape, dtype=x_data.dtype) + y = fluid.layers.data( + name='y', shape=y_data.shape, dtype=y_data.dtype) + z = x + y + a = 2 * x + 3 * y + loss = paddle.mean(a) + base_lr = 0.1 + passes = [30, 60, 80, 90] + steps_per_pass = 10 + bd = [steps_per_pass * p for p in passes] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + lr_val = paddle.optimizer.lr.PiecewiseDecay( + boundaries=bd, values=lr) + opt = paddle.optimizer.AdamW( + learning_rate=lr_val, + grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0)) + opt.minimize(loss) + # TODO: section_program will be removed in the future + empty_program._pipeline_opt = { + "fleet_opt": self.fake_fleet_opt(), + "section_program": empty_program + } + res = exe.run(empty_program, + feed={'x': x_data, + 'y': y_data}, + fetch_list=[z.name, a.name]) + return res + + def test_executor_on_single_device(self): + if fluid.is_compiled_with_cuda(): + shape = (10000, 3462) + x_data = np.random.rand(*shape) + y_data = np.random.rand(*shape) + z_data = x_data + y_data + a_data = 2 * x_data + 3 * y_data + res = self.run_fleet_executor(fluid.CUDAPlace(0), x_data, y_data) + self.assertTrue(np.allclose(res[0], z_data)) + self.assertTrue(np.allclose(res[1], a_data)) + + +if __name__ == "__main__": + unittest.main()