未验证 提交 2f037c31 编写于 作者: 1 123malin 提交者: GitHub

fix the diff between async mode and async_half mode (#19535)

* test=develop,  communicator merge add => merge average
上级 e9233d1c
......@@ -134,6 +134,8 @@ inline void MergeVars(const std::string& var_name,
auto in = EigenVector<float>::Flatten(in_t);
result.device(*cpu_ctx.eigen_device()) = result + in;
}
result.device(*cpu_ctx.eigen_device()) =
result / static_cast<float>(vars.size());
} else if (var0->IsType<framework::SelectedRows>()) {
auto& slr0 = var0->Get<framework::SelectedRows>();
auto* out_slr = out_var->GetMutable<framework::SelectedRows>();
......@@ -144,10 +146,10 @@ inline void MergeVars(const std::string& var_name,
for (auto& var : vars) {
inputs.push_back(&var->Get<framework::SelectedRows>());
}
math::scatter::MergeAdd<paddle::platform::CPUDeviceContext, float>
merge_add;
auto dev_ctx = paddle::platform::CPUDeviceContext();
merge_add(dev_ctx, inputs, out_slr, false);
math::scatter::MergeAverage<paddle::platform::CPUDeviceContext, float>
merge_average;
merge_average(dev_ctx, inputs, out_slr);
VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height()
<< " dims: " << slr0.value().dims();
} else {
......
......@@ -42,6 +42,7 @@ TEST(communicator, merge_lod_tensors) {
}
out_value += static_cast<float>(i);
}
out_value = out_value / 10.0;
const std::string out_name = "Out";
std::unique_ptr<framework::Scope> scope;
scope.reset(new framework::Scope());
......@@ -95,7 +96,7 @@ TEST(communicator, merge_selected_rows) {
std::vector<float> out_values;
out_values.reserve(10);
for (auto i = 0; i < 10; ++i) {
out_values.push_back(static_cast<float>(i * (10 - i)));
out_values.push_back(static_cast<float>((i * (10 - i)) / 10.0));
}
for (auto i = 0; i < out_slr.rows().size(); ++i) {
ASSERT_EQ(out_slr.rows()[i], i);
......
......@@ -376,11 +376,115 @@ struct MergeAdd<platform::CPUDeviceContext, T> {
}
};
template <typename T>
struct MergeAverage<platform::CPUDeviceContext, T> {
framework::SelectedRows operator()(const platform::CPUDeviceContext& context,
const framework::SelectedRows& input) {
framework::SelectedRows out;
(*this)(context, input, &out);
return out;
}
void operator()(const platform::CPUDeviceContext& context,
const framework::SelectedRows& input,
framework::SelectedRows* output) {
std::vector<const framework::SelectedRows*> inputs;
inputs.push_back(&input);
(*this)(context, inputs, output);
}
void operator()(const platform::CPUDeviceContext& context,
const std::vector<const framework::SelectedRows*>& inputs,
framework::SelectedRows* output) {
if (inputs.size() == 0) {
VLOG(3) << "no input! return";
return;
}
const framework::SelectedRows* has_value_input = nullptr;
for (auto* in : inputs) {
if (in->rows().size() > 0) {
has_value_input = in;
break;
}
}
if (has_value_input == nullptr) {
VLOG(3) << "no input has value! just return" << std::endl;
return;
}
auto input_width = has_value_input->value().dims()[1];
auto input_height = has_value_input->height();
framework::SelectedRows& out = *output;
std::set<int64_t> merged_row_set;
size_t row_num = 0;
for (auto* input : inputs) {
if (input->rows().size() == 0) {
continue;
}
PADDLE_ENFORCE_EQ(input_width, input->value().dims()[1],
"all input should have same "
"dimension except for the first one");
PADDLE_ENFORCE_EQ(input_height, input->height(),
"all input should have same height");
row_num += input->rows().size();
merged_row_set.insert(input->rows().begin(), input->rows().end());
}
out.set_height(input_height);
out.mutable_value()->mutable_data<T>(
framework::make_ddim(
{static_cast<int64_t>(merged_row_set.size()), input_width}),
context.GetPlace());
auto* out_data = out.mutable_value()->data<T>();
std::vector<int64_t> merge_rows(merged_row_set.begin(),
merged_row_set.end());
std::sort(merge_rows.begin(), merge_rows.end());
out.set_rows(merge_rows);
math::SetConstant<platform::CPUDeviceContext, T> constant_functor;
constant_functor(context, out.mutable_value(), 0.0);
std::unordered_map<int64_t, size_t> rows_to_id;
for (size_t i = 0; i < merge_rows.size(); ++i) {
rows_to_id[merge_rows[i]] = i;
}
auto blas = math::GetBlas<platform::CPUDeviceContext, T>(context);
for (auto* input : inputs) {
if (input->rows().size() == 0) {
continue;
}
auto* input_data = input->value().data<T>();
auto& input_rows = input->rows();
for (size_t i = 0; i < input_rows.size(); i++) {
size_t out_i = rows_to_id[input_rows[i]];
elementwise_add_to<platform::CPUDeviceContext, T>(
context, &blas, static_cast<size_t>(input_width),
&input_data[i * input_width], &out_data[out_i * input_width]);
}
}
size_t input_width_cast = static_cast<size_t>(input_width);
T count = static_cast<T>(inputs.size());
for (size_t i = 0; i < merge_rows.size(); i++) {
for (size_t j = 0; j < input_width_cast; j++) {
out_data[i * input_width + j] = out_data[i * input_width + j] / count;
}
}
}
};
template struct MergeAdd<platform::CPUDeviceContext, int>;
template struct MergeAdd<platform::CPUDeviceContext, int64_t>;
template struct MergeAdd<platform::CPUDeviceContext, float>;
template struct MergeAdd<platform::CPUDeviceContext, double>;
template struct MergeAverage<platform::CPUDeviceContext, int>;
template struct MergeAverage<platform::CPUDeviceContext, int64_t>;
template struct MergeAverage<platform::CPUDeviceContext, float>;
template struct MergeAverage<platform::CPUDeviceContext, double>;
template <typename T>
struct UpdateToTensor<platform::CPUDeviceContext, T> {
void operator()(const platform::CPUDeviceContext& context,
......
......@@ -93,6 +93,18 @@ struct MergeAdd {
const bool sorted_result = false);
};
template <typename DeviceContext, typename T>
struct MergeAverage {
framework::SelectedRows operator()(const DeviceContext& context,
const framework::SelectedRows& input);
void operator()(const DeviceContext& context,
const framework::SelectedRows& input,
framework::SelectedRows* output);
void operator()(const DeviceContext& context,
const std::vector<const framework::SelectedRows*>& inputs,
framework::SelectedRows* output);
};
enum class ScatterOps { ASSIGN, ADD, SUB, SUBBY, MUL, DIV, DIVBY };
// out = selected_rows_in / tensor
......
......@@ -223,6 +223,46 @@ TEST(selected_rows_functor, cpu_add_to) {
EXPECT_EQ(tensor1_data[9 * row_numel + 6], 5.0);
}
TEST(selected_rows_functor, cpu_merge_average_float) {
paddle::platform::CPUPlace cpu_place;
paddle::platform::CPUDeviceContext ctx(cpu_place);
paddle::operators::math::SetConstant<paddle::platform::CPUDeviceContext,
float>
functor;
int64_t height = 10;
int64_t row_numel = 10;
std::vector<int64_t> rows{0, 4, 4, 7};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows{
new paddle::framework::SelectedRows(rows, height)};
auto* in_value = selected_rows->mutable_value();
in_value->mutable_data<float>(
paddle::framework::make_ddim(
{static_cast<int64_t>(rows.size()), row_numel}),
cpu_place);
functor(ctx, in_value, 1.0);
paddle::operators::math::scatter::MergeAverage<
paddle::platform::CPUDeviceContext, float>
merge_average_functor;
paddle::framework::SelectedRows output =
merge_average_functor(ctx, *selected_rows);
auto out_height = output.height();
EXPECT_EQ(out_height, height);
auto& out_rows = output.rows();
EXPECT_EQ(out_rows[0], 0);
EXPECT_EQ(out_rows[1], 4);
EXPECT_EQ(out_rows[2], 7);
auto* out_data = output.value().data<float>();
EXPECT_EQ(out_data[0 * row_numel], 1.0);
EXPECT_EQ(out_data[1 * row_numel], 2.0);
EXPECT_EQ(out_data[2 * row_numel], 1.0);
}
TEST(selected_rows_functor, cpu_merge_add_float) {
paddle::platform::CPUPlace cpu_place;
paddle::platform::CPUDeviceContext ctx(cpu_place);
......
......@@ -62,7 +62,7 @@ class HDFSClient(object):
dfs = 'fs'
self.pre_commands.append(dfs)
for k, v in configs.iteritems():
for k, v in configs.items():
config_command = '-D%s=%s' % (k, v)
self.pre_commands.append(config_command)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.contrib.utils import HDFSClient
import os
def check_all_trainers_ready(ready_path, epoch):
trainer_num = fleet.worker_num()
trainer_id = fleet.worker_index()
hadoop_home = os.getenv("HADOOP_HOME")
configs = {
"fs.default.name": os.getenv("FS_NAME"),
"hadoop.job.ugi": os.getenv("FS_UGI")
}
node_ready = "ready.{}.{}.done".format(epoch, trainer_id)
with open(node_ready, "w") as node:
node.write("")
client = HDFSClient(hadoop_home, configs)
if not client.is_dir(ready_path):
client.makedirs(ready_path)
client.upload(
hdfs_path=ready_path,
local_path=node_ready,
overwrite=True,
retry_times=0)
print("PUT {} ON HDFS {} OK".format(node_ready, ready_path))
while True:
ready_num = len(client.ls(ready_path))
print("have {} trainers need to be ready".format(trainer_num - ready_num
% trainer_num))
if ready_num % trainer_num == 0:
break
time.sleep(10)
ready_num = len(client.ls(ready_path))
print("All trainers are ready, continue training")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.fluid as fluid
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.utils.fleet_barrier_util import check_all_trainers_ready
class TestFleetUtils(unittest.TestCase):
def test_fleet_barrier(self):
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.WORKER,
worker_num=1,
server_endpoints=['127.0.0.1'])
fleet.init(role)
check_all_trainers_ready("/ready_path/", 0)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册