diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index b79d6f7020c91e4c47e2fa4389416d2c6279f232..b3079f51c4d6c900b443d023f3cec5b9125427e4 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -134,6 +134,8 @@ inline void MergeVars(const std::string& var_name, auto in = EigenVector::Flatten(in_t); result.device(*cpu_ctx.eigen_device()) = result + in; } + result.device(*cpu_ctx.eigen_device()) = + result / static_cast(vars.size()); } else if (var0->IsType()) { auto& slr0 = var0->Get(); auto* out_slr = out_var->GetMutable(); @@ -144,10 +146,10 @@ inline void MergeVars(const std::string& var_name, for (auto& var : vars) { inputs.push_back(&var->Get()); } - math::scatter::MergeAdd - merge_add; auto dev_ctx = paddle::platform::CPUDeviceContext(); - merge_add(dev_ctx, inputs, out_slr, false); + math::scatter::MergeAverage + merge_average; + merge_average(dev_ctx, inputs, out_slr); VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() << " dims: " << slr0.value().dims(); } else { diff --git a/paddle/fluid/operators/distributed/communicator_test.cc b/paddle/fluid/operators/distributed/communicator_test.cc index 5294ac33d15611a003eeb7971891e8ca85ec6a73..66e36d012b10a0e1d627ee44dcde9e68f66cc719 100644 --- a/paddle/fluid/operators/distributed/communicator_test.cc +++ b/paddle/fluid/operators/distributed/communicator_test.cc @@ -42,6 +42,7 @@ TEST(communicator, merge_lod_tensors) { } out_value += static_cast(i); } + out_value = out_value / 10.0; const std::string out_name = "Out"; std::unique_ptr scope; scope.reset(new framework::Scope()); @@ -95,7 +96,7 @@ TEST(communicator, merge_selected_rows) { std::vector out_values; out_values.reserve(10); for (auto i = 0; i < 10; ++i) { - out_values.push_back(static_cast(i * (10 - i))); + out_values.push_back(static_cast((i * (10 - i)) / 10.0)); } for (auto i = 0; i < out_slr.rows().size(); ++i) { ASSERT_EQ(out_slr.rows()[i], i); diff --git a/paddle/fluid/operators/math/selected_rows_functor.cc b/paddle/fluid/operators/math/selected_rows_functor.cc index 647d4f14842ee38bbd8a5d07563ea29ff0432e1a..f73c9bb9dc57345c74678813cc5a7656ca29e134 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.cc +++ b/paddle/fluid/operators/math/selected_rows_functor.cc @@ -376,11 +376,115 @@ struct MergeAdd { } }; +template +struct MergeAverage { + framework::SelectedRows operator()(const platform::CPUDeviceContext& context, + const framework::SelectedRows& input) { + framework::SelectedRows out; + (*this)(context, input, &out); + return out; + } + + void operator()(const platform::CPUDeviceContext& context, + const framework::SelectedRows& input, + framework::SelectedRows* output) { + std::vector inputs; + inputs.push_back(&input); + (*this)(context, inputs, output); + } + + void operator()(const platform::CPUDeviceContext& context, + const std::vector& inputs, + framework::SelectedRows* output) { + if (inputs.size() == 0) { + VLOG(3) << "no input! return"; + return; + } + const framework::SelectedRows* has_value_input = nullptr; + for (auto* in : inputs) { + if (in->rows().size() > 0) { + has_value_input = in; + break; + } + } + if (has_value_input == nullptr) { + VLOG(3) << "no input has value! just return" << std::endl; + return; + } + auto input_width = has_value_input->value().dims()[1]; + auto input_height = has_value_input->height(); + framework::SelectedRows& out = *output; + std::set merged_row_set; + size_t row_num = 0; + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + PADDLE_ENFORCE_EQ(input_width, input->value().dims()[1], + "all input should have same " + "dimension except for the first one"); + PADDLE_ENFORCE_EQ(input_height, input->height(), + "all input should have same height"); + row_num += input->rows().size(); + merged_row_set.insert(input->rows().begin(), input->rows().end()); + } + + out.set_height(input_height); + out.mutable_value()->mutable_data( + framework::make_ddim( + {static_cast(merged_row_set.size()), input_width}), + context.GetPlace()); + auto* out_data = out.mutable_value()->data(); + + std::vector merge_rows(merged_row_set.begin(), + merged_row_set.end()); + std::sort(merge_rows.begin(), merge_rows.end()); + + out.set_rows(merge_rows); + + math::SetConstant constant_functor; + constant_functor(context, out.mutable_value(), 0.0); + + std::unordered_map rows_to_id; + for (size_t i = 0; i < merge_rows.size(); ++i) { + rows_to_id[merge_rows[i]] = i; + } + + auto blas = math::GetBlas(context); + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + auto* input_data = input->value().data(); + auto& input_rows = input->rows(); + + for (size_t i = 0; i < input_rows.size(); i++) { + size_t out_i = rows_to_id[input_rows[i]]; + elementwise_add_to( + context, &blas, static_cast(input_width), + &input_data[i * input_width], &out_data[out_i * input_width]); + } + } + size_t input_width_cast = static_cast(input_width); + T count = static_cast(inputs.size()); + for (size_t i = 0; i < merge_rows.size(); i++) { + for (size_t j = 0; j < input_width_cast; j++) { + out_data[i * input_width + j] = out_data[i * input_width + j] / count; + } + } + } +}; + template struct MergeAdd; template struct MergeAdd; template struct MergeAdd; template struct MergeAdd; +template struct MergeAverage; +template struct MergeAverage; +template struct MergeAverage; +template struct MergeAverage; + template struct UpdateToTensor { void operator()(const platform::CPUDeviceContext& context, diff --git a/paddle/fluid/operators/math/selected_rows_functor.h b/paddle/fluid/operators/math/selected_rows_functor.h index db0ee9bc1695f7b1a55b4d111dc470b462210963..a1eb69db7cfce0ec11aa09180fbc73c4bd0a23f6 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.h +++ b/paddle/fluid/operators/math/selected_rows_functor.h @@ -93,6 +93,18 @@ struct MergeAdd { const bool sorted_result = false); }; +template +struct MergeAverage { + framework::SelectedRows operator()(const DeviceContext& context, + const framework::SelectedRows& input); + void operator()(const DeviceContext& context, + const framework::SelectedRows& input, + framework::SelectedRows* output); + void operator()(const DeviceContext& context, + const std::vector& inputs, + framework::SelectedRows* output); +}; + enum class ScatterOps { ASSIGN, ADD, SUB, SUBBY, MUL, DIV, DIVBY }; // out = selected_rows_in / tensor diff --git a/paddle/fluid/operators/math/selected_rows_functor_test.cc b/paddle/fluid/operators/math/selected_rows_functor_test.cc index 5581b9e040272e224669d612409f88d61f794443..b7a499aa968035a46c4632d7a575594e1cb4ebcd 100644 --- a/paddle/fluid/operators/math/selected_rows_functor_test.cc +++ b/paddle/fluid/operators/math/selected_rows_functor_test.cc @@ -223,6 +223,46 @@ TEST(selected_rows_functor, cpu_add_to) { EXPECT_EQ(tensor1_data[9 * row_numel + 6], 5.0); } +TEST(selected_rows_functor, cpu_merge_average_float) { + paddle::platform::CPUPlace cpu_place; + paddle::platform::CPUDeviceContext ctx(cpu_place); + paddle::operators::math::SetConstant + functor; + int64_t height = 10; + int64_t row_numel = 10; + + std::vector rows{0, 4, 4, 7}; + std::unique_ptr selected_rows{ + new paddle::framework::SelectedRows(rows, height)}; + auto* in_value = selected_rows->mutable_value(); + in_value->mutable_data( + paddle::framework::make_ddim( + {static_cast(rows.size()), row_numel}), + cpu_place); + functor(ctx, in_value, 1.0); + + paddle::operators::math::scatter::MergeAverage< + paddle::platform::CPUDeviceContext, float> + merge_average_functor; + paddle::framework::SelectedRows output = + merge_average_functor(ctx, *selected_rows); + + auto out_height = output.height(); + EXPECT_EQ(out_height, height); + + auto& out_rows = output.rows(); + EXPECT_EQ(out_rows[0], 0); + EXPECT_EQ(out_rows[1], 4); + EXPECT_EQ(out_rows[2], 7); + + auto* out_data = output.value().data(); + + EXPECT_EQ(out_data[0 * row_numel], 1.0); + EXPECT_EQ(out_data[1 * row_numel], 2.0); + EXPECT_EQ(out_data[2 * row_numel], 1.0); +} + TEST(selected_rows_functor, cpu_merge_add_float) { paddle::platform::CPUPlace cpu_place; paddle::platform::CPUDeviceContext ctx(cpu_place); diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py index 2ed37a9be39f1de9b57abcc9e2d2c34cc956950f..962a5653f6135209de4e82d73b39cd3e8f8c9499 100644 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ b/python/paddle/fluid/contrib/utils/hdfs_utils.py @@ -62,7 +62,7 @@ class HDFSClient(object): dfs = 'fs' self.pre_commands.append(dfs) - for k, v in configs.iteritems(): + for k, v in configs.items(): config_command = '-D%s=%s' % (k, v) self.pre_commands.append(config_command) diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py new file mode 100644 index 0000000000000000000000000000000000000000..bce8da641c20c343826f725df9ac0f564accbab0 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py @@ -0,0 +1,55 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.contrib.utils import HDFSClient +import os + + +def check_all_trainers_ready(ready_path, epoch): + trainer_num = fleet.worker_num() + trainer_id = fleet.worker_index() + + hadoop_home = os.getenv("HADOOP_HOME") + configs = { + "fs.default.name": os.getenv("FS_NAME"), + "hadoop.job.ugi": os.getenv("FS_UGI") + } + + node_ready = "ready.{}.{}.done".format(epoch, trainer_id) + + with open(node_ready, "w") as node: + node.write("") + + client = HDFSClient(hadoop_home, configs) + if not client.is_dir(ready_path): + client.makedirs(ready_path) + client.upload( + hdfs_path=ready_path, + local_path=node_ready, + overwrite=True, + retry_times=0) + + print("PUT {} ON HDFS {} OK".format(node_ready, ready_path)) + + while True: + ready_num = len(client.ls(ready_path)) + print("have {} trainers need to be ready".format(trainer_num - ready_num + % trainer_num)) + if ready_num % trainer_num == 0: + break + time.sleep(10) + ready_num = len(client.ls(ready_path)) + + print("All trainers are ready, continue training") diff --git a/python/paddle/fluid/tests/unittests/test_fleet_utils.py b/python/paddle/fluid/tests/unittests/test_fleet_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..a26b27ff4ffee976f99503d38d71c8056337d8b7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_utils.py @@ -0,0 +1,35 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.fluid as fluid +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.incubate.fleet.utils.fleet_barrier_util import check_all_trainers_ready + + +class TestFleetUtils(unittest.TestCase): + def test_fleet_barrier(self): + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.WORKER, + worker_num=1, + server_endpoints=['127.0.0.1']) + fleet.init(role) + check_all_trainers_ready("/ready_path/", 0) + + +if __name__ == '__main__': + unittest.main()