communicator.cc 12.1 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/distributed/communicator.h"

Q
Qiao Longfei 已提交
17
#include <gflags/gflags.h>
18
#include <paddle/fluid/framework/program_desc.h>
Q
Qiao Longfei 已提交
19 20 21
#include <chrono>  // NOLINT
#include <thread>  // NOLINT

Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/eigen.h"
Q
Qiao Longfei 已提交
23 24 25 26 27 28
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"

Q
Qiao Longfei 已提交
29 30 31 32
DEFINE_bool(communicator_independent_recv_thread, true,
            "use an independent to recv vars from parameter server");
DEFINE_int32(communicator_send_queue_size, 20,
             "queue size to recv gradient before send");
33
DEFINE_int32(communicator_min_send_grad_num_before_recv, 20,
34
             "max grad num to send before recv parameters");
35
DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv");
Q
Qiao Longfei 已提交
36 37 38
DEFINE_int32(communicator_send_wait_times, 5,
             "times that send thread will wait if merge num does not reach "
             "max_merge_var_num");
39 40 41 42
DEFINE_int32(communicator_max_merge_var_num, 20,
             "max var num to merge and send");
DEFINE_bool(communicator_fake_rpc, false,
            "fake mode does not really send any thing");
43 44
DEFINE_bool(communicator_merge_sparse_grad, true,
            "merge sparse gradient before sending");
Q
Qiao Longfei 已提交
45

