提交 4efcb9df 编写于 作者: M MrChengmo

update fleet run for multi device support

上级 be70c94e
...@@ -106,6 +106,14 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ...@@ -106,6 +106,14 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
help="The path for each process's log.If it's not set, the log will printed to default pipe." help="The path for each process's log.If it's not set, the log will printed to default pipe."
) )
base_group.add_argument(
"--gpus",
type=str,
default=None,
help="It's for gpu training and the training process will run on the gpus,"
"each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
)
base_group.add_argument( base_group.add_argument(
"training_script", "training_script",
type=str, type=str,
...@@ -124,13 +132,6 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ...@@ -124,13 +132,6 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
type=str, type=str,
default="127.0.0.1", default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
collective_group.add_argument(
"--gpus",
type=str,
default=None,
help="It's for gpu training and the training process will run on the gpus,"
"each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
)
ps_group = parser.add_argument_group("Parameter-Server Parameters") ps_group = parser.add_argument_group("Parameter-Server Parameters")
# for parameter server # for parameter server
...@@ -193,35 +194,6 @@ def get_cluster_from_args(args, gpus): ...@@ -193,35 +194,6 @@ def get_cluster_from_args(args, gpus):
return get_cluster(node_ips, node_ip, trainer_endpoints, gpus) return get_cluster(node_ips, node_ip, trainer_endpoints, gpus)
def get_gpus(gpus):
if gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
res_gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
res_gpus = [x.strip() for x in gpus.split(',')]
else:
# change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
# therefore gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
res_gpus = [
cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',')
]
logger.info("Change selected_gpus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"CUDA_VISIBLE_DEVICES:{}".format(
gpus, res_gpus, cuda_visible_devices_list))
return res_gpus
def launch_collective(args): def launch_collective(args):
# parse arguments, used for cloud-single-machine and local # parse arguments, used for cloud-single-machine and local
gpus = get_gpus(args.gpus) gpus = get_gpus(args.gpus)
......
...@@ -526,6 +526,35 @@ def watch_local_trainers(procs, nranks): ...@@ -526,6 +526,35 @@ def watch_local_trainers(procs, nranks):
return alive return alive
def get_gpus(gpus):
if gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
res_gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
res_gpus = [x.strip() for x in gpus.split(',')]
else:
# change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
# therefore gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
res_gpus = [
cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',')
]
logger.info("Change selected_gpus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"CUDA_VISIBLE_DEVICES:{}".format(
gpus, res_gpus, cuda_visible_devices_list))
return res_gpus
def direct_start(args): def direct_start(args):
# run ps-cpu mode on paddlecloud, using given envs # run ps-cpu mode on paddlecloud, using given envs
cmd = [sys.executable, "-u", args.training_script] + \ cmd = [sys.executable, "-u", args.training_script] + \
...@@ -636,6 +665,7 @@ class ParameterServerLauncher(object): ...@@ -636,6 +665,7 @@ class ParameterServerLauncher(object):
args.workers.split(",") args.workers.split(",")
) == self.worker_num, "The worker_num and workers doesn't match. Expect workers endpoints num epual to worker_num, but received workers enpoint num: {} and worker_num {}".format( ) == self.worker_num, "The worker_num and workers doesn't match. Expect workers endpoints num epual to worker_num, but received workers enpoint num: {} and worker_num {}".format(
len(args.workers.split(",")), self.worker_num) len(args.workers.split(",")), self.worker_num)
self.worker_endpoints = args.workers self.worker_endpoints = args.workers
else: else:
ports = get_ports(self.worker_num, self.server_num) ports = get_ports(self.worker_num, self.server_num)
...@@ -750,9 +780,9 @@ class ParameterServerLauncher(object): ...@@ -750,9 +780,9 @@ class ParameterServerLauncher(object):
self.gloo_rendezvous_dir = tempfile.mkdtemp() self.gloo_rendezvous_dir = tempfile.mkdtemp()
# 3. subproces start # 3. subproces start
self.procs = [] self.procs = {"worker": [], "server": [], "heter_worker": []}
self.cmds = [] self.cmds = {"worker": [], "server": [], "heter_worker": []}
self.log_fns = [] self.log_fns = {"worker": [], "server": [], "heter_worker": []}
self.start_pod_server(args, pod) self.start_pod_server(args, pod)
self.start_pod_worker(args, pod) self.start_pod_worker(args, pod)
...@@ -762,30 +792,37 @@ class ParameterServerLauncher(object): ...@@ -762,30 +792,37 @@ class ParameterServerLauncher(object):
"Please check servers, workers and heter_worker logs in {}/workerlog.*, {}/serverlog.* and {}/heterlog.*". "Please check servers, workers and heter_worker logs in {}/workerlog.*, {}/serverlog.* and {}/heterlog.*".
format(args.log_dir, args.log_dir, args.log_dir)) format(args.log_dir, args.log_dir, args.log_dir))
# 4. wait for finish training
if len(self.procs["worker"]) > 0:
# if node has worker procs
# only wait worker to finish here # only wait worker to finish here
for i, proc in enumerate(self.procs): for i, proc in enumerate(self.procs["worker"]):
if i < len(pod.servers) and i > len(pod.servers) + len(pod.workers): self.procs["worker"][i].proc.wait()
continue if len(self.log_fns["worker"]) > 0:
self.procs[i].proc.wait() self.log_fns["worker"][i].close()
if len(self.log_fns) > 0:
self.log_fns[i].close()
logger.info( logger.info(
"all workers exit, going to finish parameter server and heter_worker" "all workers exit, going to finish parameter server and heter_worker"
) )
if len(self.procs["server"]) > 0:
for i, proc in enumerate(self.procs["server"]):
self.log_fns["server"][i].close()
self.procs["server"][i].proc.terminate()
logger.info("all parameter server are killed")
if len(self.procs["heter_worker"]) > 0:
for i, proc in enumerate(self.procs["heter_worker"]):
self.log_fns["heter_worker"][i].close()
self.procs["heter_worker"][i].proc.terminate()
logger.info("all heter_worker are killed")
else:
# if node has not worker procs
# blocking training process
if len(self.procs["server"]) > 0:
for i, proc in enumerate(self.procs["server"]):
self.procs["server"][i].proc.wait()
for i in range( if len(self.procs["heter_worker"]) > 0:
len(pod.servers + pod.workers), for i, proc in enumerate(self.procs["heter_worker"]):
len(pod.servers + pod.workers + pod.heter_workers)): self.procs["heter_worker"][i].proc.wait()
if len(self.log_fns) > 0:
self.log_fns[i].close()
self.procs[i].proc.terminate()
logger.info("all heter worker are killed")
for i in range(len(pod.servers)):
if len(self.log_fns) > 0:
self.log_fns[i].close()
self.procs[i].proc.terminate()
logger.info("all parameter server are killed", file=sys.stderr)
if os.path.exists(self.gloo_rendezvous_dir): if os.path.exists(self.gloo_rendezvous_dir):
shutil.rmtree(self.gloo_rendezvous_dir) shutil.rmtree(self.gloo_rendezvous_dir)
...@@ -814,7 +851,7 @@ class ParameterServerLauncher(object): ...@@ -814,7 +851,7 @@ class ParameterServerLauncher(object):
cmd = [sys.executable, "-u", args.training_script cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args ] + args.training_script_args
self.cmds.append(cmd) self.cmds["server"].append(cmd)
if idx == 0: if idx == 0:
logger.info( logger.info(
...@@ -827,7 +864,7 @@ class ParameterServerLauncher(object): ...@@ -827,7 +864,7 @@ class ParameterServerLauncher(object):
if args.log_dir is not None: if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir)) os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w") fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w")
self.log_fns.append(fn) self.log_fns["server"].append(fn)
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn) cmd, env=current_env, stdout=fn, stderr=fn)
else: else:
...@@ -841,7 +878,7 @@ class ParameterServerLauncher(object): ...@@ -841,7 +878,7 @@ class ParameterServerLauncher(object):
tp.log_offset = fn.tell() if fn else None tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd tp.cmd = cmd
self.procs.append(tp) self.procs["server"].append(tp)
def start_pod_worker(self, args, pod): def start_pod_worker(self, args, pod):
default_env = os.environ.copy() default_env = os.environ.copy()
...@@ -850,13 +887,16 @@ class ParameterServerLauncher(object): ...@@ -850,13 +887,16 @@ class ParameterServerLauncher(object):
current_env.pop("https_proxy", None) current_env.pop("https_proxy", None)
heter_device_num = 0 heter_device_num = 0
device_list = []
if args.heter_worker_device == "gpu": if args.heter_worker_device == "gpu":
heter_device_num = fluid.core.get_cuda_device_count() device_list = get_gpus(args.gpus)
heter_device_num = len(device_list)
elif args.heter_worker_device == "xpu": elif args.heter_worker_device == "xpu":
heter_device_num = fluid.core.get_xpu_device_count() heter_device_num = fluid.core.get_xpu_device_count()
device_list = [str(x) for x in range(0, heter_device_num)]
for idx, cur_worker in enumerate(pod.workers): for idx, cur_worker in enumerate(pod.workers):
device_id = str(idx % heter_device_num) device_id = str(device_list[idx % heter_device_num])
proc_env = { proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints,
...@@ -878,7 +918,7 @@ class ParameterServerLauncher(object): ...@@ -878,7 +918,7 @@ class ParameterServerLauncher(object):
cmd = [sys.executable, "-u", args.training_script cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args ] + args.training_script_args
self.cmds.append(cmd) self.cmds["worker"].append(cmd)
if idx == 0: if idx == 0:
logger.info( logger.info(
...@@ -891,7 +931,7 @@ class ParameterServerLauncher(object): ...@@ -891,7 +931,7 @@ class ParameterServerLauncher(object):
if args.log_dir is not None: if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir)) os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w") fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w")
self.log_fns.append(fn) self.log_fns["worker"].append(fn)
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn) cmd, env=current_env, stdout=fn, stderr=fn)
else: else:
...@@ -905,7 +945,7 @@ class ParameterServerLauncher(object): ...@@ -905,7 +945,7 @@ class ParameterServerLauncher(object):
tp.log_offset = fn.tell() if fn else None tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd tp.cmd = cmd
self.procs.append(tp) self.procs["worker"].append(tp)
def start_pod_heter_worker(self, args, pod): def start_pod_heter_worker(self, args, pod):
default_env = os.environ.copy() default_env = os.environ.copy()
...@@ -914,14 +954,17 @@ class ParameterServerLauncher(object): ...@@ -914,14 +954,17 @@ class ParameterServerLauncher(object):
current_env.pop("https_proxy", None) current_env.pop("https_proxy", None)
heter_device_num = 0 heter_device_num = 0
device_list = []
if args.heter_worker_device == "gpu": if args.heter_worker_device == "gpu":
heter_device_num = fluid.core.get_cuda_device_count() device_list = get_gpus(args.gpus)
heter_device_num = len(device_list)
elif args.heter_worker_device == "xpu": elif args.heter_worker_device == "xpu":
heter_device_num = fluid.core.get_xpu_device_count() heter_device_num = fluid.core.get_xpu_device_count()
device_list = [str(x) for x in range(0, heter_device_num)]
assert heter_device_num != 0 assert heter_device_num != 0
for idx, cur_heter_worker in enumerate(pod.heter_workers): for idx, cur_heter_worker in enumerate(pod.heter_workers):
device_id = str(idx % heter_device_num) device_id = str(device_list[idx % heter_device_num])
proc_env = { proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints,
...@@ -944,7 +987,7 @@ class ParameterServerLauncher(object): ...@@ -944,7 +987,7 @@ class ParameterServerLauncher(object):
cmd = [sys.executable, "-u", args.training_script cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args ] + args.training_script_args
self.cmds.append(cmd) self.cmds["heter_worker"].append(cmd)
if idx == 0: if idx == 0:
logger.info( logger.info(
...@@ -957,7 +1000,7 @@ class ParameterServerLauncher(object): ...@@ -957,7 +1000,7 @@ class ParameterServerLauncher(object):
if args.log_dir is not None: if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir)) os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/heterlog.%d" % (args.log_dir, idx), "w") fn = open("%s/heterlog.%d" % (args.log_dir, idx), "w")
self.log_fns.append(fn) self.log_fns["heter_worker"].append(fn)
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn) cmd, env=current_env, stdout=fn, stderr=fn)
else: else:
...@@ -971,4 +1014,4 @@ class ParameterServerLauncher(object): ...@@ -971,4 +1014,4 @@ class ParameterServerLauncher(object):
tp.log_offset = fn.tell() if fn else None tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd tp.cmd = cmd
self.procs.append(tp) self.procs["heter_worker"].append(tp)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册