diff --git a/paddle/fluid/framework/details/data_balance_op_handle.cc b/paddle/fluid/framework/details/data_balance_op_handle.cc index 023e0cdf9175d9f1c33505d8c0b461efdc407061..f8d431ef2a35823ed853b543fcd1d9b6064a4058 100644 --- a/paddle/fluid/framework/details/data_balance_op_handle.cc +++ b/paddle/fluid/framework/details/data_balance_op_handle.cc @@ -107,7 +107,6 @@ void DataBalanceOpHandle::RunImpl() { auto *tensor_var = local_scope->FindVar(in_var_handles[i]->name_); PADDLE_ENFORCE(tensor_var->IsType()); auto *tensor = tensor_var->GetMutable(); - PADDLE_ENFORCE(places_[place_idx] == tensor->place()); lod_tensors[data_idx].push_back(tensor); int ins_size = tensor->lod().empty() ? tensor->dims()[0] : tensor->NumElements(); diff --git a/paddle/fluid/framework/details/fetch_op_handle.cc b/paddle/fluid/framework/details/fetch_op_handle.cc index 224e8e1f6efd7a894591ac51c929517cae7539ce..d646c944601e81477787740189d7ac60ae97fa80 100644 --- a/paddle/fluid/framework/details/fetch_op_handle.cc +++ b/paddle/fluid/framework/details/fetch_op_handle.cc @@ -67,8 +67,8 @@ void FetchOpHandle::RunImpl() { #endif } else { tensors_[i].ShareDataWith(t); - tensors_[i].set_lod(t.lod()); } + tensors_[i].set_lod(t.lod()); } this->WaitAndMergeCPUTensors(); diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 8a9f0b1054575b41c645d81ab70f5f2a37fd8845..edfefb8231f969d3f6aa1b3cb13a341d9a25aaf4 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -216,11 +216,13 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( } else { // This op runs on all devices, and its output may have parameter's // gradients. - CreateComputationalOps(&result, *op, places_.size()); - if (op->Type() == "read") { + op->SetAttr("throw_eof_exp", false); + CreateComputationalOps(&result, *op, places_.size()); const auto &data_var_names = op->Output("Out"); InsertDataBalanceOp(&result, data_var_names); + } else { + CreateComputationalOps(&result, *op, places_.size()); } if (!is_forwarding && places_.size() > 1) { diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index 49672e11818266b6f0328206a9985ca6c9bd4287..dcbd2f22fc98fe352ce54d4aa340fe6b5a9ac24a 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -393,6 +393,7 @@ void LoDTensor::MergeLoDTensor( new_dim[0] += t->dims()[0]; auto &lod = t->lod(); + PADDLE_ENFORCE_EQ(new_lod.size(), lod.size()); for (size_t j = 0; j < lod.size(); ++j) { auto &sub_lod = new_lod[j]; auto &offset = sub_lod.back(); diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 8e9f91c185dda4729230bc9a89eb6441cbc6205b..60e4eb757668e1482090f02aea529aaad3a674d8 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -67,10 +67,14 @@ class ReadOp : public framework::OperatorBase { std::vector ins; reader->ReadNext(&ins); if (ins.empty()) { - ins.resize(out_arg_names.size()); - for (auto& tensor : ins) { - // data type is not important for subsequent DataBalanceOpHandle - tensor.mutable_data(framework::make_ddim({0}), dev_place); + if (Attr("throw_eof_exp")) { + PADDLE_THROW("There is no next data."); + } else { + ins.resize(out_arg_names.size()); + for (auto& tensor : ins) { + // data type is not important for subsequent DataBalanceOpHandle + tensor.mutable_data(framework::make_ddim({0}), dev_place); + } } } PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size()); @@ -88,6 +92,10 @@ class ReadOpMaker : public framework::OpProtoAndCheckerMaker { void Make() override { AddInput("Reader", "(ReaderHolder) The executed reader."); AddOutput("Out", "(LoDTensor) The output data.").AsDuplicable(); + AddAttr("throw_eof_exp", + "If set true, an exception will be thrown when the Reader " + "yields empty (which means there is no next data).") + .SetDefault(true); AddComment(R"DOC( Read Operator diff --git a/python/paddle/fluid/tests/unittests/test_data_balance.py b/python/paddle/fluid/tests/unittests/test_data_balance.py new file mode 100644 index 0000000000000000000000000000000000000000..44c1adad9e3138f3ed327494e16875e0104b0f3f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_data_balance.py @@ -0,0 +1,188 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.v2 as paddle +import paddle.v2.dataset.mnist as mnist +import numpy as np + + +class TestDataBalance(unittest.TestCase): + def prepare_data(self): + def fake_data_generator(): + for n in xrange(self.total_ins_num): + yield np.ones((3, 4)) * n, n + + # Prepare data + with fluid.program_guard(fluid.Program(), fluid.Program()): + reader = paddle.batch( + fake_data_generator, batch_size=self.batch_size) + feeder = fluid.DataFeeder( + feed_list=[ + fluid.layers.data( + name='image', shape=[3, 4], dtype='float32'), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( + self.data_file_name, reader, feeder) + + def prepare_lod_data(self): + def fake_data_generator(): + for n in xrange(1, self.total_ins_num + 1): + d1 = (np.ones((n, 3)) * n).astype('float32') + d2 = (np.array(n).reshape((1, 1))).astype('int32') + yield d1, d2 + + # Prepare lod data + with fluid.program_guard(fluid.Program(), fluid.Program()): + with fluid.recordio_writer.create_recordio_writer( + filename=self.lod_data_file_name) as writer: + eof = False + generator = fake_data_generator() + while (not eof): + data_batch = [ + np.array([]).reshape((0, 3)), np.array([]).reshape( + (0, 1)) + ] + lod = [0] + for _ in xrange(self.batch_size): + try: + ins = generator.next() + except StopIteration: + eof = True + break + for i, d in enumerate(ins): + data_batch[i] = np.concatenate( + (data_batch[i], d), axis=0) + lod.append(lod[-1] + ins[0].shape[0]) + if data_batch[0].shape[0] > 0: + for i, d in enumerate(data_batch): + t = fluid.LoDTensor() + t.set(data_batch[i], fluid.CPUPlace()) + if i == 0: + t.set_lod([lod]) + writer.append_tensor(t) + writer.complete_append_tensor() + + def setUp(self): + self.use_cuda = fluid.core.is_compiled_with_cuda() + self.data_file_name = './data_balance_test.recordio' + self.lod_data_file_name = './data_balance_with_lod_test.recordio' + self.total_ins_num = 50 + self.batch_size = 10 + self.prepare_data() + self.prepare_lod_data() + + def main(self): + main_prog = fluid.Program() + startup_prog = fluid.Program() + with fluid.program_guard(main_prog, startup_prog): + data_reader = fluid.layers.io.open_files( + filenames=[self.data_file_name], + shapes=[[-1, 3, 4], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + if self.use_cuda: + data_reader = fluid.layers.double_buffer(data_reader) + image, label = fluid.layers.read_file(data_reader) + + place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + + parallel_exe = fluid.ParallelExecutor( + use_cuda=self.use_cuda, main_program=main_prog) + + if (parallel_exe.device_count > self.batch_size): + print("WARNING: Unittest TestDataBalance skipped. \ + For the result is not correct when device count \ + is larger than batch size.") + exit(0) + fetch_list = [image.name, label.name] + + data_appeared = [False] * self.total_ins_num + while (True): + try: + image_val, label_val = parallel_exe.run(fetch_list, + return_numpy=True) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break + ins_num = image_val.shape[0] + broadcasted_label = np.ones( + (ins_num, 3, 4)) * label_val.reshape((ins_num, 1, 1)) + self.assertEqual(image_val.all(), broadcasted_label.all()) + for l in label_val: + self.assertFalse(data_appeared[l[0]]) + data_appeared[l[0]] = True + for i in data_appeared: + self.assertTrue(i) + + def main_lod(self): + main_prog = fluid.Program() + startup_prog = fluid.Program() + with fluid.program_guard(main_prog, startup_prog): + data_reader = fluid.layers.io.open_files( + filenames=[self.lod_data_file_name], + shapes=[[-1, 3], [-1, 1]], + lod_levels=[1, 0], + dtypes=['float32', 'int32'], + thread_num=1) + ins, label = fluid.layers.read_file(data_reader) + + place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + + parallel_exe = fluid.ParallelExecutor( + use_cuda=self.use_cuda, main_program=main_prog) + + if (parallel_exe.device_count > self.batch_size): + print("WARNING: Unittest TestDataBalance skipped. \ + For the result is not correct when device count \ + is larger than batch size.") + exit(0) + fetch_list = [ins.name, label.name] + + data_appeared = [False] * self.total_ins_num + while (True): + try: + ins_tensor, label_tensor = parallel_exe.run( + fetch_list, return_numpy=False) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break + + ins_val = np.array(ins_tensor) + label_val = np.array(label_tensor) + ins_lod = ins_tensor.lod()[0] + self.assertEqual(ins_val.shape[1], 3) + self.assertEqual(label_val.shape[1], 1) + self.assertEqual(len(ins_lod) - 1, label_val.shape[0]) + for i in range(0, len(ins_lod) - 1): + ins_elem = ins_val[ins_lod[i]:ins_lod[i + 1]][:] + label_elem = label_val[i][0] + self.assertEqual(ins_elem.all(), label_elem.all()) + self.assertFalse(data_appeared[int(label_elem - 1)]) + data_appeared[int(label_elem - 1)] = True + + for i in data_appeared: + self.assertTrue(i) + + def test_all(self): + self.main() + self.main_lod()