fleet_executor.cc 11.6 KB
Newer Older
L
LiYuRio 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14 15
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"

16
#include <algorithm>
17 18
#include <unordered_map>
#include <vector>
L
LiYuRio 已提交
19

20
#include "paddle/fluid/distributed/fleet_executor/global.h"
21
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
L
LiYuRio 已提交
22
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
23
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
24 25 26
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/op_registry.h"
27
#include "paddle/fluid/framework/operator.h"
L
LiYuRio 已提交
28
#include "paddle/fluid/framework/program_desc.h"
29
#include "paddle/fluid/framework/variable.h"
L
LiYuRio 已提交
30 31 32 33 34

namespace paddle {
namespace distributed {

FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
L
LiYuRio 已提交
35
  bool parse_flag = exe_desc_.ParseFromString(exe_desc_str);
36 37 38
  PADDLE_ENFORCE(parse_flag,
                 platform::errors::PreconditionNotMet(
                     "Error occurs while parsing string to proto"));
39
  // Message bus will be created and inited only once
40 41 42 43 44 45 46
  GlobalVal<MessageBus>::Create();
  InitMessageBus();
}

FleetExecutor::FleetExecutor(const FleetExecutorDesc& exe_desc)
    : exe_desc_(exe_desc) {
  // Message bus will be created and inited only once
47 48
  GlobalVal<MessageBus>::Create();
  InitMessageBus();
L
LiYuRio 已提交
49 50
}

51
FleetExecutor::~FleetExecutor() {
52 53
  for (const auto& carrier_id : carrier_ids_) {
    GlobalMap<std::string, Carrier>::Get(carrier_id)->Release();
54
  }
55
}
L
LiYuRio 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
namespace {
void GetSubBlockTask(const std::vector<TaskNode*>& tasks,
                     TaskNode* cur_task,
                     std::set<TaskNode*>* sub_block_task) {
  auto& downstream = cur_task->downstream();
  auto& id_to_dep_type = cur_task->id_to_dep_type();
  for (auto& down : downstream) {
    int64_t task_id = down.first;
    if (id_to_dep_type.at(task_id) == DependType::NORMAL) {
      for (const auto& task : tasks) {
        if (task->task_id() == task_id) {
          sub_block_task->emplace(task);
          GetSubBlockTask(tasks, task, sub_block_task);
        }
      }
72
    }
73
  }
74 75 76 77 78 79
}

void PreventVarsDelete(
    std::unordered_map<const framework::OperatorBase*,
                       std::vector<std::string>>* unused_vars,
    const std::vector<std::string>& vars_not_gc) {
80
  std::vector<const framework::OperatorBase*> changed_ops;
81 82

  for (const auto& pair : *unused_vars) {
83
    const framework::OperatorBase* op = pair.first;
84 85 86 87
    std::vector<std::string> cur_unused = pair.second;
    for (auto name : vars_not_gc) {
      auto iter = std::find(cur_unused.begin(), cur_unused.end(), name);
      if (iter != cur_unused.end()) {
88 89
        VLOG(3) << "Removing var: [" << name
                << "] from the unused vars list of op: [" << op->Type() << "]";
90
        cur_unused.erase(iter);
91 92 93 94 95 96 97 98
        if (std::find(changed_ops.begin(), changed_ops.end(), op) ==
            changed_ops.end()) {
          // record the op whose unused vars have been updated
          changed_ops.emplace_back(op);
        }
      }
    }
    // update the unused vars list in the map
99
    unused_vars->at(op) = cur_unused;
100 101
  }
  for (auto op : changed_ops) {
102
    const auto& iter = unused_vars->find(op);
103 104 105
    if (iter->second.empty()) {
      // remove those ops in the map that have empty unused vars list
      VLOG(3) << "Removing op: [" << op->Type() << "] from unused_vars map.";
106 107 108 109 110 111 112 113 114 115 116
      unused_vars->erase(iter);
    }
  }
}

std::vector<std::string> GetUnusedVarsAfterWhile(
    const framework::ProgramDesc& program_desc,
    const std::vector<std::string>& vars_not_gc) {
  // NOTE: Since while op won't appear in task node, in order to analyze
  // the vars which should be free after calling while op, we rebuild the
  // whole program and get the unused vars after calling while op.
L
LiYuRio 已提交
117 118
  // vars in parent block should not be free until the while op is finished.
  // The local vars will be free while running op in sub block.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
  // The unused vars above will be free in cond interceptor.
  std::vector<std::string> while_block_vars;
  std::vector<std::unique_ptr<framework::OperatorBase>> ops;
  for (const auto& desc : program_desc.Block(0).AllOps()) {
    ops.emplace_back(framework::OpRegistry::CreateOp(*desc));
  }
  auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {});
  PreventVarsDelete(&unused_vars, vars_not_gc);
  for (const auto& pair : unused_vars) {
    if (pair.first->Type() == "while") {
      for (const auto& var_name : pair.second) {
        while_block_vars.emplace_back(var_name);
      }
    }
  }
  return while_block_vars;
}

L
LiYuRio 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
std::unordered_map<const framework::OperatorBase*, std::vector<std::string>>
GetSubUnusedVars(const framework::ProgramDesc& program_desc,
                 const std::set<TaskNode*>& sub_block_tasks,
                 const std::vector<std::string>& vars_not_gc) {
  std::vector<std::unique_ptr<framework::OperatorBase>> ops;
  for (auto* task_node : sub_block_tasks) {
    for (const auto& op : task_node->ops()) {
      ops.emplace_back(std::unique_ptr<framework::OperatorBase>(op));
    }
  }
  auto unused_vars = framework::GetUnusedVars(program_desc.Block(1), ops, {});
  for (auto& unique_op : ops) {
    unique_op.release();
  }
  PreventVarsDelete(&unused_vars, vars_not_gc);
  return unused_vars;
}

155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
}  // namespace

