diff --git a/paddle/framework/CMakeLists.txt b/paddle/framework/CMakeLists.txt index 280496984251919a8b4b6c52684f950a80b78356..8c28709a68bec4fca5acaf2ec74b6d02402a6139 100644 --- a/paddle/framework/CMakeLists.txt +++ b/paddle/framework/CMakeLists.txt @@ -26,7 +26,7 @@ nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) cc_test(variable_test SRCS variable_test.cc) -cc_library(threadpool SRCS threadpool.cc) +cc_library(threadpool SRCS threadpool.cc DEPS enforce) cc_test(threadpool_test SRCS threadpool_test.cc DEPS threadpool) cc_library(scope SRCS scope.cc DEPS glog threadpool) @@ -98,3 +98,5 @@ if(NOT WITH_C_API AND WITH_FLUID) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/framework.pb.h DESTINATION include/paddle/framework) install(FILES details/cow_ptr.h details/op_registry.h DESTINATION include/paddle/framework/details) endif() + +cc_test(channel_test SRCS channel_test.cc) diff --git a/paddle/framework/channel.h b/paddle/framework/channel.h index 9ba0fc5c558a85b41deb01ad57842d9c4c054e0e..70ecccc1a1078374f3190b3956103ed8000c4fc5 100644 --- a/paddle/framework/channel.h +++ b/paddle/framework/channel.h @@ -13,75 +13,52 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once -#include -#include -#include + +#include // for size_t namespace paddle { namespace framework { +// Channel is the abstract class of buffered and un-buffered channels. template class Channel { public: - explicit Channel(std::size_t capacity) : capacity_(capacity) {} - - void Send(T* channel_element) { - std::unique_lock lock(mu_); - - if (IsBounded()) { - full_cond_var_.wait(lock, [this]() { - bool capacity_valid = capacity_ > 0 ? !IsCapacityFull() : true; - return capacity_valid; - }); - } - channel_.push_back(std::move(*channel_element)); - - lock.unlock(); - empty_cond_var_.notify_one(); - } + virtual void Send(T*) = 0; + virtual void Receive(T*) = 0; + virtual size_t Cap() = 0; - T* Receive() { - std::unique_lock lock(mu_); - empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); }); - - T* channel_element = std::move(channel_.front()); - channel_.pop_front(); - - NotifyAllSenders(&lock); - return channel_element; - } - - size_t Size() { - std::unique_lock lock(mu_); - return channel_.size(); - } + // Don't delete channels; instead, call Channel::Close. + protected: + virtual ~Channel() {} +}; - void Clear() { - std::unique_lock lock(mu_); - channel_.clear(); +// Forward declaration of channel implementations. +namespace details { +template +class Buffered; +template +class UnBuffered; +} // namespace details - NotifyAllSenders(&lock); +template +Channel* MakeChannel(size_t buffer_size) { + if (buffer_size > 0) { + return new details::Buffered(buffer_size); } + return new details::UnBuffered(); +} - private: - std::size_t capacity_; - std::mutex mu_; - std::condition_variable empty_cond_var_; - std::condition_variable full_cond_var_; - std::deque channel_; - - private: - void NotifyAllSenders(std::unique_lock* lock) { - if (IsBounded()) { - lock->unlock(); - full_cond_var_.notify_one(); - } +template +void CloseChannel(Channel* ch) { + if (ch->Cap() > 0) { + delete dynamic_cast*>(ch); + } else { + delete dynamic_cast*>(ch); } +} - bool IsBounded() const { return capacity_ > 0; } - - bool IsCapacityFull() const { return channel_.size() >= capacity_; } -}; - -} // namespace operator +} // namespace framework } // namespace paddle + +#include "paddle/framework/details/buffered_channel.h" +#include "paddle/framework/details/unbuffered_channel.h" diff --git a/paddle/framework/channel_test.cc b/paddle/framework/channel_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..9efc0172658c800d14102531332dbef68fa392f4 --- /dev/null +++ b/paddle/framework/channel_test.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/framework/channel.h" + +#include "gtest/gtest.h" + +TEST(Channel, MakeAndClose) { + using paddle::framework::Channel; + using paddle::framework::MakeChannel; + using paddle::framework::CloseChannel; + + Channel* ch = MakeChannel(10); + CloseChannel(ch); +} diff --git a/paddle/framework/details/buffered_channel.h b/paddle/framework/details/buffered_channel.h new file mode 100644 index 0000000000000000000000000000000000000000..572e29d44a3baec84a029d87f9b0874784aa761b --- /dev/null +++ b/paddle/framework/details/buffered_channel.h @@ -0,0 +1,82 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include +#include + +#include "paddle/framework/channel.h" + +namespace paddle { +namespace framework { +namespace details { + +template +class Buffered : public paddle::framework::Channel { + friend Channel* paddle::framework::MakeChannel(size_t); + friend void paddle::framework::CloseChannel(Channel*); + + public: + virtual void Send(T*); + virtual void Receive(T*); + virtual size_t Cap() { return cap_; } + + private: + size_t cap_; + std::mutex mu_; + std::condition_variable empty_cond_var_; + std::condition_variable full_cond_var_; + std::deque channel_; + + Buffered(size_t cap) : cap_(cap) {} + virtual ~Buffered(); + + void NotifyAllSenders(std::unique_lock*); +}; + +template +void Buffered::Send(T* item) { + std::unique_lock lock(mu_); + full_cond_var_.wait(lock, [this]() { return channel_.size() < cap_; }); + channel_.push_back(std::move(*item)); + lock.unlock(); + empty_cond_var_.notify_one(); +} + +template +void Buffered::Receive(T* item) { + std::unique_lock lock(mu_); + empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); }); + *item = std::move(channel_.front()); + channel_.pop_front(); + NotifyAllSenders(&lock); +} + +template +Buffered::~Buffered() { + std::unique_lock lock(mu_); + channel_.clear(); + NotifyAllSenders(&lock); +} + +template +void Buffered::NotifyAllSenders(std::unique_lock* lock) { + lock->unlock(); + full_cond_var_.notify_one(); +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/details/unbuffered_channel.h b/paddle/framework/details/unbuffered_channel.h new file mode 100644 index 0000000000000000000000000000000000000000..7ecced1fba88fea781fc342091bc71e5aa496d3a --- /dev/null +++ b/paddle/framework/details/unbuffered_channel.h @@ -0,0 +1,52 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include +#include + +#include "paddle/framework/channel.h" + +namespace paddle { +namespace framework { +namespace details { + +template +class UnBuffered : public paddle::framework::Channel { + friend Channel* paddle::framework::MakeChannel(size_t); + friend void paddle::framework::CloseChannel(Channel*); + + public: + virtual void Send(T*); + virtual void Receive(T*); + virtual size_t Cap() { return 0; } + + private: + UnBuffered() {} + virtual ~UnBuffered(); +}; + +template +void UnBuffered::Send(T* channel_element) {} + +template +void UnBuffered::Receive(T*) {} + +template +UnBuffered::~UnBuffered() {} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/prune.cc b/paddle/framework/prune.cc index 25eb813ffb96e9b1e13299421ead9f85c02da59f..bff8e0bceaca9749101b2c45edddba526d565624 100644 --- a/paddle/framework/prune.cc +++ b/paddle/framework/prune.cc @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include #include #include @@ -102,6 +103,32 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, *op_field->Add() = input.blocks(block_id).ops(i); } } + + // remove the VarDescs in BlockDesc that are not referenced in + // the pruned OpDescs + std::unordered_map var_map; + auto* var_field = output->mutable_blocks(block_id)->mutable_vars(); + for (const auto& var : *var_field) { + var_map[var.name()] = var; + } + + var_field->Clear(); + for (const auto& op : *op_field) { + // add VarDescs of all input arguments for each OpDesc + auto& input_field = op.inputs(); + for (auto& input_var : input_field) { + for (auto& arg : input_var.arguments()) { + *var_field->Add() = var_map[arg]; + } + } + // add VarDescs of all output arguments for each OpDesc + auto& output_field = op.outputs(); + for (auto& output_var : output_field) { + for (auto& arg : output_var.arguments()) { + *var_field->Add() = var_map[arg]; + } + } + } } // TODO(fengjiayi): Prune() could be inplaced to avoid unnecessary copies diff --git a/paddle/framework/threadpool.cc b/paddle/framework/threadpool.cc index b2f5ae4a96593fde1623dd10d3b63c984ae228db..b7d7c00bcf9d9770f58284023ca2defcda299d64 100644 --- a/paddle/framework/threadpool.cc +++ b/paddle/framework/threadpool.cc @@ -14,6 +14,8 @@ #include "paddle/framework/threadpool.h" +#include "paddle/platform/enforce.h" + namespace paddle { namespace framework { diff --git a/paddle/framework/threadpool.h b/paddle/framework/threadpool.h index 8912b1a43a26f9df662d3b5ddf68bfb2b87f4a20..4e9b58679d9e7c84adf76b6245b397c7a8872483 100644 --- a/paddle/framework/threadpool.h +++ b/paddle/framework/threadpool.h @@ -22,7 +22,7 @@ limitations under the License. */ #include #include -#include "paddle/platform/enforce.h" +#include "paddle/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN namespace paddle { namespace framework { diff --git a/paddle/operators/detail/grpc_client.cc b/paddle/operators/detail/grpc_client.cc index c43394554263c761d010494f692a4050cae7e8cb..9b5f7afc6a48f13ff999f635efeb9e7bf0a76fb5 100644 --- a/paddle/operators/detail/grpc_client.cc +++ b/paddle/operators/detail/grpc_client.cc @@ -97,6 +97,21 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, return true; } +bool RPCClient::AsyncSendBatchBarrier(const std::string& ep, int64_t time_out) { + const auto ch = GetChannel(ep); + + BatchBarrierProcessor* s = new BatchBarrierProcessor(ch); + s->Prepare(time_out); + + sendrecv::VariableMessage req; + req.set_varname(BATCH_BARRIER_MESSAGE); + auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_); + rpc->Finish(&s->reply_, &s->status_, (void*)s); + req_count_++; + + return true; +} + bool RPCClient::Wait() { if (req_count_ <= 0) { return true; diff --git a/paddle/operators/detail/grpc_client.h b/paddle/operators/detail/grpc_client.h index a62e70a2533ae52d84d010504b19fed5aeb15dc0..f9499f6dc70c541c214e0b659f10b2ed1e8e8581 100644 --- a/paddle/operators/detail/grpc_client.h +++ b/paddle/operators/detail/grpc_client.h @@ -71,6 +71,15 @@ class ClientBase { context_->set_deadline(deadline); } + virtual void Prepare(int64_t time_out) { + context_.reset(new grpc::ClientContext()); + + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(time_out); + + context_->set_deadline(deadline); + } + virtual void Process() = 0; std::unique_ptr stub_; @@ -117,6 +126,17 @@ class GetProcessor : public ClientBase { RequestGetCallBack response_call_back_ = ProcGetResponse; }; +class BatchBarrierProcessor : public ClientBase { + public: + explicit BatchBarrierProcessor(std::shared_ptr ch) + : ClientBase(ch) {} + + virtual ~BatchBarrierProcessor() {} + + virtual void Process() {} + sendrecv::VoidMessage reply_; +}; + class RPCClient { public: bool AsyncSendVariable(const std::string& ep, @@ -130,6 +150,10 @@ class RPCClient { const framework::Scope& scope, const std::string& var_name, int64_t time_out = 600 * 1000); + + bool AsyncSendBatchBarrier(const std::string& ep, + int64_t time_out = 600 * 1000); + bool Wait(); private: diff --git a/paddle/operators/detail/grpc_server.cc b/paddle/operators/detail/grpc_server.cc index 3ddcd839bdd23547216465dfaf44a3cd8285fe6d..4f94e1315fbd2810a05354f7c3fc54ea30967e8a 100644 --- a/paddle/operators/detail/grpc_server.cc +++ b/paddle/operators/detail/grpc_server.cc @@ -132,6 +132,7 @@ void AsyncGRPCServer::RunSyncUpdate() { cq_send_ = builder.AddCompletionQueue(); cq_get_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); LOG(INFO) << "Server listening on " << address_ << std::endl; @@ -141,11 +142,11 @@ void AsyncGRPCServer::RunSyncUpdate() { std::bind(&AsyncGRPCServer::TryToRegisterNewGetOne, this); t_send_.reset( - new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, false, + new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_send_.get(), "cq_send", send_register))); t_get_.reset( - new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, true, + new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_get_.get(), "cq_get", get_register))); // wait server @@ -174,7 +175,7 @@ void AsyncGRPCServer::TryToRegisterNewSendOne() { } RequestSend* send = new RequestSend(&service_, cq_send_.get(), &var_recv_queue_); - VLOG(4) << "create RequestSend status:" << send->Status(); + VLOG(4) << "Create RequestSend status:" << send->Status(); } void AsyncGRPCServer::TryToRegisterNewGetOne() { @@ -184,11 +185,11 @@ void AsyncGRPCServer::TryToRegisterNewGetOne() { } RequestGet* get = new RequestGet(&service_, cq_get_.get(), scope_, dev_ctx_, &var_get_queue_); - VLOG(4) << "create Requestget status:" << get->Status(); + VLOG(4) << "Create RequestGet status:" << get->Status(); } -// FIXME(typhoonzero): remove wait argument and change cq_name to enum. -void AsyncGRPCServer::HandleRequest(bool wait, grpc::ServerCompletionQueue* cq, +// FIXME(typhoonzero): change cq_name to enum. +void AsyncGRPCServer::HandleRequest(grpc::ServerCompletionQueue* cq, std::string cq_name, std::function TryToRegisterNewOne) { TryToRegisterNewOne(); diff --git a/paddle/operators/detail/grpc_server.h b/paddle/operators/detail/grpc_server.h index 1ca9086c744c558fd05fb4fc1a7280729afbec28..3f8b9d93176148619d6820f6a365d9da2e73b10d 100644 --- a/paddle/operators/detail/grpc_server.h +++ b/paddle/operators/detail/grpc_server.h @@ -57,8 +57,7 @@ class AsyncGRPCServer final : public sendrecv::SendRecvService::Service { void ShutDown(); protected: - void HandleRequest(bool wait, grpc::ServerCompletionQueue *cq, - std::string cq_name, + void HandleRequest(grpc::ServerCompletionQueue *cq, std::string cq_name, std::function TryToRegisterNewOne); void TryToRegisterNewSendOne(); void TryToRegisterNewGetOne(); diff --git a/paddle/operators/detail/sendrecvop_utils.h b/paddle/operators/detail/sendrecvop_utils.h index bc6581afab93c626c7c2439d699c6c2d858df9fa..8e66f7299c7b4d30bc5a6fe6a18b7cb3ae3827a5 100644 --- a/paddle/operators/detail/sendrecvop_utils.h +++ b/paddle/operators/detail/sendrecvop_utils.h @@ -30,6 +30,9 @@ namespace paddle { namespace operators { namespace detail { +#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" +#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV" + void SerializeToMessage(const std::string& name, const framework::Variable* var, const platform::DeviceContext& ctx, sendrecv::VariableMessage* msg); diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 593c35879ae2b3680b93ac5d8443110e61cb99fe..49e1eb3402482e7ff12d9b2b640f7271a80cf6d9 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -29,8 +29,6 @@ limitations under the License. */ #include "paddle/operators/detail/simple_block_queue.h" #include "paddle/string/printf.h" -#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" - namespace paddle { namespace operators { @@ -95,7 +93,6 @@ class RecvOp : public framework::OperatorBase { auto param_list = Attr>("ParamList"); auto grad_list = Attr>("GradList"); auto fan_in = Attr("Fanin"); - size_t param_count = param_list.size(); auto *block = Attr(kOptimizeBlock); auto *program = block->Program(); @@ -103,38 +100,50 @@ class RecvOp : public framework::OperatorBase { // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; - size_t barrier_size = param_count * fan_in; while (!exit_flag) { // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. rpc_service_->SetCond(0); - for (size_t i = 0; i < barrier_size; ++i) { + size_t recv_var_cnt = 0; + int batch_barrier = 0; + while (batch_barrier != fan_in) { const detail::MessageWithName &v = rpc_service_->Get(); auto grad_var_name = v.first; if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { LOG(INFO) << "received terminate message and exit"; exit_flag = true; break; - } - auto it = std::find(grad_list.begin(), grad_list.end(), grad_var_name); - std::string param_var_name; - if (it != grad_list.end()) { - param_var_name = param_list[it - grad_list.begin()]; + } else if (grad_var_name == BATCH_BARRIER_MESSAGE) { + VLOG(3) << "recv batch barrier message"; + batch_barrier++; + continue; } else { - LOG(ERROR) << "grad has no paired param:" << grad_var_name; - } - VLOG(3) << "received grad: " << grad_var_name - << " updating param: " << param_var_name; - if (fan_in > 1) { - grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); - } - auto *var = recv_scope.FindVar(grad_var_name); - if (var == nullptr) { - LOG(ERROR) << "Can not find server side var: " << grad_var_name; - PADDLE_THROW("Can not find server side var"); + // receive a variable + recv_var_cnt++; + auto it = + std::find(grad_list.begin(), grad_list.end(), grad_var_name); + std::string param_var_name; + if (it != grad_list.end()) { + param_var_name = param_list[it - grad_list.begin()]; + } else { + LOG(ERROR) << "grad has no paired param:" << grad_var_name; + } + VLOG(3) << "received grad: " << grad_var_name + << " updating param: " << param_var_name; + + if (fan_in > 1) { + grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); + } + auto *var = recv_scope.FindVar(grad_var_name); + if (var == nullptr) { + LOG(ERROR) << "Can not find server side var: " << grad_var_name; + PADDLE_THROW("Can not find server side var"); + } + detail::DeserializeFromMessage(v.second, dev_ctx, var); } - detail::DeserializeFromMessage(v.second, dev_ctx, var); } + VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; + // TODO(Yancey1989): merge SelectedRows variables here if (exit_flag) { break; } @@ -146,7 +155,7 @@ class RecvOp : public framework::OperatorBase { LOG(ERROR) << "run sub program error " << e.what(); } rpc_service_->SetCond(1); - rpc_service_->WaitClientGet(barrier_size); + rpc_service_->WaitClientGet(recv_var_cnt); grads_counter_.clear(); } // while(true) } @@ -161,7 +170,6 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); AddComment(R"DOC( Recv operator diff --git a/paddle/operators/reduce_op.cc b/paddle/operators/reduce_op.cc index 4a06babeda00f2420df80f81f876a0047a3285ef..84f24a909597915f0eebb6c9cad37510cbe93e7b 100644 --- a/paddle/operators/reduce_op.cc +++ b/paddle/operators/reduce_op.cc @@ -13,7 +13,6 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/operators/reduce_op.h" -#include "paddle/operators/net_op.h" namespace paddle { namespace operators { @@ -38,10 +37,14 @@ class ReduceOp : public framework::OperatorWithKernel { dim, x_rank, "The dim should be in the range [-rank(input), rank(input))."); bool reduce_all = ctx->Attrs().Get("reduce_all"); + bool keep_dim = ctx->Attrs().Get("keep_dim"); if (reduce_all) { - ctx->SetOutputDim("Out", {1}); + if (keep_dim) + ctx->SetOutputDim( + "Out", framework::make_ddim(std::vector(x_rank, 1))); + else + ctx->SetOutputDim("Out", {1}); } else { - bool keep_dim = ctx->Attrs().Get("keep_dim"); auto dims_vector = vectorize(x_dims); if (keep_dim || x_rank == 1) { dims_vector[dim] = 1; diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index 5aa66c20eaf77959089100f8dcee55f2bc83a71a..bb719dc2a8a577bc042a2a70f7169b7d70f83684 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -37,17 +37,25 @@ class SendOp : public framework::OperatorBase { auto ins = Inputs("X"); auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); + std::vector endpoints = + Attr>("endpoints"); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); for (size_t i = 0; i < ins.size(); i++) { - VLOG(3) << "sending " << ins[i]; + VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } PADDLE_ENFORCE(client_.Wait()); + for (auto& ep : endpoints) { + VLOG(3) << "batch barrier, ep: " << ep; + client_.AsyncSendBatchBarrier(ep); + } + PADDLE_ENFORCE(client_.Wait()); + for (size_t i = 0; i < outs.size(); i++) { - VLOG(3) << "getting " << outs[i]; + VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } diff --git a/paddle/operators/sequence_reshape_op.cc b/paddle/operators/sequence_reshape_op.cc index 57cca13105537d88fe942b850cae10650d3096e2..d89a46a712c9c84a142e1e347219ed171556d761 100644 --- a/paddle/operators/sequence_reshape_op.cc +++ b/paddle/operators/sequence_reshape_op.cc @@ -30,8 +30,13 @@ class SequenceReshapeOp : public framework::OperatorWithKernel { auto x_numel = product(x_dims); PADDLE_ENFORCE_EQ(x_dims.size(), 2U, "Rank of Input(X) should be 2."); int new_dim = ctx->Attrs().Get("new_dim"); - ctx->SetOutputDim("Out", - {x_numel / new_dim, static_cast(new_dim)}); + if (ctx->IsRuntime()) { + ctx->SetOutputDim("Out", + {x_numel / new_dim, static_cast(new_dim)}); + } else { + // when compiling, the batch size is undetermined, just set to -1 + ctx->SetOutputDim("Out", {-1, static_cast(new_dim)}); + } } }; diff --git a/paddle/scripts/docker/build.sh b/paddle/scripts/docker/build.sh index e70d04d9017e9e36bbd55d6a28889d9ba7fb2a13..fbae37b2ca063e32cb12ded0da901d93438bc9a2 100644 --- a/paddle/scripts/docker/build.sh +++ b/paddle/scripts/docker/build.sh @@ -32,7 +32,7 @@ function cmake_gen() { cat <= 0 + parent_block = prog.block(parent_idx) + return parent_block + + def complete_op(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + + params, grads = self.get_params_and_grads() + param_names = [p.name for p in params] + grad_names = [g.name for g in grads] + parent_block.append_op( + type='recv', + inputs={}, + outputs={}, + attrs={ + 'endpoint': self.endpoint, + 'Fanin': self.fan_in, + 'ParamList': param_names, + 'GradList': grad_names, + 'OptimizeBlock': current_block + }) + + +def Send(endpoints, send_vars, get_vars): + """ + Send layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Send", **locals()) + helper.append_op( + type="send", + inputs={"X": send_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) diff --git a/python/paddle/v2/fluid/param_attr.py b/python/paddle/v2/fluid/param_attr.py index dcca8b6c547d10864ff4cd0af1c217d89e3b522f..fc566b8a2480ce9256d610b4731405cd6d89b7e4 100644 --- a/python/paddle/v2/fluid/param_attr.py +++ b/python/paddle/v2/fluid/param_attr.py @@ -15,7 +15,10 @@ from initializer import Initializer, Xavier, Constant from regularizer import WeightDecayRegularizer -__all__ = ['ParamAttr'] +__all__ = [ + 'ParamAttr', + 'WeightNormParamAttr', +] class ParamAttr(object): @@ -82,3 +85,20 @@ class ParamAttr(object): if with_initializer: kwargs['initializer'] = self.initializer return kwargs + + +class WeightNormParamAttr(ParamAttr): + """ + Used for weight normalization. Any field in ParamAttr can also be set here. + Besides, an extra field dim can be set to indicate the dimension except + which to normalize. + """ + # List to record the parameters reparameterized by weight normalization. + # If these parameters are treated as Variable rather than Parameter, + # it can be used to discriminate these parameters and help to serialize + # these paramters for inference. + params_with_weight_norm = [] + + def __init__(self, dim=None, **kwargs): + super(WeightNormParamAttr, self).__init__(**kwargs) + self.dim = dim diff --git a/python/paddle/v2/fluid/regularizer.py b/python/paddle/v2/fluid/regularizer.py index c2f28eecfda71e305d96c5a6b62c4f5f0fbf3fa6..0273da647afb6e95a136b5ecd0975347d9a378ff 100644 --- a/python/paddle/v2/fluid/regularizer.py +++ b/python/paddle/v2/fluid/regularizer.py @@ -87,6 +87,11 @@ class WeightDecayRegularizer(object): """ raise NotImplementedError() + def __str__(self): + """Debug string + """ + raise NotImplementedError() + class L2DecayRegularizer(WeightDecayRegularizer): """Implements the L2 Weight Decay Regularization @@ -123,6 +128,9 @@ class L2DecayRegularizer(WeightDecayRegularizer): return decay + def __str__(self): + return "L2Decay, regularization_coeff=%f" % self._regularization_coeff + class L1DecayRegularizer(WeightDecayRegularizer): """Implements the L1 Weight Decay Regularization @@ -163,6 +171,9 @@ class L1DecayRegularizer(WeightDecayRegularizer): return decay + def __str__(self): + return "L1Decay, regularization_coeff=%f" % self._regularization_coeff + # We short the class name, since users will use the regulaizer with the package # name. The sample code: diff --git a/python/paddle/v2/fluid/tests/CMakeLists.txt b/python/paddle/v2/fluid/tests/CMakeLists.txt index 83053160820a70bb5e54f721c0d7b881c5765004..628ce60b406d880d961d705a6abd2b5236fb1c8c 100644 --- a/python/paddle/v2/fluid/tests/CMakeLists.txt +++ b/python/paddle/v2/fluid/tests/CMakeLists.txt @@ -1,5 +1,10 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + +if(NOT WITH_DISTRIBUTE) + list(REMOVE_ITEM TEST_OPS test_recv_op) +endif(NOT WITH_DISTRIBUTE) + foreach(src ${TEST_OPS}) py_test(${src} SRCS ${src}.py) endforeach() diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py new file mode 100644 index 0000000000000000000000000000000000000000..5c4cec028d354b99d6203281ec4c727d7e3eceac --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -0,0 +1,68 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle.v2.fluid as fluid +import paddle.v2.fluid.layers as layers +import numpy +from multiprocessing import Process +import os, sys + + +class TestRecvOp(unittest.TestCase): + def test_send(self): + # Run init_serv in a thread + place = fluid.CPUPlace() + p = Process(target=self.init_serv, args=(place, )) + p.daemon = True + p.start() + self.init_client(place) + # FIXME(typhoonzero): find a way to gracefully shutdown the server. + os.system("kill -9 %d" % p.pid) + p.join() + + def init_serv(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name="X", + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False) + with serv.do(): + o = layers.scale(x=x, scale=10.0) + main.global_block().create_var( + name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + exe = fluid.Executor(place) + exe.run(main) + + def init_client(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name='X', + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + layers.Send("127.0.0.1:6174", [x], [x]) + exe = fluid.Executor(place) + exe.run(main) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/v2/fluid/tests/test_weight_normalization.py b/python/paddle/v2/fluid/tests/test_weight_normalization.py new file mode 100644 index 0000000000000000000000000000000000000000..80ad8285d8a3c2ced814cc3588a814c14ec60855 --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_weight_normalization.py @@ -0,0 +1,121 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy +import collections +import paddle.v2.fluid as fluid +import paddle.v2.fluid.core as core +from paddle.v2.fluid.initializer import ConstantInitializer +from paddle.v2.fluid.param_attr import WeightNormParamAttr + + +class TestWeightNormalization(unittest.TestCase): + batch_size = 3 + hidden_size = 5 + data_desc = (['x', [10], 0], ) + + @classmethod + def setUpClass(cls): + cls.set_program() + + @classmethod + def set_program(cls): + data = fluid.layers.data( + name=cls.data_desc[0][0], shape=cls.data_desc[0][1]) + out = fluid.layers.fc(input=data, + size=cls.hidden_size, + param_attr=WeightNormParamAttr( + dim=None, + name='weight_norm_param', + initializer=ConstantInitializer(1.0)), + bias_attr=False, + act=None) + loss = fluid.layers.reduce_sum(out) + fluid.backward.append_backward(loss=loss) + cls.fetch_list = [ + 'weight_norm_param_g', 'weight_norm_param_v', + 'weight_norm_param_g@GRAD' + ] + + def run_program(self): + outputs = [] + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + for place in places: + self.set_inputs(place) + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + output = exe.run(fluid.default_main_program(), + feed=self.inputs, + fetch_list=self.fetch_list, + return_numpy=False) + outputs.append(output) + self.actual_outputs = outputs + + def set_data(self): + self.data = collections.OrderedDict() + for desc in self.data_desc: + data_name = desc[0] + data_shape = desc[1] + data_lod_level = desc[2] + data_lod = [] + for i in range(data_lod_level): + lod_level_i = numpy.random.randint( + low=1, + high=5, + size=self.batch_size if i == 0 else lod_level_i[-1]) + lod_level_i = [0] + numpy.cumsum(lod_level_i).tolist() + data_lod.append(lod_level_i) + data_value = numpy.random.random( + size=[data_lod[-1][-1] if data_lod else self.batch_size + ] + data_shape).astype('float32') + self.data[data_name] = (data_value, data_lod) + + def set_inputs(self, place): + self.inputs = {} + for desc in self.data_desc: + tensor = fluid.Tensor() + tensor.set(self.data[desc[0]][0], place) + if self.data[desc[0]][1]: + tensor.set_lod(self.data[desc[0]][1]) + self.inputs[desc[0]] = tensor + + def weight_normalize(self): + v = numpy.ones((self.data[self.data_desc[0][0]][0].shape[-1], + self.hidden_size)) + g = numpy.linalg.norm(v, axis=None, keepdims=True) + w = g * v / numpy.linalg.norm(v, axis=None, keepdims=True) + x = self.data[self.data_desc[0][0]][0] + out = numpy.dot(x, w) + g_grad = (numpy.dot(x.T, numpy.ones_like(out)) * (v / numpy.linalg.norm( + v, axis=None, keepdims=True))).sum(axis=None, keepdims=True) + return g, v, g_grad + + def test_weight_normalization(self): + self.set_data() + self.run_program() + expect_output = self.weight_normalize() + for actual_output in self.actual_outputs: + [ + self.assertTrue( + numpy.allclose( + numpy.array(actual), expect, atol=0.001)) + for expect, actual in zip(expect_output, actual_output) + ] + + +if __name__ == '__main__': + unittest.main()