From c9a3d3b6d6c3707a35562783ead371b443973b8b Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Sat, 6 Apr 2019 11:07:29 +0800 Subject: [PATCH] Merge pull request #16652 from xjqbest/dataset_merge_develop fix dataset bug --- paddle/fluid/framework/data_feed.cc | 22 ++++++++++++-- paddle/fluid/framework/data_feed.h | 3 ++ paddle/fluid/framework/data_set.cc | 11 +++++++ paddle/fluid/framework/data_set.h | 7 +++++ paddle/fluid/framework/fleet/fleet_wrapper.cc | 1 + paddle/fluid/framework/io/shell.cc | 2 +- paddle/fluid/pybind/data_set_py.cc | 4 +++ python/paddle/fluid/dataset.py | 2 ++ python/paddle/fluid/executor.py | 4 +-- .../fleet/parameter_server/__init__.py | 29 ++++++++++++++----- .../fluid/tests/unittests/test_dataset.py | 18 +++++------- python/paddle/fluid/trainer_factory.py | 5 ++-- 12 files changed, 81 insertions(+), 27 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index e4e9861e3..b5f7e6c22 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -242,6 +242,11 @@ void InMemoryDataFeed::SetTrainerNum(int trainer_num) { trainer_num_ = trainer_num; } +template +void InMemoryDataFeed::SetFleetSendBatchSize(int64_t size) { + fleet_send_batch_size_ = size; +} + template void InMemoryDataFeed::PutInsToChannel(const std::string& ins_str) { #ifdef _LINUX @@ -361,8 +366,13 @@ void InMemoryDataFeed::GlobalShuffle() { VLOG(3) << "GlobalShuffle() begin, thread_id=" << thread_id_; auto fleet_ptr = FleetWrapper::GetInstance(); std::vector> send_vec(trainer_num_); + std::vector send_index(trainer_num_); + uint64_t reserve_len = fleet_send_batch_size_ / trainer_num_; for (auto& vec : send_vec) { - vec.reserve(fleet_send_batch_size_); + vec.reserve(reserve_len); + } + for (int i = 0; i < trainer_num_; ++i) { + send_index[i] = i; } std::vector> total_status; auto interval = GetMemoryDataInterval(); @@ -375,7 +385,10 @@ void InMemoryDataFeed::GlobalShuffle() { int64_t node_id = random_num % trainer_num_; send_vec[node_id].push_back(&((*memory_data_)[i])); if (i % fleet_send_batch_size_ == 0 && i != 0) { - for (int j = 0; j < send_vec.size(); ++j) { + // shuffle the sequence of sending to avoid network timeout error + std::random_shuffle(send_index.begin(), send_index.end()); + for (int index = 0; index < send_index.size(); ++index) { + int j = send_index[index]; std::string send_str; SerializeIns(send_vec[j], &send_str); VLOG(3) << "send str_length=" << send_str.length() @@ -388,7 +401,10 @@ void InMemoryDataFeed::GlobalShuffle() { } } } - for (int j = 0; j < send_vec.size(); ++j) { + // shuffle the sequence of sending to avoid network timeout error + std::random_shuffle(send_index.begin(), send_index.end()); + for (int index = 0; index < send_index.size(); ++index) { + int j = send_index[index]; if (send_vec[j].size() != 0) { std::string send_str; SerializeIns(send_vec[j], &send_str); diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 8ea09b65d..648c874a0 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -94,6 +94,8 @@ class DataFeed { virtual void SetThreadNum(int thread_num) {} // This function will do nothing at default virtual void SetTrainerNum(int trainer_num) {} + // This function will do nothing at default + virtual void SetFleetSendBatchSize(int64_t size) {} virtual void SetFileListMutex(std::mutex* mutex) { mutex_for_pick_file_ = mutex; } @@ -212,6 +214,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed { virtual void SetThreadId(int thread_id); virtual void SetThreadNum(int thread_num); virtual void SetTrainerNum(int trainer_num); + virtual void SetFleetSendBatchSize(int64_t size); virtual void PutInsToChannel(const std::string& ins_str); virtual void FillMemoryDataToChannel(); virtual void FillChannelToMemoryData(); diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 600fc7471..a3b7b1e45 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -64,6 +64,17 @@ void DatasetImpl::SetTrainerNum(int trainer_num) { } } +// if you run distributed, and want to do global shuffle, +// set this before global shuffle. +// be sure you call CreateReaders before SetFleetSendBatchSize +template +void DatasetImpl::SetFleetSendBatchSize(int64_t size) { + fleet_send_batch_size_ = size; + for (auto reader : readers_) { + reader->SetFleetSendBatchSize(size); + } +} + template void DatasetImpl::SetHdfsConfig(const std::string& fs_name, const std::string& fs_ugi) { diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 6fd3fcad2..bbe0f937a 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -47,6 +47,8 @@ class Dataset { virtual void SetThreadNum(int thread_num) = 0; // set workers' num virtual void SetTrainerNum(int trainer_num) = 0; + // set fleet send batch size + virtual void SetFleetSendBatchSize(int64_t size) = 0; // set fs name and ugi virtual void SetHdfsConfig(const std::string& fs_name, const std::string& fs_ugi) = 0; @@ -59,6 +61,8 @@ class Dataset { virtual int GetThreadNum() = 0; // get worker num virtual int GetTrainerNum() = 0; + // get fleet send batch size + virtual int64_t GetFleetSendBatchSize() = 0; // get hdfs config virtual std::pair GetHdfsConfig() = 0; // get data fedd desc @@ -98,6 +102,7 @@ class DatasetImpl : public Dataset { virtual void SetFileList(const std::vector& filelist); virtual void SetThreadNum(int thread_num); virtual void SetTrainerNum(int trainer_num); + virtual void SetFleetSendBatchSize(int64_t size); virtual void SetHdfsConfig(const std::string& fs_name, const std::string& fs_ugi); virtual void SetDataFeedDesc(const std::string& data_feed_desc_str); @@ -105,6 +110,7 @@ class DatasetImpl : public Dataset { virtual const std::vector& GetFileList() { return filelist_; } virtual int GetThreadNum() { return thread_num_; } virtual int GetTrainerNum() { return trainer_num_; } + virtual int64_t GetFleetSendBatchSize() { return fleet_send_batch_size_; } virtual std::pair GetHdfsConfig() { return std::make_pair(fs_name_, fs_ugi_); } @@ -137,6 +143,7 @@ class DatasetImpl : public Dataset { std::string fs_name_; std::string fs_ugi_; unsigned int rand_seed; + int64_t fleet_send_batch_size_; }; // use std::vector as data type diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 8147c7746..394ff24c4 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -237,6 +237,7 @@ void FleetWrapper::PushDenseParamSync( std::vector regions; for (auto& t : var_names) { Variable* var = scope.FindVar(t); + CHECK(var != nullptr) << "var[" << t << "] not found"; LoDTensor* tensor = var->GetMutable(); float* g = tensor->mutable_data(place); paddle::ps::Region reg(g, tensor->numel()); diff --git a/paddle/fluid/framework/io/shell.cc b/paddle/fluid/framework/io/shell.cc index bcfa4f44f..ab671cb56 100644 --- a/paddle/fluid/framework/io/shell.cc +++ b/paddle/fluid/framework/io/shell.cc @@ -126,7 +126,7 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read, } close_open_fds_internal(); - if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) { + if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) { return -1; } exit(127); diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index b773fd03c..3f171b65a 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -50,11 +50,15 @@ void BindDataset(py::module* m) { .def("set_filelist", &framework::Dataset::SetFileList) .def("set_thread_num", &framework::Dataset::SetThreadNum) .def("set_trainer_num", &framework::Dataset::SetTrainerNum) + .def("set_fleet_send_batch_size", + &framework::Dataset::SetFleetSendBatchSize) .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig) .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc) .def("get_filelist", &framework::Dataset::GetFileList) .def("get_thread_num", &framework::Dataset::GetThreadNum) .def("get_trainer_num", &framework::Dataset::GetTrainerNum) + .def("get_fleet_send_batch_size", + &framework::Dataset::GetFleetSendBatchSize) .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig) .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc) .def("register_client2client_msg_handler", diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index e90c36da9..fc3f53368 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -236,11 +236,13 @@ class InMemoryDataset(DatasetBase): fleet: fleet singleton. Default None. """ trainer_num = 1 + fleet_send_batch_size = 80000 if fleet is not None: fleet.fleet_instance.role_maker_._barrier_worker() trainer_num = fleet.worker_num() self.dataset.register_client2client_msg_handler() self.dataset.set_trainer_num(trainer_num) + self.dataset.set_fleet_send_batch_size(fleet_send_batch_size) if fleet is not None: fleet.fleet_instance.role_maker_._barrier_worker() self.dataset.global_shuffle() diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index e4666deb7..e15197037 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -712,7 +712,7 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is needed and should be initialized") - if self.place == paddle.fluid.CUDAPlace(): + if not isinstance(self.place, core.CPUPlace): raise RuntimeError("infer_from_dataset is verified on CPUPlace" "We will open CUDAPlace in the future") @@ -796,7 +796,7 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is need and should be initialized") - if self.place == paddle.fluid.CUDAPlace(): + if not isinstance(self.place, core.CPUPlace): raise RuntimeError("train_from_dataset is verified on CPUPlace" "We will open CUDAPlace in the future") diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 044aa33c2..9b1ec412c 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -123,18 +123,25 @@ class Fleet(object): print("You should run DistributedOptimizer.minimize() first") sys.exit(-1) - def init_worker(self, programs): + def init_worker(self, programs, scopes=None): """ init_worker(): will be called by user. When a user knows current process is_server(), he/she should call init_worker() to initialize global information about worker and connect - worker with pserver. + worker with pserver. You should run startup program before init_worker. Args: programs(Program|list): a Program or a list of Programs - + scopes(Scope|list): a Scope or a list of Scopes, default None. """ if not isinstance(programs, list): programs = [programs] + if scopes is None: + scopes = [fluid.global_scope()] * len(programs) + if len(scopes) != len(programs): + print( + "You should make sure len(scopes) == len(programs) or set scopes None" + ) + sys.exit(-1) if self._opt_info: if "fleet_desc" in self._opt_info: self._dist_desc_str = text_format.MessageToString( @@ -160,7 +167,7 @@ class Fleet(object): self.role_maker_._barrier_worker() if self.role_maker_._is_first_worker(): tables = self._dist_desc.trainer_param.dense_table - for prog in programs: + for prog, scope in zip(programs, scopes): prog_id = str(id(prog)) prog_conf = self._opt_info['program_configs'][prog_id] prog_tables = {} @@ -174,10 +181,16 @@ class Fleet(object): continue var_name_list = [] for i in range(0, len(table.dense_variable_name)): - var_name_list.append(table.dense_variable_name[i]) - self._fleet_ptr.init_model(prog.desc, - int(table.table_id), - var_name_list) + var_name = table.dense_variable_name[i] + if scope.find_var(var_name) is None: + print("var " + var_name + + " not found in scope, " + + "you should run startup program first") + sys.exit(-1) + var_name_list.append(var_name) + self._fleet_ptr.init_model(scope, + int(table.table_id), + var_name_list) # barrier for init model done self.role_maker_._barrier_worker() else: diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 8c705a095..4cfd99150 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -29,7 +29,6 @@ class TestDataset(unittest.TestCase): def test_dataset_create(self): """ Testcase for dataset create. """ - return try: dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") except: @@ -48,7 +47,6 @@ class TestDataset(unittest.TestCase): def test_dataset_config(self): """ Testcase for dataset configuration. """ - return dataset = fluid.core.Dataset("MultiSlotDataset") dataset.set_thread_num(12) dataset.set_filelist(["a.txt", "b.txt", "c.txt"]) @@ -75,7 +73,6 @@ class TestDataset(unittest.TestCase): """ Testcase for InMemoryDataset from create to run. """ - return with open("test_in_memory_dataset_run_a.txt", "w") as f: data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n" @@ -112,9 +109,10 @@ class TestDataset(unittest.TestCase): for i in range(2): try: exe.train_from_dataset(fluid.default_main_program(), dataset) - except: - #self.assertTrue(False) + except ImportError as e: pass + except Exception as e: + self.assertTrue(False) os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_b.txt") @@ -123,7 +121,6 @@ class TestDataset(unittest.TestCase): """ Testcase for QueueDataset from create to run. """ - return with open("test_queue_dataset_run_a.txt", "w") as f: data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n" @@ -156,15 +153,14 @@ class TestDataset(unittest.TestCase): for i in range(2): try: exe.train_from_dataset(fluid.default_main_program(), dataset) - except: - #self.assertTrue(False) + except ImportError as e: pass + except Exception as e: + self.assertTrue(False) os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_b.txt") if __name__ == '__main__': - #unittest.main() - import sys - sys.exit(0) + unittest.main() diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 4e957880f..871b66366 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from .trainer_desc import MultiTrainer, DistMultiTrainer +from .device_worker import Hogwild, DownpourSGD + __all__ = ["TrainerFactory"] @@ -20,8 +23,6 @@ class TrainerFactory(object): pass def _create_trainer(self, opt_info=None): - from .trainer_desc import MultiTrainer, DistMultiTrainer - from .device_worker import Hogwild, DownpourSGD trainer = None device_worker = None if opt_info == None: -- GitLab