未验证 提交 a4bd3a69 编写于 作者: L liuyuhui 提交者: GitHub

fix selected_gpus bug for local_cluster (#205)

* fix selected_gpus bug for local_cluster

* fix bugs about single machine multicards training for local_cluster

* fix python -m config.yaml -b backend.yaml

* re add selected_gpus_num config for single train mode

* fix bug for submit for paddlecloud k8s
Co-authored-by: Nwuzhihua <35824027+fuyinno4@users.noreply.github.com>
上级 6bfbcaa9
...@@ -19,10 +19,16 @@ import copy ...@@ -19,10 +19,16 @@ import copy
import os import os
import subprocess import subprocess
import warnings import warnings
import sys
import logging
from paddlerec.core.engine.engine import Engine from paddlerec.core.engine.engine import Engine
from paddlerec.core.factory import TrainerFactory from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
import paddlerec.core.engine.cluster_utils as cluster_utils
logger = logging.getLogger("root")
logger.propagate = False
class ClusterEngine(Engine): class ClusterEngine(Engine):
...@@ -47,6 +53,36 @@ class ClusterEngine(Engine): ...@@ -47,6 +53,36 @@ class ClusterEngine(Engine):
self.backend)) self.backend))
def start_worker_procs(self): def start_worker_procs(self):
if (envs.get_runtime_environ("fleet_mode") == "COLLECTIVE"):
#trainer_ports = os.getenv("TRAINER_PORTS", None).split(",")
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
selected_gpus = range(int(os.getenv("TRAINER_GPU_CARD_COUNT")))
else:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in range(int(os.getenv("TRAINER_GPU_CARD_COUNT"))):
assert x in cuda_visible_devices_list, "Can't find "\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
selected_gpus = [cuda_visible_devices_list.index(x)]
print("selected_gpus:{}".format(selected_gpus))
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
logs_dir = envs.get_runtime_environ("log_dir")
print("use_paddlecloud_flag:{}".format(
cluster_utils.use_paddlecloud()))
if cluster_utils.use_paddlecloud():
cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus)
logger.info("get cluster from cloud:{}".format(cluster))
procs = cluster_utils.start_local_trainers(
cluster, pod, cmd, log_dir=logs_dir)
print("cluster:{}".format(cluster))
print("pod:{}".format(pod))
else:
trainer = TrainerFactory.create(self.trainer) trainer = TrainerFactory.create(self.trainer)
trainer.run() trainer.run()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import socket
import time
import os
import signal
import copy
import sys
import subprocess
from contextlib import closing
import socket
logger = logging.getLogger("root")
logger.propagate = False
class Cluster(object):
def __init__(self, hdfs):
self.job_server = None
self.pods = []
self.hdfs = None
self.job_stage_flag = None
def __str__(self):
return "job_server:{} pods:{} job_stage_flag:{} hdfs:{}".format(
self.job_server, [str(pod) for pod in self.pods],
self.job_stage_flag, self.hdfs)
def __eq__(self, cluster):
if len(self.pods) != len(cluster.pods):
return False
for a, b in zip(self.pods, cluster.pods):
if a != b:
return False
if self.job_stage_flag != cluster.job_stage_flag:
return False
return True
def __ne__(self, cluster):
return not self.__eq__(cluster)
def update_pods(cluster):
self.pods = copy.copy(cluster.pods)
def trainers_nranks(self):
return len(self.trainers_endpoints())
def pods_nranks(self):
return len(self.pods)
def trainers_endpoints(self):
r = []
for pod in self.pods:
for t in pod.trainers:
r.append(t.endpoint)
return r
def pods_endpoints(self):
r = []
for pod in self.pods:
ep = "{}:{}".format(pod.addr, pod.port)
assert pod.port != None and pod.addr != None, "{} not a valid endpoint".format(
ep)
r.append(ep)
return r
def get_pod_by_id(self, pod_id):
for pod in self.pods:
if str(pod_id) == str(pod.id):
return pod
return None
class JobServer(object):
def __init__(self):
self.endpoint = None
def __str__(self):
return "{}".format(self.endpoint)
def __eq__(self, j):
return self.endpint == j.endpoint
def __ne__(self, j):
return not self == j
class Trainer(object):
def __init__(self):
self.gpus = []
self.endpoint = None
self.rank = None
def __str__(self):
return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint,
self.rank)
def __eq__(self, t):
if len(self.gpus) != len(t.gpus):
return False
if self.endpoint != t.endpoint or \
self.rank != t.rank:
return False
for a, b in zip(self.gpus, t.gpus):
if a != b:
return False
return True
def __ne__(self, t):
return not self == t
def rank(self):
return self.rank
class Pod(object):
def __init__(self):
self.rank = None
self.id = None
self.addr = None
self.port = None
self.trainers = []
self.gpus = []
def __str__(self):
return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format(
self.rank, self.id, self.addr, self.port, self.gpus,
[str(t) for t in self.trainers])
def __eq__(self, pod):
if self.rank != pod.rank or \
self.id != pod.id or \
self.addr != pod.addr or \
self.port != pod.port:
logger.debug("pod {} != pod".format(self, pod))
return False
if len(self.trainers) != len(pod.trainers):
logger.debug("trainers {} != {}".format(self.trainers,
pod.trainers))
return False
for i in range(len(self.trainers)):
if self.trainers[i] != pod.trainers[i]:
logger.debug("trainer {} != {}".format(self.trainers[i],
pod.trainers[i]))
return False
return True
def __ne__(self, pod):
return not self == pod
def parse_response(self, res_pods):
pass
def rank(self):
return self.rank
def get_visible_gpus(self):
r = ""
for g in self.gpus:
r += "{},".format(g)
assert r != "", "this pod {} can't see any gpus".format(self)
r = r[:-1]
return r
def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus):
assert type(paddle_ports) is list, "paddle_ports must be list"
cluster = Cluster(hdfs=None)
trainer_rank = 0
for node_rank, ip in enumerate(node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
for i in range(len(selected_gpus)):
trainer = Trainer()
trainer.gpus.append(selected_gpus[i])
trainer.endpoint = "%s:%d" % (ip, paddle_ports[i])
trainer.rank = trainer_rank
trainer_rank += 1
pod.trainers.append(trainer)
cluster.pods.append(pod)
pod_rank = node_ips.index(node_ip)
return cluster, cluster.pods[pod_rank]
def get_cloud_cluster(selected_gpus, args_port=None):
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips = os.getenv("PADDLE_TRAINERS")
assert node_ips is not None, "PADDLE_TRAINERS should not be None"
print("node_ips:{}".format(node_ips))
node_ip = os.getenv("POD_IP")
assert node_ip is not None, "POD_IP should not be None"
print("node_ip:{}".format(node_ip))
node_rank = os.getenv("PADDLE_TRAINER_ID")
assert node_rank is not None, "PADDLE_TRAINER_ID should not be None"
print("node_rank:{}".format(node_rank))
node_ips = node_ips.split(",")
num_nodes = len(node_ips)
node_rank = int(node_rank)
started_port = args_port
print("num_nodes:", num_nodes)
if num_nodes > 1:
try:
paddle_port = int(os.getenv("PADDLE_PORT", ""))
paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", ""))
if paddle_port_num >= len(
selected_gpus) and paddle_port != args_port:
logger.warning("Use Cloud specified port:{}.".format(
paddle_port))
started_port = paddle_port
except Exception as e:
print(e)
pass
if started_port is None:
started_port = 6170
logger.debug("parsed from args:node_ips:{} \
node_ip:{} node_rank:{} started_port:{}"
.format(node_ips, node_ip, node_rank, started_port))
ports = [x for x in range(started_port, started_port + len(selected_gpus))]
cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus)
return cluster, cluster.pods[node_rank]
def use_paddlecloud():
node_ips = os.getenv("PADDLE_TRAINERS", None)
node_ip = os.getenv("POD_IP", None)
node_rank = os.getenv("PADDLE_TRAINER_ID", None)
if node_ips is None or node_ip is None or node_rank is None:
return False
else:
return True
class TrainerProc(object):
def __init__(self):
self.proc = None
self.log_fn = None
self.log_offset = None
self.rank = None
self.local_rank = None
self.cmd = None
def start_local_trainers(cluster, pod, cmd, log_dir=None):
current_env = copy.copy(os.environ.copy())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
for idx, t in enumerate(pod.trainers):
proc_env = {
"FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
"PADDLE_TRAINER_ID": "%d" % t.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
}
current_env.update(proc_env)
logger.debug("trainer proc env:{}".format(current_env))
# cmd = [sys.executable, "-u", training_script]
logger.info("start trainer proc:{} env:{}".format(cmd, proc_env))
fn = None
if log_dir is not None:
os.system("mkdir -p {}".format(log_dir))
fn = open("%s/workerlog.%d" % (log_dir, idx), "a")
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = t.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
procs.append(proc)
return procs
...@@ -19,9 +19,14 @@ import copy ...@@ -19,9 +19,14 @@ import copy
import os import os
import sys import sys
import subprocess import subprocess
import logging
from paddlerec.core.engine.engine import Engine from paddlerec.core.engine.engine import Engine
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
import paddlerec.core.engine.cluster_utils as cluster_utils
logger = logging.getLogger("root")
logger.propagate = False
class LocalClusterEngine(Engine): class LocalClusterEngine(Engine):
...@@ -97,21 +102,49 @@ class LocalClusterEngine(Engine): ...@@ -97,21 +102,49 @@ class LocalClusterEngine(Engine):
stderr=fn, stderr=fn,
cwd=os.getcwd()) cwd=os.getcwd())
procs.append(proc) procs.append(proc)
elif fleet_mode.upper() == "COLLECTIVE": elif fleet_mode.upper() == "COLLECTIVE":
selected_gpus = self.envs["selected_gpus"].split(",") cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
selected_gpus = [
x.strip() for x in self.envs["selected_gpus"].split(",")
]
else:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in self.envs["selected_gpus"].split(","):
assert x in cuda_visible_devices_list, "Can't find "\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
selected_gpus = [
cuda_visible_devices_list.index(x.strip())
for x in self.envs["selected_gpus"].split(",")
]
selected_gpus_num = len(selected_gpus) selected_gpus_num = len(selected_gpus)
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
print("use_paddlecloud_flag:{}".format(
cluster_utils.use_paddlecloud()))
if cluster_utils.use_paddlecloud():
cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus)
logger.info("get cluster from cloud:{}".format(cluster))
procs = cluster_utils.start_local_trainers(
cluster, pod, cmd, log_dir=logs_dir)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
for i in range(selected_gpus_num - 1): for i in range(selected_gpus_num - 1):
while True: while True:
new_port = envs.find_free_port() new_port = envs.find_free_port()
if new_port not in ports: if new_port not in ports:
ports.append(new_port) ports.append(new_port)
break break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) user_endpoints = ",".join(
["127.0.0.1:" + str(x) for x in ports])
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(selected_gpus_num): for i in range(selected_gpus_num):
current_env.update({ current_env.update({
"PADDLE_TRAINER_ENDPOINTS": user_endpoints, "PADDLE_TRAINER_ENDPOINTS": user_endpoints,
......
...@@ -348,6 +348,7 @@ def cluster_engine(args): ...@@ -348,6 +348,7 @@ def cluster_engine(args):
cluster_envs["fleet_mode"] = fleet_mode cluster_envs["fleet_mode"] = fleet_mode
cluster_envs["engine_role"] = "WORKER" cluster_envs["engine_role"] = "WORKER"
cluster_envs["log_dir"] = "logs"
cluster_envs["train.trainer.trainer"] = trainer cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster" cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.executor_mode"] = executor_mode cluster_envs["train.trainer.executor_mode"] = executor_mode
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册