listen_and_serv_op.cc 12.6 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
T
typhoonzero 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

Y
yi.wu 已提交
15
#include <stdio.h>  // for removing the port file
16 17
#include <csignal>
#include <cstdlib>
T
typhoonzero 已提交
18
#include <fstream>
19 20
#include <thread>  // NOLINT
#include <vector>
T
typhoonzero 已提交
21

22 23
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/operators/detail/request_handler_impl.h"
T
typhoonzero 已提交
24
#include "paddle/fluid/operators/listen_and_serv_op.h"
25
#include "paddle/fluid/platform/profiler.h"
T
typhoonzero 已提交
26 27 28 29

namespace paddle {
namespace operators {

30 31
void RunServer(std::shared_ptr<detail::RPCServer> service) {
  service->StartServer();
T
typhoonzero 已提交
32 33
  VLOG(4) << "RunServer thread end";
}
Q
qiaolongfei 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
static void split(const std::string &str, char sep,
                  std::vector<std::string> *pieces) {
  pieces->clear();
  if (str.empty()) {
    return;
  }
  size_t pos = 0;
  size_t next = str.find(sep, pos);
  while (next != std::string::npos) {
    pieces->push_back(str.substr(pos, next - pos));
    pos = next + 1;
    next = str.find(sep, pos);
  }
  if (!str.substr(pos).empty()) {
    pieces->push_back(str.substr(pos));
  }
}

T
refine  
typhoonzero 已提交
52 53 54 55 56
static void ParallelExecuteBlocks(
    const std::vector<size_t> &parallel_blkids, framework::Executor *executor,
    const std::vector<std::shared_ptr<framework::ExecutorPrepareContext>>
        &prepared,
    framework::ProgramDesc *program, framework::Scope *scope) {
T
update  
typhoonzero 已提交
57 58
  std::vector<std::future<void>> fs;
  for (size_t idx : parallel_blkids) {
T
refine  
typhoonzero 已提交
59 60 61 62
    fs.push_back(
        framework::Async([&executor, &prepared, &program, &scope, idx]() {
          int run_block = idx;  // thread local
          try {
W
Wu Yi 已提交
63
            executor->RunPreparedContext(prepared[run_block].get(), scope);
64
          } catch (const std::exception &e) {
T
refine  
typhoonzero 已提交
65 66 67
            LOG(ERROR) << "run sub program error " << e.what();
          }
        }));
T
update  
typhoonzero 已提交
68 69 70 71
  }
  for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
}

T
typhoonzero 已提交
72 73 74 75 76 77
ListenAndServOp::ListenAndServOp(const std::string &type,
                                 const framework::VariableNameMap &inputs,
                                 const framework::VariableNameMap &outputs,
                                 const framework::AttributeMap &attrs)
    : OperatorBase(type, inputs, outputs, attrs) {}

78 79
ListenAndServOp::~ListenAndServOp() { Stop(); }

T
typhoonzero 已提交
80
void ListenAndServOp::Stop() {
81
  rpc_service_->ShutDown();
T
typhoonzero 已提交
82
  server_thread_->join();
Y
yi.wu 已提交
83 84
  auto file_path = string::Sprintf("/tmp/paddle.%d.port", ::getpid());
  remove(file_path.c_str());
85 86
}

Y
yi.wu 已提交
87
void ListenAndServOp::SavePort() const {
T
done  
typhoonzero 已提交
88
  // NOTE: default write file to /tmp/paddle.selected_port
89
  rpc_service_->SavePort();
T
done  
typhoonzero 已提交
90 91
}

92 93 94 95
void ListenAndServOp::RunSyncLoop(framework::Executor *executor,
                                  framework::ProgramDesc *program,
                                  framework::Scope *recv_scope,
                                  framework::BlockDesc *prefetch_block) const {
T
typhoonzero 已提交
96 97 98 99
  size_t num_blocks = program->Size();
  PADDLE_ENFORCE_GE(num_blocks, 2,
                    "server program should have at least 2 blocks");

100 101
  std::vector<int> block_list;
  for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
Q
qiaolongfei 已提交
102
    block_list.push_back(blkid);
103
  }
Q
qiaolongfei 已提交
104
  auto optimize_prepared = executor->Prepare(*program, block_list);
105
  // Insert placeholder for block0 which holds current op itself.
106 107 108
  optimize_prepared.insert(
      optimize_prepared.begin(),
      std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
T
typhoonzero 已提交
109

110
  rpc_service_->ResetBarrierCounter();
T
typhoonzero 已提交
111 112 113
  // Record received sparse variables, so that
  // we could reset those after execute optimize program
  std::vector<framework::Variable *> sparse_vars;
114
  while (true) {
T
typhoonzero 已提交
115 116
    // Get from multiple trainers, we don't care about the order in which
    // the gradients arrives, just add suffix 0~n and merge the gradient.
117 118 119 120 121 122
    rpc_service_->SetCond(detail::kRequestSend);
    rpc_service_->WaitBarrier(detail::kRequestSend);

    if (rpc_service_->IsExit()) {
      LOG(WARNING) << "get exit!rpc_processor break!";
      rpc_service_->SetCond(detail::kRequestGet);
T
typhoonzero 已提交
123 124
      break;
    }
T
typhoonzero 已提交
125

Q
qiaolongfei 已提交
126
    // NOTE: if is_gpu_place, CUDA kernels are launched by multiple threads
T
typhoonzero 已提交
127 128 129 130 131 132 133 134
    // and this will still work.
    // The optimize blocks which have the same parent ID would run parallel
    // TODO(Yancey1989): need to use ParallelExecutor for future
    int32_t last_parent_blkid = program->Block(1).Parent();
    std::vector<size_t> parallel_blkids;
    parallel_blkids.push_back(1);
    double ts = detail::GetTimestamp();
    for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
T
typhoonzero 已提交
135
      if (blkid != static_cast<size_t>(prefetch_block->ID())) {
136
        if (program->Block(blkid).Parent() != last_parent_blkid) {
Q
qiaolongfei 已提交
137 138
          ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared,
                                program, recv_scope);
139 140 141 142
          parallel_blkids.clear();
          last_parent_blkid = program->Block(blkid).Parent();
        }
        parallel_blkids.push_back(blkid);
T
typhoonzero 已提交
143 144
      }
    }
Q
qiaolongfei 已提交
145 146
    ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program,
                          recv_scope);
147
    VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
T
typhoonzero 已提交
148 149 150 151 152 153

