未验证 提交 d5a66fd7 编写于 作者: D danleifeng 提交者: GitHub

【paddle.fleet】support multi-node cpu training for fleetrun (#26011)

* support multi-ps training mode for fleetrun; test=develop
上级 0067a2e4
......@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
paddle.distributed.launch is a module that spawns multiple distributed
paddle.distributed.launch is a module that spawns multiple distributed
process on each training node for gpu training and cpu training.
Usage:
In both of single node training or multiple node training, this module
In both of single node training or multiple node training, this module
launch a process on each of the given gpu card or cpu machine.
GPU training:
1. for single node training with all visible gpu cards:
......@@ -24,11 +24,10 @@ launch a process on each of the given gpu card or cpu machine.
fleetrun --gpus="0,1,2,3" your_training_py (arg1 arg2 and all others)
3. for multiple node training such as two node:192.168.0.16, 192.168.0.17
on 192.168.0.16:
fleetrun --ips="192.168.0.16,192.168.0.17" --node_ip=192.168.0.16 \
fleetrun --ips="192.168.0.16,192.168.0.17" \
your_training_py (arg1 arg2 and all others)
on 192.168.0.17:
fleetrun --ips="192.168.0.16,192.168.0.17" \
--node_ip=192.168.0.17 \
your_training_py (arg1 arg2 and all others)
CPU training:
1. for single node training with multi servers and workers:
......@@ -96,15 +95,14 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
"--servers", type=str, default="", help="User defined servers ip:port")
parser.add_argument(
"--workers", type=str, default="", help="User defined workers ip:port")
parser.add_argument(
"--worker_num", type=int, default=2, help="number of workers")
parser.add_argument("--worker_num", type=int, help="number of workers")
parser.add_argument(
"--server_num", type=int, default=2, help="number of servers")
parser.add_argument("--server_num", type=int, help="number of servers")
parser.add_argument(
"--log_dir",
type=str,
default="log",
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)
#positional
......@@ -129,11 +127,11 @@ def get_cluster_from_args(args, gpus):
_, node_ip = get_host_name_ip()
# node_ip = args.node_ip
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips:{%s}" \
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \
% (node_ip, node_ips)
node_rank = node_ips.index(node_ip)
logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format(
logger.debug("parsed from args: node_ips:{} node_ip:{} node_rank:{}".format(
node_ips, node_ip, node_rank))
free_ports = None
......@@ -187,8 +185,11 @@ def launch_collective(args):
cluster = None
pod = None
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus)
cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus, start_port)
logger.info("get cluster from cloud:{}".format(cluster))
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
......@@ -213,11 +214,78 @@ def launch_collective(args):
def launch_ps(args):
worker_num = args.worker_num
server_num = args.server_num
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
ports = None
if args.server_num:
server_num = args.server_num
ports = get_ports(server_num, 0)
server_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
else:
assert args.servers != "", "The setting of CPU mode must be either server_num or servers."
server_endpoints = args.servers
server_endpoints_ips = [
x.strip().split(":")[0] for x in server_endpoints.split(",")
]
server_endpoints_port = [
x.strip().split(":")[1] for x in server_endpoints.split(",")
]
server_num = len(server_endpoints_ips)
if args.worker_num:
worker_num = args.worker_num
ports = get_ports(worker_num, server_num)
worker_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
else:
assert args.workers != "", "The setting of CPU mode must be either worker_num or workers."
worker_endpoints = args.workers
worker_endpoints_ips = [
x.strip().split(":")[0] for x in worker_endpoints.split(",")
]
worker_endpoints_port = [
x.strip().split(":")[1] for x in worker_endpoints.split(",")
]
worker_num = len(worker_endpoints_ips)
node_ips = list(set(server_endpoints_ips + worker_endpoints_ips))
# local train
if len(set(node_ips)) == 1:
current_node_ip = node_ips[0]
else:
_, current_node_ip = get_host_name_ip()
assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (current_node_ip, node_ips)
node_rank = node_ips.index(current_node_ip)
logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}".
format(node_ips, current_node_ip, node_rank, server_endpoints_port))
cluster = Cluster(hdfs=None)
server_rank = 0
worker_rank = 0
for node_rank, ip in enumerate(node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
for i in range(len(server_endpoints_ips)):
if ip == server_endpoints_ips[i]:
server = Trainer()
server.endpoint = "%s:%s" % (ip, server_endpoints_port[i])
server.rank = server_rank
server_rank += 1
pod.servers.append(server)
for j in range(len(worker_endpoints_ips)):
if ip == worker_endpoints_ips[j]:
worker = Trainer()
worker.endpoint = "%s:%s" % (ip, worker_endpoints_port[i])
worker.rank = worker_rank
worker_rank += 1
pod.workers.append(worker)
cluster.pods.append(pod)
pod_rank = node_ips.index(current_node_ip)
pod = cluster.pods[pod_rank]
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
......@@ -225,68 +293,78 @@ def launch_ps(args):
procs = []
cmds = []
log_fns = []
ports = range(start_port, start_port + server_num, 1)
default_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints = ""
if args.servers == "":
user_endpoints = default_endpoints
else:
user_endpoints = args.servers
user_endpoints_ips = [x.split(":")[0] for x in user_endpoints.split(",")]
user_endpoints_port = [x.split(":")[1] for x in user_endpoints.split(",")]
for i in range(server_num):
for idx, cur_server in enumerate(pod.servers):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
"POD_IP": cur_server.endpoint.split(":")[0]
})
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/serverlog.%d" % (args.log_dir, i), "w")
fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
procs.append(proc)
for i in range(worker_num):
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_server.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = 0 if fn else None
tp.cmd = cmd
procs.append(tp)
for idx, cur_worker in enumerate(pod.workers):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
"PADDLE_TRAINER_ID": str(cur_worker.rank)
})
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
procs.append(proc)
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_worker.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = 0 if fn else None
tp.cmd = cmd
procs.append(tp)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
if i < len(pod.servers):
continue
procs[i].wait()
procs[i].proc.wait()
if len(log_fns) > 0:
log_fns[i].close()
print("all workers exit, going to finish parameter server", file=sys.stderr)
for i in range(server_num):
for i in range(len(pod.servers)):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
procs[i].proc.terminate()
print("all parameter server are killed", file=sys.stderr)
......@@ -303,11 +381,15 @@ def launch():
co_arg for co_arg in collective_args
if co_arg in " ".join(sys.argv[1:-1])
]
if len(has_ps_args) > 0 or fluid.core.get_cuda_device_count() == 0:
logger.info("Run cpu parameter-sever mode.")
cuda_device_num = fluid.core.get_cuda_device_count()
if len(has_ps_args) > 0 or cuda_device_num == 0:
logger.info(
"Run parameter-sever cpu mode. pserver args:{}, cuda count:{}".
format(has_ps_args, cuda_device_num))
launch_ps(args)
elif len(has_collective_args) > 0:
logger.info("Run gpu collective mode.")
logger.info("Run collective gpu mode. gpu args:{}, cuda count:{}".
format(has_collective_args, cuda_device_num))
launch_collective(args)
else:
logger.warning(
......
......@@ -142,12 +142,16 @@ class Pod(object):
self.addr = None
self.port = None
self.trainers = []
self.servers = []
self.workers = []
self.gpus = []
def __str__(self):
return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format(
self.rank, self.id, self.addr, self.port, self.gpus,
[str(t) for t in self.trainers])
return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \
workers:{}".format(self.rank, self.id, self.addr, self.port,
self.gpus, [str(t) for t in self.trainers],
[str(s) for s in self.servers],
[str(w) for w in self.workers])
def __eq__(self, pod):
if self.rank != pod.rank or \
......@@ -168,6 +172,26 @@ class Pod(object):
pod.trainers[i]))
return False
if len(self.servers) != len(pod.servers):
logger.debug("servers {} != {}".format(self.servers, pod.servers))
return False
for i in range(len(self.servers)):
if self.servers[i] != pod.servers[i]:
logger.debug("servers {} != {}".format(self.servers[i],
pod.servers[i]))
return False
if len(self.workers) != len(pod.workers):
logger.debug("workers {} != {}".format(self.workers, pod.workers))
return False
for i in range(len(self.workers)):
if self.workers[i] != pod.workers[i]:
logger.debug("workers {} != {}".format(self.workers[i],
pod.workers[i]))
return False
return True
def __ne__(self, pod):
......@@ -303,6 +327,17 @@ def find_free_ports(num):
return None
def get_ports(num, offset):
if os.environ.get('FLAGS_START_PORT') is None:
ports = find_free_ports(num)
if ports is not None:
ports = list(ports)
else:
start_port = os.environ.get('FLAGS_START_PORT')
ports = range(start_port + offset, start_port + offset + num, 1)
return ports
class TrainerProc(object):
def __init__(self):
self.proc = None
......
......@@ -10,6 +10,14 @@ function test_launch_ps(){
echo "test pserver launch failed"
exit -1
fi
fleetrun --servers="120.0.0.1:6780,120.0.0.1:6781" --workers="120.0.0.1:6782,120.0.0.1:6783" fleet_ps_training.py 2> ut.elog
if grep -q "server are killed" ut.elog; then
echo "test pserver launch succeed"
else
echo "test pserver launch failed"
exit -1
fi
}
if [[ ${WITH_GPU} == "OFF" ]]; then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册