提交 65227021 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

ClickHouse integration tests. [#CLICKHOUSE-2821]

上级 d4760c66
## ClickHouse integration tests
This directory contains tests that involve several ClickHouse instances, custom configs, ZooKeeper, etc.
### Running
Prerequisites:
* [docker](https://www.docker.com/community-edition#/download). Minimum required API version: 1.25, check with `docker version`.
* [docker-compose](https://docs.docker.com/compose/). To install: `sudo pip install docker-compose`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo pip install pytest`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER`.
Run the tests with the `pytest` command. To select which tests to run, use: `pytest -k <test_name_pattern>`
By default tests are run with system-wide client binary, server binary and base configs. To change that,
set the following environment variables:
* `CLICKHOUSE_TESTS_SERVER_BIN_PATH` to choose the server binary.
* `CLICKHOUSE_TESTS_CLIENT_BIN_PATH` to choose the client binary.
* `CLICKHOUSE_TESTS_BASE_CONFIG_DIR` to choose the directory from which base configs (`config.xml` and
`users.xml`) are taken.
### Adding new tests
To add new test named `foo`, create a directory `test_foo` with an empty `__init__.py` and a file
named `test.py` containing tests in it. All functions with names starting with `test` will become test cases.
`helpers` directory contains utilities for:
* Launching a ClickHouse cluster with or without ZooKeeper in docker containers.
* Sending queries to launched instances.
* Introducing network failures such as severing network link between two instances.
To assert that two TSV files must be equal, wrap them in the `TSV` class and use the regular `assert`
statement. Example: `assert TSV(result) == TSV(reference)`. In case the assertion fails, `pytest`
will automagically detect the types of variables and only the small diff of two files is printed.
from helpers.test_tools import TSV
def pytest_assertrepr_compare(op, left, right):
if isinstance(left, TSV) and isinstance(right, TSV) and op == '==':
return ['TabSeparated values differ: '] + left.diff(right)
import errno
import subprocess as sp
from threading import Timer
class Client:
def __init__(self, host, port=9000, command='/usr/bin/clickhouse-client'):
self.host = host
self.port = port
self.command = [command, '--host', self.host, '--port', str(self.port)]
def query(self, sql, stdin=None, timeout=10.0):
if stdin is None:
command = self.command + ['--multiquery']
stdin = sql
else:
command = self.command + ['--query', sql]
process = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
timer = None
if timeout is not None:
def kill_process():
try:
process.kill()
except OSError as e:
if e.errno != errno.ESRCH:
raise
timer = Timer(timeout, kill_process)
timer.start()
stdout, stderr = process.communicate(stdin)
if timer is not None:
if timer.finished.is_set():
raise Exception('Client timed out!')
else:
timer.cancel()
if process.returncode != 0:
raise Exception('Client failed! return code: {}, stderr: {}'.format(process.returncode, stderr))
return stdout
import os
import os.path as p
import re
import subprocess
import shutil
import socket
import time
import errno
import docker
from .client import Client
HELPERS_DIR = p.dirname(__file__)
class ClickHouseCluster:
"""ClickHouse cluster with several instances and (possibly) ZooKeeper.
Add instances with several calls to add_instance(), then start them with the start() call.
Directories for instances are created in the directory of base_path. After cluster is started,
these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
"""
def __init__(self, base_path, base_configs_dir=None, server_bin_path=None, client_bin_path=None):
self.base_dir = p.dirname(base_path)
self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
self.project_name = os.getlogin() + p.basename(self.base_dir)
# docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.instances = {}
self.with_zookeeper = False
self.is_up = False
def add_instance(self, name, custom_configs, with_zookeeper=False):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
custom_configs - a list of config files that will be added to config.d/ directory
with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
"""
if self.is_up:
raise Exception('Can\'t add instance %s: cluster is already up!' % name)
if name in self.instances:
raise Exception('Can\'t add instance %s: there is already an instance with the same name!' % name)
instance = ClickHouseInstance(self.base_dir, name, custom_configs, with_zookeeper, self.base_configs_dir, self.server_bin_path)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
if with_zookeeper and not self.with_zookeeper:
self.with_zookeeper = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
return instance
def start(self, destroy_dirs=True):
if self.is_up:
return
for instance in self.instances.values():
instance.create_dir(destroy_dir=destroy_dirs)
subprocess.check_call(self.base_cmd + ['up', '-d'])
docker_client = docker.from_env()
for instance in self.instances.values():
# According to how docker-compose names containers.
instance.docker_id = self.project_name + '_' + instance.name + '_1'
container = docker_client.containers.get(instance.docker_id)
instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
instance.wait_for_start()
instance.client = Client(instance.ip_address, command=self.client_bin_path)
self.is_up = True
def shutdown(self, kill=True):
if kill:
subprocess.check_call(self.base_cmd + ['kill'])
subprocess.check_call(self.base_cmd + ['down', '--volumes'])
self.is_up = False
for instance in self.instances.values():
instance.docker_id = None
instance.ip_address = None
instance.client = None
DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
{name}:
image: ubuntu:14.04
user: '{uid}'
volumes:
- {binary_path}:/usr/bin/clickhouse:ro
- {configs_dir}:/etc/clickhouse-server/
- {db_dir}:/var/lib/clickhouse/
- {logs_dir}:/var/log/clickhouse-server/
entrypoint:
- /usr/bin/clickhouse
- --config-file=/etc/clickhouse-server/config.xml
- --log-file=/var/log/clickhouse-server/clickhouse-server.log
depends_on: {depends_on}
'''
MACROS_CONFIG_TEMPLATE = '''
<yandex>
<macros>
<instance>{name}</instance>
</macros>
</yandex>
'''
class ClickHouseInstance:
def __init__(
self, base_path, name, custom_configs, with_zookeeper,
base_configs_dir, server_bin_path):
self.name = name
self.custom_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_configs]
self.with_zookeeper = with_zookeeper
self.base_configs_dir = base_configs_dir
self.server_bin_path = server_bin_path
self.path = p.abspath(p.join(base_path, name))
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
self.docker_id = None
self.ip_address = None
self.client = None
def query(self, sql, stdin=None):
return self.client.query(sql, stdin)
def wait_for_start(self, timeout=10.0):
deadline = time.time() + timeout
while True:
if time.time() >= deadline:
raise Exception("Timed out while waiting for instance {} with ip address {} to start".format(self.name, self.ip_address))
# Repeatedly poll the instance address until there is something that listens there.
# Usually it means that ClickHouse is ready to accept queries.
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.ip_address, 9000))
return
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
time.sleep(0.1)
else:
raise
finally:
sock.close()
def create_dir(self, destroy_dir=True):
"""Create the instance directory and all the needed files there."""
if destroy_dir:
self.destroy_dir()
elif p.exists(self.path):
return
os.mkdir(self.path)
configs_dir = p.join(self.path, 'configs')
os.mkdir(configs_dir)
shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)
config_d_dir = p.join(configs_dir, 'config.d')
os.mkdir(config_d_dir)
shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)
with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
macros_config.write(MACROS_CONFIG_TEMPLATE.format(name=self.name))
if self.with_zookeeper:
shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir)
for path in self.custom_config_paths:
shutil.copy(path, config_d_dir)
db_dir = p.join(self.path, 'database')
os.mkdir(db_dir)
logs_dir = p.join(self.path, 'logs')
os.mkdir(logs_dir)
depends_on = '[]'
if self.with_zookeeper:
depends_on = '["zoo1", "zoo2", "zoo3"]'
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
name=self.name,
uid=os.getuid(),
binary_path=self.server_bin_path,
configs_dir=configs_dir,
config_d_dir=config_d_dir,
db_dir=db_dir,
logs_dir=logs_dir,
depends_on=depends_on))
def destroy_dir(self):
if p.exists(self.path):
shutil.rmtree(self.path)
<yandex>
<timezone>Europe/Moscow</timezone>
<listen_host>::</listen_host>
</yandex>
version: '2'
services:
zoo1:
image: zookeeper
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: zookeeper
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: zookeeper
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
# Helper docker container to run iptables without sudo
FROM alpine
RUN apk add -U iproute2
import os.path as p
import subprocess
import time
import docker
from .cluster import HELPERS_DIR
class PartitionManager:
"""Allows introducing failures in the network between docker containers.
Can act as a context manager:
with pm as PartitionManager():
pm.partition_instances(instance1, instance2)
...
# At exit all partitions are removed automatically.
"""
def __init__(self):
self._iptables_rules = []
def isolate_instance_from_zk(self, instance, action='DROP'):
self._check_instance(instance)
self._add_rule({'source': instance.ip_address, 'destination_port': 2181, 'action': action})
self._add_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action})
def partition_instances(self, left, right, action='DROP'):
self._check_instance(left)
self._check_instance(right)
self._add_rule({'source': left.ip_address, 'destination': right.ip_address, 'action': action})
self._add_rule({'source': right.ip_address, 'destination': left.ip_address, 'action': action})
def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
@staticmethod
def _check_instance(instance):
if instance.ip_address is None:
raise Exception('Instance + ' + instance.name + ' is not launched!')
def _add_rule(self, rule):
_NetworkManager.get().add_iptables_rule(**rule)
self._iptables_rules.append(rule)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.heal_all()
class _NetworkManager:
"""Execute commands inside a container with access to network settings.
We need to call iptables to create partitions, but we want to avoid sudo.
The way to circumvent this restriction is to run iptables in a container with network=host.
The container is long-running and periodically renewed - this is an optimization to avoid the overhead
of container creation on each call.
Source of the idea: https://github.com/worstcase/blockade/blob/master/blockade/host.py
"""
# Singleton instance.
_instance = None
@classmethod
def get(cls, **kwargs):
if cls._instance is None:
cls._instance = cls(**kwargs)
return cls._instance
def add_iptables_rule(self, **kwargs):
cmd = ['iptables', '-A', 'DOCKER']
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def delete_iptables_rule(self, **kwargs):
cmd = ['iptables', '-D', 'DOCKER']
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
@staticmethod
def _iptables_cmd_suffix(
source=None, destination=None,
source_port=None, destination_port=None,
action=None):
ret = []
if source is not None:
ret.extend(['-s', source])
if destination is not None:
ret.extend(['-d', destination])
if source_port is not None:
ret.extend(['-p', 'tcp', '--sport', str(source_port)])
if destination_port is not None:
ret.extend(['-p', 'tcp', '--dport', str(destination_port)])
if action is not None:
ret.extend(['-j', action])
return ret
def __init__(
self,
image_name='clickhouse_tests_helper',
image_path=p.join(HELPERS_DIR, 'helper_container'),
container_expire_timeout=50, container_exit_timeout=60):
self.container_expire_timeout = container_expire_timeout
self.container_exit_timeout = container_exit_timeout
self._docker_client = docker.from_env()
try:
self._image = self._docker_client.images.get(image_name)
except docker.errors.ImageNotFound:
self._image = self._docker_client.images.build(tag=image_name, path=image_path, rm=True)
self._container = None
self._ensure_container()
def _ensure_container(self):
if self._container is None or self._container_expire_time <= time.time():
if self._container is not None:
try:
self._container.remove(force=True)
except docker.errors.NotFound:
pass
# Work around https://github.com/docker/docker-py/issues/1477
host_config = self._docker_client.api.create_host_config(network_mode='host', auto_remove=True)
container_id = self._docker_client.api.create_container(
self._image.id, command=('sleep %s' % self.container_exit_timeout),
detach=True, host_config=host_config)['Id']
self._container_expire_time = time.time() + self.container_expire_timeout
self._docker_client.api.start(container_id)
self._container = self._docker_client.containers.get(container_id)
return self._container
def _exec_run(self, cmd, **kwargs):
container = self._ensure_container()
handle = self._docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self._docker_client.api.exec_start(handle).decode('utf8')
exit_code = self._docker_client.api.exec_inspect(handle)['ExitCode']
if exit_code != 0:
print output
raise subprocess.CalledProcessError(exit_code, cmd)
return output
import difflib
class TSV:
"""Helper to get pretty diffs between expected and actual tab-separated value files"""
def __init__(self, contents):
self.lines = contents.readlines() if isinstance(contents, file) else contents.splitlines(True)
def __eq__(self, other):
return self.lines == other.lines
def diff(self, other):
return list(line.rstrip() for line in difflib.context_diff(self.lines, other.lines))[2:]
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>1000</session_timeout_ms>
</zookeeper>
</yandex>
[pytest]
python_files = test.py
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
instance_with_dist_table = cluster.add_instance('instance_with_dist_table', ['configs/remote_servers.xml'])
replica1 = cluster.add_instance('replica1', [], with_zookeeper=True)
replica2 = cluster.add_instance('replica2', [], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for replica in (replica1, replica2):
replica.query(
"CREATE TABLE replicated (d Date, x UInt32) ENGINE = "
"ReplicatedMergeTree('/clickhouse/tables/replicated', '{instance}', d, d, 8192)")
instance_with_dist_table.query(
"CREATE TABLE distributed (d Date, x UInt32) ENGINE = "
"Distributed('test_cluster', 'default', 'replicated')")
yield cluster
finally:
cluster.shutdown()
def test(started_cluster):
with PartitionManager() as pm:
pm.partition_instances(replica1, replica2)
replica2.query("INSERT INTO replicated VALUES ('2017-05-08', 1)")
time.sleep(1) # accrue replica delay
assert replica1.query("SELECT count() FROM replicated").strip() == ''
assert replica2.query("SELECT count() FROM replicated").strip() == '1'
# With in_order balancing replica1 is chosen.
assert instance_with_dist_table.query(
"SELECT count() FROM distributed SETTINGS load_balancing='in_order'").strip() == ''
# When we set max_replica_delay, replica1 must be excluded.
assert instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '1'
pm.isolate_instance_from_zk(replica2)
time.sleep(2) # allow pings to zookeeper to timeout
# At this point all replicas are stale, but the query must still go to replica2 which is the least stale one.
assert instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '1'
# If we forbid stale replicas, the query must fail.
with pytest.raises(Exception):
instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1,
fallback_to_stale_replicas_for_distributed_queries=0
''')
<yandex>
<!-- retention scheme for GraphiteMergeTree engine-->
<graphite_rollup>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>^one_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
</graphite_rollup>
</yandex>
import os.path as p
import time
import datetime
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', ['configs/graphite_rollup.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture
def graphite_table(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
''')
yield
instance.query('DROP TABLE test.graphite')
def test_rollup_versions(graphite_table):
timestamp = int(time.time())
rounded_timestamp = timestamp - timestamp % 60
date = datetime.date.today().isoformat()
q = instance.query
# Insert rows with timestamps relative to the current time so that the first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
one_min.x1 100 {timestamp} {date} 1
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite ORDER BY updated')) == TSV(expected1)
q('OPTIMIZE TABLE test.graphite')
# After rollup only the row with max version is retained.
expected2 = '''\
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=rounded_timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite')) == TSV(expected2)
def test_rollup_aggregation(graphite_table):
q = instance.query
# This query essentially emulates what rollup does.
result1 = q('''
SELECT avg(v), max(upd)
FROM (SELECT timestamp,
argMax(value, (updated, number)) AS v,
max(updated) AS upd
FROM (SELECT 'one_min.x5' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(intDiv(number, 2)) AS updated,
number
FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200
GROUP BY timestamp)
''')
expected1 = '''\
999634.9918367347 499999
'''
assert TSV(result1) == TSV(expected1)
# Timestamp 1111111111 is in sufficiently distant past so that the last retention clause is active.
result2 = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated
FROM (SELECT * FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200;
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected2 = '''\
one_min.x 999634.9918367347 1111444200 2017-02-02 499999
'''
assert TSV(result2) == TSV(expected2)
def test_rollup_aggregation_2(graphite_table):
result = instance.query('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 - intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected = '''\
one_min.x 24 1111110600 2017-02-02 100
'''
assert TSV(result) == TSV(expected)
def test_multiple_paths_and_versions(graphite_table):
result = instance.query('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
INSERT INTO test.graphite
SELECT 'one_min.y' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + number * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
with open(p.join(p.dirname(__file__), 'test_multiple_paths_and_versions.reference')) as reference:
assert TSV(result) == TSV(reference)
def test_multiple_output_blocks(graphite_table):
MERGED_BLOCK_SIZE = 8192
to_insert = ''
expected = ''
for i in range(2 * MERGED_BLOCK_SIZE + 1):
rolled_up_time = 1000000200 + 600 * i
for j in range(3):
cur_time = rolled_up_time + 100 * j
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(10 * j, cur_time)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(10 * (j + 1), cur_time)
expected += 'one_min.x1 20 {} 2001-09-09 2\n'.format(rolled_up_time)
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
result = instance.query('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)
def test_paths_not_matching_any_pattern(graphite_table):
to_insert = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 999999600 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = instance.query('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)
<yandex>
<!-- retention scheme for GraphiteMergeTree engine-->
<graphite_rollup>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>^one_sec</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>1</precision>
</retention>
<retention>
<age>86400</age>
<precision>5</precision>
</retention>
<retention>
<age>604800</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^five_sec</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>604800</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^five_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^ten_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^half_hour</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>1800</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_hour</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_day</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>86400</precision>
</retention>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>2592000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>
one_min.x1 100 1486048740 2017-02-02 1
one_min.x1 200 1486048740 2017-02-02 2
one_min.x1 200 1486048740 2017-02-02 2
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT into test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, toUInt32(toDateTime('2017-02-02 18:19:00')), toDate('2017-02-02'), 1);
INSERT into test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, toUInt32(toDateTime('2017-02-02 18:19:00')), toDate('2017-02-02'), 2);
SELECT * FROM test.graphite ORDER BY updated;
OPTIMIZE TABLE test.graphite;
SELECT * FROM test.graphite ORDER BY updated;
DROP TABLE test.graphite;
one_min.x 999636.4856809663 1111444200 2017-02-02 499999
999634.9918367347 499999
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated FROM (SELECT * FROM system.numbers LIMIT 1000000) WHERE intDiv(timestamp, 600) * 600 = 1111444200;
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
SELECT avg(v), max(upd) FROM (SELECT timestamp, argMax(value, (updated, number)) AS v, max(updated) AS upd FROM (SELECT 'one_min.x5' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated, number FROM system.numbers LIMIT 1000000) WHERE intDiv(timestamp, 600) * 600 = 1111444200 GROUP BY timestamp);
DROP TABLE test.graphite;
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 - intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
DROP TABLE test.graphite;
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
INSERT INTO test.graphite SELECT 'one_min.y' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + number * 600) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
DROP TABLE test.graphite;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册