    // Reset the received sparse variables, the sum operator would not
    // sum the input sparse variables which rows is empty at the next
    // mini-batch.
    // TODO(Yancey1989): move the reset action into an operator, we couldn't
    // have any hide logic in the operator.
154
    for (framework::Variable *var : sparse_vars) {
T
typhoonzero 已提交
155 156
      var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
    }
T
tangwei12 已提交
157

158 159 160
    rpc_service_->SetCond(detail::kRequestGet);
    rpc_service_->WaitBarrier(detail::kRequestGet);
    rpc_service_->ResetBarrierCounter();
T
typhoonzero 已提交
161 162
  }  // while(true)
}
T
typhoonzero 已提交
163

164
void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
Q
qiaolongfei 已提交
165
                                   framework::ProgramDesc *program) const {
Q
qiaolongfei 已提交
166
  VLOG(3) << "RunAsyncLoop in";
Q
qiaolongfei 已提交
167
  // grad name to block id
Q
qiaolongfei 已提交
168
  std::unordered_map<std::string, int32_t> grad_to_block_id;
Q
qiaolongfei 已提交
169 170
  std::unordered_map<int32_t, std::string> id_to_grad;

Q
qiaolongfei 已提交
171 172
  auto grad_to_block_id_str =
      Attr<std::vector<std::string>>("grad_to_block_id");
173
  for (const auto &grad_and_id : grad_to_block_id_str) {
Q
qiaolongfei 已提交
174
    std::vector<std::string> pieces;
Q
qiaolongfei 已提交
175 176
    split(grad_and_id, ':', &pieces);
    VLOG(3) << "after split, grad = " << pieces[0] << ", id=" << pieces[1];
Q
qiaolongfei 已提交
177
    PADDLE_ENFORCE_EQ(pieces.size(), 2);
Q
qiaolongfei 已提交
178
    PADDLE_ENFORCE_EQ(grad_to_block_id.count(pieces[0]), 0);
179

Q
qiaolongfei 已提交
180
    int block_id = std::stoi(pieces[1]);
Q
qiaolongfei 已提交
181
    grad_to_block_id[pieces[0]] = block_id;
Q
qiaolongfei 已提交
182 183 184 185 186 187 188 189
    id_to_grad[block_id] = pieces[0];
  }
  size_t num_blocks = program->Size();
  PADDLE_ENFORCE_GE(num_blocks, 2,
                    "server program should have at least 2 blocks");

  std::vector<int> block_list;
  for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
Q
qiaolongfei 已提交
190
    block_list.push_back(blkid);
Q
qiaolongfei 已提交
191 192 193 194
  }
  auto optimize_prepared = executor->Prepare(*program, block_list);
  std::unordered_map<std::string,
                     std::shared_ptr<framework::ExecutorPrepareContext>>