void FleetExecutor::Init(
    const std::string& carrier_id,
    const framework::ProgramDesc& program_desc,
    framework::Scope* scope,
    const platform::Place& place,
    int64_t num_micro_batches,
    const std::vector<TaskNode*>& task_nodes,
    const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
    const std::vector<std::string>& inference_root_scope_vars,
    const std::vector<framework::Scope*>& micro_scope_list) {
  PADDLE_ENFORCE_GT(task_nodes.size(),
                    0,
                    platform::errors::InvalidArgument(
                        "Fleet executor is inited with empty task node"));
  // Set the unused var after running while op
  std::set<TaskNode*> sub_block_tasks;
  std::vector<std::string> while_block_vars;
  for (const auto& task_node : task_nodes) {
    if (task_node->type() == "Cond") {
      GetSubBlockTask(task_nodes, task_node, &sub_block_tasks);
L
LiYuRio 已提交
177 178 179 180 181 182 183
      while_block_vars =
          GetUnusedVarsAfterWhile(program_desc, inference_root_scope_vars);
      for (auto* task_node : sub_block_tasks) {
        for (auto iter : task_node->vars_to_dtype()) {
          while_block_vars.emplace_back(iter.first);
        }
      }
184 185 186 187 188 189 190 191 192 193 194
      VLOG(3) << "Vars will be gced after while op";
      for (auto var : while_block_vars) {
        VLOG(3) << var;
      }
      task_node->SetWhileBlockVars(while_block_vars);
    }
  }
  std::vector<framework::OperatorBase*> sub_block_ops;
  for (const auto& task_node : sub_block_tasks) {
    for (const auto& op : task_node->ops()) {
      sub_block_ops.emplace_back(op);
195 196
    }
  }
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
  // Analyse the unused vars in block 0. The operators in block 1
  // should be passed in first for prevent vars been released but removed soon.
  // Since the unused vars in block 1 need to analyse separately.
  std::vector<std::unique_ptr<framework::OperatorBase>> ops;
  for (const auto& task_node : task_nodes) {
    for (const auto& op : task_node->ops()) {
      ops.emplace_back(std::unique_ptr<framework::OperatorBase>(op));
    }
  }
  auto global_unused_vars =
      framework::GetUnusedVars(program_desc.Block(0), ops, {});

  for (auto& unique_op : ops) {
    unique_op.release();
  }

L
LiYuRio 已提交
213 214 215
  auto sub_unused_vars =
      GetSubUnusedVars(program_desc, sub_block_tasks, while_block_vars);

216 217 218 219 220
  // NOTE: For inference, the vars in inference_root_scope_vars
  // shouldn't be deleted during inf, for that they may be the result of the
  // inf. If they are GCed, it will cause error during ZeroCopy the result.
  PreventVarsDelete(&global_unused_vars, inference_root_scope_vars);

221 222 223
  runtime_graph_ = std::make_shared<RuntimeGraph>();
  std::unordered_map<int64_t, TaskNode*> interceptor_id_to_task;
  for (auto task_node : task_nodes) {
224 225
    if (sub_block_tasks.find(task_node) == sub_block_tasks.end()) {
      task_node->SetUnusedVars(global_unused_vars);
L
LiYuRio 已提交
226 227
    } else {
      task_node->SetUnusedVars(sub_unused_vars);
228
    }
229 230 231 232 233
    int64_t interceptor_id = task_node->task_id();
    interceptor_id_to_task.emplace(interceptor_id, task_node);
  }
  runtime_graph_->SetInterceptorIdToRank(task_id_to_rank);
  runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task);
