listen_and_serv_op.h 2.5 KB
Newer Older
T
typhoonzero 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <stdint.h>
T
done  
typhoonzero 已提交
18
#include <atomic>
19
#include <set>
20
#include <string>
T
typhoonzero 已提交
21 22 23 24 25

#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
26 27
#include "paddle/fluid/operators/detail/request_handler.h"
#include "paddle/fluid/operators/detail/rpc_server.h"
T
typhoonzero 已提交
28 29 30 31 32

namespace paddle {
namespace operators {

constexpr char kOptimizeBlock[] = "OptimizeBlock";
33
constexpr char kPrefetchBlock[] = "PrefetchBlock";
T
typhoonzero 已提交
34

35
void RunServer(std::shared_ptr<detail::RPCServer> service);
T
typhoonzero 已提交
36 37 38

class ListenAndServOp : public framework::OperatorBase {
 public:
Q
qiaolongfei 已提交
39 40 41 42
  ListenAndServOp(const std::string& type,
                  const framework::VariableNameMap& inputs,
                  const framework::VariableNameMap& outputs,
                  const framework::AttributeMap& attrs);
T
typhoonzero 已提交
43

44 45
  virtual ~ListenAndServOp();

46 47 48 49
  void RunSyncLoop(framework::Executor* executor,
                   framework::ProgramDesc* program,
                   framework::Scope* recv_scope,
                   framework::BlockDesc* prefetch_block) const;
Q
qiaolongfei 已提交
50

51
  void RunAsyncLoop(framework::Executor* executor,
Q
qiaolongfei 已提交
52
                    framework::ProgramDesc* program) const;
Q
qiaolongfei 已提交
53

Y
yi.wu 已提交
54
  void SavePort() const;
T
done  
typhoonzero 已提交
55

56
  int GetSelectedPort() { return rpc_service_->GetSelectedPort(); }
T
wip  
typhoonzero 已提交
57

T
typhoonzero 已提交
58 59
  void Stop() override;

Q
qiaolongfei 已提交
60 61
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override;
T
typhoonzero 已提交
62 63

 protected:
64 65 66 67 68
  mutable std::shared_ptr<detail::RPCServer> rpc_service_;
  mutable std::shared_ptr<detail::RequestHandler> request_send_handler_;
  mutable std::shared_ptr<detail::RequestHandler> request_get_handler_;
  mutable std::shared_ptr<detail::RequestHandler> request_prefetch_handler_;

T
typhoonzero 已提交
69 70 71
  mutable std::shared_ptr<std::thread> server_thread_;
};

72 73 74 75 76 77 78 79
class SignalHandler {
 public:
  static void StopAndExit(int signal_num);

 private:
  DISABLE_COPY_AND_ASSIGN(SignalHandler);
};

T
typhoonzero 已提交
80 81
}  // namespace operators
}  // namespace paddle