diff --git a/python/paddle/distributed/elastic.py b/python/paddle/distributed/elastic.py index e6f21f6603d8dad6c802663e9ea6940d7eb0826e..52f36a227f1c86a79933d9088036b0cc0b2f0df8 100644 --- a/python/paddle/distributed/elastic.py +++ b/python/paddle/distributed/elastic.py @@ -50,7 +50,10 @@ if __name__ == '__main__': parser.add_argument( "--elastic_server", type=str, help="etcd server host:port") parser.add_argument("--job_id", type=str, help="job unique id") - parser.add_argument("--np", type=int, help="job pod/node number") + parser.add_argument( + "--np", + type=str, + help="job pod/node number, need to be 'MIN' or 'MIN:MAX' format") parser.add_argument("action", type=str, help="action to take") args = parser.parse_args() @@ -58,7 +61,7 @@ if __name__ == '__main__': server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') - np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) + np = int(args.np.split(":")[0]) or int(os.getenv('PADDLE_ELASTIC_NP', 0)) cmd = Command(server, name) diff --git a/python/paddle/distributed/fleet/elastic/__init__.py b/python/paddle/distributed/fleet/elastic/__init__.py index 1ac81729d5430a5b5174e6b07bda1ca0d0f5a971..b928d82fb399f3cfcef56c78a8054a1612b3c8c0 100644 --- a/python/paddle/distributed/fleet/elastic/__init__.py +++ b/python/paddle/distributed/fleet/elastic/__init__.py @@ -33,7 +33,7 @@ def enable_elastic(args, distribute_mode): if not args.job_id and not os.getenv('PADDLE_ELASTIC_JOB_ID'): return False - if not args.np and not int(os.getenv('PADDLE_ELASTIC_NP', 0)): + if not args.np and not os.getenv('PADDLE_ELASTIC_NP'): return False return True @@ -41,7 +41,11 @@ def enable_elastic(args, distribute_mode): def launch_elastic(args, distribute_mode): - elastic = ElasticManager(args) + server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') + srv, port = server.split(':') + import etcd3 + etcd_client = etcd3.client(host=srv, port=port) + elastic = ElasticManager(args, etcd_client) signal.signal(signal.SIGTERM, elastic.signal_handler) signal.signal(signal.SIGABRT, elastic.signal_handler) diff --git a/python/paddle/distributed/fleet/elastic/collective.py b/python/paddle/distributed/fleet/elastic/collective.py index 83f0e85db2badfeff4865f974dd2c9ef2053b0a1..d9c2735c4bd0191ac4c71e3a4301addd0192dc93 100644 --- a/python/paddle/distributed/fleet/elastic/collective.py +++ b/python/paddle/distributed/fleet/elastic/collective.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile from paddle.distributed.fleet import launch_utils from paddle.distributed.fleet import cloud_utils from paddle.distributed.fleet import ascend_utils diff --git a/python/paddle/distributed/fleet/elastic/manager.py b/python/paddle/distributed/fleet/elastic/manager.py index 2a344e92765d94e43079f7a4ed14aa51447ae179..cfb0d1ead5aa151f9239588be8d4585bec00186a 100644 --- a/python/paddle/distributed/fleet/elastic/manager.py +++ b/python/paddle/distributed/fleet/elastic/manager.py @@ -16,9 +16,14 @@ import time import socket import os import six +import copy import logging import signal import random +import threading +import traceback +from paddle.distributed.fleet import cloud_utils +from paddle.distributed.fleet import launch_utils logger = logging.getLogger("ELASTIC") logger.setLevel(logging.INFO) @@ -30,6 +35,18 @@ logger.addHandler(ch) ELASTIC_EXIT_CODE = 101 +# wait for timeout, unit: seconds +ELASTIC_TIMEOUT = 2 * 60 + +# keepalived ttl, unit: seconds +ELASTIC_TTL = 60 + + +# 1: Fault tolerance, 2: Elastic +class ElasticLevel: + FAULT_TOLERANCE = 1 + ELASTIC = 2 + class ElasticStatus: COMPLETED = "completed" @@ -106,21 +123,52 @@ class LauncherInterface(object): class ElasticManager(object): - def __init__(self, args): + def __init__(self, args, etcd_client): self.args = args server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') - np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) + self.min_np, self.max_np = self._parse_np(args.np) host = args.host or os.getenv('POD_IP') scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0)) force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') - self.endpoints = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '') - self.trainers = os.getenv('PADDLE_TRAINERS', '') + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = int(os.environ.get('FLAGS_START_PORT')) + if cloud_utils.use_paddlecloud(): + start_port = int(os.getenv("PADDLE_PORT", "")) + (self.device_mode, + self.devices_per_proc) = launch_utils.get_device_proc_info(args) + + self.elastic_timeout = int( + os.getenv('PADDLE_ELASTIC_TIMEOUT', ELASTIC_TIMEOUT)) + elastic_ttl = int(os.getenv('PADDLE_ELASTIC_TTL', ELASTIC_TTL)) + self.dist_endpoints = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '') + self.trainers = os.getenv('PADDLE_TRAINERS', '') + self.all_host_endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS', + '').split(",") + self.np = len(self.all_host_endpoints) + logger.info(f'start job with np={self.np}') + + #[ "%s:%d" % (ip, start_port) for ip in self.trainers.split(",")] + logger.info( + f"trainers={self.trainers}, all_host_endpoints={self.all_host_endpoints}" + ) + + # auto correct the value of elastic_level + # 1: Fault tolerant, 2: Elastic self.elastic_level = int( - os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) + os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', + ElasticLevel.FAULT_TOLERANCE)) + if self.min_np == self.max_np or \ + (self.min_np > 0 and self.max_np == 0): + self.elastic_level = ElasticLevel.FAULT_TOLERANCE + logger.info(f'start job with ElasticLevel.FAULT_TOLERANCE') + if self.min_np > 0 and self.max_np > self.min_np: + self.elastic_level = ElasticLevel.ELASTIC + logger.info(f'start job with ElasticLevel.ELASTIC') # compatible with kuberntes service discovery if not server and os.getenv( @@ -130,8 +178,6 @@ class ElasticManager(object): os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'), os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT')) - #elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1) - logger.debug('init with server {} host {}'.format(server, host)) self.hosts = [] @@ -140,20 +186,20 @@ class ElasticManager(object): self.sigint = 0 self.need_sync = False - if not server or ':' not in server or not name or not np: + self.elastic_startup_time = None + + if not server or ':' not in server or not name or not self.np: logger.info( 'Elastic is not enabled with server {} name {} and np {}'. - format(server, name, np)) + format(server, name, self.np)) self.enable = False return else: self.enable = True - import etcd3 - - srv, port = server.split(':') - self.etcd = etcd3.client(host=srv, port=port) + self.etcd = etcd_client self.host = host if host else self._get_host() + self.host_port = "%s:%d" % (self.host, start_port) # etcd data self.prefix = "/paddle/" + name @@ -165,67 +211,75 @@ class ElasticManager(object): random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(6)) self.host_path = '{}/{}{}'.format(self.node_prefix, node_tag, time.time()) - - self.np = np + scale ''' 0 group mode, be aware of healthy status of other workers 1 decouple mode, check own status only ''' self.etcd.put(self.prefix, b'0') - # host - # register self host to etcd - # register watch to reset host after host been deleted - self.etcd.delete_prefix(self.node_prefix) - + # register callback def host_call_back(event): - if self.etcd.get(self.host_path)[0] == None: - logger.info('register host again {}'.format(self.host)) - - self.etcd.put(self.host_path, six.b(self.host)) - self.need_sync = True - - host_watch = self.etcd.add_watch_callback(self.host_path, - host_call_back) - self.etcd.put(self.host_path, six.b(self.host)) - - # np describes the exact number of nodes to run the job - inp = int(self.etcd.get(self.np_path)[0] or 0) - if scale == 0 and not force: - assert inp == np or inp == 0, "np {} is not consistent with np in etcd {}".format( - np, inp) - else: - assert inp == np or inp == self.np, "np {} scale to {} by {} is not allowed".format( - inp, self.np, scale) - - self.etcd.put(self.np_path, six.b("%d" % (self.np))) - - def np_call_back(event): - gnp = int(self.etcd.get(self.np_path)[0]) - if gnp != self.np: - logger.info("scale np {} to {} ".format(self.np, gnp)) - self.np = gnp - - np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) + self.hosts = [ + six.ensure_str(i[0]) + for i in self.etcd.get_prefix(self.node_prefix) + ] + logger.info( + f"host_call_back curr_host={self.host_port}, hosts:{self.hosts}") + self.need_sync = True + self.elastic_startup_time = None + + host_watch = self.etcd.add_watch_prefix_callback(self.node_prefix, + host_call_back) + host_lease = self.etcd.lease(elastic_ttl) + + # register etcd lease heartbeat + def lease_heartbeat(): + while True: + try: + host_lease.refresh() + + hosts = [ + six.ensure_str(i[0]) + for i in self.etcd.get_prefix(self.node_prefix) + ] + logger.info( + f"[lease_heartbeat] curr_host={self.host_port}, hosts={hosts}" + ) + if self.host_port not in hosts: + logger.info( + f"[lease_heartbeat] register host={self.host_port}") + self.etcd.put(self.host_path, + six.b(self.host_port), + lease=host_lease) + except Exception as e: + logger.error("[lease_heartbeat] internal error:{} {}". + format(e, traceback.format_exc())) + break + time.sleep(elastic_ttl / 3) + + keepalived_thread = threading.Thread( + name='lease_heartbeat', target=lease_heartbeat, daemon=True) + keepalived_thread.start() + + self.etcd.put(self.host_path, six.b(self.host_port), lease=host_lease) # endpoints handle DISTRIBUTED_TRAINER_ENDPOINTS and PADDLE_TRAINERS self.etcd.put(self.endpoints_path, - six.b('{}|{}'.format(self.endpoints, self.trainers))) + six.b('{}|{}'.format(self.dist_endpoints, self.trainers))) def endpoints_call_back(event): - if not self.endpoints: + if not self.dist_endpoints: return edps = six.ensure_str(self.etcd.get(self.endpoints_path)[0] or '') - self.endpoints, self.trainers = edps.split('|') + self.dist_endpoints, self.trainers = edps.split('|') logger.info("set DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( - self.endpoints)) + self.dist_endpoints)) logger.info("set PADDLE_TRAINERS {} ".format(self.trainers)) endpoints_watch = self.etcd.add_watch_callback(self.endpoints_path, endpoints_call_back) - self.watches = [host_watch, np_watch, endpoints_watch] - + self.watches = [host_watch, endpoints_watch] self.launcher = None def exit(self, completed=False): @@ -248,6 +302,30 @@ class ElasticManager(object): if len(hosts) == 0: self.etcd.delete_prefix(self.prefix) + def _parse_np(self, np: str): + """ + np format is "MIN" or "MIN:MAX" + """ + np_str = np or os.getenv('PADDLE_ELASTIC_NP', "0") + np_dict = np_str.split(":") + min_np = max_np = 0 + if len(np_dict) == 1: + # Fault tolerant + min_np = int(np_dict[0]) + min_np = 1 if min_np <= 0 else min_np + max_np = 1 + elif len(np_dict) == 2: + # Elastic + min_np = int(np_dict[0]) + max_np = int(np_dict[1]) + min_np = 1 if min_np <= 0 else min_np + max_np = min_np if min_np > max_np else max_np + else: + raise ValueError( + f'the np={np} needs to be in "MIN" or "MIN:MAX" format') + + return min_np, max_np + def _get_host(self): try: return socket.gethostbyname(socket.getfqdn(socket.gethostname())) @@ -260,40 +338,166 @@ class ElasticManager(object): return int(self.etcd.get(self.prefix)[0]) == 1 - def _match(self): - - self.hosts = [ - six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) - ] - if len(self.hosts) == self.np: - return True + def _match(self, host_list: list=None): + if host_list: + self.hosts = host_list else: - return False + self.hosts = [ + six.ensure_str(i[0]) + for i in self.etcd.get_prefix(self.node_prefix) + ] - def _update_hosts(self): - assert len(self.hosts) != 0, 'hosts empty' + if self.elastic_level == ElasticLevel.FAULT_TOLERANCE: + if len(self.hosts) == self.np: + return True + else: + return False - if self.host in self.endpoints: - os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.endpoints - os.environ['PADDLE_TRAINERS'] = self.trainers - logger.info("update env DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( - self.endpoints)) - logger.info("update env PADDLE_TRAINERS {} ".format(self.trainers)) - return + if self.elastic_level == ElasticLevel.ELASTIC: + # FIXME(xym) add freeze status + hosts_num = len(self.hosts) + if hosts_num == self.np: + return True + + if not self.elastic_startup_time: + self.elastic_startup_time = time.time() + if hosts_num == self.max_np: + self.elastic_startup_time = None + return True + elif hosts_num >= self.min_np and hosts_num < self.max_np: + interval_time = time.time() - self.elastic_startup_time + if interval_time <= self.elastic_timeout: + logger.info( + f"wait for timeout, you can set value by PADDLE_ELASTIC_TIMEOUT, \ + hosts_num={hosts_num}, min_np={self.min_np}, \ + interval_time={interval_time}, elastic_timeout={self.elastic_timeout}" + ) + return False + return True + else: + self.elastic_startup_time = None + return False + return False + + def _update_endpoint(self, endpoints, hosts): + self.etcd.put(self.endpoints_path, + six.b('{}|{}'.format(endpoints, hosts))) + + def _update_hosts(self): + assert len(self.hosts) != 0, 'hosts empty' rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) - idx = self.hosts.index(self.host) + if self.elastic_level == ElasticLevel.FAULT_TOLERANCE: + if self.host_port in self.dist_endpoints: + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints + os.environ['PADDLE_TRAINERS'] = self.trainers + logger.info("update env DISTRIBUTED_TRAINER_ENDPOINTS {} ". + format(self.dist_endpoints)) + logger.info("update env PADDLE_TRAINERS {} ".format( + self.trainers)) + return - # swap if self.host not in the right position - if rank >= 0: - self.hosts[idx] = self.hosts[rank] - self.hosts[rank] = self.host + # fault tolerance + idx = self.hosts.index(self.host_port) + + # swap if self.host not in the right position + if rank >= 0: + self.hosts[idx] = self.hosts[rank] + self.hosts[rank] = self.host_port + else: + os.environ['PADDLE_TRAINER_ID'] = '{}'.format(idx) + hosts = ','.join( + [host_port.split(":")[0] for host_port in self.hosts]) + self.args.ips = hosts + os.environ['PADDLE_TRAINERS'] = hosts else: - os.environ['PADDLE_TRAINER_ID'] = '{}'.format(idx) - - hosts = ','.join(self.hosts) - self.args.ips = hosts - os.environ['PADDLE_TRAINERS'] = hosts + # elastic, scale up/down + endpoints = copy.deepcopy(self.all_host_endpoints) + if len(self.hosts) > self.np: + # scale up + logger.info( + f"elastic scale up, from {self.np} to {len(self.hosts)}, hosts={self.hosts}, endpoints={endpoints}" + ) + + for curr_host_port in self.hosts: + if curr_host_port not in endpoints: + endpoints.append(curr_host_port) + + os.environ['PADDLE_TRAINER_ID'] = '{}'.format( + endpoints.index(self.host_port)) + hosts = ','.join( + [host_port.split(":")[0] for host_port in endpoints]) + self.args.ips = hosts + os.environ['PADDLE_TRAINERS'] = hosts + self.np = len(endpoints) + os.environ['PADDLE_TRAINER_ENDPOINTS'] = ','.join(endpoints) + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints + self.all_host_endpoints = endpoints + else: + # scale down + logger.info( + f"elastic scale down, from {len(self.hosts)} to {self.np}, hosts={self.hosts}, endpoints={endpoints}" + ) + + # If the shrink node is from the first of the rank list, you need to minimize the movement of the rank + # eg: + # the source trainers is:10.10.10.0,10.10.10.1,10.10.10.2,10.10.10.3 + # 10.10.10.0 is removed + # the new trainers is:10.10.10.3,10.10.10.1,10.10.10.2 + # In this case, the rank of 10.10.10.1 and 10.10.10.2 remains unchanged, while the rank of 10.10.10.3 is set to rank0 + endpoints_dict = dict() + unsorted_endpoints = [] + for id, host_port in enumerate(self.hosts): + idx = endpoints.index(host_port) + if idx <= len(self.hosts) - 1 and not endpoints_dict.get( + idx): + endpoints_dict[idx] = host_port + else: + unsorted_endpoints.append(host_port) + + idle_index = 0 + sorted_endpoints = [] + for idx in range(len(self.hosts)): + if not endpoints_dict.get(idx) and len( + unsorted_endpoints) > 0: + endpoints_dict[idx] = unsorted_endpoints[idle_index] + idle_index += 1 + + sorted_endpoints.append(endpoints_dict.get(idx)) + + logger.info( + f"elastic scale down, sorted_endpoints={sorted_endpoints}") + self.all_host_endpoints = sorted_endpoints + + endpoint_list = [] + ip_list = [] + for host_port in sorted_endpoints: + host_port_list = host_port.split(":") + ip = host_port_list[0] + port = int(host_port_list[1]) + + ip_list.append(ip) + ports = [ + x + for x in range(port, port + len(self.devices_per_proc)) + ] + endpoint_list.extend( + ["%s:%d" % (ip, port) for port in ports]) + + hosts = ','.join(ip_list) + new_endpoints = ','.join(endpoint_list) + + self.args.ips = hosts + os.environ['PADDLE_TRAINER_ID'] = '{}'.format( + sorted_endpoints.index(self.host_port)) + os.environ['PADDLE_TRAINERS'] = hosts + self.np = len(sorted_endpoints) + os.environ['PADDLE_TRAINER_ENDPOINTS'] = ','.join( + sorted_endpoints) + os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = new_endpoints + self._update_endpoint(new_endpoints, hosts) def wait(self): if not self.enable: @@ -307,13 +511,6 @@ class ElasticManager(object): return logger.info('not ready for np {} with hosts {}'.format(self.np, self.hosts)) - - # reset hosts every 30s to prevent fake deadlock - if idx % 10 == 0: - self.etcd.delete_prefix(self.node_prefix) - logger.info('reset np {} with hosts {}'.format(self.np, - self.hosts)) - idx += 1 time.sleep(2) @@ -333,6 +530,7 @@ class ElasticManager(object): while not self.stopped: ret = self.launcher.watch() + logger.debug(f"launcher.watch():{ret}") if ret is not None: # self terminated logger.info('job exit with code {}'.format(ret)) @@ -341,7 +539,7 @@ class ElasticManager(object): self.exit(completed=completed) if completed: return ElasticStatus.COMPLETED - if self.elastic_level == 1: + if self.elastic_level == ElasticLevel.FAULT_TOLERANCE: return ElasticStatus.RESTART else: return ElasticStatus.ERROR @@ -354,6 +552,7 @@ class ElasticManager(object): if self.launcher: self.launcher.stop() + return ElasticStatus.EXIT def signal_handler(self, sigint, frame): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_elastic_init.py b/python/paddle/fluid/tests/unittests/test_fleet_elastic_init.py new file mode 100644 index 0000000000000000000000000000000000000000..10028d2d98f67192166908d8d25aa2caf9832d51 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_elastic_init.py @@ -0,0 +1,48 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import time +import unittest +import argparse +from warnings import catch_warnings + +from paddle.distributed.fleet.elastic import enable_elastic, launch_elastic +from paddle.distributed.fleet.launch_utils import DistributeMode + + +class TestElasticInit(unittest.TestCase): + def setUp(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2:4" + + self.args = Argument() + + def test_enable_elastic(self): + result = enable_elastic(self.args, DistributeMode.COLLECTIVE) + self.assertEqual(result, True) + + def test_launch_elastic(self): + try: + launch_elastic(self.args, DistributeMode.COLLECTIVE) + except Exception as e: + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py b/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..ddf87728a819ba7e2553ee5646ecb4718f5a6e25 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py @@ -0,0 +1,306 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import time +import unittest +import argparse + +from paddle.distributed.fleet.elastic.manager import ElasticManager +from paddle.distributed.fleet.elastic.manager import ELASTIC_TIMEOUT + + +class MockLease(): + def refresh(self): + pass + + +class MockEtcdClient: + def __init__(self, lease=None): + self._lease = lease + + def put(self, key, value, lease=None): + pass + + def get(self, key): + value = "0" + return value, value + + def delete_prefix(self, key): + pass + + def get_prefix(self, key_prefix): + hosts = ["10.10.10.1:6001", "10.10.10.2:6001"] + return hosts + + def add_watch_callback(self, *args, **kwargs): + return "host_watch" + + def add_watch_prefix_callback(self, key_prefix, callback, **kwargs): + callback(None) + return "host_watch" + + def cancel_watch(self, watch_id): + pass + + def delete(self, key): + pass + + def lease(self, ttl): + if self._lease: + return self._lease + else: + return MockLease() + + +class TestElasticManager(unittest.TestCase): + def setUp(self): + self.etcd_client = MockEtcdClient() + + def test_elastic_manager_init(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2" + gpus = "0" + nproc_per_node = 1 + host = None + host_port = None + scale = None + force = None + backend = 'gloo' + + args = Argument() + + class _MockLease(): + def refresh(self): + raise ValueError("valid error, this only for unittest") + + etcd_client = MockEtcdClient(lease=_MockLease()) + elastic = ElasticManager(args, etcd_client=etcd_client) + + def test_match_faulttolerance(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2" + gpus = "0" + nproc_per_node = 1 + host = None + host_port = None + scale = None + force = None + backend = 'gloo' + + args = Argument() + elastic = ElasticManager(args, self.etcd_client) + hosts = ["10.10.10.1:6001", "10.10.10.2:6001"] + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + self.assertEqual(elastic._match(hosts), True) + hosts = ["10.10.10.1:6001"] + os.environ['PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001" + self.assertEqual(elastic._match(hosts), False) + + def test_match_elastic(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2:4" + gpus = "0" + nproc_per_node = 1 + host = None + host_port = None + scale = None + force = None + backend = 'gloo' + + os.environ['PADDLE_ELASTIC_TIMEOUT'] = "60" + args = Argument() + os.environ['FLAGS_START_PORT'] = "6001" + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001,10.10.10.4:6001" + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001,10.10.10.4:6001" + elastic = ElasticManager(args, self.etcd_client) + hosts = ["10.10.10.1:6001", "10.10.10.2:6001"] + self.assertEqual(elastic._match(hosts), False) + + hosts = [ + "10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001", + "10.10.10.4:6001" + ] + self.assertEqual(elastic._match(hosts), True) + + hosts = ["10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"] + self.assertEqual(elastic._match(hosts), False) + + hosts = ["10.10.10.1:6001"] + self.assertEqual(elastic._match(hosts), False) + + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + elastic = ElasticManager(args, self.etcd_client) + hosts = ["10.10.10.1:6001", "10.10.10.2:6001"] + self.assertEqual(elastic._match(hosts), True) + + # TODO test timeout + #time.sleep(60) + #self.assertEqual(elastic._match(hosts), True) + + def test_update_hosts_for_faulttolerance(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "0" + gpus = "0" + nproc_per_node = 1 + host = None + host_port = None + scale = None + force = None + backend = 'gloo' + + args = Argument() + os.environ['FLAGS_START_PORT'] = "6001" + os.environ['PADDLE_ELASTIC_NP'] = "2" + os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.2" + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + elastic = ElasticManager(args, self.etcd_client) + # add 10.10.10.3:6001 + os.environ['PADDLE_TRAINER_ID'] = "0" + elastic.host_port = "10.10.10.1:6001" + elastic.hosts = ["10.10.10.1:6001", "10.10.10.2:6001"] + elastic._update_hosts() + self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.2") + + # add 10.10.10.3:6001 + elastic.host_port = "10.10.10.3:6001" + elastic.hosts = ["10.10.10.1:6001", "10.10.10.3:6001"] + os.environ['PADDLE_TRAINER_ID'] = "1" + elastic._update_hosts() + self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.3") + + elastic.host_port = "10.10.10.3:6001" + elastic.hosts = ["10.10.10.1:6001", "10.10.10.3:6001"] + os.environ['PADDLE_TRAINER_ID'] = "-1" + elastic._update_hosts() + self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.3") + + def test_update_hosts_for_elastic(self): + ####################### + # elastic, scale up # + ####################### + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2:4" + gpus = "0" + nproc_per_node = 1 + host = None + host_port = None + scale = None + force = None + backend = 'gloo' + + args = Argument() + + os.environ['FLAGS_START_PORT'] = "6001" + os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.2" + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001" + elastic = ElasticManager(args, self.etcd_client) + # add 10.10.10.3:6001 + elastic.host_port = "10.10.10.1:6001" + elastic.hosts = [ + "10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001" + ] + elastic._update_hosts() + #self.assertEqual(elastic.all_host_endpoints, + # ["10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"]) + self.assertEqual( + os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.2,10.10.10.3") + + ####################### + # elastic, scale down # + ####################### + os.environ[ + 'PADDLE_TRAINERS'] = "10.10.10.0,10.10.10.1,10.10.10.2,10.10.10.3" + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.0:6000,10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001" + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.0:6000,10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001" + elastic = ElasticManager(args, self.etcd_client) + # remove 10.10.10.1:6001 + elastic.host_port = "10.10.10.1:6001" + elastic.hosts = [ + "10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001" + ] + elastic._update_hosts() + #self.assertEqual(elastic.all_host_endpoints, + # ["10.10.10.3:6001", "10.10.10.1:6001", "10.10.10.2:6001"]) + self.assertEqual( + os.getenv('PADDLE_TRAINERS'), "10.10.10.3,10.10.10.1,10.10.10.2") + self.assertEqual( + os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS'), + "10.10.10.3:6001,10.10.10.1:6001,10.10.10.2:6001") + + ############ + os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.1" + os.environ[ + 'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.1:6002,10.10.10.1:6003,10.10.10.1:6004" + os.environ[ + 'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.1:6002,10.10.10.1:6003,10.10.10.1:6004" + elastic = ElasticManager(args, self.etcd_client) + # remove 10.10.10.1:6001 + elastic.host_port = "10.10.10.1:6001" + os.environ['PADDLE_TRAINER_ID'] = "-1" + elastic.hosts = ["10.10.10.1:6001", "10.10.10.1:6001"] + elastic._update_hosts() + #self.assertEqual(elastic.all_host_endpoints, + # ["10.10.10.1:6001", "10.10.10.1:6001"]) + self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.1") + self.assertEqual( + os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS'), + "10.10.10.1:6001,10.10.10.1:6001") + + def test_exit(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2" + gpus = "0" + nproc_per_node = 1 + host = None + host_port = None + scale = None + force = None + backend = 'gloo' + + args = Argument() + elastic = ElasticManager(args, self.etcd_client) + elastic.exit() + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh index 8b618195f55ea089c9801bb9bdce5c033e884b30..a3e76a564f5b706f499a049e7c801504c56a8573 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh @@ -15,7 +15,7 @@ echo "begin test elastic" unset GREP_OPTIONS -rm -rf log +rm -rf log* pids=`ps -ef | grep "python -m paddle.distributed.launch elastic_demo.[py]" | awk '{print $2}'` if [ -n "$pids" ]; then @@ -28,6 +28,11 @@ fi python -m pip install --no-cache-dir etcd3 -i https://mirror.baidu.com/pypi/simple + +############################# +#### test fault tolrance #### +############################# + # common env export PADDLE_ELASTIC_NP=2 export PADDLE_ELASTIC_SERVER=127.0.0.1:2379 @@ -137,7 +142,7 @@ export PADDLE_TRAINER_ID=1 export PADDLE_TRAINERS_NUM=2 python -m paddle.distributed.launch elastic_demo.py &> log_1.log & -p1=$! +p1_1=$! for i in {1..10} do @@ -184,7 +189,7 @@ export PADDLE_TRAINER_ID=0 export PADDLE_TRAINERS_NUM=2 python -m paddle.distributed.launch elastic_demo.py &> log_0.log & -p0=$! +p0_1=$! for i in {1..10} do @@ -205,4 +210,102 @@ check_env echo "All check done" sleep 3 -kill $p0 $p1 +kill $p0 $p1 $p0_1 $p1_1 + +############################# +##### test elastic ##### +############################# +# common env +export PADDLE_ELASTIC_NP=2:4 +export PADDLE_ELASTIC_SERVER=127.0.0.1:2379 +export PADDLE_ELASTIC_JOB_ID=elastic-demo-2 + +# run node 0 +export NVIDIA_VISIBLE_DEVICES=0 +export CUDA_VISIBLE_DEVICES=0 +export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.1:8001,10.10.10.2:8001,10.10.10.3:8001 +export PADDLE_TRAINERS=10.10.10.1,10.10.10.2,10.10.10.3 +export TRAINER_PORTS_NUM=1 +export POD_IP=10.10.10.1 +export PADDLE_TRAINER_ID=0 +export PADDLE_TRAINERS_NUM=3 + +python -m paddle.distributed.launch elastic_demo.py &> log_pe_0.log & +pe_0=$! + +for i in {1..10} +do + if grep -q "INFO:ELASTIC:not ready" log_pe_0.log; then + echo "run node 0 ok" + break + else + sleep 10 + fi + if [ $i -eq 10 ]; then + echo "run node 0 error" + exit -1 + fi +done + +# run node 1 +export NVIDIA_VISIBLE_DEVICES=1 +export CUDA_VISIBLE_DEVICES=1 +export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.1:8001,10.10.10.2:8001,10.10.10.3:8001 +export PADDLE_TRAINERS=10.10.10.1,10.10.10.2,10.10.10.3 +export TRAINER_PORTS_NUM=1 +export POD_IP=10.10.10.2 +export PADDLE_TRAINER_ID=1 +export PADDLE_TRAINERS_NUM=3 + +python -m paddle.distributed.launch elastic_demo.py &> log_pe_1.log & +pe_1=$! + +for i in {1..10} +do + if grep -q "INFO:ELASTIC:not ready" log_pe_1.log; then + echo "run node 1 ok" + break + else + sleep 10 + fi + if [ $i -eq 10 ]; then + echo "run node 1 error" + exit -1 + fi +done + +# run node 2 +export NVIDIA_VISIBLE_DEVICES=1 +export CUDA_VISIBLE_DEVICES=1 +export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.1:8001,10.10.10.2:8001,10.10.10.3:8001 +export PADDLE_TRAINERS=10.10.10.1,10.10.10.2,10.10.10.3 +export TRAINER_PORTS_NUM=1 +export POD_IP=10.10.10.3 +export PADDLE_TRAINER_ID=2 +export PADDLE_TRAINERS_NUM=3 + +python -m paddle.distributed.launch elastic_demo.py &> log_pe_2.log & +pe_2=$! + +for i in {1..10} +do + if grep -q "INFO:ELASTIC:ready with hosts" log_pe_2.log; then + echo "run node 2 ok" + break + else + sleep 10 + fi + if [ $i -eq 10 ]; then + echo "run node 2 error" + exit -1 + fi +done + +lw0="log/workerlog.0" + +check_env + +echo "All check done" + +sleep 3 +kill $pe_0 $pe_1 $pe_2