diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index caa09acf0572af332b3556f810361a0e5d65d799..aa950fc26f6595392e8edb4480d5a8af7c4af550 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -18,6 +18,7 @@ import os import six import logging import signal +import random logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) logger = logging.getLogger("ELASTIC") @@ -129,10 +130,14 @@ class ElasticManager(object): # etcd data self.prefix = "/paddle/" + name - self.node_prefix = self.prefix + '/nodes/' + self.node_prefix = self.prefix + '/nodes' self.np_path = self.prefix + '/np' self.endpoints_path = self.prefix + '/endpoints' - self.host_path = '{}{}'.format(self.node_prefix, time.time()) + + node_tag = ''.join( + random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(6)) + self.host_path = '{}/{}{}'.format(self.node_prefix, node_tag, + time.time()) self.np = np + scale ''' @@ -195,10 +200,13 @@ class ElasticManager(object): self.watches = [host_watch, np_watch, endpoints_watch] + self.launcher = None + def exit(self, completed=False): logger.info('manager exist completed {}'.format(completed)) - self.launcher.stop() + if self.launcher: + self.launcher.stop() if not self.enable: return @@ -264,6 +272,7 @@ class ElasticManager(object): if not self.enable: return + idx = 1 while not self.stopped: if self._match(): logger.info('ready with hosts {}'.format(self.hosts)) @@ -271,6 +280,14 @@ class ElasticManager(object): return logger.info('not ready for np {} with hosts {}'.format(self.np, self.hosts)) + + # reset hosts every 30s to prevent fake deadlock + if idx % 10 == 0: + self.etcd.delete_prefix(self.node_prefix) + logger.info('reset np {} with hosts {}'.format(self.np, + self.hosts)) + + idx += 1 time.sleep(3) return @@ -304,6 +321,8 @@ class ElasticManager(object): time.sleep(3) + if self.launcher: + self.launcher.stop() return ElasticStatus.EXIT def signal_handler(self, sigint, frame):