From efd5a849867125105cbde1682a7e2f20034ff336 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 19 Jul 2018 14:39:42 +0800 Subject: [PATCH] update executor interface --- paddle/fluid/framework/executor.cc | 4 ++-- paddle/fluid/framework/executor.h | 2 +- .../operators/distributed/grpc_client.cc | 12 ++++++---- .../fluid/operators/distributed/grpc_client.h | 6 ++++- paddle/fluid/pybind/pybind.cc | 4 +--- python/paddle/fluid/executor.py | 24 +++++++++++++++++-- 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 750a08d3a30..c2800c972a5 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -45,13 +45,13 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { Executor::Executor(const platform::Place& place) : place_(place) {} +void Executor::Close() { #ifdef PADDLE_WITH_DISTRIBUTE -void Executor::Complete() { ::paddle::operators::distributed::RPCClient::GetInstance< ::paddle::operators::distributed::GRPCClient>() ->SendComplete(); -} #endif +} void InitializeVariable(Variable* var, proto::VarType::Type var_type) { if (var_type == proto::VarType::LOD_TENSOR) { diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 53ebf18bd22..3e78572334d 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -48,7 +48,7 @@ class Executor { /* * Sending signal to pserver to mark current trainer completed. */ - void Complete(); + void Close(); #endif diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 7ef7482cab0..092e9dfd744 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -36,11 +36,15 @@ void GRPCClient::InitEventLoop() { } void GRPCClient::SendComplete() { - for (auto& it : channels_) { - VLOG(3) << "send complete message to " << it.first; - this->AsyncSendComplete(it.first); + std::unique_lock lk(completed_mutex_); + if (!completed_) { + for (auto& it : channels_) { + VLOG(3) << "send complete message to " << it.first; + this->AsyncSendComplete(it.first); + } + PADDLE_ENFORCE(this->Wait(), "internal grpc error"); + completed_ = true; } - this->Wait(); } GRPCClient::~GRPCClient() { diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h index 26cad7548e8..373183be55c 100644 --- a/paddle/fluid/operators/distributed/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -188,7 +188,7 @@ class CheckpointNotifyProcessor : public BaseProcessor { class GRPCClient : public RPCClient { public: - GRPCClient() : ok_(true) {} + GRPCClient() : ok_(true), completed_(false) {} virtual ~GRPCClient(); bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx, @@ -247,6 +247,10 @@ class GRPCClient : public RPCClient { // mutex for GetChannel thread safety std::mutex chan_mutex_; DISABLE_COPY_AND_ASSIGN(GRPCClient); + + // mutex for sending complete message only once + std::mutex completed_mutex_; + bool completed_; }; } // namespace distributed diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 9669e4c0837..c681eb9d916 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -502,9 +502,7 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Executor") .def(py::init()) -#ifdef PADDLE_WITH_DISTRIBUTE - .def("complete", &Executor::Complete) -#endif + .def("close", &Executor::Close) .def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope, int block_id, bool create_local_scope, bool create_vars) { pybind11::gil_scoped_release release; diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index d5f11619a35..4178971398c 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -247,6 +247,7 @@ class Executor(object): p.set_place(place) self.executor = core.Executor(p) self.program_caches = dict() + self._closed = False def as_lodtensor(self, data): """ @@ -348,8 +349,23 @@ class Executor(object): ] return outs - def complete(self): - self.executor.complete() + def close(self): + """ + Close this executor. + + You can no long use this executor after calling this method. + For the distributed training, this method would free the resource on PServers related to + the current Trainer. + + Example: + >>> cpu = core.CPUPlace() + >>> exe = Executor(cpu) + >>> ... + >>> exe.close() + """ + if not self._closed: + self.executor.close() + self._closed = True def run(self, program=None, @@ -402,6 +418,10 @@ class Executor(object): >>> feed={'X': x}, >>> fetch_list=[loss.name]) """ + + if self._closed: + raise RuntimeError("Attempted to use a closed Executor") + if feed is None: feed = {} if not isinstance(feed, dict): -- GitLab