Q
qiaolongfei 已提交
195
      grad_to_prepared_ctx;
Q
qiaolongfei 已提交
196
  for (size_t i = 0; i < block_list.size(); ++i) {
Q
qiaolongfei 已提交
197
    grad_to_prepared_ctx[id_to_grad[block_list[i]]] = optimize_prepared[i];
Q
qiaolongfei 已提交
198 199
  }

200 201 202
  request_send_handler_->SetGradToPreparedCtx(&grad_to_prepared_ctx);
  request_get_handler_->SetGradToPreparedCtx(&grad_to_prepared_ctx);
  request_prefetch_handler_->SetGradToPreparedCtx(&grad_to_prepared_ctx);
Q
qiaolongfei 已提交
203

Q
qiaolongfei 已提交
204
  VLOG(3) << "RunAsyncLoop into while";
205 206 207
  while (true) {
    if (rpc_service_->IsExit()) {
      LOG(INFO) << "get exit!rpc_processor break!";
Q
qiaolongfei 已提交
208 209 210
      break;
    }

211
    sleep(1);
Q
qiaolongfei 已提交
212 213 214
  }  // while(true)
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
static void FillRequestCtx(detail::RequestHandler *h, framework::Scope *scope,
                           platform::DeviceContext *dev_ctx,
                           framework::Executor *executor,
                           framework::ProgramDesc *program,
                           framework::ExecutorPrepareContext *prefetch_ctx,
                           detail::RPCServer *rpc_server) {
  h->SetScope(scope);
  h->SetDevCtx(dev_ctx);
  h->SetExecutor(executor);
  h->SetProgram(program);
  h->SetPrefetchPreparedCtx(std::move(
      std::unique_ptr<framework::ExecutorPrepareContext>(prefetch_ctx)));
  h->SetRPCServer(rpc_server);
}

Q
qiaolongfei 已提交
230 231
void ListenAndServOp::RunImpl(const framework::Scope &scope,
                              const platform::Place &dev_place) const {
232
  // Mark this as PS that it should decide profiling by listening from trainer.
X
Xin Pan 已提交
233
  platform::SetProfileListener();
Q
qiaolongfei 已提交
234 235 236 237
  platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
  auto &dev_ctx = *pool.Get(dev_place);
  framework::Scope &recv_scope = scope.NewScope();

Q
qiaolongfei 已提交
238
  bool sync_mode = Attr<bool>("sync_mode");
239
  auto fan_in = Attr<int>("Fanin");
Q
qiaolongfei 已提交
240

Q
qiaolongfei 已提交
241 242
  PADDLE_ENFORCE(!rpc_service_);
  std::string endpoint = Attr<std::string>("endpoint");
Q
qiaolongfei 已提交
243

244 245 246 247 248 249 250 251 252 253 254 255 256 257
  LOG(INFO) << "sync_mode:" << sync_mode << ", fan_in:" << fan_in
            << ", end_point:" << endpoint;

  // request_handler_.reset(new detail::GRPCRequestSendHandler(sync_mode));
  rpc_service_.reset(new detail::AsyncGRPCServer(endpoint, fan_in));
  request_send_handler_.reset(new detail::RequestSendHandler(sync_mode));
  request_get_handler_.reset(new detail::RequestGetHandler(sync_mode));
  request_prefetch_handler_.reset(
      new detail::RequestPrefetchHandler(sync_mode));

  rpc_service_->RegisterRPC(detail::kRequestSend, request_send_handler_.get());
  rpc_service_->RegisterRPC(detail::kRequestGet, request_get_handler_.get());
  rpc_service_->RegisterRPC(detail::kRequestPrefetch,
                            request_prefetch_handler_.get());
Q
qiaolongfei 已提交
258 259 260 261 262 263

  auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock);
  auto *prefetch_block = Attr<framework::BlockDesc *>(kPrefetchBlock);
  auto *program = optimize_block->Program();
  framework::Executor executor(dev_place);

