communicator.cc 11.6 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/distributed/communicator.h"

Q
Qiao Longfei 已提交
17
#include <gflags/gflags.h>
18
#include <paddle/fluid/framework/program_desc.h>
Q
Qiao Longfei 已提交
19 20 21
#include <chrono>  // NOLINT
#include <thread>  // NOLINT

Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/eigen.h"
Q
Qiao Longfei 已提交
23 24 25 26 27 28
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"

Q
Qiao Longfei 已提交
29 30 31 32
DEFINE_bool(communicator_independent_recv_thread, true,
            "use an independent to recv vars from parameter server");
DEFINE_int32(communicator_send_queue_size, 20,
             "queue size to recv gradient before send");
33
DEFINE_int32(communicator_min_send_grad_num_before_recv, 20,
34
             "max grad num to send before recv parameters");
35
DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv");
Q
Qiao Longfei 已提交
36 37 38
DEFINE_int32(communicator_send_wait_times, 5,
             "times that send thread will wait if merge num does not reach "
             "max_merge_var_num");
39 40 41 42
DEFINE_int32(communicator_max_merge_var_num, 20,
             "max var num to merge and send");
DEFINE_bool(communicator_fake_rpc, false,
            "fake mode does not really send any thing");
Q
Qiao Longfei 已提交
43

