diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index 6e71339e5a1733aa35cbf5c2ee79fd7c61f8a9e8..38f811c0e9e8ecc6a4226e17e621a3c2ef3f78c5 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -35,6 +35,7 @@ DECLARE_bool(benchmark); DEFINE_bool(check_nan_inf, false, "Checking whether operator produce NAN/INF or not. It will be " "extremely slow so please use this flag wisely."); +DEFINE_int32(inner_op_parallelism, 0, "number of threads for inner op"); namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/operator.h b/paddle/fluid/framework/operator.h index 041187665af6ad0d75a7c55fe6ed451fe6f45b73..40d935a5ff98a28b376a59b733e2929e4a128cb9 100644 --- a/paddle/fluid/framework/operator.h +++ b/paddle/fluid/framework/operator.h @@ -34,6 +34,8 @@ limitations under the License. */ #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/variant.h" +DECLARE_int32(inner_op_parallelism); + namespace paddle { namespace framework { diff --git a/paddle/fluid/operators/optimizers/adam_op.cc b/paddle/fluid/operators/optimizers/adam_op.cc index e9c395a9314180960da2b9b0f996fce5d62b14ba..54e0f5146dab3e19713d19e15c6c81868179b319 100644 --- a/paddle/fluid/operators/optimizers/adam_op.cc +++ b/paddle/fluid/operators/optimizers/adam_op.cc @@ -114,6 +114,13 @@ class AdamOpMaker : public framework::OpProtoAndCheckerMaker { "(bool, default false) " "only update the parameter that has gradient in sparse update") .SetDefault(false); + AddAttr("min_row_size_to_use_multithread", + "(int64_t, default 0) " + "when not zero, if param row size is larger then " + "min_row_size_to_use_multithread and " + "inner_op_parallelism is larger then 0, sparse update " + "will run in multithread mode") + .SetDefault(1000); AddComment(R"DOC( Adam Optimizer. diff --git a/paddle/fluid/operators/optimizers/adam_op.h b/paddle/fluid/operators/optimizers/adam_op.h index 61b9384f8422cb531a94096875434ffe36ecdbce..db44cd6ec989d27f2994625e1641b5ac60880a8a 100644 --- a/paddle/fluid/operators/optimizers/adam_op.h +++ b/paddle/fluid/operators/optimizers/adam_op.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/detail/safe_ref.h" #include "paddle/fluid/operators/math/algorithm.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" @@ -353,6 +354,8 @@ class AdamOpKernel : public framework::OpKernel { using paddle::framework::LoDTensor; using paddle::operators::detail::Ref; + int64_t min_row_size_to_use_multithread = + ctx.Attr("min_row_size_to_use_multithread"); bool lazy_mode = ctx.Attr("lazy_mode"); T beta1 = static_cast(ctx.Attr("beta1")); T beta2 = static_cast(ctx.Attr("beta2")); @@ -473,8 +476,8 @@ class AdamOpKernel : public framework::OpKernel { lr.template data(), grad_data, param.template data(), param_out.template mutable_data(ctx.GetPlace()), rows, row_numel, grad_merge.rows().size(), lazy_mode); - if (lazy_mode) { + VLOG(3) << "run cpu lazy mode"; size_t row_count = grad_merge.rows().size(); std::vector cpu_rows(grad_merge.rows()); for (size_t row_index = 0; row_index < row_count; ++row_index) { @@ -483,6 +486,62 @@ class AdamOpKernel : public framework::OpKernel { functor.adam_update(i, grad_data[row_index * row_numel + offset]); } } + } else if (FLAGS_inner_op_parallelism > 1 && + min_row_size_to_use_multithread > 0 && + param.dims()[0] > min_row_size_to_use_multithread) { + VLOG(3) << "use multi thread, inner_op_parallelism=" + << FLAGS_inner_op_parallelism + << " min_row_size_to_use_multithread=" + << min_row_size_to_use_multithread; + if (FLAGS_inner_op_parallelism > 10) { + VLOG(1) << "FLAGS_inner_op_parallelism " + << FLAGS_inner_op_parallelism << " is two large!"; + } + auto& grad_rows = grad_merge.rows(); + std::unordered_map row_id_to_grad_row_offset; + size_t param_row_count = param.numel() / row_numel; + if (param_row_count < 1000) { + VLOG(1) << "param_row_count should be larger then 1000 to use " + "multi thread, currently " + << param_row_count; + } + for (size_t i = 0; i < grad_rows.size(); ++i) { + row_id_to_grad_row_offset[grad_rows[i]] = i; + } + std::vector> fs; + int64_t line_in_each_thread = + param_row_count / FLAGS_inner_op_parallelism + 1; + for (int i = 0; i < FLAGS_inner_op_parallelism; ++i) { + int64_t start = i * line_in_each_thread; + int64_t end = (i + 1) * line_in_each_thread; + if (start >= param_row_count) { + break; + } + if (end > param_row_count) { + end = param_row_count; + } + fs.push_back( + framework::Async([&functor, &row_id_to_grad_row_offset, + &grad_data, row_numel, start, end]() { + for (int64_t row_id = start; row_id < end; ++row_id) { + auto iter = row_id_to_grad_row_offset.find(row_id); + if (iter != row_id_to_grad_row_offset.end()) { + for (size_t row_offset = 0U; row_offset < row_numel; + ++row_offset) { + functor.adam_update( + row_id * row_numel + row_offset, + grad_data[iter->second * row_numel + row_offset]); + } + } else { + for (size_t row_offset = 0U; row_offset < row_numel; + ++row_offset) { + functor.adam_update(row_id * row_numel + row_offset, 0); + } + } + } + })); + } + for (size_t i = 0; i < fs.size(); ++i) fs[i].wait(); } else { functor(param.numel()); } diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 686550a3c8d7d55f06b03132124621c5d0db342f..5e3d4725c568261b149b19a49ba992911efdbfb1 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -129,7 +129,7 @@ def __bootstrap__(): 'eager_delete_tensor_gb', 'fast_eager_deletion_mode', 'allocator_strategy', 'reader_queue_speed_test_mode', 'print_sub_graph_dir', 'pe_profile_fname', 'warpctc_dir', - 'enable_parallel_graph' + 'inner_op_parallelism', 'enable_parallel_graph' ] if 'Darwin' not in sysstr: read_env_flags.append('use_pinned_memory') diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index f01a0eda9a711abb3265fe5bb86ecb702a6ac6aa..b72b900d3b26cb705b76623e9bacc16c76a7c79f 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -822,7 +822,8 @@ class AdamOptimizer(Optimizer): "beta1": self._beta1, "beta2": self._beta2, "epsilon": self._epsilon, - "lazy_mode": self._lazy_mode + "lazy_mode": self._lazy_mode, + "min_row_size_to_use_multithread": 1000 }, stop_gradient=True) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index e86af8b7ed94028ac077e4267645b8af0a1a5dfe..808e1e6aa80744db1289094d7c1bad00002a4c3e 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -87,6 +87,7 @@ list(REMOVE_ITEM TEST_OPS test_nearest_interp_op) foreach(TEST_OP ${TEST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) endforeach(TEST_OP) +py_test_modules(test_adam_op_multi_thread MODULES test_adam_op ENVS FLAGS_inner_op_parallelism=4) py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=${WARPCTC_LIB_DIR} SERIAL) py_test_modules(test_bilinear_interp_op MODULES test_bilinear_interp_op SERIAL) py_test_modules(test_nearest_interp_op MODULES test_nearest_interp_op SERIAL) diff --git a/python/paddle/fluid/tests/unittests/test_adam_op.py b/python/paddle/fluid/tests/unittests/test_adam_op.py index ff7fc5100ebaf12655d5963c600bbd5058720349..15f277cdc0aca30b8c768b6a6ee20e44880b2304 100644 --- a/python/paddle/fluid/tests/unittests/test_adam_op.py +++ b/python/paddle/fluid/tests/unittests/test_adam_op.py @@ -261,7 +261,12 @@ class TestSparseAdamOp(unittest.TestCase): "LearningRate": np.full((1), 2.0).astype("float32") } self.init_output = np.full((height, row_numel), 0.0).astype("float32") - self.attrs = {'epsilon': epsilon, 'beta1': beta1, 'beta2': beta2} + self.attrs = { + 'epsilon': epsilon, + 'beta1': beta1, + 'beta2': beta2, + 'min_row_size_to_use_multithread': 2 + } grad_selected_rows = scope.var('Grad').get_selected_rows() grad_selected_rows.set_height(height)