提交 e8728fbf 编写于 作者: H hj

add multimachine support (mpi) for autofinetune.

上级 5797e613
......@@ -26,6 +26,7 @@ from tb_paddle import SummaryWriter
from paddlehub.common.logger import logger
from paddlehub.common.utils import mkdir
from paddlehub.autofinetune.evaluator import REWARD_SUM, TMP_HOME
from paddlehub.autofinetune.mpi_helper import MPIHelper
if six.PY3:
INF = math.inf
......@@ -75,6 +76,11 @@ class BaseTuningStrategy(object):
logdir=self._output_dir + '/visualization/pop_{}'.format(i))
self.writer_pop_trails.append(writer_pop_trail)
# for parallel on mpi
self.mpi = MPIHelper()
if self.mpi.multi_machine:
print("Autofinetune multimachine mode: running on {}".format(self.mpi.gather(self.mpi.name)))
@property
def thread(self):
return self._num_thread
......@@ -177,16 +183,22 @@ class BaseTuningStrategy(object):
solutions_modeldirs = {}
mkdir(output_dir)
for idx, solution in enumerate(solutions):
solutions = self.mpi.bcast(solutions)
# split solutions to "solutions for me"
range_start, range_end = self.mpi.split_range(len(solutions))
my_solutions = solutions[range_start:range_end]
for idx, solution in enumerate(my_solutions):
cuda = self.is_cuda_free["free"][0]
modeldir = output_dir + "/model-" + str(idx) + "/"
log_file = output_dir + "/log-" + str(idx) + ".info"
params_cudas_dirs.append([solution, cuda, modeldir, log_file])
solutions_modeldirs[tuple(solution)] = modeldir
solutions_modeldirs[tuple(solution)] = (modeldir, self.mpi.rank)
self.is_cuda_free["free"].remove(cuda)
self.is_cuda_free["busy"].append(cuda)
if len(params_cudas_dirs
) == self.thread or idx == len(solutions) - 1:
) == self.thread or idx == len(my_solutions) - 1:
tp = ThreadPool(len(params_cudas_dirs))
solution_results += tp.map(self.evaluator.run,
params_cudas_dirs)
......@@ -198,13 +210,25 @@ class BaseTuningStrategy(object):
self.is_cuda_free["busy"].remove(param_cuda[1])
params_cudas_dirs = []
self.feedback(solutions, solution_results)
all_solution_results = self.mpi.gather(solution_results)
if self.mpi.rank == 0:
# only rank 0 need to feedback
all_solution_results = [y for x in all_solution_results for y in x]
self.feedback(solutions, all_solution_results)
# remove the tmp.txt which records the eval results for trials
tmp_file = os.path.join(TMP_HOME, "tmp.txt")
if os.path.exists(tmp_file):
os.remove(tmp_file)
return solutions_modeldirs
# collect all solutions_modeldirs
collected_solutions_modeldirs = self.mpi.allgather(solutions_modeldirs)
return_dict = {}
for i in collected_solutions_modeldirs:
return_dict.update(i)
return return_dict
class HAZero(BaseTuningStrategy):
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class MPIHelper(object):
def __init__(self):
try:
from mpi4py import MPI
except:
# local run
self._size = 1
self._rank = 0
self._multi_machine = False
import socket
self._name = socket.gethostname()
else:
# in mpi environment
self._comm = MPI.COMM_WORLD
self._size = self._comm.Get_size()
self._rank = self._comm.Get_rank()
self._name = MPI.Get_processor_name()
if self._size > 1:
self._multi_machine = True
else:
self._multi_machine = False
@property
def multi_machine(self):
return self._multi_machine
@property
def rank(self):
return self._rank
@property
def size(self):
return self._size
@property
def name(self):
return self._name
def bcast(self, data):
if self._multi_machine:
# call real bcast
return self._comm.bcast(data, root = 0)
else:
# do nothing
return data
def gather(self, data):
if self._multi_machine:
# call real gather
return self._comm.gather(data, root = 0)
else:
# do nothing
return [data]
def allgather(self, data):
if self._multi_machine:
# call real allgather
return self._comm.allgather(data)
else:
# do nothing
return [data]
# calculate split range on mpi environment
def split_range(self, array_length):
if self._size == 1:
return 0, array_length
average_count = array_length / self._size
if array_length % self._size == 0:
return average_count * self._rank, average_count * (self._rank + 1)
else:
if self._rank < array_length % self._size:
return (average_count + 1) * self._rank, (average_count + 1) * (self._rank + 1)
else:
start = (average_count + 1) * (array_length % self._size) \
+ average_count * (self._rank - array_length % self._size)
return start, start + average_count
if __name__ == "__main__":
mpi = MPIHelper()
print("Hello world from process {} of {} at {}.".format(mpi.rank, mpi.size, mpi.name))
all_node_names = mpi.gather(mpi.name)
print("all node names using gather: {}".format(all_node_names))
all_node_names = mpi.allgather(mpi.name)
print("all node names using allgather: {}".format(all_node_names))
if mpi.rank == 0:
data = range(10)
else:
data = None
data = mpi.bcast(data)
print("after bcast, process {} have data {}".format(mpi.rank, data))
data = [i + mpi.rank for i in data]
print("after modify, process {} have data {}".format(mpi.rank, data))
new_data = mpi.gather(data)
print("after gather, process {} have data {}".format(mpi.rank, new_data))
# test for split
for i in range(12):
length = i + mpi.size # length should >= mpi.size
[start, end] = mpi.split_range(length)
split_result = mpi.gather([start, end])
print("length {}, split_result {}".format(length, split_result))
......@@ -188,37 +188,58 @@ class AutoFineTuneCommand(BaseCommand):
run_round_cnt = run_round_cnt + 1
print("PaddleHub Autofinetune ends.")
best_hparams_origin = autoft.get_best_hparams()
best_hparams_origin = autoft.mpi.bcast(best_hparams_origin)
with open(autoft._output_dir + "/log_file.txt", "w") as f:
best_hparams = evaluator.convert_params(autoft.get_best_hparams())
best_hparams = evaluator.convert_params(best_hparams_origin)
print("The final best hyperparameters:")
f.write("The final best hyperparameters:\n")
for index, hparam_name in enumerate(autoft.hparams_name_list):
print("%s=%s" % (hparam_name, best_hparams[index]))
f.write(hparam_name + "\t:\t" + str(best_hparams[index]) + "\n")
best_hparams_dir, best_hparams_rank = solutions_modeldirs[tuple(best_hparams_origin)]
print("The final best eval score is %s." %
autoft.get_best_eval_value())
print("The final best model parameters are saved as " +
autoft._output_dir + "/best_model .")
if autoft.mpi.multi_machine:
print("The final best model parameters are saved as " +
autoft._output_dir + "/best_model on rank " + str(best_hparams_rank) + " .")
else:
print("The final best model parameters are saved as " +
autoft._output_dir + "/best_model .")
f.write("The final best eval score is %s.\n" %
autoft.get_best_eval_value())
f.write(
"The final best model parameters are saved as ./best_model .")
best_model_dir = autoft._output_dir + "/best_model"
shutil.copytree(
solutions_modeldirs[tuple(autoft.get_best_hparams())],
best_model_dir)
f.write("\t".join(autoft.hparams_name_list) +
"\tsaved_params_dir\n")
if autoft.mpi.rank == best_hparams_rank:
shutil.copytree(best_hparams_dir, best_model_dir)
if autoft.mpi.multi_machine:
f.write(
"The final best model parameters are saved as ./best_model on rank " \
+ str(best_hparams_rank) + " .")
f.write("\t".join(autoft.hparams_name_list) +
"\tsaved_params_dir\trank\n")
else:
f.write(
"The final best model parameters are saved as ./best_model .")
f.write("\t".join(autoft.hparams_name_list) +
"\tsaved_params_dir\n")
print(
"The related infomation about hyperparamemters searched are saved as %s/log_file.txt ."
"The related infomation about hyperparamemters searched are saved as %s/log_file.txt ."
% autoft._output_dir)
for solution, modeldir in solutions_modeldirs.items():
param = evaluator.convert_params(solution)
param = [str(p) for p in param]
f.write("\t".join(param) + "\t" + modeldir + "\n")
if autoft.mpi.multi_machine:
f.write("\t".join(param) + "\t" + modeldir[0] + "\t" + str(modeldir[1]) + "\n")
else:
f.write("\t".join(param) + "\t" + modeldir[0] + "\n")
return True
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册