From 0d878e4c09bc9b52c654e36d29c474af4b0afd85 Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Wed, 28 Feb 2018 13:10:03 -0800 Subject: [PATCH] Add Go_op, Channel_create, channel_close, channel_send and channel_receive ops (#8593) * Adding Python boilerplate code for Go op * Add very basic test case * Adding the python logic for go routine * Fix syntax * Changing test to notest * Rename Routine to Go * Combining GoGuard and Go in one class * Modify test * Adding fluid close channel * Fixing __init__.py for calling fluid.go() * Adding stubs for channel methods and updating test case * Removing import * * Adding imports from concurrency * Initial commit of GO_OP (for varun) * Creating local scopes and go through them * Updated go op inputs persistability enforcement * Add thread execution; compile failing though * Fix go op * Cleaned up Go op * Fix yapf format issue * Readd warp ctc dir for unit tests * Updated make_channel, channel_send, channel_recv and channel_close * Moved thread function to another method, update unit tests * remove output var * Add stubs for channel operators * Updating concurrency with signatures * Updated the signature with return status * Fixed dtype in variables * Updating stub of ChannelSend + add infershape * Updating stub of ChannelRecv + add infershape * Updated signature * Adding the channel_create operator * Merge channel send+receive ops * Update concurrency tests using all operators * Updating the create op with ChannelHolder * Fix issues with channel_create_op * Add the implementation for channel_close op * Add channel close operator, fix channel close op * Adding the channel_send op * Comment channels C++ and Python code * Concurrency python api comment fix * Update unit test to add Status variable * Adding channel receive operator * Update concurrency test to demonstrate a complete CSP flow * Fix clang-format issues * Fixed "Out" parameter name * Fixing merge conflict in framework.py * Add channel ops to framework.py no_kernel_op_set * Seperating channel_send and channel_recv operators * Documenting capacity type * Update concurrency test to create go block as child block of main program * Changing set status implementation --- paddle/fluid/framework/CMakeLists.txt | 2 + paddle/fluid/framework/channel.h | 5 + paddle/fluid/framework/concurrency_test.cc | 122 +++++++++++++ paddle/fluid/operators/channel_close_op.cc | 71 ++++++++ paddle/fluid/operators/channel_create_op.cc | 114 ++++++++++++ paddle/fluid/operators/channel_recv_op.cc | 117 +++++++++++++ paddle/fluid/operators/channel_send_op.cc | 117 +++++++++++++ paddle/fluid/operators/go_op.cc | 111 ++++++++++++ python/paddle/fluid/concurrency.py | 164 +++++++++++++++++- python/paddle/fluid/framework.py | 16 +- .../paddle/fluid/tests/notest_concurrency.py | 3 +- 11 files changed, 830 insertions(+), 12 deletions(-) create mode 100644 paddle/fluid/framework/concurrency_test.cc create mode 100644 paddle/fluid/operators/channel_close_op.cc create mode 100644 paddle/fluid/operators/channel_create_op.cc create mode 100644 paddle/fluid/operators/channel_recv_op.cc create mode 100644 paddle/fluid/operators/channel_send_op.cc create mode 100644 paddle/fluid/operators/go_op.cc diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 0b4c6db6f98..e076b5003ba 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -96,3 +96,5 @@ cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_contex cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc) cc_test(channel_test SRCS channel_test.cc) +cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op channel_close_op channel_create_op + channel_send_op channel_recv_op sum_op elementwise_add_op executor proto_desc) diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index be578059388..bda1bfb23b1 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -91,6 +91,11 @@ class ChannelHolder { inline bool IsInitialized() const { return holder_ != nullptr; } + inline const std::type_index Type() { + PADDLE_ENFORCE_EQ(IsInitialized(), true); + return holder_->Type(); + } + private: /** * @note Placeholder hides type T, so it doesn't appear as a template diff --git a/paddle/fluid/framework/concurrency_test.cc b/paddle/fluid/framework/concurrency_test.cc new file mode 100644 index 00000000000..5770b0a5a18 --- /dev/null +++ b/paddle/fluid/framework/concurrency_test.cc @@ -0,0 +1,122 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/block_desc.h" +#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/program_desc.h" + +USE_NO_KERNEL_OP(go); +USE_NO_KERNEL_OP(channel_close); +USE_NO_KERNEL_OP(channel_create); +USE_NO_KERNEL_OP(channel_recv); +USE_NO_KERNEL_OP(channel_send); +USE_NO_KERNEL_OP(elementwise_add); + +namespace f = paddle::framework; +namespace p = paddle::platform; + +namespace paddle { +namespace framework { + +template +void CreateIntVariable(Scope &scope, p::CPUPlace &place, std::string name, + T value) { + // Create LoDTensor of dim [1,1] + auto var = scope.Var(name); + auto tensor = var->GetMutable(); + tensor->Resize({1, 1}); + T *expect = tensor->mutable_data(place); + expect[0] = value; +} + +void InitTensorsInScope(Scope &scope, p::CPUPlace &place) { + p::CPUDeviceContext ctx(place); + + // Create channel variable + scope.Var("Channel"); + + // Create Variables, x0 will be put into channel, + // result will be pulled from channel + CreateIntVariable(scope, place, "Status", false); + CreateIntVariable(scope, place, "x0", 99); + CreateIntVariable(scope, place, "result", 0); +} + +void AddOp(const std::string &type, const VariableNameMap &inputs, + const VariableNameMap &outputs, AttributeMap attrs, + BlockDesc *block) { + // insert op + auto op = block->AppendOp(); + op->SetType(type); + for (auto &kv : inputs) { + op->SetInput(kv.first, kv.second); + } + for (auto &kv : outputs) { + op->SetOutput(kv.first, kv.second); + } + op->SetAttrMap(attrs); +} + +TEST(Concurrency, Go_Op) { + Scope scope; + p::CPUPlace place; + + // Initialize scope variables + InitTensorsInScope(scope, place); + + framework::Executor executor(place); + ProgramDesc program; + BlockDesc *block = program.MutableBlock(0); + + // Create channel OP + AddOp("channel_create", {}, {{"Out", {"Channel"}}}, + {{"capacity", 10}, {"data_type", f::proto::VarType::LOD_TENSOR}}, + block); + + // Create Go Op routine + BlockDesc *goOpBlock = program.AppendBlock(program.Block(0)); + AddOp("channel_send", {{"Channel", {"Channel"}}, {"X", {"x0"}}}, + {{"Status", {"Status"}}}, {}, goOpBlock); + + // Create Go Op + AddOp("go", {{"X", {"Channel", "x0"}}}, {}, {{"sub_block", goOpBlock}}, + block); + + // Create Channel Receive Op + AddOp("channel_recv", {{"Channel", {"Channel"}}}, + {{"Status", {"Status"}}, {"Out", {"result"}}}, {}, block); + + // Create Channel Close Op + AddOp("channel_close", {{"Channel", {"Channel"}}}, {}, {}, block); + + // Check the result tensor to make sure it is set to 0 + const LoDTensor &tensor = (scope.FindVar("result"))->Get(); + auto *initialData = tensor.data(); + EXPECT_EQ(initialData[0], 0); + + executor.Run(program, &scope, 0, true, true); + + // After we call executor.run, the Go operator should do a channel_send to set + // the + // "result" variable to 99 + auto *finalData = tensor.data(); + EXPECT_EQ(finalData[0], 99); +} +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/operators/channel_close_op.cc b/paddle/fluid/operators/channel_close_op.cc new file mode 100644 index 00000000000..5892650c49e --- /dev/null +++ b/paddle/fluid/operators/channel_close_op.cc @@ -0,0 +1,71 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace pf = paddle::framework; +static constexpr char kChannel[] = "Channel"; + +namespace paddle { +namespace operators { + +class ChannelCloseOp : public framework::OperatorBase { + public: + ChannelCloseOp(const std::string &type, + const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + auto &inp = *scope.FindVar(Input(kChannel)); + + // Get the mutable version of the channel variable and closes it. + pf::ChannelHolder *ch = inp.GetMutable(); + ch->close(); + } +}; + +class ChannelCloseOpOpInferShape : public framework::InferShapeBase { + public: + void operator()(framework::InferShapeContext *context) const override { + PADDLE_ENFORCE(context->HasInput("Channel"), + "The input of ChannelClose op must be set"); + } +}; + +class ChannelCloseOpMaker : public framework::OpProtoAndCheckerMaker { + public: + ChannelCloseOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput(kChannel, + "The Channel Variable that should be closed by" + " the ChannelClose Op."); + AddComment(R"DOC( +Channel Close Operator. + +This operator closes an open channel. +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(channel_close, paddle::operators::ChannelCloseOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::ChannelCloseOpMaker); diff --git a/paddle/fluid/operators/channel_create_op.cc b/paddle/fluid/operators/channel_create_op.cc new file mode 100644 index 00000000000..b2fdfd0e1f2 --- /dev/null +++ b/paddle/fluid/operators/channel_create_op.cc @@ -0,0 +1,114 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/framework/lod_rank_table.h" +#include "paddle/fluid/framework/lod_tensor_array.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/reader.h" + +namespace pf = paddle::framework; + +static constexpr char kOutput[] = "Out"; + +namespace paddle { +namespace operators { + +class ChannelCreateOp : public framework::OperatorBase { + public: + ChannelCreateOp(const std::string &type, + const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + auto &out = *scope.FindVar(Output(kOutput)); + + // Determine the datatype and capacity of the channel to be created + // from the attributes provided. + auto dtype = + static_cast(Attr("data_type")); + auto capacity = Attr("capacity"); + + // Based on the datatype, create a new channel holder initialized with + // the given capacity. When capacity is 0, an unbuffered channel is + // created. + pf::ChannelHolder *ch = out.GetMutable(); + if (dtype == framework::proto::VarType::LOD_TENSOR) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::SELECTED_ROWS) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::LOD_RANK_TABLE) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::LOD_TENSOR_ARRAY) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::READER) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::CHANNEL) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::BOOL) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::INT32) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::INT64) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::FP32) { + ch->Reset(capacity); + } else if (dtype == framework::proto::VarType::FP64) { + ch->Reset(capacity); + } else { + PADDLE_THROW( + "Data type %d is not in " + "[LOD_TENSOR, SELECTED_ROWS, LOD_RANK_TABLE, LOD_TENSOR_ARRAY, " + "READER, CHANNEL, BOOL, INT32, INT64, FP32, FP64]", + dtype); + } + } +}; + +class ChannelCreateOpOpInferShape : public framework::InferShapeBase { + public: + void operator()(framework::InferShapeContext *context) const override { + PADDLE_ENFORCE(context->HasOutput(kOutput), + "The output of ChannelCreate op must be set"); + context->SetOutputDim(kOutput, {1}); + } +}; + +class ChannelCreateOpMaker : public framework::OpProtoAndCheckerMaker { + public: + ChannelCreateOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddOutput(kOutput, + "The object of a Channel type created by ChannelCreate Op."); + AddAttr("capacity", "The size of the buffer of Channel.") + .SetDefault(0); + AddAttr("data_type", "The data type of elements inside the Channel."); + AddComment(R"DOC( +Channel Create Operator. + +This operator creates an object of the VarType Channel and returns it. +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(channel_create, paddle::operators::ChannelCreateOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::ChannelCreateOpMaker); diff --git a/paddle/fluid/operators/channel_recv_op.cc b/paddle/fluid/operators/channel_recv_op.cc new file mode 100644 index 00000000000..c12b88e7a91 --- /dev/null +++ b/paddle/fluid/operators/channel_recv_op.cc @@ -0,0 +1,117 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/channel.h" +#include +#include +#include +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/var_type.h" +#include "paddle/fluid/operators/math/math_function.h" + +static constexpr char Channel[] = "Channel"; +static constexpr char Status[] = "Status"; +static constexpr char Out[] = "Out"; + +namespace paddle { +namespace operators { + +void SetReceiveStatus(const platform::Place &dev_place, + framework::Variable &status_var, bool status) { + auto cpu = platform::CPUPlace(); + auto status_tensor = + status_var.GetMutable()->mutable_data({1}, + cpu); + status_tensor[0] = status; +} + +bool ChannelReceive(framework::ChannelHolder *ch, framework::Variable *var) { + // Get type of channel and use that to call mutable data for Variable + auto type = framework::ToVarType(ch->Type()); + if (type == framework::proto::VarType_Type_LOD_TENSOR) + return ch->Receive(var->GetMutable()); + else if (type == framework::proto::VarType_Type_LOD_RANK_TABLE) + return ch->Receive(var->GetMutable()); + else if (type == framework::proto::VarType_Type_LOD_TENSOR_ARRAY) + return ch->Receive(var->GetMutable()); + else if (type == framework::proto::VarType_Type_SELECTED_ROWS) + return ch->Receive(var->GetMutable()); + else if (type == framework::proto::VarType_Type_READER) + return ch->Receive(var->GetMutable()); + else if (type == framework::proto::VarType_Type_CHANNEL) + return ch->Receive(var->GetMutable()); + else + PADDLE_THROW("ChannelReceive:Unsupported type"); +} + +class ChannelRecvOp : public framework::OperatorBase { + public: + ChannelRecvOp(const std::string &type, + const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + void InferShape(framework::InferShapeContext *ctx) const { + PADDLE_ENFORCE(ctx->HasInput(Channel), + "Input(Channel) of ChannelRecvOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutput(Out), + "Input(Channel) of ChannelRecvOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutput(Status), + "Output(Status) of ChannelRecvOp should not be null."); + ctx->SetOutputDim("Status", {1}); + } + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + // Get the channel holder created by channel_create op, passed as input. + framework::ChannelHolder *ch = + scope.FindVar(Input(Channel))->GetMutable(); + auto output_var = scope.FindVar(Output(Out)); + // Receive the data from the channel. + bool ok = ChannelReceive(ch, output_var); + + // Set the status output of the `ChannelReceive` call. + SetReceiveStatus(dev_place, *scope.FindVar(Output(Status)), ok); + } +}; + +class ChannelRecvOpMaker : public framework::OpProtoAndCheckerMaker { + public: + ChannelRecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput(Channel, + "(Channel) A variable which \"receives\" the a value sent" + "to it by a channel_send op.") + .AsDuplicable(); + AddOutput(Out, + "(Variable) Output Variable that will hold the data received" + " from the Channel") + .AsDuplicable(); + AddOutput(Status, + "(Tensor) An LoD Tensor that returns a boolean status of the" + "result of the receive operation.") + .AsDuplicable(); + AddComment(R"DOC( +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(channel_recv, paddle::operators::ChannelRecvOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::ChannelRecvOpMaker); diff --git a/paddle/fluid/operators/channel_send_op.cc b/paddle/fluid/operators/channel_send_op.cc new file mode 100644 index 00000000000..6d7715ad229 --- /dev/null +++ b/paddle/fluid/operators/channel_send_op.cc @@ -0,0 +1,117 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/channel.h" +#include +#include +#include +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/var_type.h" +#include "paddle/fluid/operators/math/math_function.h" + +static constexpr char Channel[] = "Channel"; +static constexpr char X[] = "X"; +static constexpr char Status[] = "Status"; +static constexpr char copy[] = "copy"; + +namespace paddle { +namespace operators { + +void SetSendStatus(const platform::Place &dev_place, + framework::Variable &status_var, bool status) { + auto cpu = platform::CPUPlace(); + auto status_tensor = + status_var.GetMutable()->mutable_data({1}, + cpu); + status_tensor[0] = status; +} + +bool ChannelSend(framework::ChannelHolder *ch, framework::Variable *var) { + auto type = framework::ToVarType(var->Type()); + if (type == framework::proto::VarType_Type_LOD_TENSOR) + return ch->Send(var->GetMutable()); + else if (type == framework::proto::VarType_Type_LOD_RANK_TABLE) + return ch->Send(var->GetMutable()); + else if (type == framework::proto::VarType_Type_LOD_TENSOR_ARRAY) + return ch->Send(var->GetMutable()); + else if (type == framework::proto::VarType_Type_SELECTED_ROWS) + return ch->Send(var->GetMutable()); + else if (type == framework::proto::VarType_Type_READER) + return ch->Send(var->GetMutable()); + else if (type == framework::proto::VarType_Type_CHANNEL) + return ch->Send(var->GetMutable()); + else + PADDLE_THROW("ChannelSend:Unsupported type"); +} + +class ChannelSendOp : public framework::OperatorBase { + public: + ChannelSendOp(const std::string &type, + const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + void InferShape(framework::InferShapeContext *ctx) const { + PADDLE_ENFORCE(ctx->HasInput(Channel), + "Input(Channel) of ChannelSendOp should not be null."); + PADDLE_ENFORCE(ctx->HasInput(X), + "Input(X) of ChannelSendOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutput(Status), + "Output(Status) of ChannelSendOp should not be null."); + ctx->SetOutputDim("Status", {1}); + } + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + // Get the channel holder created by channel_create op, passed as input. + framework::ChannelHolder *ch = + scope.FindVar(Input(Channel))->GetMutable(); + auto input_var = scope.FindVar(Input(X)); + + // Send the input data through the channel. + bool ok = ChannelSend(ch, input_var); + + // Set the status output of the `ChannelSend` call. + SetSendStatus(dev_place, *scope.FindVar(Output(Status)), ok); + } +}; + +class ChannelSendOpMaker : public framework::OpProtoAndCheckerMaker { + public: + ChannelSendOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput(Channel, + "(Channel) A variable which \"sends\" the passed in value to " + "a listening receiver.") + .AsDuplicable(); + AddInput(X, "(Variable) The value which gets sent by the channel.") + .AsDuplicable(); + AddOutput(Status, + "(Tensor) An LoD Tensor that returns a boolean status of the" + "result of the send operation.") + .AsDuplicable(); + AddAttr(copy, "(bool, default false) Should copy before send") + .SetDefault(false); + AddComment(R"DOC( +)DOC"); + } +}; +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(channel_send, paddle::operators::ChannelSendOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::ChannelSendOpMaker); diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc new file mode 100644 index 00000000000..cfa797717d7 --- /dev/null +++ b/paddle/fluid/operators/go_op.cc @@ -0,0 +1,111 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace paddle { +namespace operators { + +using StepScopeVar = std::vector; + +static constexpr char kBlock[] = "sub_block"; +static constexpr char kX[] = "X"; + +class GoOp : public framework::OperatorBase { + public: + GoOp(const std::string &type, const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + private: + void ExecuteOnThread(framework::Executor *executor, + framework::BlockDesc *block, + framework::Scope *scope) const { + framework::ProgramDesc *program = block->Program(); + executor->Run(*program, scope, block->ID(), false /*create_local_scope*/); + } + + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + /* + * Determine the global scope. Create a new child scope. + * Within the child scope, add all the local variables relevant + * to that scope. + * + * Now go through all the inputs to the op to ensure that + * all of them are in the newly created scope. This is important + * to ensure that they don't get destroyed when the parent scope + * is deleted. + * */ + + // TODO(varunarora): Consider moving this root scope lookup to scope.h. + const framework::Scope *root_scope = &scope; + const framework::Scope *parent_scope = &(root_scope->parent()); + + while (parent_scope != nullptr) { + root_scope = parent_scope; + parent_scope = &(parent_scope->parent()); + } + + framework::BlockDesc *block = Attr(kBlock); + framework::Executor executor(dev_place); + framework::Scope &new_scope = root_scope->NewScope(); + + for (auto &var : block->AllVars()) { + new_scope.Var(var->Name()); + } + + auto &inputs = Inputs(kX); + for (size_t i = 0; i < inputs.size(); i++) { + PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)), + "All variables used in the go block " + "should be created in the global scope"); + } + + // Now execute the go op with the newly created scope. + std::thread go_thread([dev_place, block, &new_scope, this]() { + framework::Executor executor(dev_place); + ExecuteOnThread(&executor, block, &new_scope); + }); + go_thread.detach(); + } +}; + +class GoOpMaker : public framework::OpProtoAndCheckerMaker { + public: + GoOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput(kX, + "A set of variables, which are required by operators inside the " + "block of Go Op.") + .AsDuplicable(); + AddAttr(kBlock, "The block inside GoOp"); + AddComment(R"DOC( +)DOC"); + } +}; + +// TODO(thuan): Look into Gradient Operator for GO_OP + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(go, paddle::operators::GoOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::GoOpMaker); diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index 5f868b6e86d..978f1891e34 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -16,7 +16,7 @@ # TODO: Operators: send, close_channel, recv, go, select from layers.control_flow import BlockGuard from layer_helper import LayerHelper - +import core __all__ = [ 'Go', 'make_channel', @@ -47,7 +47,7 @@ class Go(BlockGuard): .parent_idx) x_name_list = set() - out_vars = set() + out_vars = [] for op in go_block.ops: # Iterate over all operators, get all the inputs # and add as input to the Go operator. @@ -70,17 +70,163 @@ class Go(BlockGuard): attrs={'sub_block': go_block}) -def make_channel(dtype, size=0): - return True +def make_channel(dtype, capacity=0): + """ + Helps implementation of a concurrent program by creating a "channel" of + a defined data type. Channels allow for the passing of data in + concurrent scenarios - such as when using threads to divide computation. + Channels can be used to "send" and "receive" such data concurrently. + There are two kinds of channels: unbuffered and buffered. Unbuffered + channels have no capacity - and thus, block on send and only unblock only + once what they have sent has been received. -def channel_send(channel, value): - return True + On the other hand, buffered channels are initialized with a capacity - + and do not block on sends. + + Use this method in combination with `channel_send`, `channel_recv`, + `channel_close`, and `Go` to design a concurrent Paddle program. + + Args: + dtype (ParamAttr|int): Data type of the data sent in the channel. + This data type should be one of the Paddle supported data types. + capacity (ParamAttr|int): Size of the channel. Defaults to 0 for + to create an unbuffered channel. + + Returns: + Variable: The channel variable that can be used to send an receive data + of the defined dtype. + + Examples: + .. code-block:: python + ch = fluid.make_channel(dtype='int32', capacity=10) + ... + # Code to execute in a Go block, which receives the channel data. + fluid.channel_send(ch, 100) + fluid.channel_close(ch) + """ + helper = LayerHelper('make_channel', **locals()) + main_program = helper.main_program + make_channel_block = main_program.current_block() -def channel_recv(channel): - return True + # Make a channel variable (using the channel data type) and make sure it + # persists into the global scope. + channel = helper.create_variable( + dtype=core.VarDesc.VarType.CHANNEL, persistable=True) + + create_channel_op = make_channel_block.append_op( + type="channel_create", + outputs={"Out": channel}, + attrs={"data_type": dtype, + "capacity": capacity}) + + return create_channel_op + + +def channel_send(channel, value): + """ + Sends a value through a channel variable. Used by an unbuffered or buffered + channel to pass data from within or to a concurrent Go block, where + `channel_recv` to used to get the passed value. + + Args: + channel (Variable|Channel): Channel variable created using + `make_channel`. + + Returns: + Variable: The boolean status on whether or not the channel + successfully sent the passed value. + + Examples: + .. code-block:: python + + ch = fluid.make_channel(dtype='int32', capacity=10) + ... + # Code to execute in a Go block, which receives the channel data. + fluid.channel_send(ch, 100) + """ + helper = LayerHelper('channel_send', **locals()) + main_program = helper.main_program + channel_send_block = main_program.current_block() + status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR) + + channel_send_op = channel_send_block.append_op( + type="channel_send", + inputs={ + "Channel": channel, + "X": value, + }, + outputs={"Status": status}) + + return channel_send_op + + +def channel_recv(channel, dtype): + """ + Receives a value through a channel variable. Used by an unbuffered or + buffered channel within a concurrent Go block to get data from originally + sent using `channel_send`, or from outside such a block where + `channel_send` is used to send the value. + + Args: + channel (Variable|Channel): Channel variable created using + `make_channel`. + dtype (Variable|int): Data type of the data expected to be read in the + channel. This data type should be one of the Paddle supported data + types. + + Returns: + Variable: The boolean status on whether or not the channel + successfully received the passed value. + + Examples: + .. code-block:: python + + ch = fluid.make_channel(dtype='int32', capacity=10) + with fluid.Go(): + fluid.channel_recv(ch, 'int32') + + # Code to send data through the channel. + """ + helper = LayerHelper('channel_recv', **locals()) + main_program = helper.main_program + channel_recv_block = main_program.current_block() + + return_value = helper.create_variable(dtype=dtype) + status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR) + + channel_recv_op = channel_recv_block.append_op( + type="channel_recv", + inputs={"Channel": channel}, + outputs={"Out": return_value, + "Status": status}) + + return channel_recv_op def channel_close(channel): - return True + """ + Closes a channel created using `make_channel`. + + Args: + channel (Variable|Channel): Channel variable created using + `make_channel`. + + Examples: + .. code-block:: python + + ch = fluid.make_channel(dtype='int32', capacity=10) + ... + # Code to receive and send data through a channel + ... + fluid.channel_close(ch) + """ + helper = LayerHelper('channel_close', **locals()) + main_program = helper.main_program + channel_close_block = main_program.current_block() + + channel_close_op = channel_close_block.append_op( + type="channel_close", inputs={"Channel": channel}) + + return channel_close_op diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 2ca8c320842..4ec0bca2284 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -141,6 +141,8 @@ class Variable(object): dtype(np.dtype|core.VarDesc.VarType|str): The data type of variable. lod_level(int): The level of lod tensor. 0 means it is not a time series data. + capacity(int): The capacity of Channel variable. Ignored + for other types. persistable(bool): True if the variable should be saved as check point. Defaults to False. stop_gradient(bool): True if the variable will stop to calculate @@ -154,6 +156,7 @@ class Variable(object): shape=None, dtype=None, lod_level=None, + capacity=None, persistable=None, error_clip=None, stop_gradient=False, @@ -224,6 +227,14 @@ class Variable(object): "persistable is {2}. They are not matched".format( self.name, self.persistable, persistable)) + if capacity is not None: + if is_new_var: + self.desc.set_capacity(capacity) + else: + # TODO(abhinavarora) : Compare with set capacity once, + # get_capacity is implemented + pass + self.block.vars[name] = self self.op = None self.stop_gradient = stop_gradient @@ -472,10 +483,11 @@ class Operator(object): self.desc.check_attrs() no_kernel_op_set = { - 'feed', 'fetch', 'save', 'load', 'recurrent', + 'feed', 'fetch', 'save', 'load', 'recurrent', 'go', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv', 'listen_and_serv', 'parallel_do', 'save_combine', - 'load_combine', 'ncclInit' + 'load_combine', 'ncclInit', 'channel_create', 'channel_close', + 'channel_send', 'channel_recv' } if type not in no_kernel_op_set: self.desc.infer_var_type(self.block.desc) diff --git a/python/paddle/fluid/tests/notest_concurrency.py b/python/paddle/fluid/tests/notest_concurrency.py index 602d5f31eb3..77107f8b36f 100644 --- a/python/paddle/fluid/tests/notest_concurrency.py +++ b/python/paddle/fluid/tests/notest_concurrency.py @@ -20,7 +20,8 @@ from paddle.fluid.executor import Executor class TestRoutineOp(unittest.TestCase): def test_simple_routine(self): - ch = fluid.make_channel(dtype=bool) + ch = fluid.make_channel( + dtype=core.VarDesc.VarType.BOOL, name="CreateChannel") with fluid.Go(): fluid.channel_send(ch, True) -- GitLab