diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 750a08d3a3052d7e16bb4facd3f09c8dd063b363..c2800c972a5501859672fbfd6921499e84d09cb0 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -45,13 +45,13 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { Executor::Executor(const platform::Place& place) : place_(place) {} +void Executor::Close() { #ifdef PADDLE_WITH_DISTRIBUTE -void Executor::Complete() { ::paddle::operators::distributed::RPCClient::GetInstance< ::paddle::operators::distributed::GRPCClient>() ->SendComplete(); -} #endif +} void InitializeVariable(Variable* var, proto::VarType::Type var_type) { if (var_type == proto::VarType::LOD_TENSOR) { diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 53ebf18bd22ebb889ec65061027ac43b9906a9b1..3e78572334dc730109676744841fe50f306496d8 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -48,7 +48,7 @@ class Executor { /* * Sending signal to pserver to mark current trainer completed. */ - void Complete(); + void Close(); #endif diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 7ef7482cab062eb8615740b47c7599243f23c8c6..092e9dfd744cf9e4f4a15a659a8c1d112fc3bc05 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -36,11 +36,15 @@ void GRPCClient::InitEventLoop() { } void GRPCClient::SendComplete() { - for (auto& it : channels_) { - VLOG(3) << "send complete message to " << it.first; - this->AsyncSendComplete(it.first); + std::unique_lock lk(completed_mutex_); + if (!completed_) { + for (auto& it : channels_) { + VLOG(3) << "send complete message to " << it.first; + this->AsyncSendComplete(it.first); + } + PADDLE_ENFORCE(this->Wait(), "internal grpc error"); + completed_ = true; } - this->Wait(); } GRPCClient::~GRPCClient() { diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h index 26cad7548e891b1310df8c35ac8bf5224e0be555..373183be55c45e06e54254b1a8f7f34fd3975c1f 100644 --- a/paddle/fluid/operators/distributed/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -188,7 +188,7 @@ class CheckpointNotifyProcessor : public BaseProcessor { class GRPCClient : public RPCClient { public: - GRPCClient() : ok_(true) {} + GRPCClient() : ok_(true), completed_(false) {} virtual ~GRPCClient(); bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx, @@ -247,6 +247,10 @@ class GRPCClient : public RPCClient { // mutex for GetChannel thread safety std::mutex chan_mutex_; DISABLE_COPY_AND_ASSIGN(GRPCClient); + + // mutex for sending complete message only once + std::mutex completed_mutex_; + bool completed_; }; } // namespace distributed diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 9669e4c083746ec646c5d99be09b7e52ef3c6f47..c681eb9d91663ccd25667e87287b6fdaa211feed 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -502,9 +502,7 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Executor") .def(py::init()) -#ifdef PADDLE_WITH_DISTRIBUTE - .def("complete", &Executor::Complete) -#endif + .def("close", &Executor::Close) .def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope, int block_id, bool create_local_scope, bool create_vars) { pybind11::gil_scoped_release release; diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index d5f11619a356ac5492a66f698b67377bdac3812a..4178971398c953236bf8de4d5cb6e93d0e33380c 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -247,6 +247,7 @@ class Executor(object): p.set_place(place) self.executor = core.Executor(p) self.program_caches = dict() + self._closed = False def as_lodtensor(self, data): """ @@ -348,8 +349,23 @@ class Executor(object): ] return outs - def complete(self): - self.executor.complete() + def close(self): + """ + Close this executor. + + You can no long use this executor after calling this method. + For the distributed training, this method would free the resource on PServers related to + the current Trainer. + + Example: + >>> cpu = core.CPUPlace() + >>> exe = Executor(cpu) + >>> ... + >>> exe.close() + """ + if not self._closed: + self.executor.close() + self._closed = True def run(self, program=None, @@ -402,6 +418,10 @@ class Executor(object): >>> feed={'X': x}, >>> fetch_list=[loss.name]) """ + + if self._closed: + raise RuntimeError("Attempted to use a closed Executor") + if feed is None: feed = {} if not isinstance(feed, dict):