Q
qiaolongfei 已提交
264 265 266
  // prepare for prefetch
  VLOG(3) << "prefetch block id is " << prefetch_block->ID();
  auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID());
267 268 269 270 271 272 273 274

  auto f = std::bind(FillRequestCtx, std::placeholders::_1, &recv_scope,
                     &dev_ctx, &executor, program, prefetch_prepared.release(),
                     rpc_service_.get());

  f(request_send_handler_.get());
  f(request_get_handler_.get());
  f(request_prefetch_handler_.get());
Q
qiaolongfei 已提交
275

Q
qiaolongfei 已提交
276 277 278
  // start the server listening after all member initialized.
  server_thread_.reset(new std::thread(RunServer, rpc_service_));
  VLOG(3) << "wait server thread to become ready...";
T
done  
typhoonzero 已提交
279
  rpc_service_->WaitServerReady();
T
wip  
typhoonzero 已提交
280

281 282 283 284
  // register SIGINT(from ctrl+C) and SIGTERM(from kill) signal handlers
  signal(SIGINT, SignalHandler::StopAndExit);
  signal(SIGTERM, SignalHandler::StopAndExit);

Q
qiaolongfei 已提交
285
  // Write to a file of server selected port for python use.
Y
yi.wu 已提交
286
  SavePort();
Q
qiaolongfei 已提交
287
  if (sync_mode) {
288
    RunSyncLoop(&executor, program, &recv_scope, prefetch_block);
Q
qiaolongfei 已提交
289
  } else {
Q
qiaolongfei 已提交
290
    RunAsyncLoop(&executor, program);
Q
qiaolongfei 已提交
291
  }
Q
qiaolongfei 已提交
292 293
}

T
typhoonzero 已提交
294 295
class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
 public:
Y
Yu Yang 已提交
296
  void Make() {
T
typhoonzero 已提交
297
    AddInput("X", "(Tensor) Variables that server recv.").AsDuplicable();
298 299 300
    AddComment(R"DOC(" + "ListenAndServ operator" + "\n" + "This operator" +
" will start a RPC server which can receive variables from send_op and send" +
"back variables to recv_op.)DOC");
T
typhoonzero 已提交
301 302 303 304 305
    AddAttr<std::string>("endpoint",
                         "(string, default 127.0.0.1:6164)"
                         "IP address to listen on.")
        .SetDefault("127.0.0.1:6164")
        .AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
Q
qiaolongfei 已提交
306
    AddAttr<std::vector<std::string>>(
Q
qiaolongfei 已提交
307
        "grad_to_block_id",
Q
qiaolongfei 已提交
308
        "['param1@GRAD.block0:1', 'param2@GRAD.blockn:2'] "
Q
qiaolongfei 已提交
309 310
        "a map from grad name to it's optimize block id")
        .SetDefault({});
Q
qiaolongfei 已提交
311
    AddAttr<bool>("sync_mode", "if works at sync_mode or not").SetDefault(true);
T
typhoonzero 已提交
312 313
    AddAttr<framework::BlockDesc *>(kOptimizeBlock,
                                    "BlockID to run on server side.");
314 315
    AddAttr<framework::BlockDesc *>(kPrefetchBlock,
                                    "prefetch block to run on server side.");
316 317
    AddAttr<int>("Fanin", "How many clients send to this server.")
        .SetDefault(1);
T
typhoonzero 已提交
318 319 320
  }
};

321 322
void SignalHandler::StopAndExit(int signal_num) {
  VLOG(3) << "Catch interrupt signal: " << signal_num << ", program will exit";
323
  exit(0);
324 325
}

T
typhoonzero 已提交
326 327 328 329 330 331
}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators;

REGISTER_OPERATOR(listen_and_serv, ops::ListenAndServOp,
Y
Yancey1989 已提交
332
                  ops::ListenAndServOpMaker);