From 6e100227812b440d3046d780495548675110ef54 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Thu, 4 Jun 2020 15:38:46 +0800 Subject: [PATCH] add queue_generator_op, dequeue_op, enqueue_op and ut (#24481) * add queue_generator_op, dequeue_op, enqueue_op and ut, test=develop --- paddle/fluid/operators/dequeue_op.cc | 97 +++++++++++++++++++ paddle/fluid/operators/enqueue_op.cc | 79 +++++++++++++++ paddle/fluid/operators/queue_generator_op.cc | 93 ++++++++++++++++++ python/paddle/fluid/framework.py | 2 +- .../fluid/tests/unittests/test_queue.py | 74 ++++++++++++++ 5 files changed, 344 insertions(+), 1 deletion(-) create mode 100644 paddle/fluid/operators/dequeue_op.cc create mode 100644 paddle/fluid/operators/enqueue_op.cc create mode 100644 paddle/fluid/operators/queue_generator_op.cc create mode 100644 python/paddle/fluid/tests/unittests/test_queue.py diff --git a/paddle/fluid/operators/dequeue_op.cc b/paddle/fluid/operators/dequeue_op.cc new file mode 100644 index 0000000000..f8ab97040e --- /dev/null +++ b/paddle/fluid/operators/dequeue_op.cc @@ -0,0 +1,97 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/var_type.h" +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" +using LoDTensor = paddle::framework::LoDTensor; +using LoDTensorBlockingQueueHolder = + paddle::operators::reader::LoDTensorBlockingQueueHolder; + +namespace paddle { +namespace operators { + +class DequeueOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + DequeueOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + const std::string& queue_name = Attr("queue_name"); + auto* queue_holder_var = scope.FindVar(queue_name); + PADDLE_ENFORCE_NOT_NULL( + queue_holder_var, + platform::errors::NotFound( + "No LoDTensorBlockingQueueHolder variable with name %s found.", + queue_name)); + auto* queue_holder = + queue_holder_var->template GetMutable(); + auto& out_names = Outputs("Out"); + PADDLE_ENFORCE_GT(out_names.size(), 0, + platform::errors::InvalidArgument( + "The output for Op(dequeue) must be set.")); + for (size_t i = 0; i < out_names.size(); ++i) { + auto out_var = scope.FindVar(out_names[i]); + PADDLE_ENFORCE_NOT_NULL( + out_var, platform::errors::NotFound("No variable with name %s found", + out_names[i])); + auto* out_tensor = out_var->GetMutable(); + PADDLE_ENFORCE_NOT_NULL( + out_tensor, + platform::errors::InvalidArgument( + "Variable with name %s has not been initialized.", out_names[i])); + + std::vector lod_tensor_vec; + bool success = false; + lod_tensor_vec = queue_holder->GetQueue()->Pop(&success); + PADDLE_ENFORCE_EQ(lod_tensor_vec.size(), 1, + platform::errors::InvalidArgument( + "Expected to pop only one element per Pop call for " + "Op(dequeue), but poped %d element.", + lod_tensor_vec.size())); + for (size_t j = 0; j < lod_tensor_vec.size(); ++j) { + TensorCopySync(lod_tensor_vec[j], dev_place, out_tensor); + } + } + } +}; + +class DequeueOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddAttr("queue_name", + "Name of the `LoDTensorBlockingQueueHolder` variable"); + AddOutput("Out", "A list of `lod_tensor` to dequeue and assigned.") + .AsDuplicable(); + AddComment(R"DOC( + Dequeue operator. + )DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = ::paddle::operators; + +REGISTER_OP_WITHOUT_GRADIENT(dequeue, ops::DequeueOp, ops::DequeueOpMaker); diff --git a/paddle/fluid/operators/enqueue_op.cc b/paddle/fluid/operators/enqueue_op.cc new file mode 100644 index 0000000000..9b367a72fb --- /dev/null +++ b/paddle/fluid/operators/enqueue_op.cc @@ -0,0 +1,79 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/var_type.h" +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" + +using LoDTensor = paddle::framework::LoDTensor; +using LoDTensorBlockingQueueHolder = + paddle::operators::reader::LoDTensorBlockingQueueHolder; + +namespace paddle { +namespace operators { + +class EnqueueOp : public framework::OperatorBase { + public: + EnqueueOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + const std::string& queue_name = Attr("queue_name"); + auto* queue_holder_var = scope.FindVar(queue_name); + PADDLE_ENFORCE_NOT_NULL( + queue_holder_var, + platform::errors::NotFound( + "No LoDTensorBlockingQueueHolder variable with name %s found.", + queue_name)); + const std::string& var_name = Input("X"); + auto* in_var = scope.FindVar(var_name); + PADDLE_ENFORCE_NOT_NULL( + in_var, platform::errors::NotFound("No variable with name %s found.", + var_name)); + auto* in_tensor = in_var->GetMutable(); + auto* queue_holder = + queue_holder_var->template GetMutable(); + + std::vector lod_tensor_vec; + lod_tensor_vec.emplace_back(*in_tensor); + queue_holder->GetQueue()->Push(lod_tensor_vec); + } +}; + +class EnqueueOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddInput("X", "`lod_tensor` to enqueue"); + AddAttr("queue_name", + "Name of the `LoDTensorBlockingQueueHolder` variable"); + AddComment(R"DOC( + Enqueue operator. + )DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = ::paddle::operators; + +REGISTER_OP_WITHOUT_GRADIENT(enqueue, ops::EnqueueOp, ops::EnqueueOpMaker); diff --git a/paddle/fluid/operators/queue_generator_op.cc b/paddle/fluid/operators/queue_generator_op.cc new file mode 100644 index 0000000000..8e58f7a6a7 --- /dev/null +++ b/paddle/fluid/operators/queue_generator_op.cc @@ -0,0 +1,93 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include + +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/var_type.h" +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" + +namespace paddle { +namespace operators { + +class QueueGeneratorOp : public framework::OperatorBase { + public: + QueueGeneratorOp(const std::string& type, + const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + std::vector names = Attr>("names"); + PADDLE_ENFORCE_GT(names.size(), 0, platform::errors::InvalidArgument( + "The attribute 'names' for " + "Op(queue_generator) must be set.")); + + int capacity = Attr("capacity"); + PADDLE_ENFORCE_GT(capacity, 0, + platform::errors::InvalidArgument( + "The attribute 'capacity' for Op(queue_generator) " + "must be set a positive value, " + "but the one received is %d.", + capacity)); + + // generate queue vars and initialize them + for (const auto& name : names) { + GenerateQueue(&scope, name, capacity); + } + } + + private: + void GenerateQueue(const framework::Scope* scope, const std::string& name, + size_t capacity) const { + auto var = scope->FindVar(name); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::NotFound( + "Can't find var named '%s' in the global scope.", name)); + auto ptr = var->GetMutable(); + ptr->InitOnce(capacity); + + VLOG(3) << "generated a LodTensorBlockingQueue var named: " << name; + } +}; + +class QueueGeneratorOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddComment(R"DOC( +QueueGenerator operator +Generate and initialize one or more LodTensorBlockingQueueHolders. +)DOC"); + AddAttr>( + "names", + "['name1', 'name2', ...] " + "list of names for LodTensorBlockingQueueHolders") + .SetDefault({}); + AddAttr("capacity", "queue capacity").SetDefault(1); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OP_WITHOUT_GRADIENT(queue_generator, ops::QueueGeneratorOp, + ops::QueueGeneratorOpMaker); diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index db48176b65..270250a5d5 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -1867,7 +1867,7 @@ class Operator(object): 'conditional_block', 'while', 'send', 'recv', 'listen_and_serv', 'fl_listen_and_serv', 'ncclInit', 'select', 'checkpoint_notify', 'gen_nccl_id', 'c_gen_nccl_id', 'c_comm_init', 'c_sync_calc_stream', - 'c_sync_comm_stream' + 'c_sync_comm_stream', 'queue_generator', 'dequeue', 'enqueue' } def __init__(self, diff --git a/python/paddle/fluid/tests/unittests/test_queue.py b/python/paddle/fluid/tests/unittests/test_queue.py new file mode 100644 index 0000000000..cfb843d75e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_queue.py @@ -0,0 +1,74 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import os +import unittest + +import paddle.fluid as fluid +import paddle.fluid.layers as layers +import paddle.fluid.core as core + + +class TestQueue(unittest.TestCase): + def test_eq(self): + """ + test queue_generator op, enqueue op and dequeue op. + """ + + main_program = fluid.Program() + startup_program = fluid.Program() + value = np.random.rand(1) + with fluid.program_guard(main_program, startup_program): + data_in = layers.create_global_var( + shape=[2, 3], + value=value, + dtype="float32", + persistable=True, + name='var_in') + data_out = layers.create_global_var( + shape=[2, 3], + value=value - 1.0, + dtype="float32", + persistable=True, + name='var_out') + startup_block = startup_program.block(0) + queue_name = 'blocking_queue' + startup_block.create_var( + name=queue_name, persistable=True, type=core.VarDesc.VarType.RAW) + startup_block.append_op( + type="queue_generator", attrs={'names': [queue_name]}) + block = main_program.block(0) + block.append_op( + type='enqueue', + inputs={'X': data_in}, + attrs={'queue_name': queue_name}) + block.append_op( + type='dequeue', + outputs={'Out': [data_out]}, + attrs={'queue_name': queue_name}) + + place = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( + ) else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_program) + ret = exe.run(main_program, fetch_list=[data_out.name]) + self.assertTrue( + np.allclose(np.asarray(ret), np.full((2, 3), value, np.float32))) + + +if __name__ == '__main__': + unittest.main() -- GitLab