Q
Qiao Longfei 已提交
44 45 46 47
namespace paddle {
namespace operators {
namespace distributed {

Q
Qiao Longfei 已提交
48 49 50 51 52 53
inline double GetCurrentUS() {
  struct timeval time;
  gettimeofday(&time, NULL);
  return 1e+6 * time.tv_sec + time.tv_usec;
}

54
std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
Q
can run  
Qiao Longfei 已提交
55

Q
Qiao Longfei 已提交
56 57 58 59 60 61 62 63 64 65 66
Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx,
                           const RpcCtxMap &recv_varname_to_ctx,
                           Scope *recv_scope)
    : send_varname_to_ctx_(send_varname_to_ctx),
      recv_varname_to_ctx_(recv_varname_to_ctx),
      recv_scope_(recv_scope) {
  // get all send information from graph, build vars_to_send
  VLOG(0) << "communicator_independent_recv_thread: "
          << FLAGS_communicator_independent_recv_thread;
  VLOG(0) << "communicator_send_queue_size: "
          << FLAGS_communicator_send_queue_size;
67 68
  VLOG(0) << "communicator_min_send_grad_num_before_recv: "
          << FLAGS_communicator_min_send_grad_num_before_recv;
Q
Qiao Longfei 已提交
69 70
  VLOG(0) << "communicator_thread_pool_size: "
          << FLAGS_communicator_thread_pool_size;
71 72
  VLOG(0) << "communicator_send_wait_times: "
          << FLAGS_communicator_send_wait_times;
Q
Qiao Longfei 已提交
73
  VLOG(0) << "communicator_max_merge_var_num: "
74 75
          << FLAGS_communicator_max_merge_var_num;
  VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc;
Q
Qiao Longfei 已提交
76 77 78 79 80 81 82 83 84 85 86
  send_scope_.reset(new Scope());
  for (auto &iter : send_varname_to_ctx_) {
    send_varname_to_queue_[iter.first] =
        std::make_shared<BlockingQueue<std::shared_ptr<Variable>>>(
            FLAGS_communicator_send_queue_size);
  }
  send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
  recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
}

Communicator::~Communicator() {
87 88 89 90
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
91 92 93
  running_ = false;
  if (send_thread_) send_thread_->join();
  if (recv_thread_) recv_thread_->join();
94 95 96 97
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator done");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
98 99
}

Q
Qiao Longfei 已提交
100
void Communicator::SendThread() {
Q
Qiao Longfei 已提交
101
  VLOG(3) << "SendThread start!";
Q
Qiao Longfei 已提交
102 103 104
  while (running_) {
    std::vector<std::future<void>> task_futures;
    task_futures.reserve(send_varname_to_ctx_.size());
Q
Qiao Longfei 已提交
105
    VLOG(3) << "run send graph";
Q
Qiao Longfei 已提交
106
    auto before_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
107
    for (auto &iter : send_varname_to_queue_) {
Q
Qiao Longfei 已提交
108 109
      auto &var_name = iter.first;
      auto &var_queue = iter.second;
Q
Qiao Longfei 已提交
110
      if (var_queue->Size() > 0) {
Q
Qiao Longfei 已提交
111
        auto send_task = [this, &var_name, &var_queue] {
Q
Qiao Longfei 已提交
112
          VLOG(3) << var_name << " merge and send";
Q
Qiao Longfei 已提交
113 114
          std::vector<std::shared_ptr<Variable>> vars;
          size_t merged_var_num = 0;
Q
Qiao Longfei 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
          size_t wait_times = 0;
          while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
            if (var_queue->Size() == 0) {
              VLOG(3) << "wait_times -> " << wait_times;
              if (wait_times >= FLAGS_communicator_send_wait_times) {
                break;
              }
              std::this_thread::sleep_for(std::chrono::milliseconds(10));
              wait_times++;
              continue;
            } else {
              wait_times = 0;

              vars.push_back(var_queue->Pop());
              // only count the send number of the first var
              if (var_name == send_varname_to_queue_.begin()->first) {
                grad_num_.fetch_add(1, std::memory_order_relaxed);
              }
              merged_var_num++;
134
            }
Q
Qiao Longfei 已提交
135
          }
Q
Qiao Longfei 已提交
136
          auto before_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
137
          MergeVars(var_name, vars, send_scope_.get());
Q
Qiao Longfei 已提交
138
          auto after_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
139 140
          VLOG(3) << "merge " << merged_var_num << " " << var_name
                  << " use time " << after_merge - before_merge;
Q
Qiao Longfei 已提交
141 142
          auto send_functor = distributed::ParameterSend<float>();
          auto &ctx = send_varname_to_ctx_.at(var_name);
143 144 145
          if (!FLAGS_communicator_fake_rpc) {
            send_functor(ctx, *send_scope_, true);
          }
Q
Qiao Longfei 已提交
146 147 148
          auto after_send = GetCurrentUS();
          VLOG(3) << "send " << var_name << " use time "
                  << after_send - after_merge;
Q
Qiao Longfei 已提交
149 150 151
        };
        task_futures.emplace_back(
            send_threadpool_->enqueue(std::move(send_task)));
Q
Qiao Longfei 已提交
152
      } else {
153
        VLOG(4) << var_name << " queue empty";
Q
Qiao Longfei 已提交
154
      }
Q
Qiao Longfei 已提交
155 156 157
    }
    for (auto &task_f : task_futures) {
      task_f.wait();
Q
Qiao Longfei 已提交
158
    }
Q
Qiao Longfei 已提交
159
    auto after_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
160
    auto send_graph_use_time = after_run_send_graph - before_run_send_graph;
161
    if (send_graph_use_time > 100) {
Q
Qiao Longfei 已提交
162 163 164
      VLOG(1) << "run send graph use time "
              << after_run_send_graph - before_run_send_graph;
    }
Q
Qiao Longfei 已提交
165 166 167
    if (!FLAGS_communicator_independent_recv_thread) {
      RecvAll();
    }
Q
Qiao Longfei 已提交
168
  }
169
  VLOG(0) << "communicator stopped, send thread exit";
Q
Qiao Longfei 已提交
170 171
}

Q
Qiao Longfei 已提交
172 173
void Communicator::RecvAll() {
  VLOG(3) << "parallel run recv graph";
174
  if (!running_) return;
Q
Qiao Longfei 已提交
175
  auto before_send = GetCurrentUS();
Q
Qiao Longfei 已提交
176 177 178 179 180
  std::vector<std::future<void>> task_futures;
  task_futures.reserve(recv_varname_to_ctx_.size());
  for (auto &iter : recv_varname_to_ctx_) {
    auto recv_task = [this, &iter] {
      auto &var_name = iter.first;
181
      VLOG(4) << "recv var " << var_name;
Q
Qiao Longfei 已提交
182
      auto recv_functor = distributed::ParameterRecv<float>();
183 184 185
      if (!FLAGS_communicator_fake_rpc) {
        recv_functor(iter.second, *recv_scope_);
      }
Q
Qiao Longfei 已提交
186 187 188 189 190 191
    };
    task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
  }
  for (auto &task : task_futures) {
    task.wait();
  }
Q
Qiao Longfei 已提交
192
  auto after_recv = GetCurrentUS();
Q
Qiao Longfei 已提交
193
  VLOG(1) << "run recv graph use time " << after_recv - before_send;
Q
Qiao Longfei 已提交
194 195
}

Q
Qiao Longfei 已提交
196
void Communicator::RecvThread() {
Q
Qiao Longfei 已提交
197
  VLOG(3) << "RecvThread start!";
Q
Qiao Longfei 已提交
198
  while (running_) {
199
    auto grad_num = grad_num_.load();
200
    if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
201 202 203 204 205 206
      VLOG(1) << "current grad num " << grad_num;
      RecvAll();
      grad_num_.store(0);
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
Q
Qiao Longfei 已提交
207
  }
208
  VLOG(0) << "communicator stopped, recv thread exit";
Q
Qiao Longfei 已提交
209 210 211 212
}

void Communicator::Send(const std::string &var_name,
                        const framework::Scope &scope) {
Q
Qiao Longfei 已提交
213 214 215 216 217 218 219 220 221
  VLOG(3) << "communicator send " << var_name;
  // push var into send queue by var_name
  auto *grad_var = scope.FindVar(var_name);
  PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited");
  auto tmp_grad_var = std::make_shared<Variable>();
  framework::CopyVariable(*grad_var, tmp_grad_var.get());
  auto &queue = send_varname_to_queue_.at(var_name);
  VLOG(3) << "send " << var_name << " queue size " << queue->Size();
  queue->Push(tmp_grad_var);
Q
Qiao Longfei 已提交
222 223
}

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
void Communicator::Init(const paddle::framework::ProgramDesc &program,
                        Scope *param_scope) {
  using RpcCtxMap = operators::distributed::RpcCtxMap;
  VLOG(3) << "ProcessGraph";
  RpcCtxMap send_varname_to_ctx;
  RpcCtxMap recv_varname_to_ctx;
  for (auto *op : program.Block(0).AllOps()) {
    VLOG(3) << "node name " << op->Type();
    if (op->Type() == "send") {
      auto send_var_name = op->Input("X")[0];
      auto send_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("send_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto height_section =
          boost::get<std::vector<int64_t>>(op->GetNullableAttr("sections"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext(
          send_var_name, send_varnames, epmap, height_section, trainer_id);
      VLOG(3) << "find and init an send op: "
              << send_varname_to_ctx[send_var_name];
    } else if (op->Type() == "recv") {
      auto do_not_run = boost::get<int>(op->GetNullableAttr("do_not_run"));
      PADDLE_ENFORCE_GT(do_not_run, 0, "recv should not run!");
      auto recv_var_name = op->Output("Out")[0];
      auto recv_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("recv_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext(
          recv_var_name, recv_varnames, epmap, {}, trainer_id);
    }
  }

  // init communicator here
  if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) {
    LOG(WARNING) << "no var need to send and recv!!";
  }
  operators::distributed::Communicator::Init(send_varname_to_ctx,
                                             recv_varname_to_ctx, param_scope);
}

Q
can run  
Qiao Longfei 已提交
267 268
Communicator *Communicator::GetInstance() { return communicator_.get(); }

269 270 271 272
std::shared_ptr<Communicator> Communicator::GetInstantcePtr() {
  return communicator_;
}

Q
Qiao Longfei 已提交
273
void Communicator::Start() {
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
  VLOG(0) << "Communicator start";
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    VLOG(1) << "start send thread and recv thread";
    running_ = true;
    // start send and recv thread
    send_thread_.reset(
        new std::thread(std::bind(&Communicator::SendThread, this)));
    if (FLAGS_communicator_independent_recv_thread) {
      recv_thread_.reset(
          new std::thread(std::bind(&Communicator::RecvThread, this)));
    }
  }
}

void Communicator::Stop() {
  VLOG(0) << "Communicator stop";
  running_ = false;
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    if (send_thread_) {
      VLOG(1) << "stop send thread";
      send_thread_->join();
      send_thread_.reset(nullptr);
    }
    if (recv_thread_) {
      VLOG(1) << "stop recv thread";
      recv_thread_->join();
      recv_thread_.reset(nullptr);
    }
Q
Qiao Longfei 已提交
306
  }
307
  VLOG(0) << "Communicator stop done";
Q
Qiao Longfei 已提交
308 309 310 311 312
}

}  // namespace distributed
}  // namespace operators
}  // namespace paddle