diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 528f7b3269eb90435d88cffadfa185cc664e430a..506a38059c33b94f8dc421c1e2e3b800de35fac7 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -128,7 +128,7 @@ class MPIRoleMaker(RoleMakerBase): """ finalize the current MPI instance. """ - self.comm_.finalize() + pass class MPISymetricRoleMaker(MPIRoleMaker): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 9b1ec412c731a4b59d0da8847e91e30d8e1d864a..1c49ea1f55e7ea877a56479254efd91753d4ee8a 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -241,6 +241,35 @@ class Fleet(object): """ self._fleet_ptr.save_model(save_path) + def split_filelist(self, filelist): + """ + split filelist before distributed training, + for example, filelist is [a, b, c ,d, e] and trainer_num = 2, + then trainer 0 gets [a, b, c] and trainer 1 gets [d, e] + + Args: + filelist(list): list of filename, can be local or hdfs/afs. + + Returns: list of filename which belongs to this trainer. + """ + file_num = len(filelist) + trainer_id = self.get_worker_index() + trainer_num = self.get_worker_num() + if trainer_num > file_num: + raise ValueError( + "trainer_num should be <= file_num : " + "%s > %s" % (trainer_num, file_num) + ) + # get interval of filelist, it's [ ) + start = 0 + end = 0 + for i in range(0, trainer_id + 1): + length = file_num / trainer_num + (i < (file_num % trainer_num)) + start = end + end += length + myfilelist = filelist[start : end] + return myfilelist + def _set_opt_info(self, opt_info): """ this function saves the result from DistributedOptimizer.minimize() @@ -337,3 +366,4 @@ save_pserver_model = fleet_instance.save_pserver_model worker_num = fleet_instance.get_worker_num server_num = fleet_instance.get_server_num worker_index = fleet_instance.get_worker_index +split_filelist = fleet_instance.split_filelist