diff --git a/paddle/fluid/operators/detail/CMakeLists.txt b/paddle/fluid/operators/detail/CMakeLists.txt index f8cd2852f3eed7a960f22ebd45292b3cb56116bb..3adeeda90645ca983d9d9229b4cc1c4c90302206 100644 --- a/paddle/fluid/operators/detail/CMakeLists.txt +++ b/paddle/fluid/operators/detail/CMakeLists.txt @@ -2,7 +2,7 @@ if(WITH_DISTRIBUTE) grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor selected_rows) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") - set_source_files_properties(serde_test.cc grpc_server_test PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(serde_test.cc grpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(serde_test SRCS serde_test.cc variable_response.cc DEPS grpc++_unsecure grpc_unsecure gpr cares zlib protobuf sendrecvop_grpc) cc_test(grpc_server_test SRCS grpc_server_test.cc DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc index ba9882ce244f69d5fbe3214d3c3470cd4ec87510..d79ba6d291950e1f089eb11713bd1c3e4d154b27 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/detail/grpc_client.cc @@ -12,8 +12,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "grpc_client.h" -#include +#include "paddle/fluid/operators/detail/grpc_client.h" + +#include + #include "paddle/fluid/framework/threadpool.h" namespace paddle { @@ -52,7 +54,7 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, auto call = s->stub_g_.PrepareUnaryCall( s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_); call->StartCall(); - call->Finish(&s->reply_, &s->status_, (void*)s); + call->Finish(&s->reply_, &s->status_, static_cast(s)); }); req_count_++; @@ -70,8 +72,7 @@ void ProcGetResponse(const VarHandle& var_h, template void RequestToByteBuffer(const T& proto, ::grpc::ByteBuffer* result) { ::grpc::Slice slice(proto.ByteSizeLong()); - proto.SerializeWithCachedSizesToArray( - const_cast(reinterpret_cast(slice.begin()))); + proto.SerializeWithCachedSizesToArray(const_cast(slice.begin())); ::grpc::ByteBuffer tmp(&slice, 1); result->Swap(&tmp); } @@ -109,7 +110,7 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, auto call = s->stub_g_.PrepareUnaryCall( s->context_.get(), "/sendrecv.SendRecvService/GetVariable", buf, &cq_); call->StartCall(); - call->Finish(&s->reply_, &s->status_, (void*)s); + call->Finish(&s->reply_, &s->status_, static_cast(s)); }); req_count_++; @@ -153,7 +154,7 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep, s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req, &cq_); call->StartCall(); - call->Finish(&s->reply_, &s->status_, (void*)s); + call->Finish(&s->reply_, &s->status_, static_cast(s)); }); req_count_++; @@ -169,7 +170,7 @@ void RPCClient::AsyncSendBatchBarrier(const std::string& ep, int64_t time_out) { sendrecv::VariableMessage req; req.set_varname(BATCH_BARRIER_MESSAGE); auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_); - rpc->Finish(&s->reply_, &s->status_, (void*)s); + rpc->Finish(&s->reply_, &s->status_, static_cast(s)); req_count_++; } @@ -181,7 +182,7 @@ void RPCClient::AsyncSendFetchBarrier(const std::string& ep, int64_t time_out) { sendrecv::VariableMessage req; req.set_varname(FETCH_BARRIER_MESSAGE); auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_); - rpc->Finish(&s->reply_, &s->status_, (void*)s); + rpc->Finish(&s->reply_, &s->status_, static_cast(s)); req_count_++; } diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 591b3e334acba19421f55474aba8de2fa3d3a4d4..7c978b28b6873d05afb435de4caf7f4ce5d33193 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -14,6 +14,9 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/grpc_server.h" +#include +#include + using ::grpc::ServerAsyncResponseWriter; namespace paddle { @@ -156,6 +159,8 @@ class RequestPrefetch final : public RequestBase { ::grpc::ByteBuffer reply; // TODO(Yancey1989): execute the Block which containers prefetch ops + VLOG(3) << "RequestPrefetch Process in"; + responder_.Finish(reply, ::grpc::Status::OK, this); status_ = FINISH; } @@ -221,6 +226,7 @@ void AsyncGRPCServer::ShutdownQueue() { std::unique_lock lock(cq_mutex_); cq_send_->Shutdown(); cq_get_->Shutdown(); + cq_prefetch_->Shutdown(); } // This URL explains why shutdown is complicate: @@ -233,6 +239,7 @@ void AsyncGRPCServer::ShutDown() { void AsyncGRPCServer::TryToRegisterNewSendOne() { std::unique_lock lock(cq_mutex_); if (is_shut_down_) { + VLOG(3) << "shutdown, do not TryToRegisterNewSendOne"; return; } RequestSend* send = new RequestSend(&service_, cq_send_.get(), scope_, @@ -243,6 +250,7 @@ void AsyncGRPCServer::TryToRegisterNewSendOne() { void AsyncGRPCServer::TryToRegisterNewGetOne() { std::unique_lock lock(cq_mutex_); if (is_shut_down_) { + VLOG(3) << "shutdown, do not TryToRegisterNewGetOne"; return; } RequestGet* get = new RequestGet(&service_, cq_get_.get(), scope_, dev_ctx_, @@ -253,6 +261,7 @@ void AsyncGRPCServer::TryToRegisterNewGetOne() { void AsyncGRPCServer::TryToRegisterNewPrefetchOne() { std::unique_lock lock(cq_mutex_); if (is_shut_down_) { + VLOG(3) << "shutdown, do not TryToRegisterNewPrefetchOne"; return; } RequestPrefetch* prefetch = @@ -270,25 +279,28 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq, void* tag = NULL; bool ok = false; + while (true) { + VLOG(3) << "HandleRequest for " << cq_name << " while in"; if (!cq->Next(&tag, &ok)) { LOG(INFO) << cq_name << " CompletionQueue shutdown!"; break; } + VLOG(3) << "HandleRequest for " << cq_name << " while after Next"; PADDLE_ENFORCE(tag); // FIXME(typhoonzero): de-couple the barriers with recv_op if (!is_shut_down_ && cq_name == "cq_get") WaitCond(1); if (!is_shut_down_ && cq_name == "cq_send") WaitCond(0); - RequestBase* base = (RequestBase*)tag; + RequestBase* base = reinterpret_cast(tag); // reference: // https://github.com/tensorflow/tensorflow/issues/5596 // https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM // https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I if (!ok) { - LOG(WARNING) << cq_name << " recv no regular event:argument name" - << base->GetReqName(); + LOG(WARNING) << cq_name << " recv no regular event:argument name[" + << base->GetReqName() << "]"; TryToRegisterNewOne(); delete base; continue; diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index dd5cf4b377cb8e4a53c9a161cb32985613de32eb..b0596d3cd1e108f28e8f1485d6b5c989c55be7e9 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -15,7 +15,8 @@ limitations under the License. */ #pragma once #include -#include +#include +#include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -93,6 +94,7 @@ class AsyncGRPCServer final { // received variable from RPC, operators fetch variable from this queue. SimpleBlockQueue var_get_queue_; + // client send variable to this queue. ReceivedQueue var_recv_queue_; // condition of the sub program diff --git a/paddle/fluid/operators/detail/grpc_server_test.cc b/paddle/fluid/operators/detail/grpc_server_test.cc index 577374810696c039b8794fc151083ca7ddf43a10..1ad62863a1a98c28cb08f47dfa8a5bfae463ba91 100644 --- a/paddle/fluid/operators/detail/grpc_server_test.cc +++ b/paddle/fluid/operators/detail/grpc_server_test.cc @@ -28,6 +28,7 @@ std::unique_ptr rpc_service_; void StartServer(const std::string& endpoint) { rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); + rpc_service_->RunSyncUpdate(); } TEST(PREFETCH, CPU) { @@ -39,13 +40,23 @@ TEST(PREFETCH, CPU) { platform::CPUPlace place; platform::CPUDeviceContext ctx(place); // create var on local scope - std::string var_name("tmp_0"); - auto var = scope.Var(var_name); - auto tensor = var->GetMutable(); - tensor->Resize({10, 10}); + std::string in_var_name("in"); + std::string out_var_name("out"); + auto* in_var = scope.Var(in_var_name); + auto* in_tensor = in_var->GetMutable(); + in_tensor->Resize({10, 10}); + VLOG(3) << "before mutable_data"; + in_tensor->mutable_data(place); + scope.Var(out_var_name); + + VLOG(3) << "before fetch"; detail::RPCClient client; - client.AsyncPrefetchVariable("127.0.0.1:8889", ctx, scope, var_name, ""); + client.AsyncPrefetchVariable("127.0.0.1:8889", ctx, scope, in_var_name, + out_var_name); + client.Wait(); + + rpc_service_->ShutDown(); server_thread.join(); rpc_service_.reset(nullptr); } diff --git a/paddle/fluid/operators/detail/grpc_service.h b/paddle/fluid/operators/detail/grpc_service.h index 879e21933b452363c3fccacffb4d16ac1bfd6020..e6dab2f5a3a4280f3979417c3ca2d884a0b8ff2f 100644 --- a/paddle/fluid/operators/detail/grpc_service.h +++ b/paddle/fluid/operators/detail/grpc_service.h @@ -80,7 +80,7 @@ enum class GrpcMethod { }; static const int kGrpcNumMethods = - static_cast(GrpcMethod::kGetVariable) + 1; + static_cast(GrpcMethod::kPrefetchVariable) + 1; inline const char* GrpcMethodName(GrpcMethod id) { switch (id) { @@ -89,7 +89,7 @@ inline const char* GrpcMethodName(GrpcMethod id) { case GrpcMethod::kGetVariable: return "/sendrecv.SendRecvService/GetVariable"; case GrpcMethod::kPrefetchVariable: - return "/sendrecv.SendREcvService/PrefetchVariable"; + return "/sendrecv.SendRecvService/PrefetchVariable"; } // Shouldn't be reached. @@ -117,5 +117,5 @@ class GrpcService final { }; } // namespace detail -} // namespace operator +} // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index c27ea1268321744f6566b8b65e98f0df6d408186..b19add24e2bd325896a96be53d3d9762abfe217c 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -13,22 +13,13 @@ See the License for the specific language governing permissions and limitations under the License. */ #include -#include #include -#include - -#include #include "paddle/fluid/framework/executor.h" -#include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/framework/proto_desc.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/detail/grpc_server.h" -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" -#include "paddle/fluid/operators/detail/simple_block_queue.h" -#include "paddle/fluid/string/printf.h" namespace paddle { namespace operators { @@ -111,6 +102,11 @@ class ListenAndServOp : public framework::OperatorBase { framework::Executor executor(dev_place); + // TODO(qiao) set proper fields for table lookup and update + rpc_service_->SetExecutor(&executor); + rpc_service_->SetPrefetchBlkdId(0); + rpc_service_->SetProgram(program); + // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; // Record received sparse variables, so that @@ -173,7 +169,8 @@ class ListenAndServOp : public framework::OperatorBase { } ParallelExecuteBlocks(parallel_blkids, &executor, program, &recv_scope); - VLOG(2) << "run all blocks spent (ms) " << detail::GetTimestamp() - ts; + VLOG(3) << "run all blocks spent " << detail::GetTimestamp() - ts + << "(ms)"; // Reset the received sparse variables, the sum operator would not // sum the input sparse variables which rows is empty at the next diff --git a/paddle/fluid/operators/reshape_op.cc b/paddle/fluid/operators/reshape_op.cc index 832509641cc3d5178ff090e05437484d395bfe51..b87b8e6b26cdeb017e700870998a53c1b295988c 100644 --- a/paddle/fluid/operators/reshape_op.cc +++ b/paddle/fluid/operators/reshape_op.cc @@ -17,90 +17,66 @@ limitations under the License. */ namespace paddle { namespace operators { -class ReshapeOp : public framework::OperatorWithKernel { - public: - ReshapeOp(const std::string &type, const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : OperatorWithKernel(type, inputs, outputs, attrs) {} - - void InferShape(framework::InferShapeContext *ctx) const override { - // input check - PADDLE_ENFORCE(ctx->HasInput("X"), - "Input(X) of ReshapeOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutput("Out"), - "Output(Out) of ReshapeOp should not be null."); - - auto shape = ctx->Attrs().Get>("shape"); - PADDLE_ENFORCE(shape.size() > 0, "Attr(shape) shouldn't be empty."); - auto x_dims = ctx->GetInputDim("X"); - - std::vector neg_dims_idx; - // set some dimension to -1 if it is unknown - const int unknown_size = -1; - for (size_t i = 0; i < shape.size(); ++i) { - PADDLE_ENFORCE(shape[i] > 0 || shape[i] == unknown_size, - "Each dimension of Attr(shape) must be positive or %d.", - unknown_size); - if (shape[i] == unknown_size) { - neg_dims_idx.push_back(i); - PADDLE_ENFORCE(neg_dims_idx.size() <= 1, - "Only one dimension of Attr(shape) can be unknown."); - } - } - - int64_t capacity = - std::accumulate(shape.begin(), shape.end(), 1, std::multiplies()); - int64_t in_size = framework::product(x_dims); - if (neg_dims_idx.size() == 1) { - // dim infer - shape[neg_dims_idx[0]] = in_size / (-capacity); - // recalculate capacity - capacity = shape[neg_dims_idx[0]] * (-capacity); - } - // capacity check - PADDLE_ENFORCE(capacity == in_size, - "The size of Input(X) mismatches with Attr(shape)."); - // resize output - std::vector shape_int64(shape.size(), 0); - std::transform(shape.begin(), shape.end(), shape_int64.begin(), - [](int a) { return static_cast(a); }); - auto out_dims = framework::make_ddim(shape_int64); - ctx->SetOutputDim("Out", out_dims); - if (shape[0] == x_dims[0]) { - // Only pass LoD when the first dimension is equal between - // output and input. - ctx->ShareLoD("X", /*->*/ "Out"); - } - } -}; - class ReshapeOpMaker : public framework::OpProtoAndCheckerMaker { public: ReshapeOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("X", "The input tensor of reshape operator."); - AddOutput("Out", "The output tensor of reshape operator."); - AddAttr>("shape", - "(vector) " - "Target shape of reshape operator."); + AddInput("X", "(Tensor). The input tensor of reshape operator."); + AddInput("Shape", + "(Tensor, optional). If provided, reshape according to " + "this given shape. That is to say it has a higher priority than " + "the shape attribute, while the shape attribute still should be " + "set correctly to gurantee shape inference in compile time.") + .AsDispensable(); + AddOutput("Out", "(Tensor). The output tensor of reshape operator."); + AddAttr>( + "shape", "(std::vector) Target shape of reshape operator."); AddAttr("inplace", - "Change the source tensor's shape without copy memory.") - .SetDefault(true); + "(default: false) Change the source tensor's shape without " + "memory copy. When Attr(inplace) is set true, the output " + "tensor shares memory with Input(X), otherwise, a new output " + "tensor is created, and its data are copied from Input(x).") + .SetDefault(false); AddComment(R"DOC( Reshape Operator. -Reshape Input(X) into the shape specified by Attr(shape). +Reshape Input(X) into the shape specified by Attr(shape) or Input(Shape). The +data in Input(X) are unchanged. + +Examples: -An example: -Given a 2-D tensor X with 2 rows and 2 columns : [[1, 2], [3, 4]] +1. Given a 3-D tensor Input(X) with a shape [2, 4, 6], and the target shape +specified by Attr(shape) is [6, 8], the reshape operator will transform Input(X) +into a 2-D tensor with shape [6, 8] and leaving Input(X)'s data unchanged. -and target shape = [1, 4], the reshape operator will transform -the tensor X into a 2-D tensor: [[1, 2, 3, 4]] +2. Given a 3-D tensor Input(X) with a shape [2, 4, 6], and the target shape +specified by Attr(shape) is [2, 3, -1, 2], the reshape operator will transform +Input(X) into a 4-D tensor with shape [2, 3, 4, 2] and leaving Input(X)'s data +unchanged. In this case, one and only dimension of Attr(shape) can be set to -1, +the value of this dimension is inferred from the total element number of +Input(X) and remaining dimensions. + +3. Given a 3-D tensor Input(X) with a shape [2, 4, 6], and the target shape +specified by Attr(shape) is [-1, 0, 3, 2], the reshape operator will transform +Input(X) into a 4-D tensor with shape [2, 4, 3, 2] and leaving Input(X)'s data +unchanged. In this case, besides -1, 0 means the actual dimension value is going +to be copied from the corresponding dimension of Input(X). + +Note: + +1. One and only one dimension in Attr(shape) can be set -1. In this case, +the actual dimension value will be infered from the total element number of +Input(X) and remaining dimensions. + +2. More than one dimensions in Attr(shape) can be set to 0, which means the real +dimension value will be copied from Input(X) at runtime. Note that the index of +0 can not exceed Rank(X). For example, Input(X) is a 3-D tensor with shape +[2, 3, 4], Attr(shape) = [2, 3, 2, 0] is an invalid input. + +3. Input(Shape) has a higher priority than Attr(shape) if it is provided, while +Attr(shape) still should be set correctly to gurantee shape inference in +compile-time. -One dimension in the target shape can be set -1, representing that its -size is unknown. In this case, the real dimension will be infered from -the original shape of Input(X) and other dimensions in the target shape. )DOC"); } }; @@ -119,6 +95,14 @@ class ReshapeGradOp : public framework::OperatorWithKernel { "Input(Out@GRAD) shouldn't be null."); ctx->SetOutputDim(framework::GradVarName("X"), ctx->GetInputDim("X")); } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext &ctx) const override { + return framework::OpKernelType( + framework::ToDataType(ctx.Input("X")->type()), + ctx.device_context()); + } }; } // namespace operators diff --git a/paddle/fluid/operators/reshape_op.h b/paddle/fluid/operators/reshape_op.h index eacb0a0cf21a60ffbdef5787434859ac549388bc..871b4d38d56f10f3c0c178caa566508ab75f316c 100644 --- a/paddle/fluid/operators/reshape_op.h +++ b/paddle/fluid/operators/reshape_op.h @@ -20,17 +20,129 @@ limitations under the License. */ namespace paddle { namespace operators { +class ReshapeOp : public framework::OperatorWithKernel { + public: + ReshapeOp(const std::string &type, const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : OperatorWithKernel(type, inputs, outputs, attrs) {} + + void InferShape(framework::InferShapeContext *ctx) const override { + PADDLE_ENFORCE(ctx->HasInput("X"), + "Input(X) of ReshapeOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutput("Out"), + "Output(Out) of ReshapeOp should not be null."); + + const std::vector &shape = ctx->Attrs().Get>("shape"); + PADDLE_ENFORCE(!shape.empty(), + "The shape information must be set by Attr(shape)."); + + if (ctx->HasInput("Shape") && ctx->IsRuntime()) { + // If true, set the shape of Output(Out) according to Input(Shape) in + // ReshapeKernel with ExecutionContext. Also check LoD in ReshapeKernel. + ctx->ShareLoD("X", /*->*/ "Out"); + return; + } + + auto x_dims = ctx->GetInputDim("X"); + auto out_dims = ValidateShape(shape, x_dims); + ctx->SetOutputDim("Out", out_dims); + if (x_dims[0] == out_dims[0]) { + // Only pass LoD when the first dimension of output and Input(X) + // are the same. + ctx->ShareLoD("X", /*->*/ "Out"); + } + } + + static framework::DDim ValidateShape(const std::vector shape, + const framework::DDim &in_dims) { + const int64_t in_size = framework::product(in_dims); + // only one dimension canbe set to -1, whose size will be automatically + // infered. + const int64_t unk_dim_val = -1; + const int64_t copy_dim_val = 0; + + std::vector output_shape(shape.size(), 0); + int64_t capacity = 1; + int unk_dim_idx = -1; + for (size_t i = 0; i < shape.size(); ++i) { + if (shape[i] == unk_dim_val) { + PADDLE_ENFORCE( + unk_dim_idx == -1, + "Only one input dimension of Attr(shape) can be unknown."); + unk_dim_idx = i; + } else if (shape[i] == copy_dim_val) { + PADDLE_ENFORCE( + static_cast(i) < in_dims.size(), + "The index of dimension to copy from input shape must be less " + "than the size of input shape."); + } else { + PADDLE_ENFORCE( + shape[i] > 0, + "Each input dimension of Attr(shape) must not be negtive except " + "one unknown dimension."); + } + + capacity *= (shape[i] ? shape[i] : in_dims[i]); + output_shape[i] = + (shape[i] ? static_cast(shape[i]) : in_dims[i]); + } + + if (unk_dim_idx != -1) { + output_shape[unk_dim_idx] = -in_size / capacity; + PADDLE_ENFORCE_EQ(output_shape[unk_dim_idx] * capacity, -in_size, + "Invalid shape is given."); + } else { + PADDLE_ENFORCE_EQ(capacity, in_size, "Invalid shape is given."); + } + return framework::make_ddim(output_shape); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext &ctx) const override { + return framework::OpKernelType( + framework::ToDataType(ctx.Input("X")->type()), + ctx.device_context()); + } +}; + template class ReshapeKernel : public framework::OpKernel { public: - void Compute(const framework::ExecutionContext& ctx) const { - auto* out = ctx.Output("Out"); - auto* in = ctx.Input("X"); + void Compute(const framework::ExecutionContext &ctx) const { + auto *out = ctx.Output("Out"); + auto *in = ctx.Input("X"); + auto *shape_tensor = ctx.Input("Shape"); + + framework::DDim out_dims = out->dims(); + if (shape_tensor) { + auto *shape_data = shape_tensor->data(); + if (platform::is_gpu_place(ctx.GetPlace())) { + framework::Tensor cpu_shape_tensor; + TensorCopy(*shape_tensor, platform::CPUPlace(), ctx.device_context(), + &cpu_shape_tensor); + shape_data = cpu_shape_tensor.data(); + } + auto shape = + std::vector(shape_data, shape_data + shape_tensor->numel()); + out_dims = ReshapeOp::ValidateShape(shape, in->dims()); + } + if (!in->lod().empty()) { + PADDLE_ENFORCE_EQ( + out_dims[0], in->dims()[0], + "Reshape operator cannot reshape an input sequence batch " + "into an output sequence batch that has a different " + "number of time steps. Please consider using " + "sequence_reshape op."); + } + bool inplace = ctx.Attr("inplace"); - auto out_dims = out->dims(); + out->Resize(out_dims); if (!inplace) { out->mutable_data(ctx.GetPlace()); framework::TensorCopy(*in, ctx.GetPlace(), ctx.device_context(), out); + // TensorCopy will resize to in_dims. out->Resize(out_dims); } else { out->ShareDataWith(*in); @@ -42,9 +154,10 @@ class ReshapeKernel : public framework::OpKernel { template class ReshapeGradKernel : public framework::OpKernel { public: - void Compute(const framework::ExecutionContext& ctx) const { - auto* d_out = ctx.Input(framework::GradVarName("Out")); - auto* d_x = ctx.Output(framework::GradVarName("X")); + void Compute(const framework::ExecutionContext &ctx) const { + auto *d_out = ctx.Input(framework::GradVarName("Out")); + auto *d_x = ctx.Output(framework::GradVarName("X")); + d_x->mutable_data(ctx.GetPlace()); bool inplace = ctx.Attr("inplace"); diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 24297ffe33bc720ff7b4f2b0dbd82452dc7e0ae2..9311fc9904eb730aa56e94a4e45a1479a67df641 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -276,20 +276,25 @@ class DistributeTranspiler: suff_idx = v.name.find(".trainer_") if suff_idx >= 0: orig_var_name = v.name[:suff_idx] - pserver_program.global_block().create_var( + else: + orig_var_name = v.name + single_trainer_var = pserver_program.global_block().create_var( name=orig_var_name, persistable=True, type=v.type, dtype=v.dtype, shape=v.shape) - for trainer_id in xrange(self.trainers): - var = pserver_program.global_block().create_var( - name="%s.trainer_%d" % (orig_var_name, trainer_id), - persistable=False, - type=v.type, - dtype=v.dtype, - shape=v.shape) - recv_inputs.append(var) + if self.trainers > 1: + for trainer_id in xrange(self.trainers): + var = pserver_program.global_block().create_var( + name="%s.trainer_%d" % (orig_var_name, trainer_id), + persistable=False, + type=v.type, + dtype=v.dtype, + shape=v.shape) + recv_inputs.append(var) + else: + recv_inputs.append(single_trainer_var) # step3 optimize_block = pserver_program.create_block(0) @@ -511,8 +516,11 @@ class DistributeTranspiler: def _append_split_op(self, program, gradblocks): # Split variables that need to be split and append respective ops + add_suffix = False + if self.trainers > 1: + add_suffix = True var_mapping = self._create_vars_from_blocklist( - program, gradblocks, add_trainer_suffix=True) + program, gradblocks, add_trainer_suffix=add_suffix) for varname, splited_vars in var_mapping.iteritems(): # variable that don't need to split have empty splited_vars if len(splited_vars) <= 1: diff --git a/python/paddle/fluid/layers/detection.py b/python/paddle/fluid/layers/detection.py index 3e649dc5fd32c4ed8fa6ad273b7be04d552b51ae..a5938fe494265778ef7032c56a8d6d35acd729c5 100644 --- a/python/paddle/fluid/layers/detection.py +++ b/python/paddle/fluid/layers/detection.py @@ -19,7 +19,6 @@ from layer_function_generator import generate_layer_fn from layer_function_generator import autodoc from ..layer_helper import LayerHelper import tensor -import ops import nn import math @@ -58,7 +57,7 @@ def detection_output(loc, This operation is to get the detection results by performing following two steps: - + 1. Decode input bounding box predictions according to the prior boxes. 2. Get the final detection results by applying multi-class non maximum suppression (NMS). @@ -130,9 +129,9 @@ def detection_output(loc, target_box=loc, code_type='decode_center_size') old_shape = scores.shape - scores = ops.reshape(x=scores, shape=(-1, old_shape[-1])) + scores = nn.reshape(x=scores, shape=(-1, old_shape[-1])) scores = nn.softmax(input=scores) - scores = ops.reshape(x=scores, shape=old_shape) + scores = nn.reshape(x=scores, shape=old_shape) scores = nn.transpose(scores, perm=[0, 2, 1]) scores.stop_gradient = True nmsed_outs = helper.create_tmp_variable(dtype=decoded_box.dtype) @@ -463,7 +462,7 @@ def ssd_loss(location, num, num_prior, num_class = confidence.shape def __reshape_to_2d(var): - return ops.reshape(x=var, shape=[-1, var.shape[-1]]) + return nn.reshape(x=var, shape=[-1, var.shape[-1]]) # 1. Find matched boundding box by prior box. # 1.1 Compute IOU similarity between ground-truth boxes and prior boxes. @@ -474,7 +473,7 @@ def ssd_loss(location, # 2. Compute confidence for mining hard examples # 2.1. Get the target label based on matched indices - gt_label = ops.reshape(x=gt_label, shape=gt_label.shape + (1, )) + gt_label = nn.reshape(x=gt_label, shape=gt_label.shape + (1, )) gt_label.stop_gradient = True target_label, _ = target_assign( gt_label, matched_indices, mismatch_value=background_label) @@ -487,7 +486,7 @@ def ssd_loss(location, conf_loss = nn.softmax_with_cross_entropy(confidence, target_label) # 3. Mining hard examples - conf_loss = ops.reshape(x=conf_loss, shape=(num, num_prior)) + conf_loss = nn.reshape(x=conf_loss, shape=(num, num_prior)) conf_loss.stop_gradient = True neg_indices = helper.create_tmp_variable(dtype='int32') dtype = matched_indices.dtype @@ -556,7 +555,7 @@ def ssd_loss(location, # 5.3 Compute overall weighted loss. loss = conf_loss_weight * conf_loss + loc_loss_weight * loc_loss # reshape to [N, Np], N is the batch size and Np is the prior box number. - loss = ops.reshape(x=loss, shape=[-1, num_prior]) + loss = nn.reshape(x=loss, shape=[-1, num_prior]) loss = nn.reduce_sum(loss, dim=1, keep_dim=True) if normalize: normalizer = nn.reduce_sum(target_loc_weight) @@ -709,7 +708,7 @@ def multi_box_head(inputs, new_shape = [ -1, reduce(lambda x, y: x * y, input.shape[axis:len(input.shape)]) ] - out = ops.reshape(x=input, shape=new_shape) + out = nn.reshape(x=input, shape=new_shape) return out def _is_list_or_tuple_(data): @@ -803,7 +802,7 @@ def multi_box_head(inputs, mbox_loc.shape[0], mbox_loc.shape[1] * mbox_loc.shape[2] * mbox_loc.shape[3] / 4, 4 ] - mbox_loc_flatten = ops.reshape(mbox_loc, shape=new_shape) + mbox_loc_flatten = nn.reshape(mbox_loc, shape=new_shape) mbox_locs.append(mbox_loc_flatten) # get conf @@ -819,7 +818,7 @@ def multi_box_head(inputs, conf_loc.shape[0], conf_loc.shape[1] * conf_loc.shape[2] * conf_loc.shape[3] / num_classes, num_classes ] - conf_loc_flatten = ops.reshape(conf_loc, shape=new_shape) + conf_loc_flatten = nn.reshape(conf_loc, shape=new_shape) mbox_confs.append(conf_loc_flatten) if len(box_results) == 1: diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index c6f831c29def883e9f8aabbad881198928321759..f96bc6911ffe0efc07be83b262909be213ced40d 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -73,6 +73,7 @@ __all__ = [ 'smooth_l1', 'one_hot', 'autoincreased_step_counter', + 'reshape', 'lod_reset', 'lrn', 'pad', @@ -3266,6 +3267,8 @@ def one_hot(input, depth): The one-hot tensor or LodTensor, same as input. Examples: + .. code-block:: python + X is a LoDTensor: X.lod = [[0, 1, 4]] X.shape = [4, 1] @@ -3320,6 +3323,101 @@ def autoincreased_step_counter(counter_name=None, begin=1, step=1): return counter +def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None): + """ + Gives a new shape to the input Tensor without changing its data. + + The target shape can be given by :attr:`shape` or :attr:`actual_shape`. + :attr:`shape` is a list of integer while :attr:`actual_shape` is a tensor + variable. :attr:`actual_shape` has a higher priority than :attr:`shape` + if it is provided, while :attr:`shape` still should be set correctly to + gurantee shape inference in compile-time. + + Some tricks exist when specifying the target shape. + + 1. -1 means the value of this dimension is inferred from the total element + number of x and remaining dimensions. Thus one and only one dimension can + be set -1. + + 2. 0 means the actual dimension value is going to be copied from the + corresponding dimension of x. The indice of 0s in shape can not exceed + Rank(X). + + Here are some examples to explain it. + + 1. Given a 3-D tensor x with a shape [2, 4, 6], and the target shape + is [6, 8], the reshape operator will transform x into a 2-D tensor with + shape [6, 8] and leaving x's data unchanged. + + 2. Given a 3-D tensor x with a shape [2, 4, 6], and the target shape + specified is [2, 3, -1, 2], the reshape operator will transform x into a + 4-D tensor with shape [2, 3, 4, 2] and leaving x's data unchanged. In this + case, one dimension of the target shape is set to -1, the value of this + dimension is inferred from the total element number of x and remaining + dimensions. + + 3. Given a 3-D tensor x with a shape [2, 4, 6], and the target shape + is [-1, 0, 3, 2], the reshape operator will transform x into a 4-D tensor + with shape [2, 4, 3, 2] and leaving x's data unchanged. In this case, + besides -1, 0 means the actual dimension value is going to be copied from + the corresponding dimension of x. + + Args: + input(variable): The input tensor. + shape(list): The new shape. At most one dimension of the new shape can + be -1. + actual_shape(variable): An optional input. If provided, reshape + according to this given shape rather than + :attr:`shape` specifying shape. That is to + say :attr:`actual_shape` has a higher priority + than :attr:`shape`. + act (str): The non-linear activation to be applied to output variable. + inplace(bool): If this flag is set true, a new output tensor is created + whose data is copied from input x, otherwise the output + shares data with input without copying. + + Returns(variable): The output tensor. + + Examples: + .. code-block:: python + data = fluid.layers.data( + name='data', shape=[2, 4, 6], dtype='float32') + reshaped = fluid.layers.reshape( + x=data, shape=[-1, 0, 3, 2], act='tanh', inplace=True) + """ + + if not (isinstance(shape, list) or isinstance(shape, tuple)): + raise ValueError("Input shape must be a python lsit or tuple.") + + # Validate the shape + unk_dim_idx = -1 + for dim_idx, dim_size in enumerate(shape): + if dim_size == -1: + assert unk_dim_idx == -1, ( + "Only one dimension in shape can be unknown.") + unk_dim_idx = dim_idx + elif dim_size == 0: + assert dim_idx < len(x.shape), ( + "The indice of 0s in shape can not exceed Rank(X).") + else: + assert dim_size > 0, ( + "Each dimension size given in shape must not be negtive " + "except one unknown dimension.") + + helper = LayerHelper("reshape", **locals()) + reshaped = helper.create_tmp_variable(dtype=x.dtype) + helper.append_op( + type="reshape", + inputs={"X": x, + "Shape": actual_shape} + if isinstance(actual_shape, Variable) else {"X": x}, + attrs={"shape": shape, + "inplace": inplace}, + outputs={"Out": reshaped}) + + return helper.append_activation(reshaped) + + def lod_reset(x, y=None, target_lod=None): """ LoD Reset Operator. Set LoD of **x** to a new one specified by **y** or diff --git a/python/paddle/fluid/layers/ops.py b/python/paddle/fluid/layers/ops.py index 0e5987ee598158d189db8bc956b7e7fea2517554..a9fe25744cc0b385479c9366af1b731ec221dd5a 100644 --- a/python/paddle/fluid/layers/ops.py +++ b/python/paddle/fluid/layers/ops.py @@ -49,7 +49,6 @@ __activations__ = [ __all__ = [ 'mean', 'mul', - 'reshape', 'scale', 'sigmoid_cross_entropy_with_logits', 'elementwise_add', diff --git a/python/paddle/fluid/tests/unittests/op_test.py b/python/paddle/fluid/tests/unittests/op_test.py index 8393f7827b1c7d361ebea72f2cfc6033268772f0..299ab8e51f017e1980a8b40e3830fc42b1ff7ccc 100644 --- a/python/paddle/fluid/tests/unittests/op_test.py +++ b/python/paddle/fluid/tests/unittests/op_test.py @@ -334,7 +334,7 @@ class OpTest(unittest.TestCase): np.allclose( actual_t, expect_t, atol=atol), "Output (" + out_name + ") has diff at " + str(place) + - str(actual_t) + str(expect_t)) + str(actual_t) + "\n" + str(expect_t)) if isinstance(expect, tuple): self.assertListEqual(actual.lod(), expect[1], "Output (" + out_name + @@ -568,6 +568,6 @@ class OpTest(unittest.TestCase): fetch_list = [g for p, g in param_grad_list] executor = Executor(place) - return map( - np.array, - executor.run(prog, feed_dict, fetch_list, return_numpy=False)) + return map(np.array, + executor.run(prog, feed_dict, fetch_list, + return_numpy=False)) diff --git a/python/paddle/fluid/tests/unittests/test_mine_hard_examples_op.py b/python/paddle/fluid/tests/unittests/test_mine_hard_examples_op.py old mode 100755 new mode 100644 diff --git a/python/paddle/fluid/tests/unittests/test_reshape_op.py b/python/paddle/fluid/tests/unittests/test_reshape_op.py index 11f35c74d41146118525a5efa6c211d528e255fe..f51b5a7e9907294a5b91c920a363830d8b9a7137 100644 --- a/python/paddle/fluid/tests/unittests/test_reshape_op.py +++ b/python/paddle/fluid/tests/unittests/test_reshape_op.py @@ -14,15 +14,19 @@ import unittest import numpy as np + from op_test import OpTest class TestReshapeOp(OpTest): def setUp(self): + ori_shape = (2, 25) + new_shape = (5, 10) + self.op_type = "reshape" - self.inputs = {'X': np.random.random((10, 20)).astype("float32")} - self.attrs = {'shape': [10 * 20]} - self.outputs = {'Out': self.inputs['X'].reshape(self.attrs['shape'])} + self.inputs = {"X": np.random.random(ori_shape).astype("float32")} + self.attrs = {"shape": new_shape, "inplace": False} + self.outputs = {"Out": self.inputs["X"].reshape(new_shape)} def test_check_output(self): self.check_output() @@ -31,12 +35,33 @@ class TestReshapeOp(OpTest): self.check_grad(["X"], "Out") -class TestReshapeOpDimInfer(OpTest): +class TestReshapeOpDimInfer1(OpTest): def setUp(self): + ori_shape = (5, 10) + new_shape = (5, -1, 5) + self.op_type = "reshape" - self.inputs = {'X': np.random.random((10, 20)).astype("float32")} - self.attrs = {'shape': [4, -1, 5]} - self.outputs = {'Out': self.inputs['X'].reshape(self.attrs['shape'])} + self.inputs = {"X": np.random.random(ori_shape).astype("float32")} + self.attrs = {"shape": new_shape, "inplace": False} + self.outputs = {"Out": self.inputs["X"].reshape(self.attrs["shape"])} + + def test_check_output(self): + self.check_output() + + def test_check_grad(self): + self.check_grad(["X"], "Out") + + +class TestReshapeOpDimInfer2(OpTest): + def setUp(self): + ori_shape = (2, 2, 6) + new_shape = (2, 0, 3, -1) + infered_shape = (2, 2, 3, -1) + + self.op_type = "reshape" + self.inputs = {"X": np.random.random(ori_shape).astype("float32")} + self.attrs = {"shape": new_shape, "inplace": False} + self.outputs = {"Out": self.inputs["X"].reshape(infered_shape)} def test_check_output(self): self.check_output() @@ -47,10 +72,30 @@ class TestReshapeOpDimInfer(OpTest): class TestReshapeOpInplace(OpTest): def setUp(self): + ori_shape = (2, 25) + new_shape = (5, 10) + + self.op_type = "reshape" + self.inputs = {"X": np.random.random(ori_shape).astype("float32")} + self.attrs = {"shape": new_shape} + self.outputs = {"Out": self.inputs["X"].reshape(new_shape)} + + def test_check_output(self): + self.check_output() + + def test_check_grad(self): + self.check_grad(["X"], "Out") + + +class TestReshapeOpDimInferInplace1(OpTest): + def setUp(self): + ori_shape = (5, 10) + new_shape = (5, -1, 5) + self.op_type = "reshape" - self.inputs = {'X': np.random.random((10, 20)).astype("float32")} - self.attrs = {'shape': [10 * 20], 'inplace': True} - self.outputs = {'Out': self.inputs['X'].reshape(self.attrs['shape'])} + self.inputs = {"X": np.random.random(ori_shape).astype("float32")} + self.attrs = {"shape": new_shape} + self.outputs = {"Out": self.inputs["X"].reshape(new_shape)} def test_check_output(self): self.check_output() @@ -59,12 +104,38 @@ class TestReshapeOpInplace(OpTest): self.check_grad(["X"], "Out") -class TestReshapeOpDimInferInplace(OpTest): +class TestReshapeOpDimInferInplace2(OpTest): def setUp(self): + ori_shape = (2, 2, 6) + new_shape = (2, 0, 3, -1) + infered_shape = (2, 2, 3, -1) + + self.op_type = "reshape" + self.inputs = {"X": np.random.random(ori_shape).astype("float32")} + self.attrs = {"shape": new_shape} + self.outputs = {"Out": self.inputs["X"].reshape(infered_shape)} + + def test_check_output(self): + self.check_output() + + def test_check_grad(self): + self.check_grad(["X"], "Out") + + +class TestReshapeOpWithInputShape(OpTest): + def setUp(self): + ori_shape = (6, 5) + new_shape = (0, -1, 5) + actual_shape = (2, 3, 5) + self.op_type = "reshape" - self.inputs = {'X': np.random.random((10, 20)).astype("float32")} - self.attrs = {'shape': [4, -1, 5], 'inplace': True} - self.outputs = {'Out': self.inputs['X'].reshape(self.attrs['shape'])} + self.inputs = { + "X": np.random.random(ori_shape).astype("float32"), + "Shape": np.array( + actual_shape, dtype="int32") + } + self.attrs = {"shape": new_shape} + self.outputs = {"Out": self.inputs["X"].reshape(actual_shape)} def test_check_output(self): self.check_output() @@ -73,5 +144,5 @@ class TestReshapeOpDimInferInplace(OpTest): self.check_grad(["X"], "Out") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_target_assign_op.py b/python/paddle/fluid/tests/unittests/test_target_assign_op.py old mode 100755 new mode 100644