提交 6200ccb3 编写于 作者: A Alexey Zatelepin

improve waiting for instance startup in integration tests [#CLICKHOUSE-2821]

上级 c24f4c57
...@@ -39,6 +39,8 @@ class ClickHouseCluster: ...@@ -39,6 +39,8 @@ class ClickHouseCluster:
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name] self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.instances = {} self.instances = {}
self.with_zookeeper = False self.with_zookeeper = False
self.docker_client = None
self.is_up = False self.is_up = False
...@@ -51,10 +53,10 @@ class ClickHouseCluster: ...@@ -51,10 +53,10 @@ class ClickHouseCluster:
""" """
if self.is_up: if self.is_up:
raise Exception('Can\'t add instance %s: cluster is already up!' % name) raise Exception("Can\'t add instance %s: cluster is already up!" % name)
if name in self.instances: if name in self.instances:
raise Exception('Can\'t add instance %s: there is already an instance with the same name!' % name) raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
instance = ClickHouseInstance(self.base_dir, name, custom_configs, with_zookeeper, self.base_configs_dir, self.server_bin_path) instance = ClickHouseInstance(self.base_dir, name, custom_configs, with_zookeeper, self.base_configs_dir, self.server_bin_path)
self.instances[name] = instance self.instances[name] = instance
...@@ -75,15 +77,19 @@ class ClickHouseCluster: ...@@ -75,15 +77,19 @@ class ClickHouseCluster:
subprocess.check_call(self.base_cmd + ['up', '-d']) subprocess.check_call(self.base_cmd + ['up', '-d'])
docker_client = docker.from_env() self.docker_client = docker.from_env()
start_deadline = time.time() + 10.0 # seconds
for instance in self.instances.values(): for instance in self.instances.values():
instance.docker_client = self.docker_client
# According to how docker-compose names containers. # According to how docker-compose names containers.
instance.docker_id = self.project_name + '_' + instance.name + '_1' instance.docker_id = self.project_name + '_' + instance.name + '_1'
container = docker_client.containers.get(instance.docker_id) container = self.docker_client.containers.get(instance.docker_id)
instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress'] instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
instance.wait_for_start() instance.wait_for_start(start_deadline)
instance.client = Client(instance.ip_address, command=self.client_bin_path) instance.client = Client(instance.ip_address, command=self.client_bin_path)
...@@ -96,7 +102,10 @@ class ClickHouseCluster: ...@@ -96,7 +102,10 @@ class ClickHouseCluster:
subprocess.check_call(self.base_cmd + ['down', '--volumes']) subprocess.check_call(self.base_cmd + ['down', '--volumes'])
self.is_up = False self.is_up = False
self.docker_client = None
for instance in self.instances.values(): for instance in self.instances.values():
instance.docker_client = None
instance.docker_id = None instance.docker_id = None
instance.ip_address = None instance.ip_address = None
instance.client = None instance.client = None
...@@ -143,6 +152,7 @@ class ClickHouseInstance: ...@@ -143,6 +152,7 @@ class ClickHouseInstance:
self.path = p.abspath(p.join(base_path, name)) self.path = p.abspath(p.join(base_path, name))
self.docker_compose_path = p.join(self.path, 'docker_compose.yml') self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
self.docker_client = None
self.docker_id = None self.docker_id = None
self.ip_address = None self.ip_address = None
self.client = None self.client = None
...@@ -152,18 +162,24 @@ class ClickHouseInstance: ...@@ -152,18 +162,24 @@ class ClickHouseInstance:
return self.client.query(sql, stdin) return self.client.query(sql, stdin)
def wait_for_start(self, timeout=10.0): def wait_for_start(self, deadline):
deadline = time.time() + timeout
while True: while True:
if time.time() >= deadline: if self.docker_client.containers.get(self.docker_id).status == 'exited':
raise Exception("Timed out while waiting for instance {} with ip address {} to start".format(self.name, self.ip_address)) raise Exception("Instance `{}' failed to start".format(self.name))
time_left = deadline - time.time()
if time_left <= 0:
raise Exception("Timed out while waiting for instance `{}' with ip address {} to start".format(self.name, self.ip_address))
# Repeatedly poll the instance address until there is something that listens there. # Repeatedly poll the instance address until there is something that listens there.
# Usually it means that ClickHouse is ready to accept queries. # Usually it means that ClickHouse is ready to accept queries.
try: try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(time_left)
sock.connect((self.ip_address, 9000)) sock.connect((self.ip_address, 9000))
return return
except socket.timeout:
continue
except socket.error as e: except socket.error as e:
if e.errno == errno.ECONNREFUSED: if e.errno == errno.ECONNREFUSED:
time.sleep(0.1) time.sleep(0.1)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册