communicator.cc 12.4 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/distributed/communicator.h"

Q
Qiao Longfei 已提交
17
#include <gflags/gflags.h>
18
#include <paddle/fluid/framework/program_desc.h>
Q
Qiao Longfei 已提交
19 20 21
#include <chrono>  // NOLINT
#include <thread>  // NOLINT

Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/eigen.h"
Q
Qiao Longfei 已提交
23 24 25 26 27 28
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"

29 30 31
DECLARE_int32(communicator_max_merge_var_num);
DECLARE_int32(communicator_send_queue_size);

Q
Qiao Longfei 已提交
32 33
DEFINE_bool(communicator_independent_recv_thread, true,
            "use an independent to recv vars from parameter server");
34
DEFINE_int32(communicator_min_send_grad_num_before_recv, 20,
35
             "max grad num to send before recv parameters");
36
DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv");
Q
Qiao Longfei 已提交
37 38 39
DEFINE_int32(communicator_send_wait_times, 5,
             "times that send thread will wait if merge num does not reach "
             "max_merge_var_num");
40 41
DEFINE_bool(communicator_fake_rpc, false,
            "fake mode does not really send any thing");
42 43
DEFINE_bool(communicator_merge_sparse_grad, true,
            "merge sparse gradient before sending");
Q
Qiao Longfei 已提交
44

