diff --git a/docker/test/integration/compose/docker_compose_rabbitmq.yml b/docker/test/integration/compose/docker_compose_rabbitmq.yml
new file mode 100644
index 0000000000000000000000000000000000000000..7ebee3c0ea562fd17d9315b31bd7d1ed8c31eac6
--- /dev/null
+++ b/docker/test/integration/compose/docker_compose_rabbitmq.yml
@@ -0,0 +1,12 @@
+version: '2.2'
+
+services:
+ rabbitmq1:
+ image: rabbitmq:3-management
+ hostname: rabbitmq1
+ ports:
+ - "5672:5672"
+ - "15672:15672"
+ environment:
+ RABBITMQ_DEFAULT_USER: "root"
+ RABBITMQ_DEFAULT_PASS: "clickhouse"
diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py
index 53c36ff892479e6a8699134c32b32ba2673390d1..6d9ca1b7861d014008ef3f999ff8212cdf35bc9b 100644
--- a/tests/integration/helpers/cluster.py
+++ b/tests/integration/helpers/cluster.py
@@ -108,12 +108,14 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.base_kafka_cmd = []
+ self.base_rabbitmq_cmd = []
self.pre_zookeeper_commands = []
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
+ self.with_rabbitmq = False
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_mongo = False
@@ -143,7 +145,7 @@ class ClickHouseCluster:
return cmd
def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None,
- with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
+ with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test",
@@ -167,7 +169,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {},
with_zookeeper,
- self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
+ self.zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo, with_redis, with_minio,
self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
@@ -231,6 +233,13 @@ class ClickHouseCluster:
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_kafka.yml')]
cmds.append(self.base_kafka_cmd)
+ if with_rabbitmq and not self.with_rabbitmq:
+ self.with_rabbitmq = True
+ self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_rabbitmq.yml')])
+ self.base_rabbitmq_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
+ self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_rabbitmq.yml')]
+ cmds.append(self.base_rabbitmq_cmd)
+
if with_hdfs and not self.with_hdfs:
self.with_hdfs = True
self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_hdfs.yml')])
@@ -482,6 +491,10 @@ class ClickHouseCluster:
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
self.wait_schema_registry_to_start(120)
+ if self.with_rabbitmq and self.base_rabbitmq_cmd:
+ subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
+ self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
+
if self.with_hdfs and self.base_hdfs_cmd:
subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.wait_hdfs_to_start(120)
@@ -621,7 +634,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
- with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
+ with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo, with_redis, with_minio,
base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test",
@@ -648,6 +661,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
+ self.with_rabbitmq = with_rabbitmq
self.with_mongo = with_mongo
self.with_redis = with_redis
self.with_minio = with_minio
@@ -993,6 +1007,9 @@ class ClickHouseInstance:
depends_on.append("kafka1")
depends_on.append("schema-registry")
+ if self.with_rabbitmq:
+ depends_on.append("rabbitmq1")
+
if self.with_zookeeper:
depends_on.append("zoo1")
depends_on.append("zoo2")
@@ -1072,3 +1089,4 @@ class ClickHouseKiller(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.clickhouse_node.restore_clickhouse()
+
diff --git a/tests/integration/test_storage_rabbitmq/__init__.py b/tests/integration/test_storage_rabbitmq/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/tests/integration/test_storage_rabbitmq/configs/log_conf.xml b/tests/integration/test_storage_rabbitmq/configs/log_conf.xml
new file mode 100644
index 0000000000000000000000000000000000000000..f9d15e572aa839897d40d6342270008f19a8292d
--- /dev/null
+++ b/tests/integration/test_storage_rabbitmq/configs/log_conf.xml
@@ -0,0 +1,11 @@
+
+
+ trace
+ /var/log/clickhouse-server/log.log
+ /var/log/clickhouse-server/log.err.log
+ 1000M
+ 10
+ /var/log/clickhouse-server/stderr.log
+ /var/log/clickhouse-server/stdout.log
+
+
diff --git a/tests/integration/test_storage_rabbitmq/configs/rabbitmq.xml b/tests/integration/test_storage_rabbitmq/configs/rabbitmq.xml
new file mode 100644
index 0000000000000000000000000000000000000000..33a8a43fb1a4933b44b89baa59102d511a463461
--- /dev/null
+++ b/tests/integration/test_storage_rabbitmq/configs/rabbitmq.xml
@@ -0,0 +1,5 @@
+
+
+ earliest
+
+
diff --git a/tests/integration/test_storage_rabbitmq/configs/users.xml b/tests/integration/test_storage_rabbitmq/configs/users.xml
new file mode 100644
index 0000000000000000000000000000000000000000..246e6b069ef232b55a88bc23d74649c6a2e03554
--- /dev/null
+++ b/tests/integration/test_storage_rabbitmq/configs/users.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+
+
+
+
+ ::/0
+
+ default
+ default
+
+
+
+
+
+
+
+
diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py
new file mode 100644
index 0000000000000000000000000000000000000000..475b89f6c6079b65a53e458fc5d15052487fa2a8
--- /dev/null
+++ b/tests/integration/test_storage_rabbitmq/test.py
@@ -0,0 +1,123 @@
+import os.path as p
+import random
+import threading
+import time
+import pytest
+
+from random import randrange
+import pika
+from sys import getdefaultencoding
+
+from helpers.cluster import ClickHouseCluster
+from helpers.test_tools import TSV
+from helpers.client import QueryRuntimeException
+from helpers.network import PartitionManager
+
+import json
+import subprocess
+
+from google.protobuf.internal.encoder import _VarintBytes
+
+cluster = ClickHouseCluster(__file__)
+instance = cluster.add_instance('instance',
+ config_dir='configs',
+ main_configs=['configs/rabbitmq.xml','configs/log_conf.xml'],
+ with_rabbitmq=True)
+rabbitmq_id = ''
+
+
+# Helpers
+
+def check_rabbitmq_is_available():
+ p = subprocess.Popen(('docker',
+ 'exec',
+ '-i',
+ rabbitmq_id,
+ 'rabbitmqctl',
+ 'await_startup'),
+ stdout=subprocess.PIPE)
+ p.communicate()
+ return p.returncode == 0
+
+
+def enable_consistent_hash_plugin():
+ p = subprocess.Popen(('docker',
+ 'exec',
+ '-i',
+ rabbitmq_id,
+ "rabbitmq-plugins", "enable", "rabbitmq_consistent_hash_exchange"),
+ stdout=subprocess.PIPE)
+ p.communicate()
+ return p.returncode == 0
+
+
+def wait_rabbitmq_is_available(max_retries=50):
+ retries = 0
+ while True:
+ if check_rabbitmq_is_available():
+ break
+ else:
+ retries += 1
+ if retries > max_retries:
+ raise "RabbitMQ is not available"
+ print("Waiting for RabbitMQ to start up")
+ time.sleep(1)
+
+
+def wait_rabbitmq_plugin_enabled(max_retries=50):
+ retries = 0
+ while True:
+ if enable_consistent_hash_plugin():
+ break
+ else:
+ retries += 1
+ if retries > max_retries:
+ raise "RabbitMQ plugin is not available"
+ print("Waiting for plugin")
+ time.sleep(1)
+
+
+def rabbitmq_check_result(result, check=False, ref_file='test_rabbitmq_json.reference'):
+ fpath = p.join(p.dirname(__file__), ref_file)
+ with open(fpath) as reference:
+ if check:
+ assert TSV(result) == TSV(reference)
+ else:
+ return TSV(result) == TSV(reference)
+
+
+# Fixtures
+
+@pytest.fixture(scope="module")
+def rabbitmq_cluster():
+ try:
+ global rabbitmq_id
+ cluster.start()
+ rabbitmq_id = instance.cluster.rabbitmq_docker_id
+ print("rabbitmq_id is {}".format(rabbitmq_id))
+ instance.query('CREATE DATABASE test')
+
+ yield cluster
+
+ finally:
+ cluster.shutdown()
+
+
+@pytest.fixture(autouse=True)
+def rabbitmq_setup_teardown():
+ wait_rabbitmq_is_available()
+ wait_rabbitmq_plugin_enabled()
+ print("RabbitMQ is available - running test")
+ yield # run test
+ instance.query('DROP TABLE IF EXISTS test.rabbitmq')
+
+
+# Tests
+
+
+
+if __name__ == '__main__':
+ cluster.start()
+ raw_input("Cluster created, press any key to destroy...")
+ cluster.shutdown()
+
diff --git a/tests/integration/test_storage_rabbitmq/test_rabbitmq_json.reference b/tests/integration/test_storage_rabbitmq/test_rabbitmq_json.reference
new file mode 100644
index 0000000000000000000000000000000000000000..959bb2aad74cf5ce37f9e6fca250bbbe555ff804
--- /dev/null
+++ b/tests/integration/test_storage_rabbitmq/test_rabbitmq_json.reference
@@ -0,0 +1,50 @@
+0 0
+1 1
+2 2
+3 3
+4 4
+5 5
+6 6
+7 7
+8 8
+9 9
+10 10
+11 11
+12 12
+13 13
+14 14
+15 15
+16 16
+17 17
+18 18
+19 19
+20 20
+21 21
+22 22
+23 23
+24 24
+25 25
+26 26
+27 27
+28 28
+29 29
+30 30
+31 31
+32 32
+33 33
+34 34
+35 35
+36 36
+37 37
+38 38
+39 39
+40 40
+41 41
+42 42
+43 43
+44 44
+45 45
+46 46
+47 47
+48 48
+49 49