diff --git a/paddlehub/autofinetune/autoft.py b/paddlehub/autofinetune/autoft.py index 11e10c23baa69c9a3182a16e7f693418eb287d37..e7aba48d5be0b768587a6e69a15e72c89976e0c3 100644 --- a/paddlehub/autofinetune/autoft.py +++ b/paddlehub/autofinetune/autoft.py @@ -26,6 +26,7 @@ from tb_paddle import SummaryWriter from paddlehub.common.logger import logger from paddlehub.common.utils import mkdir from paddlehub.autofinetune.evaluator import REWARD_SUM, TMP_HOME +from paddlehub.autofinetune.mpi_helper import MPIHelper if six.PY3: INF = math.inf @@ -75,6 +76,12 @@ class BaseTuningStrategy(object): logdir=self._output_dir + '/visualization/pop_{}'.format(i)) self.writer_pop_trails.append(writer_pop_trail) + # for parallel on mpi + self.mpi = MPIHelper() + if self.mpi.multi_machine: + print("Autofinetune multimachine mode: running on {}".format( + self.mpi.gather(self.mpi.name))) + @property def thread(self): return self._num_thread @@ -177,16 +184,22 @@ class BaseTuningStrategy(object): solutions_modeldirs = {} mkdir(output_dir) - for idx, solution in enumerate(solutions): + solutions = self.mpi.bcast(solutions) + + # split solutions to "solutions for me" + range_start, range_end = self.mpi.split_range(len(solutions)) + my_solutions = solutions[range_start:range_end] + + for idx, solution in enumerate(my_solutions): cuda = self.is_cuda_free["free"][0] modeldir = output_dir + "/model-" + str(idx) + "/" log_file = output_dir + "/log-" + str(idx) + ".info" params_cudas_dirs.append([solution, cuda, modeldir, log_file]) - solutions_modeldirs[tuple(solution)] = modeldir + solutions_modeldirs[tuple(solution)] = (modeldir, self.mpi.rank) self.is_cuda_free["free"].remove(cuda) self.is_cuda_free["busy"].append(cuda) if len(params_cudas_dirs - ) == self.thread or idx == len(solutions) - 1: + ) == self.thread or idx == len(my_solutions) - 1: tp = ThreadPool(len(params_cudas_dirs)) solution_results += tp.map(self.evaluator.run, params_cudas_dirs) @@ -198,13 +211,25 @@ class BaseTuningStrategy(object): self.is_cuda_free["busy"].remove(param_cuda[1]) params_cudas_dirs = [] - self.feedback(solutions, solution_results) + all_solution_results = self.mpi.gather(solution_results) + + if self.mpi.rank == 0: + # only rank 0 need to feedback + all_solution_results = [y for x in all_solution_results for y in x] + self.feedback(solutions, all_solution_results) + # remove the tmp.txt which records the eval results for trials tmp_file = os.path.join(TMP_HOME, "tmp.txt") if os.path.exists(tmp_file): os.remove(tmp_file) - return solutions_modeldirs + # collect all solutions_modeldirs + collected_solutions_modeldirs = self.mpi.allgather(solutions_modeldirs) + return_dict = {} + for i in collected_solutions_modeldirs: + return_dict.update(i) + + return return_dict class HAZero(BaseTuningStrategy): diff --git a/paddlehub/autofinetune/mpi_helper.py b/paddlehub/autofinetune/mpi_helper.py new file mode 100755 index 0000000000000000000000000000000000000000..9608363bfa888f208e20faaf7c9ac9d2278b33d3 --- /dev/null +++ b/paddlehub/autofinetune/mpi_helper.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +class MPIHelper(object): + def __init__(self): + try: + from mpi4py import MPI + except: + # local run + self._size = 1 + self._rank = 0 + self._multi_machine = False + + import socket + self._name = socket.gethostname() + else: + # in mpi environment + self._comm = MPI.COMM_WORLD + self._size = self._comm.Get_size() + self._rank = self._comm.Get_rank() + self._name = MPI.Get_processor_name() + if self._size > 1: + self._multi_machine = True + else: + self._multi_machine = False + + @property + def multi_machine(self): + return self._multi_machine + + @property + def rank(self): + return self._rank + + @property + def size(self): + return self._size + + @property + def name(self): + return self._name + + def bcast(self, data): + if self._multi_machine: + # call real bcast + return self._comm.bcast(data, root=0) + else: + # do nothing + return data + + def gather(self, data): + if self._multi_machine: + # call real gather + return self._comm.gather(data, root=0) + else: + # do nothing + return [data] + + def allgather(self, data): + if self._multi_machine: + # call real allgather + return self._comm.allgather(data) + else: + # do nothing + return [data] + + # calculate split range on mpi environment + def split_range(self, array_length): + if self._size == 1: + return 0, array_length + average_count = array_length / self._size + if array_length % self._size == 0: + return average_count * self._rank, average_count * (self._rank + 1) + else: + if self._rank < array_length % self._size: + return (average_count + 1) * self._rank, (average_count + 1) * ( + self._rank + 1) + else: + start = (average_count + 1) * (array_length % self._size) \ + + average_count * (self._rank - array_length % self._size) + return start, start + average_count + + +if __name__ == "__main__": + + mpi = MPIHelper() + print("Hello world from process {} of {} at {}.".format( + mpi.rank, mpi.size, mpi.name)) + + all_node_names = mpi.gather(mpi.name) + print("all node names using gather: {}".format(all_node_names)) + + all_node_names = mpi.allgather(mpi.name) + print("all node names using allgather: {}".format(all_node_names)) + + if mpi.rank == 0: + data = range(10) + else: + data = None + data = mpi.bcast(data) + print("after bcast, process {} have data {}".format(mpi.rank, data)) + + data = [i + mpi.rank for i in data] + print("after modify, process {} have data {}".format(mpi.rank, data)) + + new_data = mpi.gather(data) + print("after gather, process {} have data {}".format(mpi.rank, new_data)) + + # test for split + for i in range(12): + length = i + mpi.size # length should >= mpi.size + [start, end] = mpi.split_range(length) + split_result = mpi.gather([start, end]) + print("length {}, split_result {}".format(length, split_result)) diff --git a/paddlehub/commands/autofinetune.py b/paddlehub/commands/autofinetune.py index 7b79eb4487d6323cdaae5500dffdbf0ac3aa4aab..8efb56f53a47be7b6378ecfe174a24d3803a8025 100644 --- a/paddlehub/commands/autofinetune.py +++ b/paddlehub/commands/autofinetune.py @@ -188,37 +188,62 @@ class AutoFineTuneCommand(BaseCommand): run_round_cnt = run_round_cnt + 1 print("PaddleHub Autofinetune ends.") + best_hparams_origin = autoft.get_best_hparams() + best_hparams_origin = autoft.mpi.bcast(best_hparams_origin) + with open(autoft._output_dir + "/log_file.txt", "w") as f: - best_hparams = evaluator.convert_params(autoft.get_best_hparams()) + best_hparams = evaluator.convert_params(best_hparams_origin) print("The final best hyperparameters:") f.write("The final best hyperparameters:\n") for index, hparam_name in enumerate(autoft.hparams_name_list): print("%s=%s" % (hparam_name, best_hparams[index])) f.write(hparam_name + "\t:\t" + str(best_hparams[index]) + "\n") + best_hparams_dir, best_hparams_rank = solutions_modeldirs[tuple( + best_hparams_origin)] + print("The final best eval score is %s." % autoft.get_best_eval_value()) - print("The final best model parameters are saved as " + - autoft._output_dir + "/best_model .") + + if autoft.mpi.multi_machine: + print("The final best model parameters are saved as " + + autoft._output_dir + "/best_model on rank " + + str(best_hparams_rank) + " .") + else: + print("The final best model parameters are saved as " + + autoft._output_dir + "/best_model .") f.write("The final best eval score is %s.\n" % autoft.get_best_eval_value()) - f.write( - "The final best model parameters are saved as ./best_model .") best_model_dir = autoft._output_dir + "/best_model" - shutil.copytree( - solutions_modeldirs[tuple(autoft.get_best_hparams())], - best_model_dir) - f.write("\t".join(autoft.hparams_name_list) + - "\tsaved_params_dir\n") + if autoft.mpi.rank == best_hparams_rank: + shutil.copytree(best_hparams_dir, best_model_dir) + + if autoft.mpi.multi_machine: + f.write( + "The final best model parameters are saved as ./best_model on rank " \ + + str(best_hparams_rank) + " .") + f.write("\t".join(autoft.hparams_name_list) + + "\tsaved_params_dir\trank\n") + else: + f.write( + "The final best model parameters are saved as ./best_model ." + ) + f.write("\t".join(autoft.hparams_name_list) + + "\tsaved_params_dir\n") + print( - "The related infomation about hyperparamemters searched are saved as %s/log_file.txt ." + "The related infomation about hyperparamemters searched are saved as %s/log_file.txt ." % autoft._output_dir) for solution, modeldir in solutions_modeldirs.items(): param = evaluator.convert_params(solution) param = [str(p) for p in param] - f.write("\t".join(param) + "\t" + modeldir + "\n") + if autoft.mpi.multi_machine: + f.write("\t".join(param) + "\t" + modeldir[0] + "\t" + + str(modeldir[1]) + "\n") + else: + f.write("\t".join(param) + "\t" + modeldir[0] + "\n") return True