Q
Qiao Longfei 已提交
45 46 47 48
namespace paddle {
namespace operators {
namespace distributed {

Q
Qiao Longfei 已提交
49 50 51 52 53 54
inline double GetCurrentUS() {
  struct timeval time;
  gettimeofday(&time, NULL);
  return 1e+6 * time.tv_sec + time.tv_usec;
}

T
tangwei12 已提交
55
std::once_flag Communicator::init_flag_;
56
std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
Q
can run  
Qiao Longfei 已提交
57

T
tangwei12 已提交
58 59 60 61 62 63 64
void AsyncCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx,
                                 const RpcCtxMap &recv_varname_to_ctx,
                                 Scope *recv_scope) {
  send_varname_to_ctx_ = std::move(send_varname_to_ctx);
  recv_varname_to_ctx_ = std::move(recv_varname_to_ctx);
  recv_scope_ = std::move(recv_scope);

Q
Qiao Longfei 已提交
65 66 67 68 69
  // get all send information from graph, build vars_to_send
  VLOG(0) << "communicator_independent_recv_thread: "
          << FLAGS_communicator_independent_recv_thread;
  VLOG(0) << "communicator_send_queue_size: "
          << FLAGS_communicator_send_queue_size;
70 71
  VLOG(0) << "communicator_min_send_grad_num_before_recv: "
          << FLAGS_communicator_min_send_grad_num_before_recv;
Q
Qiao Longfei 已提交
72 73
  VLOG(0) << "communicator_thread_pool_size: "
          << FLAGS_communicator_thread_pool_size;
74 75
  VLOG(0) << "communicator_send_wait_times: "
          << FLAGS_communicator_send_wait_times;
Q
Qiao Longfei 已提交
76
  VLOG(0) << "communicator_max_merge_var_num: "
77 78
          << FLAGS_communicator_max_merge_var_num;
  VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc;
79 80
  VLOG(0) << "communicator_merge_sparse_grad: "
          << FLAGS_communicator_merge_sparse_grad;
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

  if (send_varname_to_ctx.size() == 0) {
    VLOG(0) << "nothing need to be send, will not start send_thread";
  } else {
    send_scope_.reset(new Scope());
    for (auto &iter : send_varname_to_ctx_) {
      send_varname_to_queue_[iter.first] =
          std::make_shared<BlockingQueue<std::shared_ptr<Variable>>>(
              FLAGS_communicator_send_queue_size);
    }
    send_threadpool_.reset(
        new ::ThreadPool(FLAGS_communicator_thread_pool_size));
  }

  if (recv_varname_to_ctx.size() == 0) {
    VLOG(0) << "nothing need to be received, will not start recv_thread";
  } else {
    recv_threadpool_.reset(
        new ::ThreadPool(FLAGS_communicator_thread_pool_size));
Q
Qiao Longfei 已提交
100 101 102
  }
}

T
tangwei12 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
void AsyncCommunicator::InitImpl(const paddle::framework::ProgramDesc &program,
                                 Scope *param_scope) {
  using RpcCtxMap = operators::distributed::RpcCtxMap;
  VLOG(3) << "ProcessGraph";
  RpcCtxMap send_varname_to_ctx;
  RpcCtxMap recv_varname_to_ctx;
  for (auto *op : program.Block(0).AllOps()) {
    VLOG(3) << "node name " << op->Type();
    if (op->Type() == "send") {
      auto send_var_name = op->Input("X")[0];
      auto send_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("send_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto height_section =
          boost::get<std::vector<int64_t>>(op->GetNullableAttr("sections"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext(
          send_var_name, send_varnames, epmap, height_section, trainer_id);
      VLOG(3) << "find and init an send op: "
              << send_varname_to_ctx[send_var_name];
    } else if (op->Type() == "recv") {
      auto do_not_run = boost::get<int>(op->GetNullableAttr("do_not_run"));
      PADDLE_ENFORCE_GT(do_not_run, 0, "recv should not run!");
      auto recv_var_name = op->Output("Out")[0];
      auto recv_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("recv_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext(
          recv_var_name, recv_varnames, epmap, {}, trainer_id);
    }
  }

  // init communicator here
  if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) {
    LOG(WARNING) << "no var need to send and recv!!";
  }

  operators::distributed::AsyncCommunicator::InitImpl(
      send_varname_to_ctx, recv_varname_to_ctx, param_scope);
}

AsyncCommunicator::~AsyncCommunicator() {
148 149 150 151
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
152 153 154
  running_ = false;
  if (send_thread_) send_thread_->join();
  if (recv_thread_) recv_thread_->join();
155 156 157 158
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator done");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
159 160
}

T
tangwei12 已提交
161
void AsyncCommunicator::SendThread() {
Q
Qiao Longfei 已提交
162
  VLOG(3) << "SendThread start!";
Q
Qiao Longfei 已提交
163 164 165
  while (running_) {
    std::vector<std::future<void>> task_futures;
    task_futures.reserve(send_varname_to_ctx_.size());
Q
Qiao Longfei 已提交
166
    VLOG(3) << "run send graph";
Q
Qiao Longfei 已提交
167
    auto before_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
168
    for (auto &iter : send_varname_to_queue_) {
Q
Qiao Longfei 已提交
169 170
      auto &var_name = iter.first;
      auto &var_queue = iter.second;
Q
Qiao Longfei 已提交
171
      if (var_queue->Size() > 0) {
Q
Qiao Longfei 已提交
172
        auto send_task = [this, &var_name, &var_queue] {
Q
Qiao Longfei 已提交
173
          VLOG(3) << var_name << " merge and send";
Q
Qiao Longfei 已提交
174 175
          std::vector<std::shared_ptr<Variable>> vars;
          size_t merged_var_num = 0;
Q
Qiao Longfei 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
          size_t wait_times = 0;
          while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
            if (var_queue->Size() == 0) {
              VLOG(3) << "wait_times -> " << wait_times;
              if (wait_times >= FLAGS_communicator_send_wait_times) {
                break;
              }
              std::this_thread::sleep_for(std::chrono::milliseconds(10));
              wait_times++;
              continue;
            } else {
              wait_times = 0;

              vars.push_back(var_queue->Pop());
              // only count the send number of the first var
              if (var_name == send_varname_to_queue_.begin()->first) {
                grad_num_.fetch_add(1, std::memory_order_relaxed);
              }
              merged_var_num++;
195
            }
Q
Qiao Longfei 已提交
196
          }
Q
Qiao Longfei 已提交
197
          auto before_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
198
          MergeVars(var_name, vars, send_scope_.get());
Q
Qiao Longfei 已提交
199
          auto after_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
200 201
          VLOG(3) << "merge " << merged_var_num << " " << var_name
                  << " use time " << after_merge - before_merge;
Q
Qiao Longfei 已提交
202 203
          auto send_functor = distributed::ParameterSend<float>();
          auto &ctx = send_varname_to_ctx_.at(var_name);
204 205 206
          if (!FLAGS_communicator_fake_rpc) {
            send_functor(ctx, *send_scope_, true);
          }
Q
Qiao Longfei 已提交
207 208 209
          auto after_send = GetCurrentUS();
          VLOG(3) << "send " << var_name << " use time "
                  << after_send - after_merge;
Q
Qiao Longfei 已提交
210 211 212
        };
        task_futures.emplace_back(
            send_threadpool_->enqueue(std::move(send_task)));
Q
Qiao Longfei 已提交
213
      } else {
214
        VLOG(4) << var_name << " queue empty";
Q
Qiao Longfei 已提交
215
      }
Q
Qiao Longfei 已提交
216 217 218
    }
    for (auto &task_f : task_futures) {
      task_f.wait();
Q
Qiao Longfei 已提交
219
    }
Q
Qiao Longfei 已提交
220
    auto after_run_send_graph = GetCurrentUS();
221 222 223

    VLOG(3) << "run send graph use time "
            << after_run_send_graph - before_run_send_graph;
T
tangwei12 已提交
224
    Recv();
Q
Qiao Longfei 已提交
225
  }
226
  VLOG(0) << "communicator stopped, send thread exit";
Q
Qiao Longfei 已提交
227 228
}

T
tangwei12 已提交
229
void AsyncCommunicator::RecvThread() {
Q
Qiao Longfei 已提交
230
  VLOG(3) << "RecvThread start!";
Q
Qiao Longfei 已提交
231
  while (running_) {
232
    auto grad_num = grad_num_.load();
233
    if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
234 235 236 237 238 239
      VLOG(1) << "current grad num " << grad_num;
      RecvAll();
      grad_num_.store(0);
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
Q
Qiao Longfei 已提交
240
  }
241
  VLOG(0) << "communicator stopped, recv thread exit";
Q
Qiao Longfei 已提交
242 243
}

T
tangwei12 已提交
244 245
void AsyncCommunicator::Send(const std::string &var_name,
                             const framework::Scope &scope) {
Q
Qiao Longfei 已提交
246 247 248 249
  VLOG(3) << "communicator send " << var_name;
  // push var into send queue by var_name
  auto *grad_var = scope.FindVar(var_name);
  PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited");
250 251 252 253 254 255 256 257 258 259 260 261 262 263
  if (grad_var->IsType<framework::SelectedRows>() &&
      !FLAGS_communicator_merge_sparse_grad) {
    auto send_functor = distributed::ParameterSend<float>();
    auto &ctx = send_varname_to_ctx_.at(var_name);
    if (!FLAGS_communicator_fake_rpc) {
      send_functor(ctx, scope, true);
    }
  } else {
    auto tmp_grad_var = std::make_shared<Variable>();
    framework::CopyVariable(*grad_var, tmp_grad_var.get());
    auto &queue = send_varname_to_queue_.at(var_name);
    VLOG(3) << "send " << var_name << " queue size " << queue->Size();
    queue->Push(tmp_grad_var);
  }
Q
Qiao Longfei 已提交
264 265
}

T
tangwei12 已提交
266 267 268
void AsyncCommunicator::Recv() {
  if (FLAGS_communicator_independent_recv_thread) {
    return;
269 270
  }

T
tangwei12 已提交
271 272 273 274 275 276
  auto grad_num = grad_num_.load();
  if (grad_num > 0) {
    RecvAll();
    grad_num_.store(0);
  } else {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
277 278 279
  }
}

T
tangwei12 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
void AsyncCommunicator::RecvAll() {
  VLOG(3) << "parallel run recv graph";
  if (!running_) return;
  auto before_send = GetCurrentUS();
  std::vector<std::future<void>> task_futures;
  task_futures.reserve(recv_varname_to_ctx_.size());
  for (auto &iter : recv_varname_to_ctx_) {
    auto recv_task = [this, &iter] {
      auto &var_name = iter.first;
      VLOG(4) << "recv var " << var_name;
      auto recv_functor = distributed::ParameterRecv<float>();
      if (!FLAGS_communicator_fake_rpc) {
        recv_functor(iter.second, *recv_scope_);
      }
    };
    task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
  }
  for (auto &task : task_futures) {
    task.wait();
  }
  auto after_recv = GetCurrentUS();
  VLOG(1) << "run recv graph use time " << after_recv - before_send;
302 303
}

T
tangwei12 已提交
304
void AsyncCommunicator::Start() {
305 306 307 308 309 310 311 312
  VLOG(0) << "Communicator start";
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    VLOG(1) << "start send thread and recv thread";
    running_ = true;
    // start send and recv thread
    send_thread_.reset(
T
tangwei12 已提交
313
        new std::thread(std::bind(&AsyncCommunicator::SendThread, this)));
314 315
    if (FLAGS_communicator_independent_recv_thread) {
      recv_thread_.reset(
T
tangwei12 已提交
316
          new std::thread(std::bind(&AsyncCommunicator::RecvThread, this)));
317 318 319 320
    }
  }
}

T
tangwei12 已提交
321
void AsyncCommunicator::Stop() {
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
  VLOG(0) << "Communicator stop";
  running_ = false;
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    if (send_thread_) {
      VLOG(1) << "stop send thread";
      send_thread_->join();
      send_thread_.reset(nullptr);
    }
    if (recv_thread_) {
      VLOG(1) << "stop recv thread";
      recv_thread_->join();
      recv_thread_.reset(nullptr);
    }
Q
Qiao Longfei 已提交
337
  }
338
  VLOG(0) << "Communicator stop done";
Q
Qiao Longfei 已提交
339 340 341 342 343
}

}  // namespace distributed
}  // namespace operators
}  // namespace paddle