communicator.cc 12.4 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/distributed/communicator.h"

Q
Qiao Longfei 已提交
17
#include <gflags/gflags.h>
18
#include <paddle/fluid/framework/program_desc.h>
Q
Qiao Longfei 已提交
19 20 21
#include <chrono>  // NOLINT
#include <thread>  // NOLINT

Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/eigen.h"
Q
Qiao Longfei 已提交
23 24 25 26 27 28
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"

29 30 31
DECLARE_int32(communicator_max_merge_var_num);
DECLARE_int32(communicator_send_queue_size);

Q
Qiao Longfei 已提交
32 33
DEFINE_bool(communicator_independent_recv_thread, true,
            "use an independent to recv vars from parameter server");
34
DEFINE_int32(communicator_min_send_grad_num_before_recv, 20,
35
             "max grad num to send before recv parameters");
36
DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv");
Q
Qiao Longfei 已提交
37 38 39
DEFINE_int32(communicator_send_wait_times, 5,
             "times that send thread will wait if merge num does not reach "
             "max_merge_var_num");
40 41
DEFINE_bool(communicator_fake_rpc, false,
            "fake mode does not really send any thing");
42 43
DEFINE_bool(communicator_merge_sparse_grad, true,
            "merge sparse gradient before sending");
Q
Qiao Longfei 已提交
44

