提交 c12fa26f 编写于 作者: I Ilya Golshtein

kerberized kafka test works

上级 2413caa7
version: '2.3'
services:
kafka_kerberized_zookeeper:
image: confluentinc/cp-zookeeper:5.2.0
# restart: always
hostname: kafka_kerberized_zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVERS: "kafka_kerberized_zookeeper:2888:3888"
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dsun.security.krb5.debug=true"
volumes:
- ${KERBERIZED_KAFKA_DIR:-../../../../../tests/integration/tskk/_instances/instance}/secrets:/etc/kafka/secrets
- /dev/urandom:/dev/random
depends_on:
- kafka_kerberos
security_opt:
- label:disable
kerberized_kafka1:
image: confluentinc/cp-kafka:5.2.0
# restart: always
hostname: kerberized_kafka1
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_LISTENERS: OUTSIDE://:19092,UNSECURED_OUTSIDE://:19093,UNSECURED_INSIDE://:9093
KAFKA_ADVERTISED_LISTENERS: OUTSIDE://kerberized_kafka1:19092,UNSECURED_OUTSIDE://kerberized_kafka1:19093,UNSECURED_INSIDE://localhost:9093
# KAFKA_LISTENERS: INSIDE://kerberized_kafka1:9092,OUTSIDE://kerberized_kafka1:19092
# KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kerberized_kafka1:19092
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:SASL_PLAINTEXT,UNSECURED_OUTSIDE:PLAINTEXT,UNSECURED_INSIDE:PLAINTEXT,
KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "kafka_kerberized_zookeeper:2181"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dsun.security.krb5.debug=true"
volumes:
- ${KERBERIZED_KAFKA_DIR:-../../../../../tests/integration/tskk/_instances/instance}/secrets:/etc/kafka/secrets
depends_on:
- kafka_kerberized_zookeeper
- kafka_kerberos
security_opt:
- label:disable
kafka_kerberos:
build: /home/ilejn/projects/ClickHouse/tests/integration/tskk
hostname: kafka_kerberos
volumes:
- ${KERBERIZED_KAFKA_DIR:-../../../../../tests/integration/tskk/_instances/instance}/secrets:/tmp/keytab
ports: [88, 749]
......@@ -125,6 +125,7 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.base_kafka_cmd = []
self.base_kerberized_kafka_cmd = []
self.base_rabbitmq_cmd = []
self.base_cassandra_cmd = []
self.pre_zookeeper_commands = []
......@@ -133,6 +134,7 @@ class ClickHouseCluster:
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_kerberized_kafka = False
self.with_rabbitmq = False
self.with_odbc_drivers = False
self.with_hdfs = False
......@@ -169,7 +171,7 @@ class ClickHouseCluster:
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False,
......@@ -207,6 +209,7 @@ class ClickHouseCluster:
zookeeper_config_path=self.zookeeper_config_path,
with_mysql=with_mysql,
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_mongo=with_mongo,
with_redis=with_redis,
......@@ -290,6 +293,13 @@ class ClickHouseCluster:
p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')]
cmds.append(self.base_kafka_cmd)
if with_kerberized_kafka and not self.with_kerberized_kafka:
self.with_kerberized_kafka = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')])
self.base_kerberized_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')]
cmds.append(self.base_kerberized_kafka_cmd)
if with_rabbitmq and not self.with_rabbitmq:
self.with_rabbitmq = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')])
......@@ -608,6 +618,13 @@ class ClickHouseCluster:
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
env = os.environ.copy()
self.kerberized_kafka_instance_path = instance.path
env['KERBERIZED_KAFKA_DIR'] = self.kerberized_kafka_instance_path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
self.kerberized_kafka_docker_id = self.get_instance_docker_id('kerberized_kafka1')
if self.with_rabbitmq and self.base_rabbitmq_cmd:
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
......@@ -778,9 +795,12 @@ services:
- {instance_config_dir}:/etc/clickhouse-server/
- {db_dir}:/var/lib/clickhouse/
- {logs_dir}:/var/log/clickhouse-server/
- /etc/passwd:/etc/passwd:ro
{binary_volume}
{odbc_bridge_volume}
{odbc_ini_path}
{keytab_path}
{krb5_conf}
entrypoint: {entrypoint_cmd}
tmpfs: {tmpfs}
cap_add:
......@@ -810,7 +830,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo,
with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
......@@ -838,6 +858,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
self.with_mongo = with_mongo
self.with_redis = with_redis
......@@ -853,6 +874,13 @@ class ClickHouseInstance:
else:
self.odbc_ini_path = ""
if with_kerberized_kafka:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
self.keytab_path = ""
self.krb5_conf = ""
self.docker_client = None
self.ip_address = None
self.client = None
......@@ -1182,6 +1210,15 @@ class ClickHouseInstance:
if self.with_zookeeper:
shutil.copy(self.zookeeper_config_path, conf_d_dir)
if self.with_kerberized_kafka:
# shutil.copytree(p.abspath(p.join(self.base_dir, 'secrets')), p.abspath(p.join(self.path, 'secrets')))
secrets_dir = p.abspath(p.join(self.custom_config_dir, os.pardir, 'secrets'))
distutils.dir_util.copy_tree(secrets_dir, p.abspath(p.join(self.path, 'secrets')))
# Copy config dir
if self.custom_config_dir:
distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)
# Copy config.d configs
print "Copy custom test config files {} to {}".format(self.custom_main_config_paths, self.config_d_dir)
for path in self.custom_main_config_paths:
......@@ -1217,6 +1254,9 @@ class ClickHouseInstance:
depends_on.append("kafka1")
depends_on.append("schema-registry")
if self.with_kerberized_kafka:
depends_on.append("kerberized_kafka1")
if self.with_rabbitmq:
depends_on.append("rabbitmq1")
......@@ -1280,6 +1320,8 @@ class ClickHouseInstance:
user=os.getuid(),
env_file=env_file,
odbc_ini_path=odbc_ini_path,
keytab_path=self.keytab_path,
krb5_conf=self.krb5_conf,
entrypoint_cmd=entrypoint_cmd,
networks=networks,
app_net=app_net,
......
FROM centos:6.6
# based on confluent kerberos, which is not provided as image any more
RUN yum install -y krb5-server krb5-libs krb5-auth-dialog krb5-workstation
EXPOSE 88 749
ADD ./kerberos_image_config.sh /config.sh
ENTRYPOINT ["/bin/bash", "/config.sh"]
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}
\ No newline at end of file
(id = ${id:Escaped}, blockNo = ${blockNo:Escaped}, val1 = ${val1:CSV}, val2 = ${val2:Escaped}, val3 = ${val3:Escaped})
\ No newline at end of file
{
"type": "record",
"name": "row",
"fields": [
{"name": "id", "type": "long"},
{"name": "blockNo", "type": "int"},
{"name": "val1", "type": "string"},
{"name": "val2", "type": "float"},
{"name": "val3", "type": "int"}
]
}
\ No newline at end of file
@0x99f75f775fe63dae;
struct TestRecordStruct
{
id @0 : Int64;
blockNo @1 : UInt16;
val1 @2 : Text;
val2 @3 : Float32;
val3 @4 : UInt8;
}
\ No newline at end of file
syntax = "proto3";
message TestMessage {
int64 id = 1;
uint32 blockNo = 2;
string val1 = 3;
float val2 = 4;
uint32 val3 = 5;
};
<yandex>
<kafka>
<auto_offset_reset>earliest</auto_offset_reset>
<!-- Debugging of possible issues, like:
- https://github.com/edenhill/librdkafka/issues/2077
- https://github.com/edenhill/librdkafka/issues/1778
- #5615
XXX: for now this messages will appears in stderr.
-->
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_mechanism>GSSAPI</sasl_mechanism>
<sasl_kerberos_service_name>kafka</sasl_kerberos_service_name>
<sasl_kerberos_keytab>/tmp/keytab/clickhouse.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/instance@TEST.CONFLUENT.IO</sasl_kerberos_principal>
<debug>security</debug>
<api_version_request>false</api_version_request>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</yandex>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>
\ No newline at end of file
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<!--stream_poll_timeout_ms>1</stream_poll_timeout_ms>
<stream_flush_interval_ms>100</stream_flush_interval_ms-->
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/kafka.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='clickhouse_path/format_schemas/kafka.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n*clickhouse_path/format_schemas/kafka.proto\"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_KEYVALUEPAIR = _descriptor.Descriptor(
name='KeyValuePair',
full_name='KeyValuePair',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='KeyValuePair.key', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='value', full_name='KeyValuePair.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=46,
serialized_end=88,
)
DESCRIPTOR.message_types_by_name['KeyValuePair'] = _KEYVALUEPAIR
KeyValuePair = _reflection.GeneratedProtocolMessageType('KeyValuePair', (_message.Message,), dict(
DESCRIPTOR = _KEYVALUEPAIR,
__module__ = 'clickhouse_path.format_schemas.kafka_pb2'
# @@protoc_insertion_point(class_scope:KeyValuePair)
))
_sym_db.RegisterMessage(KeyValuePair)
# @@protoc_insertion_point(module_scope)
#!/bin/bash
# based on confluent kerberos, which is not provided as image any more
[[ "TRACE" ]] && set -x
: ${REALM:=TEST.CONFLUENT.IO}
: ${DOMAIN_REALM:=test.confluent.io}
: ${KERB_MASTER_KEY:=masterkey}
: ${KERB_ADMIN_USER:=admin}
: ${KERB_ADMIN_PASS:=admin}
create_config() {
: ${KDC_ADDRESS:=$(hostname -f)}
cat>/etc/krb5.conf<<EOF
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = $REALM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
[realms]
$REALM = {
kdc = $KDC_ADDRESS
admin_server = $KDC_ADDRESS
}
[domain_realm]
.$DOMAIN_REALM = $REALM
$DOMAIN_REALM = $REALM
EOF
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
$REALM = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
master_key_type = des3-hmac-sha1
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
default_principal_flags = +preauth
}
EOF
}
create_db() {
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
}
start_kdc() {
mkdir -p /var/log/kerberos
/etc/rc.d/init.d/krb5kdc start
/etc/rc.d/init.d/kadmin start
chkconfig krb5kdc on
chkconfig kadmin on
}
restart_kdc() {
/etc/rc.d/init.d/krb5kdc restart
/etc/rc.d/init.d/kadmin restart
}
create_admin_user() {
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
}
create_keytabs() {
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@TEST.CONFLUENT.IO"
kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@TEST.CONFLUENT.IO"
# kadmin.local -q "addprinc -randkey kafka/localhost@TEST.CONFLUENT.IO"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/localhost@TEST.CONFLUENT.IO"
kadmin.local -q "addprinc -randkey zkclient@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@TEST.CONFLUENT.IO"
# kadmin.local -q "addprinc -randkey kafkauser@TEST.CONFLUENT.IO"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser@TEST.CONFLUENT.IO"
kadmin.local -q "addprinc -randkey kafkauser/instance@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@TEST.CONFLUENT.IO"
chmod g+r /tmp/keytab/clickhouse.keytab
}
main() {
if [ ! -f /kerberos_initialized ]; then
create_config
create_db
create_admin_user
start_kdc
touch /kerberos_initialized
fi
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
while true; do sleep 1000; done
else
start_kdc
create_keytabs
tail -F /var/log/kerberos/krb5kdc.log
fi
}
[[ "$0" == "$BASH_SOURCE" ]] && main "$@"
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kerberized_kafka.keytab"
principal="kafka/kerberized_kafka1@TEST.CONFLUENT.IO";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CONFLUENT.IO";
};
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = TEST.CONFLUENT.IO
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
[realms]
TEST.CONFLUENT.IO = {
kdc = kafka_kerberos
admin_server = kafka_kerberos
}
[domain_realm]
.TEST.CONFLUENT.IO = TEST.CONFLUENT.IO
TEST.CONFLUENT.IO = TEST.CONFLUENT.IO
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kafka_kerberized_zookeeper.keytab"
principal="zookeeper/kafka_kerberized_zookeeper@TEST.CONFLUENT.IO";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CONFLUENT.IO";
};
import os.path as p
import random
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
import subprocess
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
from google.protobuf.internal.encoder import _VarintBytes
"""
protoc --version
libprotoc 3.0.0
# to create kafka_pb2.py
protoc --python_out=. kafka.proto
"""
import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for SELECT LIMIT is working.
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
config_dir='configs',
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
with_kerberized_kafka=True,
# with_zookeeper=True,
clickhouse_path_dir='clickhouse_path')
kafka_id = '' # instance.cluster.kafka_docker_id
# Helpers
def check_kafka_is_available():
# credentials are needed
p = subprocess.Popen(('docker',
'exec',
'-i',
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'localhost:9093'),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
def wait_kafka_is_available(max_retries=50):
retries = 0
while True:
if check_kafka_is_available():
break
else:
retries += 1
if retries > max_retries:
raise "Kafka is not available"
print("Waiting for Kafka to start up")
time.sleep(1)
def kafka_produce(topic, messages, timestamp=None):
producer = KafkaProducer(bootstrap_servers="localhost:9093")
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
print ("Produced {} messages for topic {}".format(len(messages), topic))
# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
global kafka_id
cluster.start()
kafka_id = instance.cluster.kerberized_kafka_docker_id
print("kafka_id is {}".format(kafka_id))
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
wait_kafka_is_available()
print("kafka is available - running test")
time.sleep(60)
yield # run test
# Tests
@pytest.mark.timeout(1000) # wait to build containers
def test_kafka_json_as_string(kafka_cluster):
kafka_produce('kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
instance.query('''
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka;')
expected = '''\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
'''
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
cluster.shutdown()
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 22
23 23
24 24
25 25
26 26
27 27
28 28
29 29
30 30
31 31
32 32
33 33
34 34
35 35
36 36
37 37
38 38
39 39
40 40
41 41
42 42
43 43
44 44
45 45
46 46
47 47
48 48
49 49
0 virt1 0 0 0 0000-00-00 00:00:00
1 virt1 1 0 0 0000-00-00 00:00:00
2 virt1 2 0 0 0000-00-00 00:00:00
3 virt1 3 0 0 0000-00-00 00:00:00
4 virt1 4 0 0 0000-00-00 00:00:00
5 virt1 5 0 0 0000-00-00 00:00:00
6 virt1 6 0 0 0000-00-00 00:00:00
7 virt1 7 0 0 0000-00-00 00:00:00
8 virt1 8 0 0 0000-00-00 00:00:00
9 virt1 9 0 0 0000-00-00 00:00:00
10 virt1 10 0 0 0000-00-00 00:00:00
11 virt1 11 0 0 0000-00-00 00:00:00
12 virt1 12 0 0 0000-00-00 00:00:00
13 virt1 13 0 0 0000-00-00 00:00:00
14 virt1 14 0 0 0000-00-00 00:00:00
15 virt1 15 0 0 0000-00-00 00:00:00
16 virt1 16 0 0 0000-00-00 00:00:00
17 virt1 17 0 0 0000-00-00 00:00:00
18 virt1 18 0 0 0000-00-00 00:00:00
19 virt1 19 0 0 0000-00-00 00:00:00
20 virt1 20 0 0 0000-00-00 00:00:00
21 virt1 21 0 0 0000-00-00 00:00:00
22 virt1 22 0 0 0000-00-00 00:00:00
23 virt1 23 0 0 0000-00-00 00:00:00
24 virt1 24 0 0 0000-00-00 00:00:00
25 virt1 25 1 0 0000-00-00 00:00:00
26 virt1 26 1 0 0000-00-00 00:00:00
27 virt1 27 1 0 0000-00-00 00:00:00
28 virt1 28 1 0 0000-00-00 00:00:00
29 virt1 29 1 0 0000-00-00 00:00:00
30 virt1 30 1 0 0000-00-00 00:00:00
31 virt1 31 1 0 0000-00-00 00:00:00
32 virt1 32 1 0 0000-00-00 00:00:00
33 virt1 33 1 0 0000-00-00 00:00:00
34 virt1 34 1 0 0000-00-00 00:00:00
35 virt1 35 1 0 0000-00-00 00:00:00
36 virt1 36 1 0 0000-00-00 00:00:00
37 virt1 37 1 0 0000-00-00 00:00:00
38 virt1 38 1 0 0000-00-00 00:00:00
39 virt1 39 1 0 0000-00-00 00:00:00
40 virt1 40 1 0 0000-00-00 00:00:00
41 virt1 41 1 0 0000-00-00 00:00:00
42 virt1 42 1 0 0000-00-00 00:00:00
43 virt1 43 1 0 0000-00-00 00:00:00
44 virt1 44 1 0 0000-00-00 00:00:00
45 virt1 45 1 0 0000-00-00 00:00:00
46 virt1 46 1 0 0000-00-00 00:00:00
47 virt1 47 1 0 0000-00-00 00:00:00
48 virt1 48 1 0 0000-00-00 00:00:00
49 virt1 49 1 0 0000-00-00 00:00:00
0 virt2 0 0 0 0000-00-00 00:00:00
1 virt2 1 1 0 0000-00-00 00:00:00
2 virt2 2 2 0 0000-00-00 00:00:00
3 virt2 3 3 0 0000-00-00 00:00:00
4 virt2 4 4 0 0000-00-00 00:00:00
5 virt2 5 5 0 0000-00-00 00:00:00
6 virt2 6 6 0 0000-00-00 00:00:00
7 virt2 7 7 0 0000-00-00 00:00:00
8 virt2 8 8 0 0000-00-00 00:00:00
9 virt2 9 9 0 0000-00-00 00:00:00
10 virt2 10 10 0 0000-00-00 00:00:00
11 virt2 11 11 0 0000-00-00 00:00:00
12 virt2 12 12 0 0000-00-00 00:00:00
13 virt2 13 13 0 0000-00-00 00:00:00
14 virt2 14 14 0 0000-00-00 00:00:00
15 virt2 15 15 0 0000-00-00 00:00:00
16 virt2 16 16 0 0000-00-00 00:00:00
17 virt2 17 17 0 0000-00-00 00:00:00
18 virt2 18 18 0 0000-00-00 00:00:00
19 virt2 19 19 0 0000-00-00 00:00:00
20 virt2 20 20 0 0000-00-00 00:00:00
21 virt2 21 21 0 0000-00-00 00:00:00
22 virt2 22 22 0 0000-00-00 00:00:00
23 virt2 23 23 0 0000-00-00 00:00:00
24 virt2 24 24 0 0000-00-00 00:00:00
25 virt2 25 25 0 0000-00-00 00:00:00
26 virt2 26 26 0 0000-00-00 00:00:00
27 virt2 27 27 0 0000-00-00 00:00:00
28 virt2 28 28 0 0000-00-00 00:00:00
29 virt2 29 29 0 0000-00-00 00:00:00
30 virt2 30 30 0 0000-00-00 00:00:00
31 virt2 31 31 0 0000-00-00 00:00:00
32 virt2 32 32 0 0000-00-00 00:00:00
33 virt2 33 33 0 0000-00-00 00:00:00
34 virt2 34 34 0 0000-00-00 00:00:00
35 virt2 35 35 0 0000-00-00 00:00:00
36 virt2 36 36 0 0000-00-00 00:00:00
37 virt2 37 37 0 0000-00-00 00:00:00
38 virt2 38 38 0 0000-00-00 00:00:00
39 virt2 39 39 0 0000-00-00 00:00:00
40 virt2 40 40 0 0000-00-00 00:00:00
41 virt2 41 41 0 0000-00-00 00:00:00
42 virt2 42 42 0 0000-00-00 00:00:00
43 virt2 43 43 0 0000-00-00 00:00:00
44 virt2 44 44 0 0000-00-00 00:00:00
45 virt2 45 45 0 0000-00-00 00:00:00
46 virt2 46 46 0 0000-00-00 00:00:00
47 virt2 47 47 0 0000-00-00 00:00:00
48 virt2 48 48 0 0000-00-00 00:00:00
49 virt2 49 49 0 0000-00-00 00:00:00
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册