提交 15bad674 编写于 作者: A Andrew Onyshchuk

Add AvroConfluent integration test

上级 b236f7a3
......@@ -20,6 +20,7 @@ from dicttoxml import dicttoxml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from minio import Minio
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from .client import Client
from .hdfs_api import HDFSApi
......@@ -122,6 +123,11 @@ class ClickHouseCluster:
self.minio_redirect_host = "redirect"
self.minio_redirect_port = 80
# available when with_kafka == True
self.schema_registry_client = None
self.schema_registry_host = "schema-registry"
self.schema_registry_port = 8081
self.docker_client = None
self.is_up = False
......@@ -372,6 +378,19 @@ class ClickHouseCluster:
logging.warning("Can't connect to Minio: %s", str(ex))
time.sleep(1)
def wait_schema_registry_to_start(self, timeout=10):
sr_client = CachedSchemaRegistryClient('http://localhost:8081')
start = time.time()
while time.time() - start < timeout:
try:
sr_client._send_request(sr_client.url)
self.schema_registry_client = sr_client
logging.info("Connected to SchemaRegistry")
return
except Exception as ex:
logging.warning("Can't connect to SchemaRegistry: %s", str(ex))
time.sleep(1)
def start(self, destroy_dirs=True):
if self.is_up:
return
......@@ -415,6 +434,7 @@ class ClickHouseCluster:
if self.with_kafka and self.base_kafka_cmd:
subprocess_check_call(self.base_kafka_cmd + common_opts + ['--renew-anon-volumes'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
self.wait_schema_registry_to_start(120)
if self.with_hdfs and self.base_hdfs_cmd:
subprocess_check_call(self.base_hdfs_cmd + common_opts)
......@@ -880,6 +900,7 @@ class ClickHouseInstance:
if self.with_kafka:
depends_on.append("kafka1")
depends_on.append("schema-registry")
if self.with_zookeeper:
depends_on.append("zoo1")
......
......@@ -29,3 +29,18 @@ services:
- kafka_zookeeper
security_opt:
- label:disable
schema-registry:
image: confluentinc/cp-schema-registry:5.2.0
hostname: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
depends_on:
- kafka_zookeeper
- kafka1
security_opt:
- label:disable
......@@ -35,7 +35,7 @@ RUN apt-get update \
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN pip install urllib3==1.23 pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2==2.7.5 pymongo tzlocal kafka-python protobuf redis aerospike pytest-timeout minio
RUN pip install urllib3==1.23 pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2==2.7.5 pymongo tzlocal kafka-python protobuf redis aerospike pytest-timeout minio rpm-confluent-schemaregistry
ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 17.09.1-ce
......
import json
import logging
import io
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.client
import avro.schema
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("dummy", with_kafka=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
# use http to force parsing on server
result = instance.http_query(query, data=stdin, params=settings)
logging.info("Query finished")
return result
def test_select(cluster):
# type: (ClickHouseCluster) -> None
schema_registry_client = cluster.schema_registry_client
serializer = MessageSerializer(schema_registry_client)
schema = avro.schema.make_avsc_object({
'name': 'test_record',
'type': 'record',
'fields': [
{
'name': 'value',
'type': 'long'
}
]
})
buf = io.BytesIO()
for x in range(0, 3):
message = serializer.encode_record_with_schema(
'test_subject', schema, {'value': x}
)
buf.write(message)
stdin = buf.getvalue()
instance = cluster.instances["dummy"] # type: ClickHouseInstance
schema_registry_url = "http://{}:{}".format(
cluster.schema_registry_host,
cluster.schema_registry_port
)
run_query(instance, "create table avro_data(value Int64) engine = Memory()")
settings = {'format_avro_schema_registry_url': schema_registry_url}
run_query(instance, "insert into avro_data format AvroConfluent", stdin, settings)
stdout = run_query(instance, "select * from avro_data")
assert list(map(str.split, stdout.splitlines())) == [
["0"],
["1"],
["2"],
]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册