communicator.cc 12.0 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/distributed/communicator.h"

Q
Qiao Longfei 已提交
17
#include <gflags/gflags.h>
18
#include <paddle/fluid/framework/program_desc.h>
Q
Qiao Longfei 已提交
19 20 21
#include <chrono>  // NOLINT
#include <thread>  // NOLINT

Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/eigen.h"
Q
Qiao Longfei 已提交
23 24 25 26 27 28
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"

Q
Qiao Longfei 已提交
29 30 31 32
DEFINE_bool(communicator_independent_recv_thread, true,
            "use an independent to recv vars from parameter server");
DEFINE_int32(communicator_send_queue_size, 20,
             "queue size to recv gradient before send");
33
DEFINE_int32(communicator_min_send_grad_num_before_recv, 20,
34
             "max grad num to send before recv parameters");
35
DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv");
Q
Qiao Longfei 已提交
36 37 38
DEFINE_int32(communicator_send_wait_times, 5,
             "times that send thread will wait if merge num does not reach "
             "max_merge_var_num");
39 40 41 42
DEFINE_int32(communicator_max_merge_var_num, 20,
             "max var num to merge and send");
DEFINE_bool(communicator_fake_rpc, false,
            "fake mode does not really send any thing");
Q
Qiao Longfei 已提交
43

