未验证 提交 7c4e5150 编写于 作者: K kuizhiqing 提交者: GitHub

optimize for elastic (#33895)

* add random and prevent deadlock
上级 5c9fce0e
...@@ -18,6 +18,7 @@ import os ...@@ -18,6 +18,7 @@ import os
import six import six
import logging import logging
import signal import signal
import random
logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper())
logger = logging.getLogger("ELASTIC") logger = logging.getLogger("ELASTIC")
...@@ -129,10 +130,14 @@ class ElasticManager(object): ...@@ -129,10 +130,14 @@ class ElasticManager(object):
# etcd data # etcd data
self.prefix = "/paddle/" + name self.prefix = "/paddle/" + name
self.node_prefix = self.prefix + '/nodes/' self.node_prefix = self.prefix + '/nodes'
self.np_path = self.prefix + '/np' self.np_path = self.prefix + '/np'
self.endpoints_path = self.prefix + '/endpoints' self.endpoints_path = self.prefix + '/endpoints'
self.host_path = '{}{}'.format(self.node_prefix, time.time())
node_tag = ''.join(
random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(6))
self.host_path = '{}/{}{}'.format(self.node_prefix, node_tag,
time.time())
self.np = np + scale self.np = np + scale
''' '''
...@@ -195,10 +200,13 @@ class ElasticManager(object): ...@@ -195,10 +200,13 @@ class ElasticManager(object):
self.watches = [host_watch, np_watch, endpoints_watch] self.watches = [host_watch, np_watch, endpoints_watch]
self.launcher = None
def exit(self, completed=False): def exit(self, completed=False):
logger.info('manager exist completed {}'.format(completed)) logger.info('manager exist completed {}'.format(completed))
self.launcher.stop() if self.launcher:
self.launcher.stop()
if not self.enable: if not self.enable:
return return
...@@ -264,6 +272,7 @@ class ElasticManager(object): ...@@ -264,6 +272,7 @@ class ElasticManager(object):
if not self.enable: if not self.enable:
return return
idx = 1
while not self.stopped: while not self.stopped:
if self._match(): if self._match():
logger.info('ready with hosts {}'.format(self.hosts)) logger.info('ready with hosts {}'.format(self.hosts))
...@@ -271,6 +280,14 @@ class ElasticManager(object): ...@@ -271,6 +280,14 @@ class ElasticManager(object):
return return
logger.info('not ready for np {} with hosts {}'.format(self.np, logger.info('not ready for np {} with hosts {}'.format(self.np,
self.hosts)) self.hosts))
# reset hosts every 30s to prevent fake deadlock
if idx % 10 == 0:
self.etcd.delete_prefix(self.node_prefix)
logger.info('reset np {} with hosts {}'.format(self.np,
self.hosts))
idx += 1
time.sleep(3) time.sleep(3)
return return
...@@ -304,6 +321,8 @@ class ElasticManager(object): ...@@ -304,6 +321,8 @@ class ElasticManager(object):
time.sleep(3) time.sleep(3)
if self.launcher:
self.launcher.stop()
return ElasticStatus.EXIT return ElasticStatus.EXIT
def signal_handler(self, sigint, frame): def signal_handler(self, sigint, frame):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册