diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index f0c8ccc243c596cb04ea60320fc510478bbbf354..114496085429b18607ff84178e181c36bd2d1adb 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -352,8 +352,6 @@ void DatasetImpl::CreateReaders() { VLOG(3) << "Filelist size in Dataset: " << filelist_.size(); VLOG(3) << "channel num in Dataset: " << channel_num_; CHECK(thread_num_ > 0) << "thread num should > 0"; - CHECK(thread_num_ <= filelist_.size()) - << "thread num should <= filelist size"; CHECK(channel_num_ > 0) << "channel num should > 0"; CHECK(channel_num_ <= thread_num_) << "channel num should <= thread num"; VLOG(3) << "readers size: " << readers_.size(); diff --git a/paddle/fluid/framework/pull_dense_worker.cc b/paddle/fluid/framework/pull_dense_worker.cc index 20d7f98e93695107637107c60f5ef42b8ce9293d..3fe0d516e2d5417b958dbfc1c6b13d15ed2be127 100644 --- a/paddle/fluid/framework/pull_dense_worker.cc +++ b/paddle/fluid/framework/pull_dense_worker.cc @@ -80,6 +80,9 @@ void PullDenseWorker::Stop() { if (running_) { running_ = false; t_.join(); + // pull dense when stop, to make sure local dense params are same as + // pserver, so save paddle model will save dense model same as pserver + PullDense(true); } } diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 902a33b614675eeac0d6bf643b3b519325fd150d..20ffd13d605779a5298efd10d947cf55868905f3 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -246,6 +246,8 @@ class InMemoryDataset(DatasetBase): """ if self.thread_num > len(self.filelist): self.thread_num = len(self.filelist) + if self.thread_num == 0: + self.thread_num = 1 self.dataset.set_thread_num(self.thread_num) if self.queue_num is None: self.queue_num = self.thread_num @@ -545,6 +547,20 @@ class QueueDataset(DatasetBase): super(QueueDataset, self).__init__() self.proto_desc.name = "MultiSlotDataFeed" + def _prepare_to_run(self): + """ + Set data_feed_desc/thread num/filelist before run, + user no need to call this function. + """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + if self.thread_num == 0: + self.thread_num = 1 + self.dataset.set_thread_num(self.thread_num) + self.dataset.set_filelist(self.filelist) + self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_readers() + def local_shuffle(self): """ Local shuffle data. diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index a52970fad1220b150fd56b365358cdab9a8ae199..ac9b0f232761b66b3a74a938ceadb9adc2ee8e31 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -148,8 +148,10 @@ class Fleet(object): def split_files(self, files): """ split files before distributed training, - for example, files is [a, b, c ,d, e] and trainer_num = 2, - then trainer 0 gets [a, b, c] and trainer 1 gets [d, e] + example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer + 0 gets [a, b, c] and trainer 1 gets [d, e]. + example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets + [a], trainer 1 gets [b], trainer 2 gets [] Args: files(list): file list need to be read. @@ -160,9 +162,6 @@ class Fleet(object): trainer_id = self.worker_index() trainers = self.worker_num() - if len(files) < trainers: - raise ValueError("file number must gather or equal trainer number") - remainder = len(files) % trainers blocksize = len(files) / trainers