提交 2d5c31d5 编写于 作者: I Ilya Golshtein

test_storage_kerberized_kafka cleanup + negative test

上级 c12fa26f
......@@ -165,6 +165,22 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
### Kerberos support {#kafka-kerberos-support}
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements.
Example:
``` xml
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
```
## Virtual Columns {#virtual-columns}
- `_topic` — Kafka topic.
......
......@@ -619,10 +619,10 @@ class ClickHouseCluster:
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
env = os.environ.copy()
self.kerberized_kafka_instance_path = instance.path
env['KERBERIZED_KAFKA_DIR'] = self.kerberized_kafka_instance_path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
env_var = {}
env_var['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
_create_env_file(self.base_dir, env_var, ".env")
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'])
self.kerberized_kafka_docker_id = self.get_instance_docker_id('kerberized_kafka1')
if self.with_rabbitmq and self.base_rabbitmq_cmd:
......@@ -1211,7 +1211,6 @@ class ClickHouseInstance:
shutil.copy(self.zookeeper_config_path, conf_d_dir)
if self.with_kerberized_kafka:
# shutil.copytree(p.abspath(p.join(self.base_dir, 'secrets')), p.abspath(p.join(self.path, 'secrets')))
secrets_dir = p.abspath(p.join(self.custom_config_dir, os.pardir, 'secrets'))
distutils.dir_util.copy_tree(secrets_dir, p.abspath(p.join(self.path, 'secrets')))
......
FROM centos:6.6
# based on confluent kerberos, which is not provided as image any more
# old OS to make is faster and smaller
RUN yum install -y krb5-server krb5-libs krb5-auth-dialog krb5-workstation
EXPOSE 88 749
ADD ./kerberos_image_config.sh /config.sh
RUN touch /config.sh
# should be overwritten e.g. via docker_compose volumes
# volumes: /some_path/my_kerberos_config.sh:/config.sh:ro
ENTRYPOINT ["/bin/bash", "/config.sh"]
......@@ -12,7 +12,7 @@
<sasl_mechanism>GSSAPI</sasl_mechanism>
<sasl_kerberos_service_name>kafka</sasl_kerberos_service_name>
<sasl_kerberos_keytab>/tmp/keytab/clickhouse.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/instance@TEST.CONFLUENT.IO</sasl_kerberos_principal>
<sasl_kerberos_principal>kafkauser/instance@TEST.CLICKHOUSE.TECH</sasl_kerberos_principal>
<debug>security</debug>
<api_version_request>false</api_version_request>
</kafka>
......
#!/bin/bash
# based on confluent kerberos, which is not provided as image any more
[[ "TRACE" ]] && set -x
: ${REALM:=TEST.CONFLUENT.IO}
: ${DOMAIN_REALM:=test.confluent.io}
: ${REALM:=TEST.CLICKHOUSE.TECH}
: ${DOMAIN_REALM:=test.clickhouse.tech}
: ${KERB_MASTER_KEY:=masterkey}
: ${KERB_ADMIN_USER:=admin}
: ${KERB_ADMIN_PASS:=admin}
......@@ -92,23 +91,19 @@ create_admin_user() {
create_keytabs() {
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@TEST.CONFLUENT.IO"
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@TEST.CONFLUENT.IO"
# kadmin.local -q "addprinc -randkey kafka/localhost@TEST.CONFLUENT.IO"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/localhost@TEST.CONFLUENT.IO"
kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@${REALM}"
kadmin.local -q "addprinc -randkey zkclient@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@TEST.CONFLUENT.IO"
# kadmin.local -q "addprinc -randkey kafkauser@TEST.CONFLUENT.IO"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser@TEST.CONFLUENT.IO"
kadmin.local -q "addprinc -randkey zkclient@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@${REALM}"
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"
kadmin.local -q "addprinc -randkey kafkauser/instance@TEST.CONFLUENT.IO"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@TEST.CONFLUENT.IO"
chmod g+r /tmp/keytab/clickhouse.keytab
}
......
......@@ -3,13 +3,12 @@ KafkaServer {
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kerberized_kafka.keytab"
principal="kafka/kerberized_kafka1@TEST.CONFLUENT.IO";
principal="kafka/kerberized_kafka1@TEST.CLICKHOUSE.TECH";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CONFLUENT.IO";
principal="zkclient@TEST.CLICKHOUSE.TECH";
};
......@@ -4,7 +4,7 @@
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = TEST.CONFLUENT.IO
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
......@@ -12,11 +12,11 @@
forwardable = true
[realms]
TEST.CONFLUENT.IO = {
TEST.CLICKHOUSE.TECH = {
kdc = kafka_kerberos
admin_server = kafka_kerberos
}
[domain_realm]
.TEST.CONFLUENT.IO = TEST.CONFLUENT.IO
TEST.CONFLUENT.IO = TEST.CONFLUENT.IO
.TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
......@@ -3,12 +3,12 @@ Server {
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kafka_kerberized_zookeeper.keytab"
principal="zookeeper/kafka_kerberized_zookeeper@TEST.CONFLUENT.IO";
principal="zookeeper/kafka_kerberized_zookeeper@TEST.CLICKHOUSE.TECH";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CONFLUENT.IO";
principal="zkclient@TEST.CLICKHOUSE.TECH";
};
......@@ -17,37 +17,20 @@ from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
from google.protobuf.internal.encoder import _VarintBytes
"""
protoc --version
libprotoc 3.0.0
# to create kafka_pb2.py
protoc --python_out=. kafka.proto
"""
import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for SELECT LIMIT is working.
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
config_dir='configs',
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
with_kerberized_kafka=True,
# with_zookeeper=True,
clickhouse_path_dir='clickhouse_path')
)
kafka_id = '' # instance.cluster.kafka_docker_id
# Helpers
def check_kafka_is_available():
# credentials are needed
# plaintext
p = subprocess.Popen(('docker',
'exec',
'-i',
......@@ -102,12 +85,11 @@ def kafka_setup_teardown():
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
wait_kafka_is_available()
print("kafka is available - running test")
time.sleep(60)
yield # run test
# Tests
@pytest.mark.timeout(1000) # wait to build containers
@pytest.mark.timeout(180) # wait to build containers
def test_kafka_json_as_string(kafka_cluster):
kafka_produce('kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
......@@ -130,6 +112,30 @@ def test_kafka_json_as_string(kafka_cluster):
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
def test_kafka_json_as_string_no_kdc(kafka_cluster):
kafka_produce('kafka_json_as_string_no_kdc', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
kafka_cluster.pause_container('kafka_kerberos')
instance.query('''
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka_no_kdc;')
expected = ''
kafka_cluster.unpause_container('kafka_kerberos')
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
if __name__ == '__main__':
cluster.start()
......
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}
\ No newline at end of file
(id = ${id:Escaped}, blockNo = ${blockNo:Escaped}, val1 = ${val1:CSV}, val2 = ${val2:Escaped}, val3 = ${val3:Escaped})
\ No newline at end of file
{
"type": "record",
"name": "row",
"fields": [
{"name": "id", "type": "long"},
{"name": "blockNo", "type": "int"},
{"name": "val1", "type": "string"},
{"name": "val2", "type": "float"},
{"name": "val3", "type": "int"}
]
}
\ No newline at end of file
@0x99f75f775fe63dae;
struct TestRecordStruct
{
id @0 : Int64;
blockNo @1 : UInt16;
val1 @2 : Text;
val2 @3 : Float32;
val3 @4 : UInt8;
}
\ No newline at end of file
syntax = "proto3";
message TestMessage {
int64 id = 1;
uint32 blockNo = 2;
string val1 = 3;
float val2 = 4;
uint32 val3 = 5;
};
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 22
23 23
24 24
25 25
26 26
27 27
28 28
29 29
30 30
31 31
32 32
33 33
34 34
35 35
36 36
37 37
38 38
39 39
40 40
41 41
42 42
43 43
44 44
45 45
46 46
47 47
48 48
49 49
0 virt1 0 0 0 0000-00-00 00:00:00
1 virt1 1 0 0 0000-00-00 00:00:00
2 virt1 2 0 0 0000-00-00 00:00:00
3 virt1 3 0 0 0000-00-00 00:00:00
4 virt1 4 0 0 0000-00-00 00:00:00
5 virt1 5 0 0 0000-00-00 00:00:00
6 virt1 6 0 0 0000-00-00 00:00:00
7 virt1 7 0 0 0000-00-00 00:00:00
8 virt1 8 0 0 0000-00-00 00:00:00
9 virt1 9 0 0 0000-00-00 00:00:00
10 virt1 10 0 0 0000-00-00 00:00:00
11 virt1 11 0 0 0000-00-00 00:00:00
12 virt1 12 0 0 0000-00-00 00:00:00
13 virt1 13 0 0 0000-00-00 00:00:00
14 virt1 14 0 0 0000-00-00 00:00:00
15 virt1 15 0 0 0000-00-00 00:00:00
16 virt1 16 0 0 0000-00-00 00:00:00
17 virt1 17 0 0 0000-00-00 00:00:00
18 virt1 18 0 0 0000-00-00 00:00:00
19 virt1 19 0 0 0000-00-00 00:00:00
20 virt1 20 0 0 0000-00-00 00:00:00
21 virt1 21 0 0 0000-00-00 00:00:00
22 virt1 22 0 0 0000-00-00 00:00:00
23 virt1 23 0 0 0000-00-00 00:00:00
24 virt1 24 0 0 0000-00-00 00:00:00
25 virt1 25 1 0 0000-00-00 00:00:00
26 virt1 26 1 0 0000-00-00 00:00:00
27 virt1 27 1 0 0000-00-00 00:00:00
28 virt1 28 1 0 0000-00-00 00:00:00
29 virt1 29 1 0 0000-00-00 00:00:00
30 virt1 30 1 0 0000-00-00 00:00:00
31 virt1 31 1 0 0000-00-00 00:00:00
32 virt1 32 1 0 0000-00-00 00:00:00
33 virt1 33 1 0 0000-00-00 00:00:00
34 virt1 34 1 0 0000-00-00 00:00:00
35 virt1 35 1 0 0000-00-00 00:00:00
36 virt1 36 1 0 0000-00-00 00:00:00
37 virt1 37 1 0 0000-00-00 00:00:00
38 virt1 38 1 0 0000-00-00 00:00:00
39 virt1 39 1 0 0000-00-00 00:00:00
40 virt1 40 1 0 0000-00-00 00:00:00
41 virt1 41 1 0 0000-00-00 00:00:00
42 virt1 42 1 0 0000-00-00 00:00:00
43 virt1 43 1 0 0000-00-00 00:00:00
44 virt1 44 1 0 0000-00-00 00:00:00
45 virt1 45 1 0 0000-00-00 00:00:00
46 virt1 46 1 0 0000-00-00 00:00:00
47 virt1 47 1 0 0000-00-00 00:00:00
48 virt1 48 1 0 0000-00-00 00:00:00
49 virt1 49 1 0 0000-00-00 00:00:00
0 virt2 0 0 0 0000-00-00 00:00:00
1 virt2 1 1 0 0000-00-00 00:00:00
2 virt2 2 2 0 0000-00-00 00:00:00
3 virt2 3 3 0 0000-00-00 00:00:00
4 virt2 4 4 0 0000-00-00 00:00:00
5 virt2 5 5 0 0000-00-00 00:00:00
6 virt2 6 6 0 0000-00-00 00:00:00
7 virt2 7 7 0 0000-00-00 00:00:00
8 virt2 8 8 0 0000-00-00 00:00:00
9 virt2 9 9 0 0000-00-00 00:00:00
10 virt2 10 10 0 0000-00-00 00:00:00
11 virt2 11 11 0 0000-00-00 00:00:00
12 virt2 12 12 0 0000-00-00 00:00:00
13 virt2 13 13 0 0000-00-00 00:00:00
14 virt2 14 14 0 0000-00-00 00:00:00
15 virt2 15 15 0 0000-00-00 00:00:00
16 virt2 16 16 0 0000-00-00 00:00:00
17 virt2 17 17 0 0000-00-00 00:00:00
18 virt2 18 18 0 0000-00-00 00:00:00
19 virt2 19 19 0 0000-00-00 00:00:00
20 virt2 20 20 0 0000-00-00 00:00:00
21 virt2 21 21 0 0000-00-00 00:00:00
22 virt2 22 22 0 0000-00-00 00:00:00
23 virt2 23 23 0 0000-00-00 00:00:00
24 virt2 24 24 0 0000-00-00 00:00:00
25 virt2 25 25 0 0000-00-00 00:00:00
26 virt2 26 26 0 0000-00-00 00:00:00
27 virt2 27 27 0 0000-00-00 00:00:00
28 virt2 28 28 0 0000-00-00 00:00:00
29 virt2 29 29 0 0000-00-00 00:00:00
30 virt2 30 30 0 0000-00-00 00:00:00
31 virt2 31 31 0 0000-00-00 00:00:00
32 virt2 32 32 0 0000-00-00 00:00:00
33 virt2 33 33 0 0000-00-00 00:00:00
34 virt2 34 34 0 0000-00-00 00:00:00
35 virt2 35 35 0 0000-00-00 00:00:00
36 virt2 36 36 0 0000-00-00 00:00:00
37 virt2 37 37 0 0000-00-00 00:00:00
38 virt2 38 38 0 0000-00-00 00:00:00
39 virt2 39 39 0 0000-00-00 00:00:00
40 virt2 40 40 0 0000-00-00 00:00:00
41 virt2 41 41 0 0000-00-00 00:00:00
42 virt2 42 42 0 0000-00-00 00:00:00
43 virt2 43 43 0 0000-00-00 00:00:00
44 virt2 44 44 0 0000-00-00 00:00:00
45 virt2 45 45 0 0000-00-00 00:00:00
46 virt2 46 46 0 0000-00-00 00:00:00
47 virt2 47 47 0 0000-00-00 00:00:00
48 virt2 48 48 0 0000-00-00 00:00:00
49 virt2 49 49 0 0000-00-00 00:00:00
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册