From 47ea2534fb9cac31f1b5c15c54112e6105810cb1 Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Thu, 13 Dec 2018 17:12:38 +0800 Subject: [PATCH] clean parallel do test=develop --- .../operators/controlflow/parallel_do_op.cc | 426 ------------------ python/paddle/fluid/backward.py | 79 +--- python/paddle/fluid/framework.py | 4 +- python/paddle/fluid/layers/control_flow.py | 152 +------ .../tests/book/notest_understand_sentiment.py | 18 +- .../fluid/tests/book/test_recognize_digits.py | 15 +- .../paddle/fluid/tests/book/test_word2vec.py | 14 +- .../test_memopt_fit_a_line.py | 87 ---- .../fluid/tests/unittests/test_parallel_op.py | 235 ---------- .../memory_optimization_transpiler.py | 5 +- 10 files changed, 10 insertions(+), 1025 deletions(-) delete mode 100644 paddle/fluid/operators/controlflow/parallel_do_op.cc delete mode 100644 python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_op.py diff --git a/paddle/fluid/operators/controlflow/parallel_do_op.cc b/paddle/fluid/operators/controlflow/parallel_do_op.cc deleted file mode 100644 index ab25628d4..000000000 --- a/paddle/fluid/operators/controlflow/parallel_do_op.cc +++ /dev/null @@ -1,426 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include - -#include "paddle/fluid/framework/executor.h" -#include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/framework/threadpool.h" -#include "paddle/fluid/operators/detail/safe_ref.h" - -namespace paddle { -namespace operators { - -static constexpr char kInputs[] = "inputs"; -static constexpr char kParameters[] = "parameters"; -static constexpr char kPlaces[] = "places"; - -static constexpr char kOutputs[] = "outputs"; -static constexpr char kParallelScopes[] = "parallel_scopes"; - -static constexpr char kParallelBlock[] = "sub_block"; -static constexpr char kUseNCCL[] = "use_nccl"; - -using LoDTensor = framework::LoDTensor; -using SelectedRows = framework::SelectedRows; - -static void SplitTensorAndMoveTensorToScopes( - const framework::Scope &scope, std::vector *sub_scopes, - const std::vector &places, - const std::vector &names) { - size_t num_sub_scopes = 0; - for (auto &argu : names) { - const auto &tensor = - detail::Ref(scope.FindVar(argu), - "Cannot find variable %s in the parent scope", argu) - .Get(); - auto lod_tensors = tensor.SplitLoDTensor(places); - - for (auto &lod : lod_tensors) { - VLOG(3) << lod.dims(); - } - if (num_sub_scopes == 0) { - num_sub_scopes = lod_tensors.size(); - } else { - PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size()); - } - PADDLE_ENFORCE_NE(num_sub_scopes, 0); - if (sub_scopes->size() == 0) { - sub_scopes->reserve(num_sub_scopes); - for (size_t i = 0; i < num_sub_scopes; ++i) { - sub_scopes->emplace_back(&scope.NewScope()); - } - } - - for (size_t i = 0; i < lod_tensors.size(); ++i) { - *detail::Ref(sub_scopes->at(i)->Var(argu), - "Cannot find variable in the sub-scope", argu) - .GetMutable() = lod_tensors[i]; - } - } -} - -inline void CopyOrShare(const framework::Variable &src, - const platform::Place &dst_place, - framework::Variable *dst) { - if (src.IsType()) { - if (src.Get().place() == dst_place) { - dst->GetMutable()->ShareDataWith(src.Get()); - dst->GetMutable()->set_lod(src.Get().lod()); - } else { - TensorCopy(src.Get(), dst_place, dst->GetMutable()); - } - } else if (src.IsType()) { - auto &src_sr = src.Get(); - auto *dst_sr = dst->GetMutable(); - dst_sr->set_height(src_sr.height()); - if (src_sr.value().place() == dst_place) { - dst_sr->mutable_value()->ShareDataWith(src_sr.value()); - dst_sr->set_rows(src_sr.rows()); - } else { - TensorCopy(src_sr.value(), dst_place, dst_sr->mutable_value()); - } - } else { - PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name()); - } -} - -void WaitOnPlace(const platform::Place place) { - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto &dev_ctx = *pool.Get(place); - dev_ctx.Wait(); -} - -void WaitOnPlaces(const std::vector places) { - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - - for (auto &place : places) { - auto &dev_ctx = *pool.Get(place); - dev_ctx.Wait(); - } -} - -class ParallelDoOp : public framework::OperatorBase { - public: - ParallelDoOp(const std::string &type, - const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : framework::OperatorBase(type, inputs, outputs, attrs) {} - - private: - void RunImpl(const framework::Scope &scope, - const platform::Place &place) const override { - // get device context from pool - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto &dev_ctx = *pool.Get(place); - - auto *block = Attr(kParallelBlock); - auto *program = block->Program(); - - auto &places = scope.FindVar(Input(kPlaces))->Get(); - - auto &sub_scopes = *scope.FindVar(Output(kParallelScopes)) - ->GetMutable>(); - - // split input - SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places, - Inputs(kInputs)); - - // copy parameter - for (auto ¶m : Inputs(kParameters)) { - PADDLE_ENFORCE(scope.FindVar(param)->IsType(), - "Only support parameter type as LoDTensor"); - auto &src = scope.FindVar(param)->Get(); - - auto *sub_scope0 = sub_scopes[0]; - auto *dst0 = sub_scope0->Var(param)->GetMutable(); - dst0->ShareDataWith(src); - - for (size_t i = 1; i < sub_scopes.size(); ++i) { - auto &place = places[i]; - auto *sub_scope = sub_scopes[i]; - auto *dst = sub_scope->Var(param)->GetMutable(); - framework::TensorCopy(src, place, dst); - } - } - WaitOnPlaces(places); - - std::vector> workers; - workers.reserve(places.size()); - for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) { - auto &place = places[place_idx]; - auto *cur_scope = sub_scopes[place_idx]; - - workers.emplace_back(framework::Async([program, cur_scope, place, block] { - framework::Executor executor(place); - executor.Run(*program, cur_scope, block->ID(), - false /*create_local_scope*/); - })); - } - for (auto &worker : workers) { - worker.wait(); - } - WaitOnPlaces(places); - - // merge output - for (auto &o_name : Outputs(kOutputs)) { - std::vector lod_tensors; - lod_tensors.reserve(sub_scopes.size()); - for (auto *sub_scope : sub_scopes) { - lod_tensors.emplace_back(&sub_scope->FindVar(o_name)->Get()); - } - - auto *lod_tensor_to_be_merged = - scope.FindVar(o_name)->GetMutable(); - lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace()); - } - WaitOnPlaces(places); - } -}; - -class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker { - public: - void Make() override { - AddInput(kInputs, "").AsDuplicable(); - AddInput(kParameters, "").AsDuplicable(); - AddInput(kPlaces, ""); - AddOutput(kOutputs, "").AsDuplicable(); - AddOutput(kParallelScopes, ""); - AddAttr(kParallelBlock, ""); - AddAttr(kUseNCCL, "true if we use nccl on backward") - .SetDefault(false); - AddComment(R"DOC( -ParallelDo Operator. -)DOC"); - } -}; - -class ParallelDoGradOp : public framework::OperatorBase { - public: - ParallelDoGradOp(const std::string &type, - const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : framework::OperatorBase(type, inputs, outputs, attrs) {} - - private: - void RunImpl(const framework::Scope &scope, - const platform::Place &place) const override { - auto *block = Attr(kParallelBlock); - auto *program = block->Program(); - - auto &sub_scopes = scope.FindVar(Input(kParallelScopes)) - ->Get>(); - auto &places = scope.FindVar(Input(kPlaces))->Get(); - - // feed output@grad - SplitTensorAndMoveTensorToScopes( - scope, const_cast *>(&sub_scopes), - places, Inputs(framework::GradVarName(kOutputs))); - WaitOnPlaces(places); - - // exe run - std::vector> workers; - for (size_t i = 0; i < sub_scopes.size(); ++i) { - auto &place = places[i]; - auto *cur_scope = sub_scopes[i]; - - // execute - workers.emplace_back(framework::Async([program, cur_scope, place, block] { - framework::Executor executor(place); - executor.Run(*program, cur_scope, block->ID(), - false /*create_local_scope*/); - })); - } - for (auto &worker : workers) { - worker.wait(); - } - WaitOnPlaces(places); - - // NCCL allreduce op will be added by backward, - // so no need to explicitly accumulate grad - if (!(Attr(kUseNCCL))) { - AccumulateGrad(scope, place, sub_scopes, places); - } else { - for (auto &place : places) { - PADDLE_ENFORCE(platform::is_gpu_place(place), - "NCCL only supports cuda place"); - } - } - for (auto &s : Outputs(framework::GradVarName(kParameters))) { - if (s == framework::kEmptyVarName) { - continue; - } - VLOG(3) << "Moving " << s; - CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s)); - } - WaitOnPlaces(places); - } - - void AccumulateGrad(const framework::Scope &scope, - const platform::Place &place, - const std::vector &sub_scopes, - const platform::PlaceList &places) const { - for (auto &s : Outputs(framework::GradVarName(kParameters))) { - if (s == framework::kEmptyVarName) { - continue; - } - VLOG(3) << "Accumulating " << s; - if (s == framework::kEmptyVarName) continue; - std::string tmp_name; - auto *tmp = sub_scopes[0]->Var(&tmp_name); - - for (size_t i = 1; i < sub_scopes.size(); ++i) { - CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp); - WaitOnPlaces(places); - - auto sum_op = framework::OpRegistry::CreateOp( - "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}}, - framework::AttributeMap{{"use_mkldnn", {false}}}); - VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]); - sum_op->Run(*sub_scopes[0], places[0]); - WaitOnPlace(places[0]); - } - - CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s)); - } - WaitOnPlaces(places); - } -}; - -std::ostream &operator<<(std::ostream &sout, - const std::vector &strs) { - std::copy(strs.begin(), strs.end(), - std::ostream_iterator(sout, ",")); - return sout; -} - -class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { - public: - using framework::SingleGradOpDescMaker::SingleGradOpDescMaker; - - protected: - virtual std::unique_ptr Apply() const { - auto *grad = new framework::OpDesc(); - grad->SetType("parallel_do_grad"); - for (auto &input_param : this->InputNames()) { - VLOG(3) << input_param; - grad->SetInput(input_param, this->Input(input_param)); - if (input_param != kPlaces) { - grad->SetOutput(framework::GradVarName(input_param), - this->InputGrad(input_param, false)); - } - } - auto *g_block = this->grad_block_[0]; - - // All variable name that needed by gradient operators - std::unordered_set all_inputs_in_grad_blocks; - - for (size_t i = 0; i < g_block->OpSize(); ++i) { - auto *op = g_block->Op(i); - for (auto &var_name : op->InputArgumentNames()) { - all_inputs_in_grad_blocks.insert(var_name); - } - } - - for (auto &output_param : this->OutputNames()) { - if (output_param == kParallelScopes) { - grad->SetInput(output_param, this->Output(output_param)); - grad->SetInput(framework::GradVarName(output_param), - this->Output(output_param)); - } else { - grad->SetInput(output_param, this->Output(output_param)); - std::vector og_names; - for (auto &og_name : this->OutputGrad(output_param)) { - if (all_inputs_in_grad_blocks.count(og_name) != 0) { - // there are some gradient operators who need the OG. So make this - // OG as an input of parallel.do - og_names.push_back(og_name); - } - // else, there is no operator who need the OG. Do not use this OG as - // an input - } - grad->SetInput(framework::GradVarName(output_param), og_names); - } - } - grad->SetInput("Communicator", {"nccl_com__do_not_change_"}); - grad->SetAttrMap(this->Attrs()); - grad->SetBlockAttr(kParallelBlock, grad_block_[0]); - - return std::unique_ptr(grad); - } -}; - -class ParallelDoGradOpShapeInference : public framework::InferShapeBase { - public: - void operator()(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInputs(kParameters)); - PADDLE_ENFORCE(ctx->HasInputs(kInputs)); - PADDLE_ENFORCE(ctx->HasInputs(kOutputs)); - - ctx->SetOutputsDim(framework::GradVarName(kParameters), - ctx->GetInputsDim(kParameters)); - - auto i_dims = ctx->GetInputsDim(kInputs); - auto ig_names = ctx->Outputs(framework::GradVarName(kInputs)); - - for (size_t i = 0; i < ig_names.size(); ++i) { - auto &ig_name = ig_names[i]; - if (ig_name == framework::kEmptyVarName) { - continue; - } - - ctx->SetDims({ig_name}, {i_dims[i]}); - } - - auto p_dims = ctx->GetInputsDim(kParameters); - auto pg_names = ctx->Outputs(framework::GradVarName(kParameters)); - for (size_t i = 0; i < pg_names.size(); ++i) { - auto &pg_name = pg_names[i]; - if (pg_name == framework::kEmptyVarName) { - continue; - } - ctx->SetDims({pg_name}, {p_dims[i]}); - } - } -}; - -class ParallelDoGradOpVarTypeInference : public framework::VarTypeInference { - public: - void operator()(const framework::OpDesc &op_desc, - framework::BlockDesc *block) const override { - framework::BlockDesc *sub_block = - boost::get(op_desc.GetAttr(kParallelBlock)); - for (auto &out_vars : op_desc.Outputs()) { - for (auto &out_var : out_vars.second) { - auto &var = block->FindRecursiveOrCreateVar(out_var); - auto sub_var = sub_block->FindRecursiveOrCreateVar(out_var); - if (sub_var.GetType() != var.GetType()) { - var.SetType(sub_var.GetType()); - } - } - } - } -}; - -} // namespace operators -} // namespace paddle - -REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp, - paddle::operators::ParallelDoOpProtoMaker, - paddle::operators::ParallelDoGradOpDescMaker); -REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp, - paddle::operators::ParallelDoGradOpShapeInference, - paddle::operators::ParallelDoGradOpVarTypeInference); diff --git a/python/paddle/fluid/backward.py b/python/paddle/fluid/backward.py index 17fe8dc3c..b2c3e7c98 100644 --- a/python/paddle/fluid/backward.py +++ b/python/paddle/fluid/backward.py @@ -249,69 +249,6 @@ def serialize_op_decs(op_desc): return proto.__str__() -def _callback_lookup_(op): - """ - Only used in _append_backward_ops_ - Build and returns a callback function for certain op. For example - - parallel_do: AllReduce - - :param op: - :return: callback function - """ - if op.type == 'parallel_do' and op.attr('use_nccl'): - all_vars = op.block.vars - param_names = set(op.input('parameters')) - param_names = [ - name for name in param_names - if all_vars[name].stop_gradient is False - ] - param_grad_names = [n + "@GRAD" for n in param_names] - - class ParallelDoCallBack(object): - def __init__(self, param_grad_names, parallel_scopes_name): - self.has_inserted_nccl_init = False - self.param_grad_names = param_grad_names - self.parallel_scopes_name = parallel_scopes_name - - def __call__(self, block, context): - if not self.has_inserted_nccl_init: - op_desc = _create_op_desc_( - "ncclInit", - {"parallel_scopes": self.parallel_scopes_name}, - {"Communicator": ['nccl_com__do_not_change_']}, {}) - block.program.global_block().desc.append_op().copy_from( - op_desc) - self.has_inserted_nccl_init = True - - current_op_desc = context["__current_op_desc__"] - for o_param in current_op_desc.output_names(): - for o_argu in current_op_desc.output(o_param): - if o_argu in self.param_grad_names: - allreduce_out_name = o_argu + "__nccl_all_reduce__" - op_desc = _create_op_desc_( - "ncclReduce", - { - "X": [o_argu], - "Communicator": - ['nccl_com__do_not_change_'] - }, - {"Out": [allreduce_out_name]}, - {"reduction": "ncclSum", - "root": 0}, ) - block.desc.append_op().copy_from(op_desc) - - op_desc = _create_op_desc_( - "assign", {"X": [allreduce_out_name]}, - {"Out": [o_argu]}, {}) - block.desc.append_op().copy_from(op_desc) - - return ParallelDoCallBack(param_grad_names, - op.output("parallel_scopes")) - else: - return None - - def _append_backward_ops_(block, ops, target_block, @@ -349,17 +286,8 @@ def _append_backward_ops_(block, sub_block = program.block(op._block_attr_id("sub_block")) grad_sub_block = program._create_block() grad_sub_block._set_forward_block_idx(sub_block.idx) - cb = _callback_lookup_(op) - if cb is not None: - if callbacks is None: - new_callbacks = [cb] - else: - new_callbacks = callbacks + [_callback_lookup_(op)] - _append_backward_ops_(sub_block, sub_block.ops, grad_sub_block, - no_grad_dict, grad_to_var, new_callbacks) - else: - _append_backward_ops_(sub_block, sub_block.ops, grad_sub_block, - no_grad_dict, grad_to_var, callbacks) + _append_backward_ops_(sub_block, sub_block.ops, grad_sub_block, + no_grad_dict, grad_to_var, callbacks) program._rollback() grad_sub_block_list.append(grad_sub_block.desc) @@ -424,9 +352,6 @@ def _append_backward_vars_(block, start_op_idx, grad_to_var, grad_info_map): # infer_shape and infer_type op_desc.infer_var_type(block.desc) op_desc.infer_shape(block.desc) - # ncclInit dones't need to set data_type - if op_desc.type() == 'ncclInit': - continue for arg in op_desc.output_arg_names(): if arg in new_vars: _infer_var_data_type_(arg, block) diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 089792059..d0bd78454 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -563,8 +563,8 @@ class Operator(object): OP_WITHOUT_KERNEL_SET = { 'feed', 'fetch', 'save', 'load', 'recurrent', 'go', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv', - 'listen_and_serv', 'parallel_do', 'save_combine', 'load_combine', - 'ncclInit', 'select', 'checkpoint_notify', 'gen_nccl_id' + 'listen_and_serv', 'save_combine', 'load_combine', 'ncclInit', 'select', + 'checkpoint_notify', 'gen_nccl_id' } def __init__(self, diff --git a/python/paddle/fluid/layers/control_flow.py b/python/paddle/fluid/layers/control_flow.py index b7e396856..21454370d 100644 --- a/python/paddle/fluid/layers/control_flow.py +++ b/python/paddle/fluid/layers/control_flow.py @@ -226,156 +226,6 @@ class BlockGuard(object): return True -class ParallelDo(object): - """ - ParallelDo is used to represent multi-thread data parallel processing. - - Its vanilla implementation can be shown as the following (:math:`|` means - single thread and :math:`||||` means multiple threads) - - .. code-block:: text - - In the forward pass - | Split input onto different devices - | Copy parameter onto different devices - |||| Compute forward pass in parallel - | Merge output from different devices - - In the backward pass - | Split output@grad onto different devices - |||| Compute backward pass in parallel - | accumulate param@grad from different devices to the first device - | Merge input@grad from different devices - | Copy param@grad to the place of parallel_do_op - - Examples: - - .. code-block:: python - - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - # ParallelDo version & Single-thread version - if thread_num > 1: - places = fluid.layers.get_places(thread_num) - pd = fluid.layers.control_flow.ParallelDo(places) - with pd.do(): - images = pd.read_input(images) - label = pd.read_input(label) - predict = cnn_model(images) - cost = fluid.layers.cross_entropy(input=predict, label=label) - - avg_cost = fluid.layers.mean(x=cost) - pd.write_output(avg_cost) - - avg_cost = pd() - avg_cost = fluid.layers.mean(avg_cost) - else: - predict = cnn_model(images) - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) - - .. warning:: - - It will be soon deprecated, please use ParallelExecutor instead. - """ - - def __init__(self, places, use_nccl=False, name=None): - warnings.warn( - "API ParallelDo is deprecated since 0.15.0. Please use ParallelExecutor instead.", - Warning) - self.helper = LayerHelper("parallel_do", name=name) - self.inputs = [] - self.places = places - self.outputs = [] - self.status = StaticRNN.BEFORE_RNN_BLOCK - self.use_nccl = use_nccl - - def do(self): - return BlockGuardWithCompletion(self) - - def parent_block(self): - prog = self.helper.main_program - parent_idx = prog.current_block().parent_idx - assert parent_idx >= 0 - parent_block = prog.block(parent_idx) - return parent_block - - def __call__(self, *args, **kwargs): - if self.status != StaticRNN.AFTER_RNN_BLOCK: - raise ValueError("RNN output can only be retrieved after rnn block") - if len(self.outputs) == 0: - raise ValueError("RNN has no output") - elif len(self.outputs) == 1: - return self.outputs[0] - else: - return self.outputs - - def read_input(self, var): - self.inputs.append(var) - return var - - def write_output(self, var): - self.outputs.append(var) - - def get_parameters(self): - main_program = self.helper.main_program - current_block = main_program.current_block() - parent_block = self.parent_block() - - local_inputs = set() - params = list() - for var in self.inputs: - local_inputs.add(var.name) - - for op in current_block.ops: - for iname in op.input_names: - for in_var_name in op.input(iname): - if in_var_name not in local_inputs: - params.append(in_var_name) - - for oname in op.output_names: - for out_var_name in op.output(oname): - local_inputs.add(out_var_name) - - params = list(set(params)) - - return [parent_block.var(name) for name in params] - - def _complete_op(self): - main_program = self.helper.main_program - current_block = main_program.current_block() - parent_block = self.parent_block() - - step_scope = parent_block.create_var( - type=core.VarDesc.VarType.STEP_SCOPES) - - self.outputs = [ - parent_block.create_var( - name=o.name, - shape=o.shape, - dtype=o.dtype, - lod_level=o.lod_level, - persistable=o.persistable, - stop_gradient=o.stop_gradient) for o in self.outputs - ] - - inputs = [parent_block.var(i.name) for i in self.inputs] - outputs = [parent_block.var(o.name) for o in self.outputs] - - parent_block.append_op( - type='parallel_do', - inputs={ - 'inputs': inputs, - 'parameters': self.get_parameters(), - 'places': self.places - }, - outputs={'outputs': outputs, - 'parallel_scopes': [step_scope]}, - attrs={'sub_block': current_block, - 'use_nccl': self.use_nccl}) - - class BlockGuardWithCompletion(BlockGuard): """ BlockGuardWithCompletion class. @@ -384,7 +234,7 @@ class BlockGuardWithCompletion(BlockGuard): """ def __init__(self, rnn): - if not (isinstance(rnn, StaticRNN) or isinstance(rnn, ParallelDo)): + if not isinstance(rnn, StaticRNN): raise TypeError( "BlockGuardWithCompletion takes a StaticRNN or ParallelDo") super(BlockGuardWithCompletion, self).__init__(rnn.helper.main_program) diff --git a/python/paddle/fluid/tests/book/notest_understand_sentiment.py b/python/paddle/fluid/tests/book/notest_understand_sentiment.py index a666507bd..5658bb4ec 100644 --- a/python/paddle/fluid/tests/book/notest_understand_sentiment.py +++ b/python/paddle/fluid/tests/book/notest_understand_sentiment.py @@ -15,7 +15,6 @@ from __future__ import print_function from paddle.fluid.layers.device import get_places -from paddle.fluid.layers.control_flow import ParallelDo import unittest import paddle.fluid as fluid import paddle @@ -147,22 +146,7 @@ def train(word_dict, cost, acc_out, prediction = net_method( data, label, input_dim=dict_dim, class_dim=class_dim) else: - places = get_places() - pd = ParallelDo(places) - with pd.do(): - cost, acc, _ = net_method( - pd.read_input(data), - pd.read_input(label), - input_dim=dict_dim, - class_dim=class_dim) - pd.write_output(cost) - pd.write_output(acc) - - cost, acc = pd() - cost = fluid.layers.mean(cost) - acc_out = fluid.layers.mean(acc) - prediction = None - assert save_dirname is None + raise NotImplementedError() adagrad = fluid.optimizer.Adagrad(learning_rate=0.002) adagrad.minimize(cost) diff --git a/python/paddle/fluid/tests/book/test_recognize_digits.py b/python/paddle/fluid/tests/book/test_recognize_digits.py index 4a70976a4..54936519c 100644 --- a/python/paddle/fluid/tests/book/test_recognize_digits.py +++ b/python/paddle/fluid/tests/book/test_recognize_digits.py @@ -25,7 +25,6 @@ import numpy import paddle import paddle.fluid as fluid from paddle.fluid.layers.device import get_places -from paddle.fluid.layers.control_flow import ParallelDo BATCH_SIZE = 64 @@ -82,19 +81,7 @@ def train(nn_type, net_conf = conv_net if parallel: - places = get_places() - pd = ParallelDo(places) - with pd.do(): - img_ = pd.read_input(img) - label_ = pd.read_input(label) - prediction, avg_loss, acc = net_conf(img_, label_) - for o in [avg_loss, acc]: - pd.write_output(o) - - avg_loss, acc = pd() - # get mean loss and acc through every devices. - avg_loss = fluid.layers.mean(avg_loss) - acc = fluid.layers.mean(acc) + raise NotImplementedError() else: prediction, avg_loss, acc = net_conf(img, label) diff --git a/python/paddle/fluid/tests/book/test_word2vec.py b/python/paddle/fluid/tests/book/test_word2vec.py index 9191f0fc2..08f70c9ca 100644 --- a/python/paddle/fluid/tests/book/test_word2vec.py +++ b/python/paddle/fluid/tests/book/test_word2vec.py @@ -17,7 +17,6 @@ from __future__ import print_function import paddle import paddle.fluid as fluid from paddle.fluid.layers.device import get_places -from paddle.fluid.layers.control_flow import ParallelDo import unittest import os import numpy as np @@ -84,18 +83,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True): avg_cost, predict_word = __network__( [first_word, second_word, third_word, forth_word, next_word]) else: - places = get_places() - pd = ParallelDo(places) - with pd.do(): - avg_cost, predict_word = __network__( - list( - map(pd.read_input, [ - first_word, second_word, third_word, forth_word, - next_word - ]))) - pd.write_output(avg_cost) - - avg_cost = fluid.layers.mean(pd()) + raise NotImplementedError() sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py deleted file mode 100644 index dab2a52bc..000000000 --- a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import math -import sys - -import paddle -import paddle.fluid as fluid -from paddle.fluid.layers.device import get_places -from paddle.fluid.layers.control_flow import ParallelDo - -# need to fix random seed and training data to compare the loss -# value accurately calculated by the default and the memory optimization -# version. -fluid.default_startup_program().random_seed = 111 - -x = fluid.layers.data(name='x', shape=[13], dtype='float32') -y = fluid.layers.data(name='y', shape=[1], dtype='float32') - -device_type = 'CPU' -use_nccl = False -place = fluid.CPUPlace() -if fluid.core.is_compiled_with_cuda(): - device_type = 'CUDA' - use_nccl = False - place = fluid.CUDAPlace(0) - -places = get_places(device_count=0, device_type=device_type) -pd = ParallelDo(places, use_nccl=use_nccl) -with pd.do(): - x_ = pd.read_input(x) - y_ = pd.read_input(y) - y_predict = fluid.layers.fc(input=x_, size=1, act=None) - cost = fluid.layers.square_error_cost(input=y_predict, label=y_) - avg_cost = fluid.layers.mean(x=cost) - pd.write_output(avg_cost) - -cost = pd() -avg_cost = fluid.layers.mean(x=cost) -sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01) -sgd_optimizer.minimize(avg_cost) - -fluid.memory_optimize(fluid.default_main_program(), print_log=True) -# fluid.release_memory(fluid.default_main_program()) - -BATCH_SIZE = 200 - -# fix the order of training data -train_reader = paddle.batch( - paddle.dataset.uci_housing.train(), batch_size=BATCH_SIZE, drop_last=False) - -# train_reader = paddle.batch( -# paddle.reader.shuffle( -# paddle.dataset.uci_housing.train(), buf_size=500), -# batch_size=BATCH_SIZE) - -feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) -exe = fluid.Executor(place) - -exe.run(fluid.default_startup_program()) - -PASS_NUM = 100 -for pass_id in range(PASS_NUM): - for data in train_reader(): - avg_loss_value, = exe.run(fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[avg_cost]) - - if avg_loss_value[0] < 10.0: - exit(0) # if avg cost less than 10.0, we think our code is good. - print(avg_loss_value[0]) - if math.isnan(float(avg_loss_value)): - sys.exit("got NaN loss, training failed.") -exit(1) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_op.py b/python/paddle/fluid/tests/unittests/test_parallel_op.py deleted file mode 100644 index 380e17284..000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_op.py +++ /dev/null @@ -1,235 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -from paddle.fluid.layers.device import get_places -from paddle.fluid.layers.control_flow import ParallelDo -import paddle.fluid.profiler as profiler -import numpy -import six - - -class BaseParallelForTest(unittest.TestCase): - def run_test(self, callback, feed, fetch): - """ - Run the unittest for parallel.for - Args: - callback(callable): A callable function returns a generator. There - are two yields in the generator function. The first yield - returns the data layers, and the second yield returns the loss. - The modified data variables will be sent back during the first - yield. - - feed(dict): The executor feeding dictionary. - fetch(list|basestr): The fetch name lists. - - Returns: - None - - Raises: - AssertionError when the computation of cpu, parallel.for in cpu, - gpu, parallel.for in gpu are different. - - """ - cpu = fluid.CPUPlace() - result_cpu = self._run_test_impl_( - callback=callback, - feed=feed, - fetch=fetch, - place=cpu, - use_parallel=False) - result_cpu_parallel = self._run_test_impl_( - callback=callback, - feed=feed, - fetch=fetch, - place=cpu, - use_parallel=True) - if fluid.core.is_compiled_with_cuda(): - gpu = fluid.CUDAPlace(0) - result_gpu = self._run_test_impl_( - callback=callback, - feed=feed, - fetch=fetch, - place=gpu, - use_parallel=False, - use_gpu=True) - result_gpu_parallel = self._run_test_impl_( - callback=callback, - feed=feed, - fetch=fetch, - place=gpu, - use_parallel=True, - use_gpu=True) - result_gpu_nccl = self._run_test_impl_( - callback=callback, - feed=feed, - fetch=fetch, - place=gpu, - use_parallel=True, - use_nccl=True, - use_gpu=True) - self._assert_same_(fetch, result_cpu, result_cpu_parallel, - result_gpu, result_gpu_parallel, result_gpu_nccl) - else: - self._assert_same_(fetch, result_cpu, result_cpu_parallel) - - def _run_test_impl_(self, - callback, - feed, - fetch, - place, - use_parallel=False, - use_nccl=False, - use_gpu=False): - """ - Run a single test, returns the fetch values - Args: - place(Place): the computation place. - use_parallel(bool): Whether use parallel.for or not. - - Returns: - Fetched numpy arrays. - - """ - if isinstance(fetch, six.string_types): - fetch = [fetch] - main = fluid.Program() - startup = fluid.Program() - # Fix seed - main.random_seed = 10 - startup.random_seed = 10 - - with fluid.program_guard(main, startup): - generator = callback() - # Automatically insert parallel do if use_parallel = True - if use_parallel: - thread_num = fluid.core.get_cuda_device_count( - ) if use_gpu else 8 - places = get_places(thread_num) - pd = ParallelDo(places, use_nccl=use_nccl) - data = next(generator) - - if isinstance(data, fluid.framework.Variable): - data = [data] - - with pd.do(): - ins = list(map(pd.read_input, data)) - if len(ins) == 1: - ins = ins[0] - loss = generator.send(ins) # patch input - pd.write_output(loss) - - loss = pd() - else: - data = next(generator) - loss = generator.send(data) - self.assertIsNotNone(loss) - avg_loss = fluid.layers.mean(loss) - fluid.backward.append_backward(loss=avg_loss) - - exe = fluid.Executor(place) - exe.run(startup) - if use_gpu: - profile_type = 'GPU' - else: - profile_type = 'CPU' - with profiler.profiler(profile_type, 'total', '/tmp/profiler'): - return exe.run(main, feed=feed, fetch_list=fetch) - - def _assert_same_(self, fetch, *args): - """ - Assert the return values of `run_test` are same. - Args: - fetch: Fetch list. Used for print error message - *args: The fetch result lists of each situations. - - Returns: - None - - Raises: - AssertionError - - """ - - def _impl_(a, b, fetch_id, item_id): - item_str = [ - 'CPU', 'ParallelCPU', 'GPU', 'ParallelGPU', 'ParallelGPUNCCL' - ] - flag = numpy.allclose(a, b, rtol=0.1, atol=1e-3) - self.assertTrue(flag, - "The {0} are different in {1}, {2} vs {3}".format( - fetch[fetch_id], item_str[item_id], a, b)) - - for i, items in enumerate(zip(*args)): - self.assertGreater(len(items), 0) - for j in range(1, len(items)): - _impl_(items[0], items[j], fetch_id=i, item_id=j) - - -class ParallelOpTest(BaseParallelForTest): - @staticmethod - def __network__(): - x = fluid.layers.data(shape=[784], dtype='float32', name='img') - x = yield x - hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') - hidden = fluid.layers.batch_norm(input=hidden) - loss = fluid.layers.mean(hidden) - yield loss - - def test_simple_fc(self): - self.run_test( - callback=self.__network__, - feed={ - 'img': numpy.random.random(size=(51, 784)).astype('float32') - }, - fetch=['fc1.w@GRAD']) - - def test_fc_with_tiny_data(self): - self.run_test( - callback=self.__network__, - feed={'img': numpy.random.random(size=(1, 784)).astype('float32')}, - fetch=['fc1.w@GRAD']) - - -class ParallelOpTestMultipleInput(BaseParallelForTest): - @staticmethod - def __network__(): - x = fluid.layers.data( - shape=[784], dtype='float32', name='img1', stop_gradient=False) - y = fluid.layers.data( - shape=[784], dtype='float32', name='img2', stop_gradient=False) - yield [x, y] - x = x + y - hidden1 = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') - hidden2 = fluid.layers.fc(input=hidden1, size=200, param_attr='fc2.w') - hidden3 = fluid.layers.fc(input=hidden2, size=200, param_attr='fc3.w') - loss = fluid.layers.mean(hidden3) - yield loss - - def test_simple_fc(self): - self.run_test( - callback=self.__network__, - feed={ - 'img1': numpy.random.random(size=(51, 784)).astype('float32'), - 'img2': numpy.random.random(size=(51, 784)).astype('float32') - }, - fetch=['fc1.w@GRAD', 'fc2.w@GRAD', 'fc3.w@GRAD']) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/transpiler/memory_optimization_transpiler.py b/python/paddle/fluid/transpiler/memory_optimization_transpiler.py index 95aafec05..d10ea4e47 100755 --- a/python/paddle/fluid/transpiler/memory_optimization_transpiler.py +++ b/python/paddle/fluid/transpiler/memory_optimization_transpiler.py @@ -35,11 +35,10 @@ dtype_to_size = { } SUB_BLOCK_OPS = [ - "while", "while_grad", "parallel_do", "parallel_do_grad", - "conditional_block", "conditional_block_grad" + "while", "while_grad", "conditional_block", "conditional_block_grad" ] -SUB_BLOCK_PAIR = [("while", "while_grad"), ("parallel_do", "parallel_do_grad"), +SUB_BLOCK_PAIR = [("while", "while_grad"), ("conditional_block", "conditional_block_grad")] PRINT_LOG = False -- GitLab