Q
Qiao Longfei 已提交
46 47 48 49
namespace paddle {
namespace operators {
namespace distributed {

Q
Qiao Longfei 已提交
50 51 52 53 54 55
inline double GetCurrentUS() {
  struct timeval time;
  gettimeofday(&time, NULL);
  return 1e+6 * time.tv_sec + time.tv_usec;
}

56
std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
Q
can run  
Qiao Longfei 已提交
57

Q
Qiao Longfei 已提交
58 59 60 61 62 63 64 65 66 67 68
Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx,
                           const RpcCtxMap &recv_varname_to_ctx,
                           Scope *recv_scope)
    : send_varname_to_ctx_(send_varname_to_ctx),
      recv_varname_to_ctx_(recv_varname_to_ctx),
      recv_scope_(recv_scope) {
  // get all send information from graph, build vars_to_send
  VLOG(0) << "communicator_independent_recv_thread: "
          << FLAGS_communicator_independent_recv_thread;
  VLOG(0) << "communicator_send_queue_size: "
          << FLAGS_communicator_send_queue_size;
69 70
  VLOG(0) << "communicator_min_send_grad_num_before_recv: "
          << FLAGS_communicator_min_send_grad_num_before_recv;
Q
Qiao Longfei 已提交
71 72
  VLOG(0) << "communicator_thread_pool_size: "
          << FLAGS_communicator_thread_pool_size;
73 74
  VLOG(0) << "communicator_send_wait_times: "
          << FLAGS_communicator_send_wait_times;
Q
Qiao Longfei 已提交
75
  VLOG(0) << "communicator_max_merge_var_num: "
76 77
          << FLAGS_communicator_max_merge_var_num;
  VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc;
78 79
  VLOG(0) << "communicator_merge_sparse_grad: "
          << FLAGS_communicator_merge_sparse_grad;
Q
Qiao Longfei 已提交
80 81 82 83 84 85 86 87 88 89 90
  send_scope_.reset(new Scope());
  for (auto &iter : send_varname_to_ctx_) {
    send_varname_to_queue_[iter.first] =
        std::make_shared<BlockingQueue<std::shared_ptr<Variable>>>(
            FLAGS_communicator_send_queue_size);
  }
  send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
  recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
}

Communicator::~Communicator() {
91 92 93 94
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
95 96 97
  running_ = false;
  if (send_thread_) send_thread_->join();
  if (recv_thread_) recv_thread_->join();
98 99 100 101
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator done");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
102 103
}

Q
Qiao Longfei 已提交
104
void Communicator::SendThread() {
Q
Qiao Longfei 已提交
105
  VLOG(3) << "SendThread start!";
Q
Qiao Longfei 已提交
106 107 108
  while (running_) {
    std::vector<std::future<void>> task_futures;
    task_futures.reserve(send_varname_to_ctx_.size());
Q
Qiao Longfei 已提交
109
    VLOG(3) << "run send graph";
Q
Qiao Longfei 已提交
110
    auto before_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
111
    for (auto &iter : send_varname_to_queue_) {
Q
Qiao Longfei 已提交
112 113
      auto &var_name = iter.first;
      auto &var_queue = iter.second;
Q
Qiao Longfei 已提交
114
      if (var_queue->Size() > 0) {
Q
Qiao Longfei 已提交
115
        auto send_task = [this, &var_name, &var_queue] {
Q
Qiao Longfei 已提交
116
          VLOG(3) << var_name << " merge and send";
Q
Qiao Longfei 已提交
117 118
          std::vector<std::shared_ptr<Variable>> vars;
          size_t merged_var_num = 0;
Q
Qiao Longfei 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
          size_t wait_times = 0;
          while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
            if (var_queue->Size() == 0) {
              VLOG(3) << "wait_times -> " << wait_times;
              if (wait_times >= FLAGS_communicator_send_wait_times) {
                break;
              }
              std::this_thread::sleep_for(std::chrono::milliseconds(10));
              wait_times++;
              continue;
            } else {
              wait_times = 0;

              vars.push_back(var_queue->Pop());
              // only count the send number of the first var
              if (var_name == send_varname_to_queue_.begin()->first) {
                grad_num_.fetch_add(1, std::memory_order_relaxed);
              }
              merged_var_num++;
138
            }
Q
Qiao Longfei 已提交
139
          }
Q
Qiao Longfei 已提交
140
          auto before_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
141
          MergeVars(var_name, vars, send_scope_.get());
Q
Qiao Longfei 已提交
142
          auto after_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
143 144
          VLOG(3) << "merge " << merged_var_num << " " << var_name
                  << " use time " << after_merge - before_merge;
Q
Qiao Longfei 已提交
145 146
          auto send_functor = distributed::ParameterSend<float>();
          auto &ctx = send_varname_to_ctx_.at(var_name);
147 148 149
          if (!FLAGS_communicator_fake_rpc) {
            send_functor(ctx, *send_scope_, true);
          }
Q
Qiao Longfei 已提交
150 151 152
          auto after_send = GetCurrentUS();
          VLOG(3) << "send " << var_name << " use time "
                  << after_send - after_merge;
Q
Qiao Longfei 已提交
153 154 155
        };
        task_futures.emplace_back(
            send_threadpool_->enqueue(std::move(send_task)));
Q
Qiao Longfei 已提交
156
      } else {
157
        VLOG(4) << var_name << " queue empty";
Q
Qiao Longfei 已提交
158
      }
Q
Qiao Longfei 已提交
159 160 161
    }
    for (auto &task_f : task_futures) {
      task_f.wait();
Q
Qiao Longfei 已提交
162
    }
Q
Qiao Longfei 已提交
163
    auto after_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
164
    auto send_graph_use_time = after_run_send_graph - before_run_send_graph;
165
    if (send_graph_use_time > 100) {
Q
Qiao Longfei 已提交
166 167 168
      VLOG(1) << "run send graph use time "
              << after_run_send_graph - before_run_send_graph;
    }
Q
Qiao Longfei 已提交
169 170 171
    if (!FLAGS_communicator_independent_recv_thread) {
      RecvAll();
    }
Q
Qiao Longfei 已提交
172
  }
173
  VLOG(0) << "communicator stopped, send thread exit";
Q
Qiao Longfei 已提交
174 175
}

Q
Qiao Longfei 已提交
176 177
void Communicator::RecvAll() {
  VLOG(3) << "parallel run recv graph";
178
  if (!running_) return;
Q
Qiao Longfei 已提交
179
  auto before_send = GetCurrentUS();
Q
Qiao Longfei 已提交
180 181 182 183 184
  std::vector<std::future<void>> task_futures;
  task_futures.reserve(recv_varname_to_ctx_.size());
  for (auto &iter : recv_varname_to_ctx_) {
    auto recv_task = [this, &iter] {
      auto &var_name = iter.first;
185
      VLOG(4) << "recv var " << var_name;
Q
Qiao Longfei 已提交
186
      auto recv_functor = distributed::ParameterRecv<float>();
187 188 189
      if (!FLAGS_communicator_fake_rpc) {
        recv_functor(iter.second, *recv_scope_);
      }
Q
Qiao Longfei 已提交
190 191 192 193 194 195
    };
    task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
  }
  for (auto &task : task_futures) {
    task.wait();
  }
Q
Qiao Longfei 已提交
196
  auto after_recv = GetCurrentUS();
Q
Qiao Longfei 已提交
197
  VLOG(1) << "run recv graph use time " << after_recv - before_send;
Q
Qiao Longfei 已提交
198 199
}

Q
Qiao Longfei 已提交
200
void Communicator::RecvThread() {
Q
Qiao Longfei 已提交
201
  VLOG(3) << "RecvThread start!";
Q
Qiao Longfei 已提交
202
  while (running_) {
203
    auto grad_num = grad_num_.load();
204
    if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
205 206 207 208 209 210
      VLOG(1) << "current grad num " << grad_num;
      RecvAll();
      grad_num_.store(0);
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
Q
Qiao Longfei 已提交
211
  }
212
  VLOG(0) << "communicator stopped, recv thread exit";
Q
Qiao Longfei 已提交
213 214 215 216
}

void Communicator::Send(const std::string &var_name,
                        const framework::Scope &scope) {
Q
Qiao Longfei 已提交
217 218 219 220
  VLOG(3) << "communicator send " << var_name;
  // push var into send queue by var_name
  auto *grad_var = scope.FindVar(var_name);
  PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited");
221 222 223 224 225 226 227 228 229 230 231 232 233 234
  if (grad_var->IsType<framework::SelectedRows>() &&
      !FLAGS_communicator_merge_sparse_grad) {
    auto send_functor = distributed::ParameterSend<float>();
    auto &ctx = send_varname_to_ctx_.at(var_name);
    if (!FLAGS_communicator_fake_rpc) {
      send_functor(ctx, scope, true);
    }
  } else {
    auto tmp_grad_var = std::make_shared<Variable>();
    framework::CopyVariable(*grad_var, tmp_grad_var.get());
    auto &queue = send_varname_to_queue_.at(var_name);
    VLOG(3) << "send " << var_name << " queue size " << queue->Size();
    queue->Push(tmp_grad_var);
  }
Q
Qiao Longfei 已提交
235 236
}

237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
void Communicator::Init(const paddle::framework::ProgramDesc &program,
                        Scope *param_scope) {
  using RpcCtxMap = operators::distributed::RpcCtxMap;
  VLOG(3) << "ProcessGraph";
  RpcCtxMap send_varname_to_ctx;
  RpcCtxMap recv_varname_to_ctx;
  for (auto *op : program.Block(0).AllOps()) {
    VLOG(3) << "node name " << op->Type();
    if (op->Type() == "send") {
      auto send_var_name = op->Input("X")[0];
      auto send_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("send_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto height_section =
          boost::get<std::vector<int64_t>>(op->GetNullableAttr("sections"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext(
          send_var_name, send_varnames, epmap, height_section, trainer_id);
      VLOG(3) << "find and init an send op: "
              << send_varname_to_ctx[send_var_name];
    } else if (op->Type() == "recv") {
      auto do_not_run = boost::get<int>(op->GetNullableAttr("do_not_run"));
      PADDLE_ENFORCE_GT(do_not_run, 0, "recv should not run!");
      auto recv_var_name = op->Output("Out")[0];
      auto recv_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("recv_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext(
          recv_var_name, recv_varnames, epmap, {}, trainer_id);
    }
  }

  // init communicator here
  if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) {
    LOG(WARNING) << "no var need to send and recv!!";
  }
  operators::distributed::Communicator::Init(send_varname_to_ctx,
                                             recv_varname_to_ctx, param_scope);
}

Q
can run  
Qiao Longfei 已提交
280 281
Communicator *Communicator::GetInstance() { return communicator_.get(); }

282 283 284 285
std::shared_ptr<Communicator> Communicator::GetInstantcePtr() {
  return communicator_;
}

Q
Qiao Longfei 已提交
286
void Communicator::Start() {
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
  VLOG(0) << "Communicator start";
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    VLOG(1) << "start send thread and recv thread";
    running_ = true;
    // start send and recv thread
    send_thread_.reset(
        new std::thread(std::bind(&Communicator::SendThread, this)));
    if (FLAGS_communicator_independent_recv_thread) {
      recv_thread_.reset(
          new std::thread(std::bind(&Communicator::RecvThread, this)));
    }
  }
}

void Communicator::Stop() {
  VLOG(0) << "Communicator stop";
  running_ = false;
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    if (send_thread_) {
      VLOG(1) << "stop send thread";
      send_thread_->join();
      send_thread_.reset(nullptr);
    }
    if (recv_thread_) {
      VLOG(1) << "stop recv thread";
      recv_thread_->join();
      recv_thread_.reset(nullptr);
    }
Q
Qiao Longfei 已提交
319
  }
320
  VLOG(0) << "Communicator stop done";
Q
Qiao Longfei 已提交
321 322 323 324 325
}

}  // namespace distributed
}  // namespace operators
}  // namespace paddle