未验证 提交 6af531b7 编写于 作者: X xiayanming 提交者: GitHub

fleet support elastic scale up/down (#36684)

* fleet support elastic train

* fleet support elastic train

* support elastic

* add unittest

* fix unitest bug

* fix unittest bug

* fix unittest bug

* fix unittest coverage

* fix unittest coverage

* fix unittest coverage

* fix unittest coverage

* fix unittest coverage

* fix elastic bug

* fix ci fail

* fix ci fail

* fix elastic bug

* fix elastic bug

* fix joint debugging bug

* fix joint debugging bug

* fix windows ci failed

* fix windows ci failed
上级 9a9345fa
...@@ -50,7 +50,10 @@ if __name__ == '__main__': ...@@ -50,7 +50,10 @@ if __name__ == '__main__':
parser.add_argument( parser.add_argument(
"--elastic_server", type=str, help="etcd server host:port") "--elastic_server", type=str, help="etcd server host:port")
parser.add_argument("--job_id", type=str, help="job unique id") parser.add_argument("--job_id", type=str, help="job unique id")
parser.add_argument("--np", type=int, help="job pod/node number") parser.add_argument(
"--np",
type=str,
help="job pod/node number, need to be 'MIN' or 'MIN:MAX' format")
parser.add_argument("action", type=str, help="action to take") parser.add_argument("action", type=str, help="action to take")
args = parser.parse_args() args = parser.parse_args()
...@@ -58,7 +61,7 @@ if __name__ == '__main__': ...@@ -58,7 +61,7 @@ if __name__ == '__main__':
server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')
np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) np = int(args.np.split(":")[0]) or int(os.getenv('PADDLE_ELASTIC_NP', 0))
cmd = Command(server, name) cmd = Command(server, name)
......
...@@ -33,7 +33,7 @@ def enable_elastic(args, distribute_mode): ...@@ -33,7 +33,7 @@ def enable_elastic(args, distribute_mode):
if not args.job_id and not os.getenv('PADDLE_ELASTIC_JOB_ID'): if not args.job_id and not os.getenv('PADDLE_ELASTIC_JOB_ID'):
return False return False
if not args.np and not int(os.getenv('PADDLE_ELASTIC_NP', 0)): if not args.np and not os.getenv('PADDLE_ELASTIC_NP'):
return False return False
return True return True
...@@ -41,7 +41,11 @@ def enable_elastic(args, distribute_mode): ...@@ -41,7 +41,11 @@ def enable_elastic(args, distribute_mode):
def launch_elastic(args, distribute_mode): def launch_elastic(args, distribute_mode):
elastic = ElasticManager(args) server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
srv, port = server.split(':')
import etcd3
etcd_client = etcd3.client(host=srv, port=port)
elastic = ElasticManager(args, etcd_client)
signal.signal(signal.SIGTERM, elastic.signal_handler) signal.signal(signal.SIGTERM, elastic.signal_handler)
signal.signal(signal.SIGABRT, elastic.signal_handler) signal.signal(signal.SIGABRT, elastic.signal_handler)
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile
from paddle.distributed.fleet import launch_utils from paddle.distributed.fleet import launch_utils
from paddle.distributed.fleet import cloud_utils from paddle.distributed.fleet import cloud_utils
from paddle.distributed.fleet import ascend_utils from paddle.distributed.fleet import ascend_utils
......
...@@ -16,9 +16,14 @@ import time ...@@ -16,9 +16,14 @@ import time
import socket import socket
import os import os
import six import six
import copy
import logging import logging
import signal import signal
import random import random
import threading
import traceback
from paddle.distributed.fleet import cloud_utils
from paddle.distributed.fleet import launch_utils
logger = logging.getLogger("ELASTIC") logger = logging.getLogger("ELASTIC")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -30,6 +35,18 @@ logger.addHandler(ch) ...@@ -30,6 +35,18 @@ logger.addHandler(ch)
ELASTIC_EXIT_CODE = 101 ELASTIC_EXIT_CODE = 101
# wait for timeout, unit: seconds
ELASTIC_TIMEOUT = 2 * 60
# keepalived ttl, unit: seconds
ELASTIC_TTL = 60
# 1: Fault tolerance, 2: Elastic
class ElasticLevel:
FAULT_TOLERANCE = 1
ELASTIC = 2
class ElasticStatus: class ElasticStatus:
COMPLETED = "completed" COMPLETED = "completed"
...@@ -106,21 +123,52 @@ class LauncherInterface(object): ...@@ -106,21 +123,52 @@ class LauncherInterface(object):
class ElasticManager(object): class ElasticManager(object):
def __init__(self, args): def __init__(self, args, etcd_client):
self.args = args self.args = args
server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')
np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) self.min_np, self.max_np = self._parse_np(args.np)
host = args.host or os.getenv('POD_IP') host = args.host or os.getenv('POD_IP')
scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0)) scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0))
force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') force = args.force or os.getenv('PADDLE_ELASTIC_FORCE')
self.endpoints = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '') start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = int(os.environ.get('FLAGS_START_PORT'))
if cloud_utils.use_paddlecloud():
start_port = int(os.getenv("PADDLE_PORT", ""))
(self.device_mode,
self.devices_per_proc) = launch_utils.get_device_proc_info(args)
self.elastic_timeout = int(
os.getenv('PADDLE_ELASTIC_TIMEOUT', ELASTIC_TIMEOUT))
elastic_ttl = int(os.getenv('PADDLE_ELASTIC_TTL', ELASTIC_TTL))
self.dist_endpoints = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '')
self.trainers = os.getenv('PADDLE_TRAINERS', '') self.trainers = os.getenv('PADDLE_TRAINERS', '')
self.all_host_endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS',
'').split(",")
self.np = len(self.all_host_endpoints)
logger.info(f'start job with np={self.np}')
#[ "%s:%d" % (ip, start_port) for ip in self.trainers.split(",")]
logger.info(
f"trainers={self.trainers}, all_host_endpoints={self.all_host_endpoints}"
)
# auto correct the value of elastic_level
# 1: Fault tolerant, 2: Elastic
self.elastic_level = int( self.elastic_level = int(
os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL',
ElasticLevel.FAULT_TOLERANCE))
if self.min_np == self.max_np or \
(self.min_np > 0 and self.max_np == 0):
self.elastic_level = ElasticLevel.FAULT_TOLERANCE
logger.info(f'start job with ElasticLevel.FAULT_TOLERANCE')
if self.min_np > 0 and self.max_np > self.min_np:
self.elastic_level = ElasticLevel.ELASTIC
logger.info(f'start job with ElasticLevel.ELASTIC')
# compatible with kuberntes service discovery # compatible with kuberntes service discovery
if not server and os.getenv( if not server and os.getenv(
...@@ -130,8 +178,6 @@ class ElasticManager(object): ...@@ -130,8 +178,6 @@ class ElasticManager(object):
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'), os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'),
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT')) os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT'))
#elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1)
logger.debug('init with server {} host {}'.format(server, host)) logger.debug('init with server {} host {}'.format(server, host))
self.hosts = [] self.hosts = []
...@@ -140,20 +186,20 @@ class ElasticManager(object): ...@@ -140,20 +186,20 @@ class ElasticManager(object):
self.sigint = 0 self.sigint = 0
self.need_sync = False self.need_sync = False
if not server or ':' not in server or not name or not np: self.elastic_startup_time = None
if not server or ':' not in server or not name or not self.np:
logger.info( logger.info(
'Elastic is not enabled with server {} name {} and np {}'. 'Elastic is not enabled with server {} name {} and np {}'.
format(server, name, np)) format(server, name, self.np))
self.enable = False self.enable = False
return return
else: else:
self.enable = True self.enable = True
import etcd3 self.etcd = etcd_client
srv, port = server.split(':')
self.etcd = etcd3.client(host=srv, port=port)
self.host = host if host else self._get_host() self.host = host if host else self._get_host()
self.host_port = "%s:%d" % (self.host, start_port)
# etcd data # etcd data
self.prefix = "/paddle/" + name self.prefix = "/paddle/" + name
...@@ -165,67 +211,75 @@ class ElasticManager(object): ...@@ -165,67 +211,75 @@ class ElasticManager(object):
random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(6)) random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(6))
self.host_path = '{}/{}{}'.format(self.node_prefix, node_tag, self.host_path = '{}/{}{}'.format(self.node_prefix, node_tag,
time.time()) time.time())
self.np = np + scale
''' '''
0 group mode, be aware of healthy status of other workers 0 group mode, be aware of healthy status of other workers
1 decouple mode, check own status only 1 decouple mode, check own status only
''' '''
self.etcd.put(self.prefix, b'0') self.etcd.put(self.prefix, b'0')
# host # register callback
# register self host to etcd
# register watch to reset host after host been deleted
self.etcd.delete_prefix(self.node_prefix)
def host_call_back(event): def host_call_back(event):
if self.etcd.get(self.host_path)[0] == None: self.hosts = [
logger.info('register host again {}'.format(self.host)) six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix)
self.etcd.put(self.host_path, six.b(self.host)) ]
logger.info(
f"host_call_back curr_host={self.host_port}, hosts:{self.hosts}")
self.need_sync = True self.need_sync = True
self.elastic_startup_time = None
host_watch = self.etcd.add_watch_callback(self.host_path, host_watch = self.etcd.add_watch_prefix_callback(self.node_prefix,
host_call_back) host_call_back)
self.etcd.put(self.host_path, six.b(self.host)) host_lease = self.etcd.lease(elastic_ttl)
# np describes the exact number of nodes to run the job # register etcd lease heartbeat
inp = int(self.etcd.get(self.np_path)[0] or 0) def lease_heartbeat():
if scale == 0 and not force: while True:
assert inp == np or inp == 0, "np {} is not consistent with np in etcd {}".format( try:
np, inp) host_lease.refresh()
else:
assert inp == np or inp == self.np, "np {} scale to {} by {} is not allowed".format(
inp, self.np, scale)
self.etcd.put(self.np_path, six.b("%d" % (self.np)))
def np_call_back(event):
gnp = int(self.etcd.get(self.np_path)[0])
if gnp != self.np:
logger.info("scale np {} to {} ".format(self.np, gnp))
self.np = gnp
np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) hosts = [
six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix)
]
logger.info(
f"[lease_heartbeat] curr_host={self.host_port}, hosts={hosts}"
)
if self.host_port not in hosts:
logger.info(
f"[lease_heartbeat] register host={self.host_port}")
self.etcd.put(self.host_path,
six.b(self.host_port),
lease=host_lease)
except Exception as e:
logger.error("[lease_heartbeat] internal error:{} {}".
format(e, traceback.format_exc()))
break
time.sleep(elastic_ttl / 3)
keepalived_thread = threading.Thread(
name='lease_heartbeat', target=lease_heartbeat, daemon=True)
keepalived_thread.start()
self.etcd.put(self.host_path, six.b(self.host_port), lease=host_lease)
# endpoints handle DISTRIBUTED_TRAINER_ENDPOINTS and PADDLE_TRAINERS # endpoints handle DISTRIBUTED_TRAINER_ENDPOINTS and PADDLE_TRAINERS
self.etcd.put(self.endpoints_path, self.etcd.put(self.endpoints_path,
six.b('{}|{}'.format(self.endpoints, self.trainers))) six.b('{}|{}'.format(self.dist_endpoints, self.trainers)))
def endpoints_call_back(event): def endpoints_call_back(event):
if not self.endpoints: if not self.dist_endpoints:
return return
edps = six.ensure_str(self.etcd.get(self.endpoints_path)[0] or '') edps = six.ensure_str(self.etcd.get(self.endpoints_path)[0] or '')
self.endpoints, self.trainers = edps.split('|') self.dist_endpoints, self.trainers = edps.split('|')
logger.info("set DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( logger.info("set DISTRIBUTED_TRAINER_ENDPOINTS {} ".format(
self.endpoints)) self.dist_endpoints))
logger.info("set PADDLE_TRAINERS {} ".format(self.trainers)) logger.info("set PADDLE_TRAINERS {} ".format(self.trainers))
endpoints_watch = self.etcd.add_watch_callback(self.endpoints_path, endpoints_watch = self.etcd.add_watch_callback(self.endpoints_path,
endpoints_call_back) endpoints_call_back)
self.watches = [host_watch, np_watch, endpoints_watch] self.watches = [host_watch, endpoints_watch]
self.launcher = None self.launcher = None
def exit(self, completed=False): def exit(self, completed=False):
...@@ -248,6 +302,30 @@ class ElasticManager(object): ...@@ -248,6 +302,30 @@ class ElasticManager(object):
if len(hosts) == 0: if len(hosts) == 0:
self.etcd.delete_prefix(self.prefix) self.etcd.delete_prefix(self.prefix)
def _parse_np(self, np: str):
"""
np format is "MIN" or "MIN:MAX"
"""
np_str = np or os.getenv('PADDLE_ELASTIC_NP', "0")
np_dict = np_str.split(":")
min_np = max_np = 0
if len(np_dict) == 1:
# Fault tolerant
min_np = int(np_dict[0])
min_np = 1 if min_np <= 0 else min_np
max_np = 1
elif len(np_dict) == 2:
# Elastic
min_np = int(np_dict[0])
max_np = int(np_dict[1])
min_np = 1 if min_np <= 0 else min_np
max_np = min_np if min_np > max_np else max_np
else:
raise ValueError(
f'the np={np} needs to be in "MIN" or "MIN:MAX" format')
return min_np, max_np
def _get_host(self): def _get_host(self):
try: try:
return socket.gethostbyname(socket.getfqdn(socket.gethostname())) return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
...@@ -260,40 +338,166 @@ class ElasticManager(object): ...@@ -260,40 +338,166 @@ class ElasticManager(object):
return int(self.etcd.get(self.prefix)[0]) == 1 return int(self.etcd.get(self.prefix)[0]) == 1
def _match(self): def _match(self, host_list: list=None):
if host_list:
self.hosts = host_list
else:
self.hosts = [ self.hosts = [
six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix)
] ]
if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
if len(self.hosts) == self.np: if len(self.hosts) == self.np:
return True return True
else: else:
return False return False
if self.elastic_level == ElasticLevel.ELASTIC:
# FIXME(xym) add freeze status
hosts_num = len(self.hosts)
if hosts_num == self.np:
return True
if not self.elastic_startup_time:
self.elastic_startup_time = time.time()
if hosts_num == self.max_np:
self.elastic_startup_time = None
return True
elif hosts_num >= self.min_np and hosts_num < self.max_np:
interval_time = time.time() - self.elastic_startup_time
if interval_time <= self.elastic_timeout:
logger.info(
f"wait for timeout, you can set value by PADDLE_ELASTIC_TIMEOUT, \
hosts_num={hosts_num}, min_np={self.min_np}, \
interval_time={interval_time}, elastic_timeout={self.elastic_timeout}"
)
return False
return True
else:
self.elastic_startup_time = None
return False
return False
def _update_endpoint(self, endpoints, hosts):
self.etcd.put(self.endpoints_path,
six.b('{}|{}'.format(endpoints, hosts)))
def _update_hosts(self): def _update_hosts(self):
assert len(self.hosts) != 0, 'hosts empty' assert len(self.hosts) != 0, 'hosts empty'
rank = int(os.getenv('PADDLE_TRAINER_ID', -1))
if self.host in self.endpoints: if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.endpoints if self.host_port in self.dist_endpoints:
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints
os.environ['PADDLE_TRAINERS'] = self.trainers os.environ['PADDLE_TRAINERS'] = self.trainers
logger.info("update env DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( logger.info("update env DISTRIBUTED_TRAINER_ENDPOINTS {} ".
self.endpoints)) format(self.dist_endpoints))
logger.info("update env PADDLE_TRAINERS {} ".format(self.trainers)) logger.info("update env PADDLE_TRAINERS {} ".format(
self.trainers))
return return
rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) # fault tolerance
idx = self.hosts.index(self.host) idx = self.hosts.index(self.host_port)
# swap if self.host not in the right position # swap if self.host not in the right position
if rank >= 0: if rank >= 0:
self.hosts[idx] = self.hosts[rank] self.hosts[idx] = self.hosts[rank]
self.hosts[rank] = self.host self.hosts[rank] = self.host_port
else: else:
os.environ['PADDLE_TRAINER_ID'] = '{}'.format(idx) os.environ['PADDLE_TRAINER_ID'] = '{}'.format(idx)
hosts = ','.join(
[host_port.split(":")[0] for host_port in self.hosts])
self.args.ips = hosts
os.environ['PADDLE_TRAINERS'] = hosts
else:
# elastic, scale up/down
endpoints = copy.deepcopy(self.all_host_endpoints)
if len(self.hosts) > self.np:
# scale up
logger.info(
f"elastic scale up, from {self.np} to {len(self.hosts)}, hosts={self.hosts}, endpoints={endpoints}"
)
hosts = ','.join(self.hosts) for curr_host_port in self.hosts:
if curr_host_port not in endpoints:
endpoints.append(curr_host_port)
os.environ['PADDLE_TRAINER_ID'] = '{}'.format(
endpoints.index(self.host_port))
hosts = ','.join(
[host_port.split(":")[0] for host_port in endpoints])
self.args.ips = hosts self.args.ips = hosts
os.environ['PADDLE_TRAINERS'] = hosts os.environ['PADDLE_TRAINERS'] = hosts
self.np = len(endpoints)
os.environ['PADDLE_TRAINER_ENDPOINTS'] = ','.join(endpoints)
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints
self.all_host_endpoints = endpoints
else:
# scale down
logger.info(
f"elastic scale down, from {len(self.hosts)} to {self.np}, hosts={self.hosts}, endpoints={endpoints}"
)
# If the shrink node is from the first of the rank list, you need to minimize the movement of the rank
# eg:
# the source trainers is:10.10.10.0,10.10.10.1,10.10.10.2,10.10.10.3
# 10.10.10.0 is removed
# the new trainers is:10.10.10.3,10.10.10.1,10.10.10.2
# In this case, the rank of 10.10.10.1 and 10.10.10.2 remains unchanged, while the rank of 10.10.10.3 is set to rank0
endpoints_dict = dict()
unsorted_endpoints = []
for id, host_port in enumerate(self.hosts):
idx = endpoints.index(host_port)
if idx <= len(self.hosts) - 1 and not endpoints_dict.get(
idx):
endpoints_dict[idx] = host_port
else:
unsorted_endpoints.append(host_port)
idle_index = 0
sorted_endpoints = []
for idx in range(len(self.hosts)):
if not endpoints_dict.get(idx) and len(
unsorted_endpoints) > 0:
endpoints_dict[idx] = unsorted_endpoints[idle_index]
idle_index += 1
sorted_endpoints.append(endpoints_dict.get(idx))
logger.info(
f"elastic scale down, sorted_endpoints={sorted_endpoints}")
self.all_host_endpoints = sorted_endpoints
endpoint_list = []
ip_list = []
for host_port in sorted_endpoints:
host_port_list = host_port.split(":")
ip = host_port_list[0]
port = int(host_port_list[1])
ip_list.append(ip)
ports = [
x
for x in range(port, port + len(self.devices_per_proc))
]
endpoint_list.extend(
["%s:%d" % (ip, port) for port in ports])
hosts = ','.join(ip_list)
new_endpoints = ','.join(endpoint_list)
self.args.ips = hosts
os.environ['PADDLE_TRAINER_ID'] = '{}'.format(
sorted_endpoints.index(self.host_port))
os.environ['PADDLE_TRAINERS'] = hosts
self.np = len(sorted_endpoints)
os.environ['PADDLE_TRAINER_ENDPOINTS'] = ','.join(
sorted_endpoints)
os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = new_endpoints
self._update_endpoint(new_endpoints, hosts)
def wait(self): def wait(self):
if not self.enable: if not self.enable:
...@@ -307,13 +511,6 @@ class ElasticManager(object): ...@@ -307,13 +511,6 @@ class ElasticManager(object):
return return
logger.info('not ready for np {} with hosts {}'.format(self.np, logger.info('not ready for np {} with hosts {}'.format(self.np,
self.hosts)) self.hosts))
# reset hosts every 30s to prevent fake deadlock
if idx % 10 == 0:
self.etcd.delete_prefix(self.node_prefix)
logger.info('reset np {} with hosts {}'.format(self.np,
self.hosts))
idx += 1 idx += 1
time.sleep(2) time.sleep(2)
...@@ -333,6 +530,7 @@ class ElasticManager(object): ...@@ -333,6 +530,7 @@ class ElasticManager(object):
while not self.stopped: while not self.stopped:
ret = self.launcher.watch() ret = self.launcher.watch()
logger.debug(f"launcher.watch():{ret}")
if ret is not None: # self terminated if ret is not None: # self terminated
logger.info('job exit with code {}'.format(ret)) logger.info('job exit with code {}'.format(ret))
...@@ -341,7 +539,7 @@ class ElasticManager(object): ...@@ -341,7 +539,7 @@ class ElasticManager(object):
self.exit(completed=completed) self.exit(completed=completed)
if completed: if completed:
return ElasticStatus.COMPLETED return ElasticStatus.COMPLETED
if self.elastic_level == 1: if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
return ElasticStatus.RESTART return ElasticStatus.RESTART
else: else:
return ElasticStatus.ERROR return ElasticStatus.ERROR
...@@ -354,6 +552,7 @@ class ElasticManager(object): ...@@ -354,6 +552,7 @@ class ElasticManager(object):
if self.launcher: if self.launcher:
self.launcher.stop() self.launcher.stop()
return ElasticStatus.EXIT return ElasticStatus.EXIT
def signal_handler(self, sigint, frame): def signal_handler(self, sigint, frame):
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import unittest
import argparse
from warnings import catch_warnings
from paddle.distributed.fleet.elastic import enable_elastic, launch_elastic
from paddle.distributed.fleet.launch_utils import DistributeMode
class TestElasticInit(unittest.TestCase):
def setUp(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2:4"
self.args = Argument()
def test_enable_elastic(self):
result = enable_elastic(self.args, DistributeMode.COLLECTIVE)
self.assertEqual(result, True)
def test_launch_elastic(self):
try:
launch_elastic(self.args, DistributeMode.COLLECTIVE)
except Exception as e:
pass
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import unittest
import argparse
from paddle.distributed.fleet.elastic.manager import ElasticManager
from paddle.distributed.fleet.elastic.manager import ELASTIC_TIMEOUT
class MockLease():
def refresh(self):
pass
class MockEtcdClient:
def __init__(self, lease=None):
self._lease = lease
def put(self, key, value, lease=None):
pass
def get(self, key):
value = "0"
return value, value
def delete_prefix(self, key):
pass
def get_prefix(self, key_prefix):
hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
return hosts
def add_watch_callback(self, *args, **kwargs):
return "host_watch"
def add_watch_prefix_callback(self, key_prefix, callback, **kwargs):
callback(None)
return "host_watch"
def cancel_watch(self, watch_id):
pass
def delete(self, key):
pass
def lease(self, ttl):
if self._lease:
return self._lease
else:
return MockLease()
class TestElasticManager(unittest.TestCase):
def setUp(self):
self.etcd_client = MockEtcdClient()
def test_elastic_manager_init(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2"
gpus = "0"
nproc_per_node = 1
host = None
host_port = None
scale = None
force = None
backend = 'gloo'
args = Argument()
class _MockLease():
def refresh(self):
raise ValueError("valid error, this only for unittest")
etcd_client = MockEtcdClient(lease=_MockLease())
elastic = ElasticManager(args, etcd_client=etcd_client)
def test_match_faulttolerance(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2"
gpus = "0"
nproc_per_node = 1
host = None
host_port = None
scale = None
force = None
backend = 'gloo'
args = Argument()
elastic = ElasticManager(args, self.etcd_client)
hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
self.assertEqual(elastic._match(hosts), True)
hosts = ["10.10.10.1:6001"]
os.environ['PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001"
self.assertEqual(elastic._match(hosts), False)
def test_match_elastic(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2:4"
gpus = "0"
nproc_per_node = 1
host = None
host_port = None
scale = None
force = None
backend = 'gloo'
os.environ['PADDLE_ELASTIC_TIMEOUT'] = "60"
args = Argument()
os.environ['FLAGS_START_PORT'] = "6001"
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001,10.10.10.4:6001"
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001,10.10.10.4:6001"
elastic = ElasticManager(args, self.etcd_client)
hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
self.assertEqual(elastic._match(hosts), False)
hosts = [
"10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001",
"10.10.10.4:6001"
]
self.assertEqual(elastic._match(hosts), True)
hosts = ["10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"]
self.assertEqual(elastic._match(hosts), False)
hosts = ["10.10.10.1:6001"]
self.assertEqual(elastic._match(hosts), False)
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
elastic = ElasticManager(args, self.etcd_client)
hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
self.assertEqual(elastic._match(hosts), True)
# TODO test timeout
#time.sleep(60)
#self.assertEqual(elastic._match(hosts), True)
def test_update_hosts_for_faulttolerance(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "0"
gpus = "0"
nproc_per_node = 1
host = None
host_port = None
scale = None
force = None
backend = 'gloo'
args = Argument()
os.environ['FLAGS_START_PORT'] = "6001"
os.environ['PADDLE_ELASTIC_NP'] = "2"
os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.2"
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
elastic = ElasticManager(args, self.etcd_client)
# add 10.10.10.3:6001
os.environ['PADDLE_TRAINER_ID'] = "0"
elastic.host_port = "10.10.10.1:6001"
elastic.hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
elastic._update_hosts()
self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.2")
# add 10.10.10.3:6001
elastic.host_port = "10.10.10.3:6001"
elastic.hosts = ["10.10.10.1:6001", "10.10.10.3:6001"]
os.environ['PADDLE_TRAINER_ID'] = "1"
elastic._update_hosts()
self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.3")
elastic.host_port = "10.10.10.3:6001"
elastic.hosts = ["10.10.10.1:6001", "10.10.10.3:6001"]
os.environ['PADDLE_TRAINER_ID'] = "-1"
elastic._update_hosts()
self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.3")
def test_update_hosts_for_elastic(self):
#######################
# elastic, scale up #
#######################
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2:4"
gpus = "0"
nproc_per_node = 1
host = None
host_port = None
scale = None
force = None
backend = 'gloo'
args = Argument()
os.environ['FLAGS_START_PORT'] = "6001"
os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.2"
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
elastic = ElasticManager(args, self.etcd_client)
# add 10.10.10.3:6001
elastic.host_port = "10.10.10.1:6001"
elastic.hosts = [
"10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"
]
elastic._update_hosts()
#self.assertEqual(elastic.all_host_endpoints,
# ["10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"])
self.assertEqual(
os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.2,10.10.10.3")
#######################
# elastic, scale down #
#######################
os.environ[
'PADDLE_TRAINERS'] = "10.10.10.0,10.10.10.1,10.10.10.2,10.10.10.3"
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.0:6000,10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001"
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.0:6000,10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001"
elastic = ElasticManager(args, self.etcd_client)
# remove 10.10.10.1:6001
elastic.host_port = "10.10.10.1:6001"
elastic.hosts = [
"10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"
]
elastic._update_hosts()
#self.assertEqual(elastic.all_host_endpoints,
# ["10.10.10.3:6001", "10.10.10.1:6001", "10.10.10.2:6001"])
self.assertEqual(
os.getenv('PADDLE_TRAINERS'), "10.10.10.3,10.10.10.1,10.10.10.2")
self.assertEqual(
os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS'),
"10.10.10.3:6001,10.10.10.1:6001,10.10.10.2:6001")
############
os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.1"
os.environ[
'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.1:6002,10.10.10.1:6003,10.10.10.1:6004"
os.environ[
'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.1:6002,10.10.10.1:6003,10.10.10.1:6004"
elastic = ElasticManager(args, self.etcd_client)
# remove 10.10.10.1:6001
elastic.host_port = "10.10.10.1:6001"
os.environ['PADDLE_TRAINER_ID'] = "-1"
elastic.hosts = ["10.10.10.1:6001", "10.10.10.1:6001"]
elastic._update_hosts()
#self.assertEqual(elastic.all_host_endpoints,
# ["10.10.10.1:6001", "10.10.10.1:6001"])
self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.1")
self.assertEqual(
os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS'),
"10.10.10.1:6001,10.10.10.1:6001")
def test_exit(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2"
gpus = "0"
nproc_per_node = 1
host = None
host_port = None
scale = None
force = None
backend = 'gloo'
args = Argument()
elastic = ElasticManager(args, self.etcd_client)
elastic.exit()
if __name__ == "__main__":
unittest.main()
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
echo "begin test elastic" echo "begin test elastic"
unset GREP_OPTIONS unset GREP_OPTIONS
rm -rf log rm -rf log*
pids=`ps -ef | grep "python -m paddle.distributed.launch elastic_demo.[py]" | awk '{print $2}'` pids=`ps -ef | grep "python -m paddle.distributed.launch elastic_demo.[py]" | awk '{print $2}'`
if [ -n "$pids" ]; then if [ -n "$pids" ]; then
...@@ -28,6 +28,11 @@ fi ...@@ -28,6 +28,11 @@ fi
python -m pip install --no-cache-dir etcd3 -i https://mirror.baidu.com/pypi/simple python -m pip install --no-cache-dir etcd3 -i https://mirror.baidu.com/pypi/simple
#############################
#### test fault tolrance ####
#############################
# common env # common env
export PADDLE_ELASTIC_NP=2 export PADDLE_ELASTIC_NP=2
export PADDLE_ELASTIC_SERVER=127.0.0.1:2379 export PADDLE_ELASTIC_SERVER=127.0.0.1:2379
...@@ -137,7 +142,7 @@ export PADDLE_TRAINER_ID=1 ...@@ -137,7 +142,7 @@ export PADDLE_TRAINER_ID=1
export PADDLE_TRAINERS_NUM=2 export PADDLE_TRAINERS_NUM=2
python -m paddle.distributed.launch elastic_demo.py &> log_1.log & python -m paddle.distributed.launch elastic_demo.py &> log_1.log &
p1=$! p1_1=$!
for i in {1..10} for i in {1..10}
do do
...@@ -184,7 +189,7 @@ export PADDLE_TRAINER_ID=0 ...@@ -184,7 +189,7 @@ export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=2 export PADDLE_TRAINERS_NUM=2
python -m paddle.distributed.launch elastic_demo.py &> log_0.log & python -m paddle.distributed.launch elastic_demo.py &> log_0.log &
p0=$! p0_1=$!
for i in {1..10} for i in {1..10}
do do
...@@ -205,4 +210,102 @@ check_env ...@@ -205,4 +210,102 @@ check_env
echo "All check done" echo "All check done"
sleep 3 sleep 3
kill $p0 $p1 kill $p0 $p1 $p0_1 $p1_1
#############################
##### test elastic #####
#############################
# common env
export PADDLE_ELASTIC_NP=2:4
export PADDLE_ELASTIC_SERVER=127.0.0.1:2379
export PADDLE_ELASTIC_JOB_ID=elastic-demo-2
# run node 0
export NVIDIA_VISIBLE_DEVICES=0
export CUDA_VISIBLE_DEVICES=0
export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.1:8001,10.10.10.2:8001,10.10.10.3:8001
export PADDLE_TRAINERS=10.10.10.1,10.10.10.2,10.10.10.3
export TRAINER_PORTS_NUM=1
export POD_IP=10.10.10.1
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=3
python -m paddle.distributed.launch elastic_demo.py &> log_pe_0.log &
pe_0=$!
for i in {1..10}
do
if grep -q "INFO:ELASTIC:not ready" log_pe_0.log; then
echo "run node 0 ok"
break
else
sleep 10
fi
if [ $i -eq 10 ]; then
echo "run node 0 error"
exit -1
fi
done
# run node 1
export NVIDIA_VISIBLE_DEVICES=1
export CUDA_VISIBLE_DEVICES=1
export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.1:8001,10.10.10.2:8001,10.10.10.3:8001
export PADDLE_TRAINERS=10.10.10.1,10.10.10.2,10.10.10.3
export TRAINER_PORTS_NUM=1
export POD_IP=10.10.10.2
export PADDLE_TRAINER_ID=1
export PADDLE_TRAINERS_NUM=3
python -m paddle.distributed.launch elastic_demo.py &> log_pe_1.log &
pe_1=$!
for i in {1..10}
do
if grep -q "INFO:ELASTIC:not ready" log_pe_1.log; then
echo "run node 1 ok"
break
else
sleep 10
fi
if [ $i -eq 10 ]; then
echo "run node 1 error"
exit -1
fi
done
# run node 2
export NVIDIA_VISIBLE_DEVICES=1
export CUDA_VISIBLE_DEVICES=1
export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.1:8001,10.10.10.2:8001,10.10.10.3:8001
export PADDLE_TRAINERS=10.10.10.1,10.10.10.2,10.10.10.3
export TRAINER_PORTS_NUM=1
export POD_IP=10.10.10.3
export PADDLE_TRAINER_ID=2
export PADDLE_TRAINERS_NUM=3
python -m paddle.distributed.launch elastic_demo.py &> log_pe_2.log &
pe_2=$!
for i in {1..10}
do
if grep -q "INFO:ELASTIC:ready with hosts" log_pe_2.log; then
echo "run node 2 ok"
break
else
sleep 10
fi
if [ $i -eq 10 ]; then
echo "run node 2 error"
exit -1
fi
done
lw0="log/workerlog.0"
check_env
echo "All check done"
sleep 3
kill $pe_0 $pe_1 $pe_2
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册