Q
Qiao Longfei 已提交
45 46 47 48
namespace paddle {
namespace operators {
namespace distributed {

Q
Qiao Longfei 已提交
49 50 51 52 53 54
inline double GetCurrentUS() {
  struct timeval time;
  gettimeofday(&time, NULL);
  return 1e+6 * time.tv_sec + time.tv_usec;
}

55
std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
Q
can run  
Qiao Longfei 已提交
56

Q
Qiao Longfei 已提交
57 58 59 60 61 62 63 64 65 66 67
Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx,
                           const RpcCtxMap &recv_varname_to_ctx,
                           Scope *recv_scope)
    : send_varname_to_ctx_(send_varname_to_ctx),
      recv_varname_to_ctx_(recv_varname_to_ctx),
      recv_scope_(recv_scope) {
  // get all send information from graph, build vars_to_send
  VLOG(0) << "communicator_independent_recv_thread: "
          << FLAGS_communicator_independent_recv_thread;
  VLOG(0) << "communicator_send_queue_size: "
          << FLAGS_communicator_send_queue_size;
68 69
  VLOG(0) << "communicator_min_send_grad_num_before_recv: "
          << FLAGS_communicator_min_send_grad_num_before_recv;
Q
Qiao Longfei 已提交
70 71
  VLOG(0) << "communicator_thread_pool_size: "
          << FLAGS_communicator_thread_pool_size;
72 73
  VLOG(0) << "communicator_send_wait_times: "
          << FLAGS_communicator_send_wait_times;
Q
Qiao Longfei 已提交
74
  VLOG(0) << "communicator_max_merge_var_num: "
75 76
          << FLAGS_communicator_max_merge_var_num;
  VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc;
77 78
  VLOG(0) << "communicator_merge_sparse_grad: "
          << FLAGS_communicator_merge_sparse_grad;
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97

  if (send_varname_to_ctx.size() == 0) {
    VLOG(0) << "nothing need to be send, will not start send_thread";
  } else {
    send_scope_.reset(new Scope());
    for (auto &iter : send_varname_to_ctx_) {
      send_varname_to_queue_[iter.first] =
          std::make_shared<BlockingQueue<std::shared_ptr<Variable>>>(
              FLAGS_communicator_send_queue_size);
    }
    send_threadpool_.reset(
        new ::ThreadPool(FLAGS_communicator_thread_pool_size));
  }

  if (recv_varname_to_ctx.size() == 0) {
    VLOG(0) << "nothing need to be received, will not start recv_thread";
  } else {
    recv_threadpool_.reset(
        new ::ThreadPool(FLAGS_communicator_thread_pool_size));
Q
Qiao Longfei 已提交
98 99 100 101
  }
}

Communicator::~Communicator() {
102 103 104 105
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
106 107 108
  running_ = false;
  if (send_thread_) send_thread_->join();
  if (recv_thread_) recv_thread_->join();
109 110 111 112
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator done");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
113 114
}

Q
Qiao Longfei 已提交
115
void Communicator::SendThread() {
Q
Qiao Longfei 已提交
116
  VLOG(3) << "SendThread start!";
Q
Qiao Longfei 已提交
117 118 119
  while (running_) {
    std::vector<std::future<void>> task_futures;
    task_futures.reserve(send_varname_to_ctx_.size());
Q
Qiao Longfei 已提交
120
    VLOG(3) << "run send graph";
Q
Qiao Longfei 已提交
121
    auto before_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
122
    for (auto &iter : send_varname_to_queue_) {
Q
Qiao Longfei 已提交
123 124
      auto &var_name = iter.first;
      auto &var_queue = iter.second;
Q
Qiao Longfei 已提交
125
      if (var_queue->Size() > 0) {
Q
Qiao Longfei 已提交
126
        auto send_task = [this, &var_name, &var_queue] {
Q
Qiao Longfei 已提交
127
          VLOG(3) << var_name << " merge and send";
Q
Qiao Longfei 已提交
128 129
          std::vector<std::shared_ptr<Variable>> vars;
          size_t merged_var_num = 0;
Q
Qiao Longfei 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
          size_t wait_times = 0;
          while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
            if (var_queue->Size() == 0) {
              VLOG(3) << "wait_times -> " << wait_times;
              if (wait_times >= FLAGS_communicator_send_wait_times) {
                break;
              }
              std::this_thread::sleep_for(std::chrono::milliseconds(10));
              wait_times++;
              continue;
            } else {
              wait_times = 0;

              vars.push_back(var_queue->Pop());
              // only count the send number of the first var
              if (var_name == send_varname_to_queue_.begin()->first) {
                grad_num_.fetch_add(1, std::memory_order_relaxed);
              }
              merged_var_num++;
149
            }
Q
Qiao Longfei 已提交
150
          }
Q
Qiao Longfei 已提交
151
          auto before_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
152
          MergeVars(var_name, vars, send_scope_.get());
Q
Qiao Longfei 已提交
153
          auto after_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
154 155
          VLOG(3) << "merge " << merged_var_num << " " << var_name
                  << " use time " << after_merge - before_merge;
Q
Qiao Longfei 已提交
156 157
          auto send_functor = distributed::ParameterSend<float>();
          auto &ctx = send_varname_to_ctx_.at(var_name);
158 159 160
          if (!FLAGS_communicator_fake_rpc) {
            send_functor(ctx, *send_scope_, true);
          }
Q
Qiao Longfei 已提交
161 162 163
          auto after_send = GetCurrentUS();
          VLOG(3) << "send " << var_name << " use time "
                  << after_send - after_merge;
Q
Qiao Longfei 已提交
164 165 166
        };
        task_futures.emplace_back(
            send_threadpool_->enqueue(std::move(send_task)));
Q
Qiao Longfei 已提交
167
      } else {
168
        VLOG(4) << var_name << " queue empty";
Q
Qiao Longfei 已提交
169
      }
Q
Qiao Longfei 已提交
170 171 172
    }
    for (auto &task_f : task_futures) {
      task_f.wait();
Q
Qiao Longfei 已提交
173
    }
Q
Qiao Longfei 已提交
174
    auto after_run_send_graph = GetCurrentUS();
175 176 177 178

    VLOG(3) << "run send graph use time "
            << after_run_send_graph - before_run_send_graph;
    RecvNonIndependent();
Q
Qiao Longfei 已提交
179
  }
180
  VLOG(0) << "communicator stopped, send thread exit";
Q
Qiao Longfei 已提交
181 182
}

183
void Communicator::RecvNonIndependent() {
184
  if (FLAGS_communicator_independent_recv_thread) {
185 186 187 188 189 190 191 192 193 194 195 196
    return;
  }

  auto grad_num = grad_num_.load();
  if (grad_num > 0) {
    RecvAll();
    grad_num_.store(0);
  } else {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }
}

Q
Qiao Longfei 已提交
197 198
void Communicator::RecvAll() {
  VLOG(3) << "parallel run recv graph";
199
  if (!running_) return;
Q
Qiao Longfei 已提交
200
  auto before_send = GetCurrentUS();
Q
Qiao Longfei 已提交
201 202 203 204 205
  std::vector<std::future<void>> task_futures;
  task_futures.reserve(recv_varname_to_ctx_.size());
  for (auto &iter : recv_varname_to_ctx_) {
    auto recv_task = [this, &iter] {
      auto &var_name = iter.first;
206
      VLOG(4) << "recv var " << var_name;
Q
Qiao Longfei 已提交
207
      auto recv_functor = distributed::ParameterRecv<float>();
208 209 210
      if (!FLAGS_communicator_fake_rpc) {
        recv_functor(iter.second, *recv_scope_);
      }
Q
Qiao Longfei 已提交
211 212 213 214 215 216
    };
    task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
  }
  for (auto &task : task_futures) {
    task.wait();
  }
Q
Qiao Longfei 已提交
217
  auto after_recv = GetCurrentUS();
Q
Qiao Longfei 已提交
218
  VLOG(1) << "run recv graph use time " << after_recv - before_send;
Q
Qiao Longfei 已提交
219 220
}

Q
Qiao Longfei 已提交
221
void Communicator::RecvThread() {
Q
Qiao Longfei 已提交
222
  VLOG(3) << "RecvThread start!";
Q
Qiao Longfei 已提交
223
  while (running_) {
224
    auto grad_num = grad_num_.load();
225
    if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
226 227 228 229 230 231
      VLOG(1) << "current grad num " << grad_num;
      RecvAll();
      grad_num_.store(0);
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
Q
Qiao Longfei 已提交
232
  }
233
  VLOG(0) << "communicator stopped, recv thread exit";
Q
Qiao Longfei 已提交
234 235 236 237
}

void Communicator::Send(const std::string &var_name,
                        const framework::Scope &scope) {
Q
Qiao Longfei 已提交
238 239 240 241
  VLOG(3) << "communicator send " << var_name;
  // push var into send queue by var_name
  auto *grad_var = scope.FindVar(var_name);
  PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited");
242 243 244 245 246 247 248 249 250 251 252 253 254 255
  if (grad_var->IsType<framework::SelectedRows>() &&
      !FLAGS_communicator_merge_sparse_grad) {
    auto send_functor = distributed::ParameterSend<float>();
    auto &ctx = send_varname_to_ctx_.at(var_name);
    if (!FLAGS_communicator_fake_rpc) {
      send_functor(ctx, scope, true);
    }
  } else {
    auto tmp_grad_var = std::make_shared<Variable>();
    framework::CopyVariable(*grad_var, tmp_grad_var.get());
    auto &queue = send_varname_to_queue_.at(var_name);
    VLOG(3) << "send " << var_name << " queue size " << queue->Size();
    queue->Push(tmp_grad_var);
  }
Q
Qiao Longfei 已提交
256 257
}

258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
void Communicator::Init(const paddle::framework::ProgramDesc &program,
                        Scope *param_scope) {
  using RpcCtxMap = operators::distributed::RpcCtxMap;
  VLOG(3) << "ProcessGraph";
  RpcCtxMap send_varname_to_ctx;
  RpcCtxMap recv_varname_to_ctx;
  for (auto *op : program.Block(0).AllOps()) {
    VLOG(3) << "node name " << op->Type();
    if (op->Type() == "send") {
      auto send_var_name = op->Input("X")[0];
      auto send_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("send_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto height_section =
          boost::get<std::vector<int64_t>>(op->GetNullableAttr("sections"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext(
          send_var_name, send_varnames, epmap, height_section, trainer_id);
      VLOG(3) << "find and init an send op: "
              << send_varname_to_ctx[send_var_name];
    } else if (op->Type() == "recv") {
      auto do_not_run = boost::get<int>(op->GetNullableAttr("do_not_run"));
      PADDLE_ENFORCE_GT(do_not_run, 0, "recv should not run!");
      auto recv_var_name = op->Output("Out")[0];
      auto recv_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("recv_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext(
          recv_var_name, recv_varnames, epmap, {}, trainer_id);
    }
  }

  // init communicator here
  if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) {
    LOG(WARNING) << "no var need to send and recv!!";
  }
  operators::distributed::Communicator::Init(send_varname_to_ctx,
                                             recv_varname_to_ctx, param_scope);
}

Q
can run  
Qiao Longfei 已提交
301 302
Communicator *Communicator::GetInstance() { return communicator_.get(); }

303 304 305 306
std::shared_ptr<Communicator> Communicator::GetInstantcePtr() {
  return communicator_;
}

Q
Qiao Longfei 已提交
307
void Communicator::Start() {
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  VLOG(0) << "Communicator start";
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    VLOG(1) << "start send thread and recv thread";
    running_ = true;
    // start send and recv thread
    send_thread_.reset(
        new std::thread(std::bind(&Communicator::SendThread, this)));
    if (FLAGS_communicator_independent_recv_thread) {
      recv_thread_.reset(
          new std::thread(std::bind(&Communicator::RecvThread, this)));
    }
  }
}

void Communicator::Stop() {
  VLOG(0) << "Communicator stop";
  running_ = false;
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    if (send_thread_) {
      VLOG(1) << "stop send thread";
      send_thread_->join();
      send_thread_.reset(nullptr);
    }
    if (recv_thread_) {
      VLOG(1) << "stop recv thread";
      recv_thread_->join();
      recv_thread_.reset(nullptr);
    }
Q
Qiao Longfei 已提交
340
  }
341
  VLOG(0) << "Communicator stop done";
Q
Qiao Longfei 已提交
342 343 344 345 346
}

}  // namespace distributed
}  // namespace operators
}  // namespace paddle