diff --git a/doc/api/v2/fluid/layers.rst b/doc/api/v2/fluid/layers.rst index f738bf15641d9fca0bfb0c10821de778ceee0d79..231ec2d4ba102a5d31c47cbc7a5d484ef17a7f3a 100644 --- a/doc/api/v2/fluid/layers.rst +++ b/doc/api/v2/fluid/layers.rst @@ -18,6 +18,11 @@ dynamic_lstm .. autofunction:: paddle.v2.fluid.layers.dynamic_lstm :noindex: +dynamic_lstmp +------------- +.. autofunction:: paddle.v2.fluid.layers.dynamic_lstmp + :noindex: + dynamic_gru ----------- .. autofunction:: paddle.v2.fluid.layers.dynamic_gru diff --git a/doc/design/support_new_device.md b/doc/design/support_new_device.md index b154f01d32c5952367f075a732e2f7729d3c8626..8983df900460127fc130043c52373dab505363ba 100644 --- a/doc/design/support_new_device.md +++ b/doc/design/support_new_device.md @@ -174,7 +174,7 @@ class MaxOutFunctor { }; ``` -CPU implemention is in .cc file +CPU implementation is in .cc file ``` template @@ -188,7 +188,7 @@ class MaxOutFunctor { }; ``` -CUDA implemention is in .cu file +CUDA implementation is in .cu file ``` template @@ -203,9 +203,9 @@ class MaxOutFunctor { ``` -We first obtain the computing handle from a concrete DeviceContext, and then compute on tensors. +We first obtain the computing handle from a concrete DeviceContext and then compute on tensors. -The implemention of `OpKernel` is similar to math functors, the extra thing we need to do is to register the OpKernel in a global map. +The implementation of `OpKernel` is similar to math functors, the extra thing we need to do is to register the OpKernel in a global map. Fluid provides different register interfaces in op_registry.h diff --git a/paddle/framework/CMakeLists.txt b/paddle/framework/CMakeLists.txt index 280496984251919a8b4b6c52684f950a80b78356..318661af8bd04880577222fdc82cc1b6e79a457f 100644 --- a/paddle/framework/CMakeLists.txt +++ b/paddle/framework/CMakeLists.txt @@ -98,3 +98,5 @@ if(NOT WITH_C_API AND WITH_FLUID) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/framework.pb.h DESTINATION include/paddle/framework) install(FILES details/cow_ptr.h details/op_registry.h DESTINATION include/paddle/framework/details) endif() + +cc_test(channel_test SRCS channel_test.cc) diff --git a/paddle/framework/channel.h b/paddle/framework/channel.h index 9ba0fc5c558a85b41deb01ad57842d9c4c054e0e..70ecccc1a1078374f3190b3956103ed8000c4fc5 100644 --- a/paddle/framework/channel.h +++ b/paddle/framework/channel.h @@ -13,75 +13,52 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once -#include -#include -#include + +#include // for size_t namespace paddle { namespace framework { +// Channel is the abstract class of buffered and un-buffered channels. template class Channel { public: - explicit Channel(std::size_t capacity) : capacity_(capacity) {} - - void Send(T* channel_element) { - std::unique_lock lock(mu_); - - if (IsBounded()) { - full_cond_var_.wait(lock, [this]() { - bool capacity_valid = capacity_ > 0 ? !IsCapacityFull() : true; - return capacity_valid; - }); - } - channel_.push_back(std::move(*channel_element)); - - lock.unlock(); - empty_cond_var_.notify_one(); - } + virtual void Send(T*) = 0; + virtual void Receive(T*) = 0; + virtual size_t Cap() = 0; - T* Receive() { - std::unique_lock lock(mu_); - empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); }); - - T* channel_element = std::move(channel_.front()); - channel_.pop_front(); - - NotifyAllSenders(&lock); - return channel_element; - } - - size_t Size() { - std::unique_lock lock(mu_); - return channel_.size(); - } + // Don't delete channels; instead, call Channel::Close. + protected: + virtual ~Channel() {} +}; - void Clear() { - std::unique_lock lock(mu_); - channel_.clear(); +// Forward declaration of channel implementations. +namespace details { +template +class Buffered; +template +class UnBuffered; +} // namespace details - NotifyAllSenders(&lock); +template +Channel* MakeChannel(size_t buffer_size) { + if (buffer_size > 0) { + return new details::Buffered(buffer_size); } + return new details::UnBuffered(); +} - private: - std::size_t capacity_; - std::mutex mu_; - std::condition_variable empty_cond_var_; - std::condition_variable full_cond_var_; - std::deque channel_; - - private: - void NotifyAllSenders(std::unique_lock* lock) { - if (IsBounded()) { - lock->unlock(); - full_cond_var_.notify_one(); - } +template +void CloseChannel(Channel* ch) { + if (ch->Cap() > 0) { + delete dynamic_cast*>(ch); + } else { + delete dynamic_cast*>(ch); } +} - bool IsBounded() const { return capacity_ > 0; } - - bool IsCapacityFull() const { return channel_.size() >= capacity_; } -}; - -} // namespace operator +} // namespace framework } // namespace paddle + +#include "paddle/framework/details/buffered_channel.h" +#include "paddle/framework/details/unbuffered_channel.h" diff --git a/paddle/framework/channel_test.cc b/paddle/framework/channel_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..9efc0172658c800d14102531332dbef68fa392f4 --- /dev/null +++ b/paddle/framework/channel_test.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/framework/channel.h" + +#include "gtest/gtest.h" + +TEST(Channel, MakeAndClose) { + using paddle::framework::Channel; + using paddle::framework::MakeChannel; + using paddle::framework::CloseChannel; + + Channel* ch = MakeChannel(10); + CloseChannel(ch); +} diff --git a/paddle/framework/data_type.h b/paddle/framework/data_type.h index 6a372ac32e48131eed28e2d42125feb5b92a11c7..98eb3e857d1943e71f1d41f24ecbedbe09e85b7b 100644 --- a/paddle/framework/data_type.h +++ b/paddle/framework/data_type.h @@ -79,5 +79,33 @@ inline void VisitDataType(proto::DataType type, Visitor visitor) { } } +inline std::string DataTypeToString(const proto::DataType type) { + using namespace paddle::framework::proto; + switch (type) { + case DataType::FP16: + return "float16"; + case DataType::FP32: + return "float32"; + case DataType::FP64: + return "float64"; + case DataType::INT16: + return "int16"; + case DataType::INT32: + return "int32"; + case DataType::INT64: + return "int64"; + case DataType::BOOL: + return "bool"; + default: + PADDLE_THROW("Not support type %d", type); + } +} + +inline std::ostream& operator<<(std::ostream& out, + const proto::DataType& type) { + out << DataTypeToString(type); + return out; +} + } // namespace framework } // namespace paddle diff --git a/paddle/framework/details/buffered_channel.h b/paddle/framework/details/buffered_channel.h new file mode 100644 index 0000000000000000000000000000000000000000..572e29d44a3baec84a029d87f9b0874784aa761b --- /dev/null +++ b/paddle/framework/details/buffered_channel.h @@ -0,0 +1,82 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include +#include + +#include "paddle/framework/channel.h" + +namespace paddle { +namespace framework { +namespace details { + +template +class Buffered : public paddle::framework::Channel { + friend Channel* paddle::framework::MakeChannel(size_t); + friend void paddle::framework::CloseChannel(Channel*); + + public: + virtual void Send(T*); + virtual void Receive(T*); + virtual size_t Cap() { return cap_; } + + private: + size_t cap_; + std::mutex mu_; + std::condition_variable empty_cond_var_; + std::condition_variable full_cond_var_; + std::deque channel_; + + Buffered(size_t cap) : cap_(cap) {} + virtual ~Buffered(); + + void NotifyAllSenders(std::unique_lock*); +}; + +template +void Buffered::Send(T* item) { + std::unique_lock lock(mu_); + full_cond_var_.wait(lock, [this]() { return channel_.size() < cap_; }); + channel_.push_back(std::move(*item)); + lock.unlock(); + empty_cond_var_.notify_one(); +} + +template +void Buffered::Receive(T* item) { + std::unique_lock lock(mu_); + empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); }); + *item = std::move(channel_.front()); + channel_.pop_front(); + NotifyAllSenders(&lock); +} + +template +Buffered::~Buffered() { + std::unique_lock lock(mu_); + channel_.clear(); + NotifyAllSenders(&lock); +} + +template +void Buffered::NotifyAllSenders(std::unique_lock* lock) { + lock->unlock(); + full_cond_var_.notify_one(); +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/details/unbuffered_channel.h b/paddle/framework/details/unbuffered_channel.h new file mode 100644 index 0000000000000000000000000000000000000000..7ecced1fba88fea781fc342091bc71e5aa496d3a --- /dev/null +++ b/paddle/framework/details/unbuffered_channel.h @@ -0,0 +1,52 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include +#include + +#include "paddle/framework/channel.h" + +namespace paddle { +namespace framework { +namespace details { + +template +class UnBuffered : public paddle::framework::Channel { + friend Channel* paddle::framework::MakeChannel(size_t); + friend void paddle::framework::CloseChannel(Channel*); + + public: + virtual void Send(T*); + virtual void Receive(T*); + virtual size_t Cap() { return 0; } + + private: + UnBuffered() {} + virtual ~UnBuffered(); +}; + +template +void UnBuffered::Send(T* channel_element) {} + +template +void UnBuffered::Receive(T*) {} + +template +UnBuffered::~UnBuffered() {} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/op_kernel_type_test.cc b/paddle/framework/op_kernel_type_test.cc index 649afeee8a846b0579545f2edff77e9dbe3b4dd8..cb23bbde01493d1a3b5845e77d6160a75f409c7a 100644 --- a/paddle/framework/op_kernel_type_test.cc +++ b/paddle/framework/op_kernel_type_test.cc @@ -26,9 +26,9 @@ TEST(OpKernelType, ToString) { OpKernelType op_kernel_type(DataType::FP32, CPUPlace(), DataLayout::kNCHW, LibraryType::kCUDNN); - ASSERT_EQ( - paddle::framework::KernelTypeToString(op_kernel_type), - "data_type[5]:data_layout[NCHW]:place[CPUPlace]:library_type[CUDNN]"); + ASSERT_EQ(paddle::framework::KernelTypeToString(op_kernel_type), + "data_type[float32]:data_layout[NCHW]:place[CPUPlace]:library_type[" + "CUDNN]"); } TEST(OpKernelType, Hash) { diff --git a/paddle/operators/CMakeLists.txt b/paddle/operators/CMakeLists.txt index 15f7cb6b560590f55e276fde4900d2e3c0045fb8..50b441e75d05d985d2ab2e0c46218eacab45e2ea 100644 --- a/paddle/operators/CMakeLists.txt +++ b/paddle/operators/CMakeLists.txt @@ -122,9 +122,11 @@ if(WITH_DISTRIBUTE) set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) op_library(recv_op DEPS ${DISTRIBUTE_DEPS}) set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op recv_op sum_op executor) + op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS}) + set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op listen_and_serv_op sum_op executor) else() - set(DEPS_OPS ${DEPS_OPS} send_op recv_op) + set(DEPS_OPS ${DEPS_OPS} send_op recv_op listen_and_serv_op) endif() op_library(cond_op DEPS framework_proto tensor net_op) @@ -147,6 +149,7 @@ op_library(max_sequence_len_op DEPS lod_rank_table) op_library(sequence_conv_op DEPS context_project) op_library(sequence_pool_op DEPS sequence_pooling) op_library(lstm_op DEPS sequence2batch lstm_compute) +op_library(lstmp_op DEPS sequence2batch lstm_compute) op_library(gru_op DEPS sequence2batch gru_compute) op_library(recurrent_op DEPS executor) op_library(warpctc_op DEPS dynload_warpctc sequence_padding sequence_scale math_function) diff --git a/paddle/operators/detail/grpc_client.cc b/paddle/operators/detail/grpc_client.cc index c43394554263c761d010494f692a4050cae7e8cb..9b5f7afc6a48f13ff999f635efeb9e7bf0a76fb5 100644 --- a/paddle/operators/detail/grpc_client.cc +++ b/paddle/operators/detail/grpc_client.cc @@ -97,6 +97,21 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, return true; } +bool RPCClient::AsyncSendBatchBarrier(const std::string& ep, int64_t time_out) { + const auto ch = GetChannel(ep); + + BatchBarrierProcessor* s = new BatchBarrierProcessor(ch); + s->Prepare(time_out); + + sendrecv::VariableMessage req; + req.set_varname(BATCH_BARRIER_MESSAGE); + auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_); + rpc->Finish(&s->reply_, &s->status_, (void*)s); + req_count_++; + + return true; +} + bool RPCClient::Wait() { if (req_count_ <= 0) { return true; diff --git a/paddle/operators/detail/grpc_client.h b/paddle/operators/detail/grpc_client.h index a62e70a2533ae52d84d010504b19fed5aeb15dc0..f9499f6dc70c541c214e0b659f10b2ed1e8e8581 100644 --- a/paddle/operators/detail/grpc_client.h +++ b/paddle/operators/detail/grpc_client.h @@ -71,6 +71,15 @@ class ClientBase { context_->set_deadline(deadline); } + virtual void Prepare(int64_t time_out) { + context_.reset(new grpc::ClientContext()); + + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(time_out); + + context_->set_deadline(deadline); + } + virtual void Process() = 0; std::unique_ptr stub_; @@ -117,6 +126,17 @@ class GetProcessor : public ClientBase { RequestGetCallBack response_call_back_ = ProcGetResponse; }; +class BatchBarrierProcessor : public ClientBase { + public: + explicit BatchBarrierProcessor(std::shared_ptr ch) + : ClientBase(ch) {} + + virtual ~BatchBarrierProcessor() {} + + virtual void Process() {} + sendrecv::VoidMessage reply_; +}; + class RPCClient { public: bool AsyncSendVariable(const std::string& ep, @@ -130,6 +150,10 @@ class RPCClient { const framework::Scope& scope, const std::string& var_name, int64_t time_out = 600 * 1000); + + bool AsyncSendBatchBarrier(const std::string& ep, + int64_t time_out = 600 * 1000); + bool Wait(); private: diff --git a/paddle/operators/detail/grpc_server.cc b/paddle/operators/detail/grpc_server.cc index 3ddcd839bdd23547216465dfaf44a3cd8285fe6d..4f94e1315fbd2810a05354f7c3fc54ea30967e8a 100644 --- a/paddle/operators/detail/grpc_server.cc +++ b/paddle/operators/detail/grpc_server.cc @@ -132,6 +132,7 @@ void AsyncGRPCServer::RunSyncUpdate() { cq_send_ = builder.AddCompletionQueue(); cq_get_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); LOG(INFO) << "Server listening on " << address_ << std::endl; @@ -141,11 +142,11 @@ void AsyncGRPCServer::RunSyncUpdate() { std::bind(&AsyncGRPCServer::TryToRegisterNewGetOne, this); t_send_.reset( - new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, false, + new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_send_.get(), "cq_send", send_register))); t_get_.reset( - new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, true, + new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_get_.get(), "cq_get", get_register))); // wait server @@ -174,7 +175,7 @@ void AsyncGRPCServer::TryToRegisterNewSendOne() { } RequestSend* send = new RequestSend(&service_, cq_send_.get(), &var_recv_queue_); - VLOG(4) << "create RequestSend status:" << send->Status(); + VLOG(4) << "Create RequestSend status:" << send->Status(); } void AsyncGRPCServer::TryToRegisterNewGetOne() { @@ -184,11 +185,11 @@ void AsyncGRPCServer::TryToRegisterNewGetOne() { } RequestGet* get = new RequestGet(&service_, cq_get_.get(), scope_, dev_ctx_, &var_get_queue_); - VLOG(4) << "create Requestget status:" << get->Status(); + VLOG(4) << "Create RequestGet status:" << get->Status(); } -// FIXME(typhoonzero): remove wait argument and change cq_name to enum. -void AsyncGRPCServer::HandleRequest(bool wait, grpc::ServerCompletionQueue* cq, +// FIXME(typhoonzero): change cq_name to enum. +void AsyncGRPCServer::HandleRequest(grpc::ServerCompletionQueue* cq, std::string cq_name, std::function TryToRegisterNewOne) { TryToRegisterNewOne(); diff --git a/paddle/operators/detail/grpc_server.h b/paddle/operators/detail/grpc_server.h index 1ca9086c744c558fd05fb4fc1a7280729afbec28..3f8b9d93176148619d6820f6a365d9da2e73b10d 100644 --- a/paddle/operators/detail/grpc_server.h +++ b/paddle/operators/detail/grpc_server.h @@ -57,8 +57,7 @@ class AsyncGRPCServer final : public sendrecv::SendRecvService::Service { void ShutDown(); protected: - void HandleRequest(bool wait, grpc::ServerCompletionQueue *cq, - std::string cq_name, + void HandleRequest(grpc::ServerCompletionQueue *cq, std::string cq_name, std::function TryToRegisterNewOne); void TryToRegisterNewSendOne(); void TryToRegisterNewGetOne(); diff --git a/paddle/operators/detail/sendrecvop_utils.h b/paddle/operators/detail/sendrecvop_utils.h index bc6581afab93c626c7c2439d699c6c2d858df9fa..8e66f7299c7b4d30bc5a6fe6a18b7cb3ae3827a5 100644 --- a/paddle/operators/detail/sendrecvop_utils.h +++ b/paddle/operators/detail/sendrecvop_utils.h @@ -30,6 +30,9 @@ namespace paddle { namespace operators { namespace detail { +#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" +#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV" + void SerializeToMessage(const std::string& name, const framework::Variable* var, const platform::DeviceContext& ctx, sendrecv::VariableMessage* msg); diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index f8af4a290b470f31980245dfd54e600d26da576d..1e64d5c65c0c9dcb87360290f2b9e41d496f770f 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -25,9 +25,9 @@ limitations under the License. */ namespace paddle { namespace operators { -class SendOp : public framework::OperatorBase { +class RecvOp : public framework::OperatorBase { public: - SendOp(const std::string& type, const framework::VariableNameMap& inputs, + RecvOp(const std::string& type, const framework::VariableNameMap& inputs, const framework::VariableNameMap& outputs, const framework::AttributeMap& attrs) : OperatorBase(type, inputs, outputs, attrs) {} @@ -44,7 +44,6 @@ class SendOp : public framework::OperatorBase { VLOG(3) << "getting " << outs[i]; client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } - PADDLE_ENFORCE(client_.Wait()); } @@ -52,21 +51,22 @@ class SendOp : public framework::OperatorBase { mutable detail::RPCClient client_; }; -class SendOpMaker : public framework::OpProtoAndCheckerMaker { +class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: - SendOpMaker(OpProto* proto, OpAttrChecker* op_checker) + RecvOpMaker(OpProto* proto, OpAttrChecker* op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "(Tensor) Input tensor to be sent").AsDuplicable(); AddOutput("Out", "(Tensor) Output tensor to be received from server") .AsDuplicable(); AddComment(R"DOC( -Send operator +Recv operator -This operator will send tensor to recv_op at the parameter server. +This operator can get variables from server side. )DOC"); AddAttr>("endpoints", "(string vector, default 127.0.0.1:6164)" - "Server endpoints to send variables to.") + "Server endpoints to recv variables" + "from.") .SetDefault({}); AddAttr>("epmap", "(string vector, default 127.0.0.1:6164)" @@ -81,4 +81,4 @@ This operator will send tensor to recv_op at the parameter server. namespace ops = paddle::operators; -REGISTER_OPERATOR(send, ops::SendOp, ops::SendOpMaker); +REGISTER_OPERATOR(recv, ops::RecvOp, ops::RecvOpMaker); diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index c90e4d8ef06581e2fb7f99dcbe70df32c1ad80c0..9c180a72448342d61f941939952db4f6aa1b93ae 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -38,26 +38,34 @@ class SendOp : public framework::OperatorBase { auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); bool do_get = Attr("DoGet"); + std::vector endpoints = + Attr>("endpoints"); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); for (size_t i = 0; i < ins.size(); i++) { - VLOG(3) << "sending " << ins[i]; + VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } PADDLE_ENFORCE(client_.Wait()); + for (auto& ep : endpoints) { + VLOG(3) << "batch barrier, ep: " << ep; + client_.AsyncSendBatchBarrier(ep); + } + PADDLE_ENFORCE(client_.Wait()); + if (do_get) { for (size_t i = 0; i < outs.size(); i++) { - VLOG(3) << "getting " << outs[i]; + VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } + PADDLE_ENFORCE(client_.Wait()); } - - PADDLE_ENFORCE(client_.Wait()); } private: + // TODO(typhoonzero): put RPCClient in a Variable. mutable detail::RPCClient client_; }; diff --git a/paddle/operators/send_recv_op_test.cc b/paddle/operators/send_recv_op_test.cc index 874eac9711377d5d8d3eb022511fcc6535a5098f..31527a906d56da54d2571910de627757d708a996 100644 --- a/paddle/operators/send_recv_op_test.cc +++ b/paddle/operators/send_recv_op_test.cc @@ -132,7 +132,7 @@ void StartServerNet(bool is_sparse) { attrs.insert({"GradList", std::vector({"x1"})}); attrs.insert({"OptimizeBlock", block}); listen_and_serv_op = - f::OpRegistry::CreateOp("listen_and_serv", {{"RX", {"x1"}}}, {}, attrs); + f::OpRegistry::CreateOp("listen_and_serv", {}, {}, attrs); listen_and_serv_op->Run(scope, place); } diff --git a/paddle/scripts/docker/build.sh b/paddle/scripts/docker/build.sh index e70d04d9017e9e36bbd55d6a28889d9ba7fb2a13..fbae37b2ca063e32cb12ded0da901d93438bc9a2 100644 --- a/paddle/scripts/docker/build.sh +++ b/paddle/scripts/docker/build.sh @@ -32,7 +32,7 @@ function cmake_gen() { cat <= 0 + parent_block = prog.block(parent_idx) + return parent_block + + def complete_op(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + + params, grads = self.get_params_and_grads() + param_names = [p.name for p in params] + grad_names = [g.name for g in grads] + parent_block.append_op( + type='listen_and_serv', + inputs={}, + outputs={}, + attrs={ + 'endpoint': self.endpoint, + 'Fanin': self.fan_in, + 'ParamList': param_names, + 'GradList': grad_names, + 'OptimizeBlock': current_block + }) + + +def Send(endpoints, send_vars, get_vars): + """ + Send layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Send", **locals()) + helper.append_op( + type="send", + inputs={"X": send_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) + + +def Recv(endpoints, get_vars): + """ + Recv layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Recv", **locals()) + helper.append_op( + type="recv", + inputs={"X": get_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) diff --git a/python/paddle/v2/fluid/layers/nn.py b/python/paddle/v2/fluid/layers/nn.py index bae33f6a156b094d6e9cccc9cabb42880f157ac8..d11dccfd22124d58d8634c01a00527c373b92f00 100644 --- a/python/paddle/v2/fluid/layers/nn.py +++ b/python/paddle/v2/fluid/layers/nn.py @@ -26,6 +26,7 @@ __all__ = [ 'fc', 'embedding', 'dynamic_lstm', + 'dynamic_lstmp', 'dynamic_gru', 'gru_unit', 'linear_chain_crf', @@ -256,7 +257,8 @@ def dynamic_lstm(input, gate_activation='sigmoid', cell_activation='tanh', candidate_activation='tanh', - dtype='float32'): + dtype='float32', + name=None): """ **Dynamic LSTM Layer** @@ -282,7 +284,7 @@ def dynamic_lstm(input, W_{fc}, W_{oc}` are diagonal weight matrices for peephole connections. In our implementation, we use vectors to reprenset these diagonal weight matrices. The :math:`b` terms denote bias vectors (:math:`b_i` is the input - gate bias vector), :math:`\sigma` is the non-line activations, such as + gate bias vector), :math:`\sigma` is the non-linear activations, such as logistic sigmoid function, and :math:`i, f, o` and :math:`c` are the input gate, forget gate, output gate, and cell activation vectors, respectively, all of which have the same size as the cell output activation vector :math:`h`. @@ -308,25 +310,25 @@ def dynamic_lstm(input, (T X 4D), where T is the total time steps in this mini-batch, D is the hidden size. size(int): 4 * hidden size. - param_attr(ParamAttr): The parameter attribute for the learnable + param_attr(ParamAttr|None): The parameter attribute for the learnable hidden-hidden weights. - - The shape is (D x 4D), where D is the hidden - size. - Weights = {:math:`W_{ch}, W_{ih}, \ W_{fh}, W_{oh}`} - bias_attr(ParamAttr): The bias attribute for the learnable bias + - The shape is (D x 4D), where D is the hidden + size. + bias_attr(ParamAttr|None): The bias attribute for the learnable bias weights, which contains two parts, input-hidden bias weights and peephole connections weights if setting `use_peepholes` to `True`. 1. `use_peepholes = False` - - The shape is (1 x 4D). - Biases = {:math:`b_c, b_i, b_f, b_o`}. + - The shape is (1 x 4D). 2. `use_peepholes = True` - - The shape is (1 x 7D). - Biases = { :math:`b_c, b_i, b_f, b_o, W_{ic}, \ W_{fc}, W_{oc}`}. + - The shape is (1 x 7D). use_peepholes(bool): Whether to enable diagonal/peephole connections, default `True`. is_reverse(bool): Whether to compute reversed LSTM, default `False`. @@ -339,6 +341,8 @@ def dynamic_lstm(input, Choices = ["sigmoid", "tanh", "relu", "identity"], default "tanh". dtype(str): Data type. Choices = ["float32", "float64"], default "float32". + name(str|None): A name for this layer(optional). If set None, the layer + will be named automatically. Returns: tuple: The hidden state, and cell state of LSTM. The shape of both \ @@ -353,6 +357,7 @@ def dynamic_lstm(input, forward, _ = fluid.layers.dynamic_lstm( input=forward_proj, size=hidden_dim * 4, use_peepholes=False) """ + helper = LayerHelper('lstm', **locals()) size = size / 4 weight = helper.create_parameter( @@ -389,6 +394,192 @@ def dynamic_lstm(input, return hidden, cell +def dynamic_lstmp(input, + size, + proj_size, + param_attr=None, + bias_attr=None, + use_peepholes=True, + is_reverse=False, + gate_activation='sigmoid', + cell_activation='tanh', + candidate_activation='tanh', + proj_activation='tanh', + dtype='float32', + name=None): + """ + **Dynamic LSTMP Layer** + + LSTMP (LSTM with recurrent projection) layer has a separate projection + layer after the LSTM layer, projecting the original hidden state to a + lower-dimensional one, which is proposed to reduce the number of total + parameters and furthermore computational complexity for the LSTM, + espeacially for the case that the size of output units is relative + large (https://research.google.com/pubs/archive/43905.pdf). + + The formula is as follows: + + .. math:: + + i_t & = \sigma(W_{ix}x_{t} + W_{ir}r_{t-1} + W_{ic}c_{t-1} + b_i) + + f_t & = \sigma(W_{fx}x_{t} + W_{fr}r_{t-1} + W_{fc}c_{t-1} + b_f) + + \\tilde{c_t} & = act_g(W_{cx}x_t + W_{cr}r_{t-1} + b_c) + + o_t & = \sigma(W_{ox}x_{t} + W_{or}r_{t-1} + W_{oc}c_t + b_o) + + c_t & = f_t \odot c_{t-1} + i_t \odot \\tilde{c_t} + + h_t & = o_t \odot act_h(c_t) + + r_t & = \overline{act_h}(W_{rh}h_t) + + In the above formula: + + * :math:`W`: Denotes weight matrices (e.g. :math:`W_{xi}` is \ + the matrix of weights from the input gate to the input). + * :math:`W_{ic}`, :math:`W_{fc}`, :math:`W_{oc}`: Diagonal weight \ + matrices for peephole connections. In our implementation, \ + we use vectors to reprenset these diagonal weight matrices. + * :math:`b`: Denotes bias vectors (e.g. :math:`b_i` is the input gate \ + bias vector). + * :math:`\sigma`: The activation, such as logistic sigmoid function. + * :math:`i, f, o` and :math:`c`: The input gate, forget gate, output \ + gate, and cell activation vectors, respectively, all of which have \ + the same size as the cell output activation vector :math:`h`. + * :math:`h`: The hidden state. + * :math:`r`: The recurrent projection of the hidden state. + * :math:`\\tilde{c_t}`: The candidate hidden state, whose \ + computation is based on the current input and previous hidden state. + * :math:`\odot`: The element-wise product of the vectors. + * :math:`act_g` and :math:`act_h`: The cell input and cell output \ + activation functions and `tanh` is usually used for them. + * :math:`\overline{act_h}`: The activation function for the projection \ + output, usually using `identity` or same as :math:`act_h`. + + Set `use_peepholes` to `False` to disable peephole connection. The formula + is omitted here, please refer to the paper + http://www.bioinf.jku.at/publications/older/2604.pdf for details. + + Note that these :math:`W_{xi}x_{t}, W_{xf}x_{t}, W_{xc}x_{t}, W_{xo}x_{t}` + operations on the input :math:`x_{t}` are NOT included in this operator. + Users can choose to use fully-connected layer before LSTMP layer. + + Args: + input(Variable): The input of dynamic_lstmp layer, which supports + variable-time length input sequence. The underlying + tensor in this Variable is a matrix with shape + (T X 4D), where T is the total time steps in this + mini-batch, D is the hidden size. + size(int): 4 * hidden size. + proj_size(int): The size of projection output. + param_attr(ParamAttr|None): The parameter attribute for the learnable + hidden-hidden weight and projection weight. + + - Hidden-hidden weight = {:math:`W_{ch}, W_{ih}, \ + W_{fh}, W_{oh}`}. + - The shape of hidden-hidden weight is (P x 4D), + where P is the projection size and D the hidden + size. + - Projection weight = {:math:`W_{rh}`}. + - The shape of projection weight is (D x P). + bias_attr(ParamAttr|None): The bias attribute for the learnable bias + weights, which contains two parts, input-hidden + bias weights and peephole connections weights if + setting `use_peepholes` to `True`. + + 1. `use_peepholes = False` + - Biases = {:math:`b_c, b_i, b_f, b_o`}. + - The shape is (1 x 4D). + 2. `use_peepholes = True` + - Biases = { :math:`b_c, b_i, b_f, b_o, W_{ic}, \ + W_{fc}, W_{oc}`}. + - The shape is (1 x 7D). + use_peepholes(bool): Whether to enable diagonal/peephole connections, + default `True`. + is_reverse(bool): Whether to compute reversed LSTM, default `False`. + gate_activation(str): The activation for input gate, forget gate and + output gate. Choices = ["sigmoid", "tanh", "relu", + "identity"], default "sigmoid". + cell_activation(str): The activation for cell output. Choices = ["sigmoid", + "tanh", "relu", "identity"], default "tanh". + candidate_activation(str): The activation for candidate hidden state. + Choices = ["sigmoid", "tanh", "relu", "identity"], + default "tanh". + proj_activation(str): The activation for projection output. + Choices = ["sigmoid", "tanh", "relu", "identity"], + default "tanh". + dtype(str): Data type. Choices = ["float32", "float64"], default "float32". + name(str|None): A name for this layer(optional). If set None, the layer + will be named automatically. + + Returns: + tuple: The projection of hidden state, and cell state of LSTMP. The \ + shape of projection is (T x P), for the cell state which is \ + (T x D), and both LoD is the same with the `input`. + + Examples: + .. code-block:: python + + hidden_dim, proj_dim = 512, 256 + fc_out = fluid.layers.fc(input=input_seq, size=hidden_dim * 4, + act=None, bias_attr=None) + proj_out, _ = fluid.layers.dynamic_lstmp(input=fc_out, + size=hidden_dim * 4, + proj_size=proj_dim, + use_peepholes=False, + is_reverse=True, + cell_activation="tanh", + proj_activation="tanh") + """ + + helper = LayerHelper('lstmp', **locals()) + size = size / 4 + weight = helper.create_parameter( + attr=helper.param_attr, shape=[proj_size, 4 * size], dtype=dtype) + proj_weight = helper.create_parameter( + attr=helper.param_attr, shape=[size, proj_size], dtype=dtype) + bias_size = [1, 7 * size] + if not use_peepholes: + bias_size[1] = 4 * size + bias = helper.create_parameter( + attr=helper.bias_attr, shape=bias_size, dtype=dtype, is_bias=True) + + projection = helper.create_tmp_variable(dtype) + cell = helper.create_tmp_variable(dtype) + ordered_proj0 = helper.create_tmp_variable(dtype) + batch_hidden = helper.create_tmp_variable(dtype) + batch_gate = helper.create_tmp_variable(dtype) + batch_cell_pre_act = helper.create_tmp_variable(dtype) + + helper.append_op( + type='lstmp', + inputs={ + 'Input': input, + 'Weight': weight, + 'ProjWeight': proj_weight, + 'Bias': bias + }, + outputs={ + 'Projection': projection, + 'Cell': cell, + 'OrderedP0': ordered_proj0, + 'BatchHidden': batch_hidden, + 'BatchGate': batch_gate, + 'BatchCellPreAct': batch_cell_pre_act + }, + attrs={ + 'use_peepholes': use_peepholes, + 'is_reverse': is_reverse, + 'gate_activation': gate_activation, + 'cell_activation': cell_activation, + 'candidate_activation': candidate_activation, + 'proj_activation': proj_activation + }) + return projection, cell + + def dynamic_gru(input, size, param_attr=None, diff --git a/python/paddle/v2/fluid/tests/CMakeLists.txt b/python/paddle/v2/fluid/tests/CMakeLists.txt index 83053160820a70bb5e54f721c0d7b881c5765004..628ce60b406d880d961d705a6abd2b5236fb1c8c 100644 --- a/python/paddle/v2/fluid/tests/CMakeLists.txt +++ b/python/paddle/v2/fluid/tests/CMakeLists.txt @@ -1,5 +1,10 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + +if(NOT WITH_DISTRIBUTE) + list(REMOVE_ITEM TEST_OPS test_recv_op) +endif(NOT WITH_DISTRIBUTE) + foreach(src ${TEST_OPS}) py_test(${src} SRCS ${src}.py) endforeach() diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py index 0c51ccf306367d9a458968a7a4a8aa53bea663c3..298ecfc386b3ae093cf714a41f5072759cb2cf2e 100644 --- a/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py @@ -1,16 +1,16 @@ -#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # -#Licensed under the Apache License, Version 2.0 (the "License"); -#you may not use this file except in compliance with the License. -#You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -#Unless required by applicable law or agreed to in writing, software -#distributed under the License is distributed on an "AS IS" BASIS, -#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -#See the License for the specific language governing permissions and -#limitations under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from __future__ import print_function diff --git a/python/paddle/v2/fluid/tests/test_layers.py b/python/paddle/v2/fluid/tests/test_layers.py index 4e863625422c93c77ad4fb65be35580943d1cf54..3f54e28defb76d3430a82e791578e20b84833f16 100644 --- a/python/paddle/v2/fluid/tests/test_layers.py +++ b/python/paddle/v2/fluid/tests/test_layers.py @@ -202,6 +202,18 @@ class TestBook(unittest.TestCase): x_t=x_t, hidden_t_prev=prev_hidden, cell_t_prev=prev_cell)) print(str(program)) + def test_dynamic_lstmp(self): + program = Program() + with program_guard(program): + hidden_dim, proj_dim = 16, 8 + seq_data = layers.data( + name='seq_data', shape=[10, 10], dtype='float32', lod_level=1) + fc_out = layers.fc(input=seq_data, size=4 * hidden_dim) + self.assertIsNotNone( + layers.dynamic_lstmp( + input=fc_out, size=4 * hidden_dim, proj_size=proj_dim)) + print(str(program)) + def test_sequence_softmax(self): program = Program() with program_guard(program): diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py new file mode 100644 index 0000000000000000000000000000000000000000..def5ca94422d2391de2608e1d49d1763d8da9aef --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -0,0 +1,70 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle.v2.fluid as fluid +import paddle.v2.fluid.layers as layers +import numpy +from multiprocessing import Process +import os, sys +import time + + +class TestRecvOp(unittest.TestCase): + def test_send(self): + # Run init_serv in a thread + place = fluid.CPUPlace() + p = Process(target=self.init_serv, args=(place, )) + p.daemon = True + p.start() + time.sleep(5) + self.init_client(place) + # FIXME(typhoonzero): find a way to gracefully shutdown the server. + os.system("kill -9 %d" % p.pid) + p.join() + + def init_serv(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name="X", + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False) + with serv.do(): + o = layers.scale(x=x, scale=10.0) + main.global_block().create_var( + name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + exe = fluid.Executor(place) + exe.run(main) + + def init_client(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name='X', + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + layers.Send("127.0.0.1:6174", [x], [x]) + exe = fluid.Executor(place) + exe.run(main) + + +if __name__ == "__main__": + unittest.main()