234

235
  VLOG(5) << runtime_graph_->DebugString();
236 237 238
  Carrier* carrier =
      GlobalMap<std::string, Carrier>::Create(carrier_id, carrier_id);
  carrier_ids_.insert(carrier_id);
239 240
  // Set current running carrier
  GlobalVal<std::string>::Set(new std::string(carrier_id));
241 242 243 244 245
  InitCarrier(carrier,
              scope,
              place,
              num_micro_batches,
              program_desc,
246 247
              inference_root_scope_vars,
              micro_scope_list);
248
  GlobalVal<MessageBus>::Get()->Barrier();
249 250
}

251
void FleetExecutor::InitCarrier(
252 253 254 255 256
    Carrier* carrier,
    framework::Scope* scope,
    const platform::Place& place,
    int64_t num_micro_batches,
    const framework::ProgramDesc& program_desc,
257 258
    const std::vector<std::string>& inference_root_scope_vars,
    const std::vector<framework::Scope*>& micro_scope_list) {
259 260 261 262 263 264 265
  carrier->Init(exe_desc_.cur_rank(),
                runtime_graph_->interceptor_id_to_rank(),
                runtime_graph_->interceptor_id_to_node(),
                program_desc,
                scope,
                num_micro_batches,
                place,
266 267
                inference_root_scope_vars,
                micro_scope_list);
268 269
}

270 271 272 273 274 275 276
void FleetExecutor::InitMessageBus() {
  std::stringstream ss;
  ss << "\nThe DNS table of the message bus is: \n";
  int64_t cur_rank = exe_desc_.cur_rank();
  std::unordered_map<int64_t, std::string> rank_to_addr;
  std::string addr;
  for (const auto& rank_info : exe_desc_.cluster_info()) {
277
    // init the dns map
278 279 280 281 282 283 284 285
    int64_t rank = rank_info.rank();
    std::string ip_port = rank_info.ip_port();
    ss << rank << "\t->\t" << ip_port << "\n";
    rank_to_addr.insert(std::make_pair(rank, ip_port));
    if (rank == cur_rank) {
      addr = ip_port;
    }
  }
286 287
  if (addr == "") {
    PADDLE_ENFORCE_EQ(
288 289
        rank_to_addr.size(),
        1,
290 291 292
        platform::errors::NotFound("Empty address is not valid for "
                                   "paddle.distributed.launch method."));
    PADDLE_ENFORCE_EQ(
293 294
        cur_rank,
        0,
295 296 297 298 299 300
        platform::errors::NotFound("Address is empty but cur rank is not 0."));
  }
  VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is "
          << (addr == "" ? "empty" : addr) << ".";
  VLOG(3) << "The number of ranks are "
          << (rank_to_addr.size() == 0 ? 1 : rank_to_addr.size()) << ".";
301
  VLOG(5) << ss.str();
302
  GlobalVal<MessageBus>::Get()->Init(cur_rank, rank_to_addr, addr);
L
LiYuRio 已提交
303 304
}

305
void FleetExecutor::Run(const std::string& carrier_id) {
306 307 308 309 310 311 312
  Carrier* carrier = GlobalMap<std::string, Carrier>::Get(carrier_id);
  // Set current running carrier
  if (*GlobalVal<std::string>::Get() != carrier_id) {
    GlobalVal<std::string>::Set(new std::string(carrier_id));
    GlobalVal<MessageBus>::Get()->Barrier();
  }
  carrier->Start();
L
LiYuRio 已提交
313 314 315 316
}

}  // namespace distributed
}  // namespace paddle