Q
Qiao Longfei 已提交
44 45 46 47
namespace paddle {
namespace operators {
namespace distributed {

Q
Qiao Longfei 已提交
48 49 50 51 52 53
inline double GetCurrentUS() {
  struct timeval time;
  gettimeofday(&time, NULL);
  return 1e+6 * time.tv_sec + time.tv_usec;
}

54
std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
Q
can run  
Qiao Longfei 已提交
55

Q
Qiao Longfei 已提交
56 57 58 59 60 61 62 63 64 65 66
Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx,
                           const RpcCtxMap &recv_varname_to_ctx,
                           Scope *recv_scope)
    : send_varname_to_ctx_(send_varname_to_ctx),
      recv_varname_to_ctx_(recv_varname_to_ctx),
      recv_scope_(recv_scope) {
  // get all send information from graph, build vars_to_send
  VLOG(0) << "communicator_independent_recv_thread: "
          << FLAGS_communicator_independent_recv_thread;
  VLOG(0) << "communicator_send_queue_size: "
          << FLAGS_communicator_send_queue_size;
67 68
  VLOG(0) << "communicator_min_send_grad_num_before_recv: "
          << FLAGS_communicator_min_send_grad_num_before_recv;
Q
Qiao Longfei 已提交
69 70
  VLOG(0) << "communicator_thread_pool_size: "
          << FLAGS_communicator_thread_pool_size;
71 72
  VLOG(0) << "communicator_send_wait_times: "
          << FLAGS_communicator_send_wait_times;
Q
Qiao Longfei 已提交
73
  VLOG(0) << "communicator_max_merge_var_num: "
74 75
          << FLAGS_communicator_max_merge_var_num;
  VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc;
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94

  if (send_varname_to_ctx.size() == 0) {
    VLOG(0) << "nothing need to be send, will not start send_thread";
  } else {
    send_scope_.reset(new Scope());
    for (auto &iter : send_varname_to_ctx_) {
      send_varname_to_queue_[iter.first] =
          std::make_shared<BlockingQueue<std::shared_ptr<Variable>>>(
              FLAGS_communicator_send_queue_size);
    }
    send_threadpool_.reset(
        new ::ThreadPool(FLAGS_communicator_thread_pool_size));
  }

  if (recv_varname_to_ctx.size() == 0) {
    VLOG(0) << "nothing need to be received, will not start recv_thread";
  } else {
    recv_threadpool_.reset(
        new ::ThreadPool(FLAGS_communicator_thread_pool_size));
Q
Qiao Longfei 已提交
95 96 97 98
  }
}

Communicator::~Communicator() {
99 100 101 102
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
103 104 105
  running_ = false;
  if (send_thread_) send_thread_->join();
  if (recv_thread_) recv_thread_->join();
106 107 108 109
  if (FLAGS_v >= 3) {
    std::string msg("~Communicator done");
    fwrite(msg.c_str(), msg.length(), 1, stdout);
  }
Q
Qiao Longfei 已提交
110 111
}

Q
Qiao Longfei 已提交
112
void Communicator::SendThread() {
Q
Qiao Longfei 已提交
113
  VLOG(3) << "SendThread start!";
Q
Qiao Longfei 已提交
114 115 116
  while (running_) {
    std::vector<std::future<void>> task_futures;
    task_futures.reserve(send_varname_to_ctx_.size());
Q
Qiao Longfei 已提交
117
    VLOG(3) << "run send graph";
Q
Qiao Longfei 已提交
118
    auto before_run_send_graph = GetCurrentUS();
Q
Qiao Longfei 已提交
119
    for (auto &iter : send_varname_to_queue_) {
Q
Qiao Longfei 已提交
120 121
      auto &var_name = iter.first;
      auto &var_queue = iter.second;
Q
Qiao Longfei 已提交
122
      if (var_queue->Size() > 0) {
Q
Qiao Longfei 已提交
123
        auto send_task = [this, &var_name, &var_queue] {
Q
Qiao Longfei 已提交
124
          VLOG(3) << var_name << " merge and send";
Q
Qiao Longfei 已提交
125 126
          std::vector<std::shared_ptr<Variable>> vars;
          size_t merged_var_num = 0;
Q
Qiao Longfei 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
          size_t wait_times = 0;
          while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
            if (var_queue->Size() == 0) {
              VLOG(3) << "wait_times -> " << wait_times;
              if (wait_times >= FLAGS_communicator_send_wait_times) {
                break;
              }
              std::this_thread::sleep_for(std::chrono::milliseconds(10));
              wait_times++;
              continue;
            } else {
              wait_times = 0;

              vars.push_back(var_queue->Pop());
              // only count the send number of the first var
              if (var_name == send_varname_to_queue_.begin()->first) {
                grad_num_.fetch_add(1, std::memory_order_relaxed);
              }
              merged_var_num++;
146
            }
Q
Qiao Longfei 已提交
147
          }
Q
Qiao Longfei 已提交
148
          auto before_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
149
          MergeVars(var_name, vars, send_scope_.get());
Q
Qiao Longfei 已提交
150
          auto after_merge = GetCurrentUS();
Q
Qiao Longfei 已提交
151 152
          VLOG(3) << "merge " << merged_var_num << " " << var_name
                  << " use time " << after_merge - before_merge;
Q
Qiao Longfei 已提交
153 154
          auto send_functor = distributed::ParameterSend<float>();
          auto &ctx = send_varname_to_ctx_.at(var_name);
155 156 157
          if (!FLAGS_communicator_fake_rpc) {
            send_functor(ctx, *send_scope_, true);
          }
Q
Qiao Longfei 已提交
158 159 160
          auto after_send = GetCurrentUS();
          VLOG(3) << "send " << var_name << " use time "
                  << after_send - after_merge;
Q
Qiao Longfei 已提交
161 162 163
        };
        task_futures.emplace_back(
            send_threadpool_->enqueue(std::move(send_task)));
Q
Qiao Longfei 已提交
164
      } else {
165
        VLOG(4) << var_name << " queue empty";
Q
Qiao Longfei 已提交
166
      }
Q
Qiao Longfei 已提交
167 168 169
    }
    for (auto &task_f : task_futures) {
      task_f.wait();
Q
Qiao Longfei 已提交
170
    }
Q
Qiao Longfei 已提交
171
    auto after_run_send_graph = GetCurrentUS();
172 173 174 175

    VLOG(3) << "run send graph use time "
            << after_run_send_graph - before_run_send_graph;
    RecvNonIndependent();
Q
Qiao Longfei 已提交
176
  }
177
  VLOG(0) << "communicator stopped, send thread exit";
Q
Qiao Longfei 已提交
178 179
}

180 181 182 183 184 185 186 187 188 189 190 191 192 193
void Communicator::RecvNonIndependent() {
  if (!FLAGS_communicator_independent_recv_thread) {
    return;
  }

  auto grad_num = grad_num_.load();
  if (grad_num > 0) {
    RecvAll();
    grad_num_.store(0);
  } else {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }
}

Q
Qiao Longfei 已提交
194 195
void Communicator::RecvAll() {
  VLOG(3) << "parallel run recv graph";
196
  if (!running_) return;
Q
Qiao Longfei 已提交
197
  auto before_send = GetCurrentUS();
Q
Qiao Longfei 已提交
198 199 200 201 202
  std::vector<std::future<void>> task_futures;
  task_futures.reserve(recv_varname_to_ctx_.size());
  for (auto &iter : recv_varname_to_ctx_) {
    auto recv_task = [this, &iter] {
      auto &var_name = iter.first;
203
      VLOG(4) << "recv var " << var_name;
Q
Qiao Longfei 已提交
204
      auto recv_functor = distributed::ParameterRecv<float>();
205 206 207
      if (!FLAGS_communicator_fake_rpc) {
        recv_functor(iter.second, *recv_scope_);
      }
Q
Qiao Longfei 已提交
208 209 210 211 212 213
    };
    task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
  }
  for (auto &task : task_futures) {
    task.wait();
  }
Q
Qiao Longfei 已提交
214
  auto after_recv = GetCurrentUS();
Q
Qiao Longfei 已提交
215
  VLOG(1) << "run recv graph use time " << after_recv - before_send;
Q
Qiao Longfei 已提交
216 217
}

Q
Qiao Longfei 已提交
218
void Communicator::RecvThread() {
Q
Qiao Longfei 已提交
219
  VLOG(3) << "RecvThread start!";
Q
Qiao Longfei 已提交
220
  while (running_) {
221
    auto grad_num = grad_num_.load();
222
    if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
223 224 225 226 227 228
      VLOG(1) << "current grad num " << grad_num;
      RecvAll();
      grad_num_.store(0);
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
Q
Qiao Longfei 已提交
229
  }
230
  VLOG(0) << "communicator stopped, recv thread exit";
Q
Qiao Longfei 已提交
231 232 233 234
}

void Communicator::Send(const std::string &var_name,
                        const framework::Scope &scope) {
Q
Qiao Longfei 已提交
235 236 237 238 239 240 241 242 243
  VLOG(3) << "communicator send " << var_name;
  // push var into send queue by var_name
  auto *grad_var = scope.FindVar(var_name);
  PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited");
  auto tmp_grad_var = std::make_shared<Variable>();
  framework::CopyVariable(*grad_var, tmp_grad_var.get());
  auto &queue = send_varname_to_queue_.at(var_name);
  VLOG(3) << "send " << var_name << " queue size " << queue->Size();
  queue->Push(tmp_grad_var);
Q
Qiao Longfei 已提交
244 245
}

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
void Communicator::Init(const paddle::framework::ProgramDesc &program,
                        Scope *param_scope) {
  using RpcCtxMap = operators::distributed::RpcCtxMap;
  VLOG(3) << "ProcessGraph";
  RpcCtxMap send_varname_to_ctx;
  RpcCtxMap recv_varname_to_ctx;
  for (auto *op : program.Block(0).AllOps()) {
    VLOG(3) << "node name " << op->Type();
    if (op->Type() == "send") {
      auto send_var_name = op->Input("X")[0];
      auto send_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("send_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto height_section =
          boost::get<std::vector<int64_t>>(op->GetNullableAttr("sections"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext(
          send_var_name, send_varnames, epmap, height_section, trainer_id);
      VLOG(3) << "find and init an send op: "
              << send_varname_to_ctx[send_var_name];
    } else if (op->Type() == "recv") {
      auto do_not_run = boost::get<int>(op->GetNullableAttr("do_not_run"));
      PADDLE_ENFORCE_GT(do_not_run, 0, "recv should not run!");
      auto recv_var_name = op->Output("Out")[0];
      auto recv_varnames = boost::get<std::vector<std::string>>(
          op->GetNullableAttr("recv_varnames"));
      auto epmap =
          boost::get<std::vector<std::string>>(op->GetNullableAttr("epmap"));
      auto trainer_id = boost::get<int>(op->GetNullableAttr("trainer_id"));
      recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext(
          recv_var_name, recv_varnames, epmap, {}, trainer_id);
    }
  }

  // init communicator here
  if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) {
    LOG(WARNING) << "no var need to send and recv!!";
  }
  operators::distributed::Communicator::Init(send_varname_to_ctx,
                                             recv_varname_to_ctx, param_scope);
}

Q
can run  
Qiao Longfei 已提交
289 290
Communicator *Communicator::GetInstance() { return communicator_.get(); }

291 292 293 294
std::shared_ptr<Communicator> Communicator::GetInstantcePtr() {
  return communicator_;
}

Q
Qiao Longfei 已提交
295
void Communicator::Start() {
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
  VLOG(0) << "Communicator start";
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    VLOG(1) << "start send thread and recv thread";
    running_ = true;
    // start send and recv thread
    send_thread_.reset(
        new std::thread(std::bind(&Communicator::SendThread, this)));
    if (FLAGS_communicator_independent_recv_thread) {
      recv_thread_.reset(
          new std::thread(std::bind(&Communicator::RecvThread, this)));
    }
  }
}

void Communicator::Stop() {
  VLOG(0) << "Communicator stop";
  running_ = false;
  if (!communicator_) {
    VLOG(0) << "Communicator is not inited, do nothing";
  } else {
    if (send_thread_) {
      VLOG(1) << "stop send thread";
      send_thread_->join();
      send_thread_.reset(nullptr);
    }
    if (recv_thread_) {
      VLOG(1) << "stop recv thread";
      recv_thread_->join();
      recv_thread_.reset(nullptr);
    }
Q
Qiao Longfei 已提交
328
  }
329
  VLOG(0) << "Communicator stop done";
Q
Qiao Longfei 已提交
330 331 332 333 334
}

}  // namespace distributed
}  // namespace operators
}  // namespace paddle