未验证 提交 2e304699 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #15757 from filimonov/fix-kafka-no-holes-flap

fix flap in no_holes_when_write_suffix_failed
......@@ -19,6 +19,7 @@ class PartitionManager:
def __init__(self):
self._iptables_rules = []
_NetworkManager.get()
def drop_instance_zk_connections(self, instance, action='DROP'):
self._check_instance(instance)
......
......@@ -1454,8 +1454,15 @@ def test_kafka_virtual_columns2(kafka_cluster):
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(240)
@pytest.mark.timeout(120)
def test_kafka_produce_key_timestamp(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="insert3", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
......@@ -1810,7 +1817,7 @@ def test_kafka_rebalance(kafka_cluster):
@pytest.mark.timeout(1200)
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
instance.query('''
......@@ -1823,8 +1830,19 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
kafka_topic_list = 'no_holes_when_write_suffix_failed',
kafka_group_name = 'no_holes_when_write_suffix_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
kafka_max_block_size = 20,
kafka_flush_interval_ms = 2000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
''')
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
# init PartitionManager (it starts container) earlier
pm = PartitionManager()
instance.query('''
CREATE TABLE test.view (key UInt64, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1')
ORDER BY key;
......@@ -1833,17 +1851,18 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(1);
''')
# the tricky part here is that disconnect should happen after write prefix, but before write suffix
# so i use sleepEachRow
with PartitionManager() as pm:
time.sleep(12)
pm.drop_instance_zk_connections(instance)
time.sleep(20)
pm.heal_all
time.sleep(3)
pm.drop_instance_zk_connections(instance)
time.sleep(20)
pm.heal_all()
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(90)
time.sleep(45)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("ZooKeeper session has been expired.: while write prefix to view")
......@@ -2021,9 +2040,9 @@ def test_bad_reschedule(kafka_cluster):
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
@pytest.mark.timeout(1200)
@pytest.mark.timeout(300)
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
......@@ -2036,8 +2055,16 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
kafka_topic_list = 'duplicates_when_commit_failed',
kafka_group_name = 'duplicates_when_commit_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
kafka_max_block_size = 20,
kafka_flush_interval_ms = 1000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
''')
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree()
ORDER BY key;
......@@ -2048,15 +2075,14 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
''')
# print time.strftime("%m/%d/%Y %H:%M:%S")
time.sleep(
12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen
time.sleep(3) # MV will work for 10 sec, after that commit should happen, we want to pause before
# print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.pause_container('kafka1')
# that timeout it VERY important, and picked after lot of experiments
# when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka)
# when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member"
time.sleep(40)
time.sleep(42)
# print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.unpause_container('kafka1')
......
......@@ -2,7 +2,7 @@ DROP TABLE IF EXISTS slow_log;
DROP TABLE IF EXISTS expected_times;
CREATE TABLE expected_times (QUERY_GROUP_ID String, max_query_duration_ms UInt64) Engine=Memory;
INSERT INTO expected_times VALUES('main_dashboard_top_query', 100), ('main_dashboard_bottom_query', 100);
INSERT INTO expected_times VALUES('main_dashboard_top_query', 500), ('main_dashboard_bottom_query', 500);
SET log_queries=1;
SELECT 1;
......@@ -25,8 +25,8 @@ CREATE MATERIALIZED VIEW slow_log Engine=Memory AS
SELECT 1 /* QUERY_GROUP_ID:main_dashboard_top_query */;
SELECT 1 /* QUERY_GROUP_ID:main_dashboard_bottom_query */;
SELECT 1 WHERE not ignore(sleep(0.105)) /* QUERY_GROUP_ID:main_dashboard_top_query */;
SELECT 1 WHERE not ignore(sleep(0.105)) /* QUERY_GROUP_ID:main_dashboard_bottom_query */;
SELECT 1 WHERE not ignore(sleep(0.520)) /* QUERY_GROUP_ID:main_dashboard_top_query */;
SELECT 1 WHERE not ignore(sleep(0.520)) /* QUERY_GROUP_ID:main_dashboard_bottom_query */;
SET log_queries=0;
SYSTEM FLUSH LOGS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册