diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 1a023f61675ed62c141bb6e71fabbdf0086b0c64..d539940133881d5aa632eae6e3975ae57a385ebf 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -219,6 +219,7 @@ class InMemoryDataset(DatasetBase): >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset") >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() >>> dataset.local_shuffle() """ self.dataset.local_shuffle() @@ -236,6 +237,7 @@ class InMemoryDataset(DatasetBase): >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset") >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() >>> dataset.global_shuffle(fleet) Args: @@ -255,6 +257,25 @@ class InMemoryDataset(DatasetBase): if fleet is not None: fleet.fleet_instance.role_maker_._barrier_worker() + def release_memory(self): + """ + Release InMemoryDataset memory data, when data will not be used again. + + Example: + >>> import paddle.fluid as fluid + >>> import paddle.fluid.incubate.fleet.parameter_server as fleet + >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset") + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.global_shuffle(fleet) + >>> exe = fluid.Executor(fluid.CPUPlace()) + >>> exe.run(fluid.default_startup_program()) + >>> exe.train_from_dataset(fluid.default_main_program(), dataset) + >>> dataset.release_memory() + """ + self.dataset.release_memory() + class QueueDataset(DatasetBase): """ diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index bf50a5815dbe445a7a44fd9199ed51f632ff4997..ffc7ae0172e26191264625d0a8bdd28dab69c833 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -128,7 +128,7 @@ class MPIRoleMaker(RoleMakerBase): """ finalize the current MPI instance. """ - self._comm.finalize() + pass class MPISymetricRoleMaker(MPIRoleMaker): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 58f17f95323236cfc559a5cb05a3b09bef3b0a5e..4a7665b9bced9df5f3fb8a82bbcfd7a8feb6a24a 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -241,6 +241,40 @@ class Fleet(object): """ self._fleet_ptr.save_model(save_path) + def split_filelist(self, filelist): + """ + split filelist before distributed training, + for example, filelist is [a, b, c ,d, e] and trainer_num = 2, + then trainer 0 gets [a, b, c] and trainer 1 gets [d, e] + + Example: + >>> all_filelist = ["a.txt", "b.txt", "c.txt"] + >>> my_filelist = fleet.split_filelist(all_filelist) + >>> dataset = fluid.DatasetFactory().create_dataset() + >>> dataset.set_filelist(my_filelist) + + Args: + filelist(list): list of filename, can be local or hdfs/afs. + + Returns: + list of filename which belongs to this trainer. + """ + file_num = len(filelist) + trainer_id = self.get_worker_index() + trainer_num = self.get_worker_num() + if trainer_num > file_num: + raise ValueError("trainer_num should be <= file_num : " + "%s > %s" % (trainer_num, file_num)) + # get interval of filelist, it's [ ) + start = 0 + end = 0 + for i in range(0, trainer_id + 1): + length = file_num / trainer_num + (i < (file_num % trainer_num)) + start = end + end += length + my_filelist = filelist[start:end] + return my_filelist + def _set_opt_info(self, opt_info): """ this function saves the result from DistributedOptimizer.minimize() @@ -337,3 +371,4 @@ save_pserver_model = fleet_instance.save_pserver_model worker_num = fleet_instance.get_worker_num server_num = fleet_instance.get_server_num worker_index = fleet_instance.get_worker_index +split_filelist = fleet_instance.split_filelist