From be8c82cc13b0a4b47fa7f3808fd7f9af6feb59df Mon Sep 17 00:00:00 2001 From: chengduo Date: Sat, 15 Jun 2019 12:30:00 +0800 Subject: [PATCH] [Cherry pick]Update CPU_NUM config (#18110) * update CPU_NUM config test=develop --- paddle/fluid/API.spec | 4 +- .../scope_buffered_ssa_graph_executor.cc | 1 + paddle/fluid/framework/parallel_executor.cc | 6 ++ paddle/fluid/operators/print_op.cc | 73 ++++++++++++------- python/paddle/dataset/flowers.py | 3 +- .../contrib/slim/tests/test_graph_wrapper.py | 2 + python/paddle/fluid/data_feeder.py | 8 +- python/paddle/fluid/framework.py | 27 +++++-- python/paddle/fluid/layers/control_flow.py | 14 +++- python/paddle/fluid/parallel_executor.py | 1 + .../test_parallel_executor_dry_run.py | 2 + .../fluid/tests/unittests/test_print_op.py | 32 ++++++++ .../test_py_reader_using_executor.py | 1 + 13 files changed, 126 insertions(+), 48 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 73788f9359..8d14731530 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -31,7 +31,7 @@ paddle.fluid.memory_optimize (ArgSpec(args=['input_program', 'skip_opt_set', 'pr paddle.fluid.release_memory (ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)), ('document', 'd38c5b8b2b2e0bb19bcf1b581a80a7e4')) paddle.fluid.DistributeTranspilerConfig.__init__ paddle.fluid.ParallelExecutor.__init__ (ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id', 'scope'], varargs=None, keywords=None, defaults=(None, None, None, None, None, 1, 0, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.ParallelExecutor.drop_local_exe_scopes (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '80d857dc626612e2b2460d0154551e95')) +paddle.fluid.ParallelExecutor.drop_local_exe_scopes (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '77c739744ea5708b80fb1b37cc89db40')) paddle.fluid.ParallelExecutor.run (ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True)), ('document', '33ce6ec50f8eeb05d340e6b114b026fd')) paddle.fluid.create_lod_tensor (ArgSpec(args=['data', 'recursive_seq_lens', 'place'], varargs=None, keywords=None, defaults=None), ('document', 'b82ea20e2dc5ff2372e0643169ca47ff')) paddle.fluid.create_random_int_lodtensor (ArgSpec(args=['recursive_seq_lens', 'base_shape', 'place', 'low', 'high'], varargs=None, keywords=None, defaults=None), ('document', '74dc6d23185d90a7a50fbac19f5b65fb')) @@ -311,7 +311,7 @@ paddle.fluid.layers.StaticRNN.step_input (ArgSpec(args=['self', 'x'], varargs=No paddle.fluid.layers.StaticRNN.step_output (ArgSpec(args=['self', 'o'], varargs=None, keywords=None, defaults=None), ('document', '252890d4c3199a7623ab8667e13fd837')) paddle.fluid.layers.StaticRNN.update_memory (ArgSpec(args=['self', 'mem', 'var'], varargs=None, keywords=None, defaults=None), ('document', '7a0000520f179f35239956a5ba55119f')) paddle.fluid.layers.reorder_lod_tensor_by_rank (ArgSpec(args=['x', 'rank_table'], varargs=None, keywords=None, defaults=None), ('document', '5b552a1f0f7eb4dacb768a975ba15d08')) -paddle.fluid.layers.Print (ArgSpec(args=['input', 'first_n', 'message', 'summarize', 'print_tensor_name', 'print_tensor_type', 'print_tensor_shape', 'print_tensor_lod', 'print_phase'], varargs=None, keywords=None, defaults=(-1, None, -1, True, True, True, True, 'both')), ('document', 'a222dbad457441941e50b812e5af9c7e')) +paddle.fluid.layers.Print (ArgSpec(args=['input', 'first_n', 'message', 'summarize', 'print_tensor_name', 'print_tensor_type', 'print_tensor_shape', 'print_tensor_lod', 'print_phase'], varargs=None, keywords=None, defaults=(-1, None, -1, True, True, True, True, 'both')), ('document', 'ee6c70867d317b0a87094ed23546215f')) paddle.fluid.layers.is_empty (ArgSpec(args=['x', 'cond'], varargs=None, keywords=None, defaults=(None,)), ('document', '3011dc695f490afdf504dc24f628319a')) paddle.fluid.layers.sigmoid (ArgSpec(args=['x', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'a4e395ab004e7da34e94a0a1f9eee183')) paddle.fluid.layers.logsigmoid (ArgSpec(args=['x', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', '5f2508c52e0a797bb9bd5e29d79ede78')) diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 06a454f4ad..5bbbf07e6d 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -71,6 +71,7 @@ void ScopeBufferedSSAGraphExecutor::DropLocalExeScopes() { if (local_scope_var != nullptr) { auto &local_scope = *local_scope_var->GetMutable(); scope->DeleteScope(local_scope); + scope->EraseVars({std::string(details::kLocalExecScopeName)}); VLOG(3) << "Drop local execution scope: " << local_scope; } } diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 07479f6782..15f83aa1fe 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -325,6 +325,12 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, "the number of places must be greater than 1."); } + LOG(WARNING) << string::Sprintf( + "The number of %s, which is used in ParallelExecutor, is %lu. And " + "the Program will be copied %lu copies", + (member_->use_cuda_ ? "CUDAPlace" : "CPUPlace"), places.size(), + places.size()); + // Step 1. Bcast the bcast_vars to devs. // Create local scopes if (local_scopes.empty()) { diff --git a/paddle/fluid/operators/print_op.cc b/paddle/fluid/operators/print_op.cc index 200b01797e..f686e5293b 100644 --- a/paddle/fluid/operators/print_op.cc +++ b/paddle/fluid/operators/print_op.cc @@ -135,33 +135,34 @@ struct Formater { }; // TODO(ChunweiYan) there should be some other printers for TensorArray -class TensorPrintOp : public framework::OperatorBase { +class PrintOp : public framework::OperatorBase { public: - TensorPrintOp(const std::string &type, - const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) + PrintOp(const std::string &type, const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) : OperatorBase(type, inputs, outputs, attrs) {} - TensorPrintOp(const TensorPrintOp &o) - : framework::OperatorBase( - static_cast(o)) { - PADDLE_THROW("Not implemented."); - } - private: void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { - const framework::Variable *in_var_ptr = nullptr; - std::string printed_var_name = ""; - - in_var_ptr = scope.FindVar(Input("In")); - printed_var_name = Inputs("In").front(); - - PADDLE_ENFORCE_NOT_NULL(in_var_ptr); - - auto &in_tensor = in_var_ptr->Get(); + const auto in_var = scope.FindVar(Input("In")); + auto out_var = scope.FindVar(Output("Out")); + PADDLE_ENFORCE_NOT_NULL(in_var, "The input should not be found in scope", + Input("In")); + PADDLE_ENFORCE_NOT_NULL(out_var, "The output should not be found in scope", + Output("Out")); + auto &in_tensor = in_var->Get(); + framework::LoDTensor *out_tensor = + out_var->GetMutable(); + + PrintValue(place, Inputs("In").front(), in_tensor); + framework::TensorCopy(in_tensor, place, out_tensor); + out_tensor->set_lod(in_tensor.lod()); + } + void PrintValue(const platform::Place &place, + const std::string &printed_var_name, + const framework::LoDTensor &in_tensor) const { std::string print_phase = Attr("print_phase"); bool is_forward = Attr("is_forward"); @@ -177,12 +178,12 @@ class TensorPrintOp : public framework::OperatorBase { printed_tensor.set_lod(in_tensor.lod()); printed_tensor.Resize(in_tensor.dims()); - if (platform::is_cpu_place(in_tensor.place())) { + if (is_cpu_place(in_tensor.place())) { printed_tensor.ShareDataWith(in_tensor); } else { // copy data to cpu to print platform::CPUPlace place; - framework::TensorCopy(in_tensor, place, &printed_tensor); + TensorCopy(in_tensor, place, &printed_tensor); } Formater formater; @@ -215,6 +216,7 @@ class PrintOpProtoAndCheckMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { AddInput("In", "Input tensor to be displayed."); + AddOutput("Out", "The output tensor."); AddAttr("first_n", "Only log `first_n` number of times."); AddAttr("message", "A string message to print as a prefix."); AddAttr("summarize", "Number of elements printed."); @@ -239,10 +241,23 @@ tensor `t`.)DOC"); } }; -class InferShapeForward : public framework::InferShapeBase { +class PrintOpInferShape : public framework::InferShapeBase { + public: + void operator()(framework::InferShapeContext *ctx) const override { + VLOG(10) << "PrintOpInferShape"; + PADDLE_ENFORCE(ctx->HasInput("In"), "Input(In) should not be null."); + PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null."); + ctx->ShareDim("In", /*->*/ "Out"); + ctx->ShareLoD("In", /*->*/ "Out"); + } +}; + +class PrintOpVarTypeInference : public framework::VarTypeInference { public: - void operator()(framework::InferShapeContext *context) const override { - PADDLE_ENFORCE(context->HasInput("In"), "Input(In) should not be null."); + void operator()(framework::InferVarTypeContext *ctx) const override { + auto input_type = ctx->GetType(ctx->Input("In")[0]); + auto out_name = ctx->Output("Out").front(); + ctx->SetType(out_name, input_type); } }; @@ -253,7 +268,8 @@ class PrintOpGradientMaker : public framework::SingleGradOpDescMaker { std::unique_ptr Apply() const override { auto *op_desc_ptr = new framework::OpDesc(); op_desc_ptr->SetType("print"); - op_desc_ptr->SetInput("In", InputGrad("In")); + op_desc_ptr->SetInput("In", OutputGrad("Out")); + op_desc_ptr->SetOutput("Out", InputGrad("In")); op_desc_ptr->SetAttrMap(Attrs()); op_desc_ptr->SetAttr("is_forward", false); return std::unique_ptr(op_desc_ptr); @@ -265,5 +281,6 @@ class PrintOpGradientMaker : public framework::SingleGradOpDescMaker { namespace ops = paddle::operators; -REGISTER_OPERATOR(print, ops::TensorPrintOp, ops::PrintOpProtoAndCheckMaker, - ops::PrintOpGradientMaker, ops::InferShapeForward); +REGISTER_OPERATOR(print, ops::PrintOp, ops::PrintOpProtoAndCheckMaker, + ops::PrintOpGradientMaker, ops::PrintOpInferShape, + ops::PrintOpVarTypeInference); diff --git a/python/paddle/dataset/flowers.py b/python/paddle/dataset/flowers.py index e048639ae1..969ad3c922 100644 --- a/python/paddle/dataset/flowers.py +++ b/python/paddle/dataset/flowers.py @@ -138,8 +138,7 @@ def reader_creator(data_file, break if use_xmap: - cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) - return xmap_readers(mapper, reader, cpu_num, buffered_size) + return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) else: return map_readers(mapper, reader) diff --git a/python/paddle/fluid/contrib/slim/tests/test_graph_wrapper.py b/python/paddle/fluid/contrib/slim/tests/test_graph_wrapper.py index 0ab8052d7a..69080cf50e 100644 --- a/python/paddle/fluid/contrib/slim/tests/test_graph_wrapper.py +++ b/python/paddle/fluid/contrib/slim/tests/test_graph_wrapper.py @@ -19,6 +19,8 @@ import six import numpy as np from paddle.fluid.contrib.slim.graph import GraphWrapper from paddle.fluid import core +import os +os.environ['CPU_NUM'] = str(4) def residual_block(num): diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index 32b2c8014c..1090c78142 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -22,7 +22,7 @@ from six.moves import zip, range, xrange import multiprocessing from .framework import Variable, default_main_program, _current_expected_place - +from .framework import _cpu_num, _cuda_ids __all__ = ['DataFeeder'] @@ -359,11 +359,9 @@ class DataFeeder(object): if num_places is not None: return int(num_places) elif isinstance(self.place, core.CUDAPlace): - return core.get_cuda_device_count() + return len(_cuda_ids()) else: - cpu_num = int( - os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - return cpu_num + return _cpu_num() def decorate_reader(self, reader, diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index ae17d08f26..aaa2d2246c 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -27,7 +27,7 @@ import six import numpy as np import subprocess import multiprocessing - +import sys from .. import compat as cpt from .proto import framework_pb2 @@ -82,7 +82,24 @@ def _current_expected_place(): def _cpu_num(): - return int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + if "CPU_NUM" not in os.environ.keys(): + sys.stderr.write( + 'The CPU_NUM is not specified, you should set CPU_NUM in ' + 'the environment variable list, i.e export CPU_NUM=1. CPU_NUM ' + 'indicates that how many CPUPlace are used in the current task.\n' + '!!! The default number of CPUPlaces is 1.\n\n') + os.environ['CPU_NUM'] = str(1) + cpu_num = os.environ.get('CPU_NUM') + return int(cpu_num) + + +def _cuda_ids(): + gpus_env = os.getenv("FLAGS_selected_gpus") + if gpus_env: + device_ids = [int(s) for s in gpus_env.split(",")] + else: + device_ids = six.moves.range(core.get_cuda_device_count()) + return device_ids def cuda_places(device_ids=None): @@ -116,11 +133,7 @@ def cuda_places(device_ids=None): assert core.is_compiled_with_cuda(), \ "Not compiled with CUDA" if device_ids is None: - gpus_env = os.getenv("FLAGS_selected_gpus") - if gpus_env: - device_ids = [int(s) for s in gpus_env.split(",")] - else: - device_ids = six.moves.range(core.get_cuda_device_count()) + device_ids = _cuda_ids() elif not isinstance(device_ids, (list, tuple)): device_ids = [device_ids] return [core.CUDAPlace(dev_id) for dev_id in device_ids] diff --git a/python/paddle/fluid/layers/control_flow.py b/python/paddle/fluid/layers/control_flow.py index f8c84a7029..d073c15b02 100644 --- a/python/paddle/fluid/layers/control_flow.py +++ b/python/paddle/fluid/layers/control_flow.py @@ -165,8 +165,12 @@ def Print(input, print the gradients of input tensor. Returns: - Variable: Output tensor, same data with input tensor. + Variable: Output tensor. + NOTES: + The input and output are two different variables, and in the + following process, you should use the output variable but not the input, + otherwise, the print layer doesn't have backward. Examples: .. code-block:: python @@ -174,16 +178,18 @@ def Print(input, import paddle.fluid as fluid input = fluid.layers.data(name="input", shape=[4, 32, 32], dtype="float32") - fluid.layers.Print(input, message = "The content of input layer:") + input = fluid.layers.Print(input, message = "The content of input layer:") # value = some_layer(...) # Print(value, summarize=10, # message="The content of some_layer: ") ''' - helper = LayerHelper('print', **locals()) + helper = LayerHelper('print' + "_" + input.name, **locals()) + output = helper.create_variable_for_type_inference(input.dtype) helper.append_op( type='print', inputs={'In': input}, + outputs={'Out': output}, attrs={ 'first_n': first_n, 'summarize': summarize, @@ -194,7 +200,7 @@ def Print(input, 'print_tensor_lod': print_tensor_lod, 'print_phase': print_phase.upper() }) - return input + return output class BlockGuard(object): diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index f2cefeb301..d4a1041a4b 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -324,6 +324,7 @@ class ParallelExecutor(object): loss = fluid.layers.mean(hidden) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) exe.run(startup_program) parallel_exe = fluid.ParallelExecutor(use_cuda=use_cuda, diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py index d0eca7d6df..328b3a4813 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py @@ -17,6 +17,8 @@ from paddle.fluid import compiler import unittest import logging import six +import os +os.environ['CPU_NUM'] = str(4) class TestBase(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_print_op.py b/python/paddle/fluid/tests/unittests/test_print_op.py index 8097b5f734..5b7aed46a9 100644 --- a/python/paddle/fluid/tests/unittests/test_print_op.py +++ b/python/paddle/fluid/tests/unittests/test_print_op.py @@ -17,11 +17,13 @@ from __future__ import print_function import unittest import paddle.fluid.core as core from paddle.fluid.executor import Executor +import paddle.fluid as fluid import paddle.fluid.layers as layers from paddle.fluid.backward import append_backward from paddle.fluid.framework import switch_main_program from paddle.fluid.framework import Program import numpy as np +from simple_nets import simple_fc_net, init_data class TestPrintOpCPU(unittest.TestCase): @@ -68,5 +70,35 @@ class TestPrintOpGPU(TestPrintOpCPU): self.x_tensor.set_recursive_sequence_lengths([[1, 1]]) +class TestPrintOpBackward(unittest.TestCase): + def check_backward(self, use_cuda): + main = fluid.Program() + startup = fluid.Program() + + with fluid.program_guard(main, startup): + loss = simple_fc_net() + loss = fluid.layers.Print(loss) + fluid.optimizer.Adam().minimize(loss) + + print_ops = [op for op in main.blocks[0].ops if op.type == u'print'] + assert len(print_ops) == 2, "The number of print op should be 2" + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup) + + binary = fluid.compiler.CompiledProgram(main).with_data_parallel( + loss_name=loss.name) + + img, label = init_data() + feed_dict = {"image": img, "label": label} + exe.run(binary, feed_dict) + + def test_fw_bw(self): + if core.is_compiled_with_cuda(): + self.check_backward(use_cuda=True) + self.check_backward(use_cuda=False) + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index a3701f0808..e4fb9b1970 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -22,6 +22,7 @@ import numpy as np import threading import multiprocessing import os +os.environ['CPU_NUM'] = str(4) def as_tensor(np_array_or_tensor, place=None): -- GitLab