From e95cafd9a767f539190bf3c3c139ec845811cffb Mon Sep 17 00:00:00 2001 From: xjqbest <173596896@qq.com> Date: Mon, 25 Mar 2019 17:33:04 +0800 Subject: [PATCH] fix code style & add dataset testcase test=develop --- paddle/fluid/framework/data_set.cc | 2 + paddle/fluid/framework/data_set.h | 8 + .../fluid/framework/executor_thread_worker.cc | 1 + paddle/fluid/framework/io/fs.cc | 1 + paddle/fluid/framework/io/fs.h | 1 + paddle/fluid/pybind/data_set_py.cc | 5 + paddle/fluid/string/string_helper.h | 1 + python/paddle/fluid/async_executor.py | 28 +++- .../fluid/tests/unittests/test_dataset.py | 144 ++++++++++++++++++ 9 files changed, 186 insertions(+), 5 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_dataset.py diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 62001c24df..c7b9ee717a 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -62,6 +62,8 @@ void DatasetImpl::SetTrainerNum(int trainer_num) { template void DatasetImpl::SetHdfsConfig(const std::string& fs_name, const std::string& fs_ugi) { + fs_name_ = fs_name; + fs_ugi_ = fs_ugi; std::string cmd = std::string("hadoop fs"); cmd += " -D fs.default.name=" + fs_name; cmd += " -D hadoop.job.ugi=" + fs_ugi; diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 4bbcc6d06a..1f08f8eaa8 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -20,6 +20,7 @@ #include #include // NOLINT #include +#include #include "paddle/fluid/framework/data_feed.h" @@ -58,6 +59,8 @@ class Dataset { virtual int GetThreadNum() = 0; // get worker num virtual int GetTrainerNum() = 0; + // get hdfs config + virtual std::pair GetHdfsConfig() = 0; // get data fedd desc virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() = 0; // get readers, the reader num depend both on thread num @@ -102,6 +105,9 @@ class DatasetImpl : public Dataset { virtual const std::vector& GetFileList() { return filelist_; } virtual int GetThreadNum() { return thread_num_; } virtual int GetTrainerNum() { return trainer_num_; } + virtual std::pair GetHdfsConfig() { + return std::make_pair(fs_name_, fs_ugi_); + } virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() { return data_feed_desc_; } @@ -128,6 +134,8 @@ class DatasetImpl : public Dataset { std::vector filelist_; size_t file_idx_; std::mutex mutex_for_pick_file_; + std::string fs_name_; + std::string fs_ugi_; }; // use std::vector as data type diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index f09b283000..005d98c6e8 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -14,6 +14,7 @@ limitations under the License. */ #include "paddle/fluid/framework/executor_thread_worker.h" #include +#include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" diff --git a/paddle/fluid/framework/io/fs.cc b/paddle/fluid/framework/io/fs.cc index a4f2d2a89a..d5bc5df256 100644 --- a/paddle/fluid/framework/io/fs.cc +++ b/paddle/fluid/framework/io/fs.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/io/fs.h" +#include namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/io/fs.h b/paddle/fluid/framework/io/fs.h index f08953552c..8a0734bf54 100644 --- a/paddle/fluid/framework/io/fs.h +++ b/paddle/fluid/framework/io/fs.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "glog/logging.h" #include "paddle/fluid/framework/io/shell.h" #include "paddle/fluid/string/string_helper.h" diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 30d1d185cf..bc6a39ea9e 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -52,6 +52,11 @@ void BindDataset(py::module* m) { .def("set_trainer_num", &framework::Dataset::SetTrainerNum) .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig) .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc) + .def("get_filelist", &framework::Dataset::GetFileList) + .def("get_thread_num", &framework::Dataset::GetThreadNum) + .def("get_trainer_num", &framework::Dataset::GetTrainerNum) + .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig) + .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc) .def("register_client2client_msg_handler", &framework::Dataset::RegisterClientToClientMsgHandler) .def("load_into_memory", &framework::Dataset::LoadIntoMemory) diff --git a/paddle/fluid/string/string_helper.h b/paddle/fluid/string/string_helper.h index 0cdbf7d0e4..c3b99a9797 100644 --- a/paddle/fluid/string/string_helper.h +++ b/paddle/fluid/string/string_helper.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "boost/lexical_cast.hpp" #include "glog/logging.h" diff --git a/python/paddle/fluid/async_executor.py b/python/paddle/fluid/async_executor.py index 50c21933c3..9e75d2d16e 100644 --- a/python/paddle/fluid/async_executor.py +++ b/python/paddle/fluid/async_executor.py @@ -78,6 +78,12 @@ class AsyncExecutor(object): """ def __init__(self, place=None, run_mode=""): + """ + Init. + Args: + place(Place): CPUPlace or GPUPlace. + run_mode(str): default is empty string. + """ if place is None: place = core.CPUPlace() if not isinstance(place, core.CPUPlace): @@ -91,6 +97,18 @@ class AsyncExecutor(object): self.instance = None def run(self, program, data_feed, filelist, thread_num, fetch, debug=False): + """ + Run program by this AsyncExecutor. + Args: + program(Program): the program that need to run, if not provied, + then default_main_program will be used. + data_feed(DataFeedDesc): A DataFeedDesc object + filelist(str|list): a file or a list of files + thread_num(int): number of concurrent training threads. + fetch(str|list): the var name or a list of var names to inspect + debug(bool): When set to True, fetch vars will be printed to + standard output after each minibatch + """ if program is None: program = default_main_program() program_desc = program.desc @@ -211,12 +229,12 @@ class AsyncExecutor(object): """ download_data is a default download method for distributed training a user download data without this method - + Example: >>> exe = fluid.AsyncExecutor() >>> exe.download_data("/xxx/xxx/xx/", - >>> "./data", "afs:// - >>> xxx.xxx.xxx.xxx:9901", "xxx,yyy") + >>> "./data", "afs:// + >>> xxx.xxx.xxx.xxx:9901", "xxx,yyy") Args: afs_path(str): afs_path defined by users local_path(str): download data path @@ -256,7 +274,7 @@ class AsyncExecutor(object): def config_distributed_nodes(self): """ if a user needs to run distributed async executor - he or she needs to do a global configuration so that + he or she needs to do a global configuration so that information of current process can be obtained """ self.instance = ps_instance.PaddlePSInstance(1, 2) @@ -282,7 +300,7 @@ class AsyncExecutor(object): """ initialize server of current node if current process is a server Args: - dist_desc(str): a protobuf string that describes + dist_desc(str): a protobuf string that describes how to init a worker and a server """ if self.instance is None: diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py new file mode 100644 index 0000000000..491a09274b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -0,0 +1,144 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.fluid as fluid +import numpy as np +import os +import shutil +import unittest + + +class TestDataset(unittest.TestCase): + def test_dataset_create(self): + try: + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + except: + self.assertTrue(False) + + try: + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + except: + self.assertTrue(False) + + try: + dataset = fluid.DatasetFactory().create_dataset("MyOwnDataset") + self.assertTrue(False) + except: + self.assertTrue(True) + + def test_dataset_config(self): + dataset = fluid.core.Dataset("MultiSlotDataset") + dataset.set_thread_num(12) + dataset.set_filelist(["a.txt", "b.txt", "c.txt"]) + dataset.set_trainer_num(4) + dataset.set_hdfs_config("my_fs_name", "my_fs_ugi") + + thread_num = dataset.get_thread_num() + self.assertEqual(thread_num, 12) + + filelist = dataset.get_filelist() + self.assertEqual(len(filelist), 3) + self.assertEqual(filelist[0], "a.txt") + self.assertEqual(filelist[1], "b.txt") + self.assertEqual(filelist[2], "c.txt") + + trainer_num = dataset.get_trainer_num() + self.assertEqual(trainer_num, 4) + + name, ugi = dataset.get_hdfs_config() + self.assertEqual(name, "my_fs_name") + self.assertEqual(ugi, "my_fs_ugi") + + def test_in_memory_dataset_run(self): + with open("test_dataset_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_dataset_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + slots = ["slot1","slot2","slot3","slot4"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data(name=slot, shape=[1], + dtype="int64", lod_level=1) + slots_vars.append(var) + + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist(["test_dataset_a.txt", "test_dataset_b.txt"]) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + dataset.load_into_memory() + dataset.local_shuffle() + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + for i in range(2): + try: + exe.train_from_dataset(fluid.default_main_program(), dataset) + except: + self.assertTrue(False) + + os.remove("./test_dataset_a.txt") + os.remove("./test_dataset_b.txt") + + def test_queue_dataset_run(self): + with open("test_dataset_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_dataset_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + slots = ["slot1","slot2","slot3","slot4"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data(name=slot, shape=[1], + dtype="int64", lod_level=1) + slots_vars.append(var) + + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist(["test_dataset_a.txt", "test_dataset_b.txt"]) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + for i in range(2): + try: + exe.train_from_dataset(fluid.default_main_program(), dataset) + except: + self.assertTrue(False) + + os.remove("./test_dataset_a.txt") + os.remove("./test_dataset_b.txt") + + +if __name__ == '__main__': + unittest.main() -- GitLab