未验证 提交 1ce0d4db 编写于 作者: F filimonov 提交者: GitHub

Update test.py

similar change for test_kafka_duplicates_when_commit_failed
上级 60f4e561
......@@ -2040,9 +2040,9 @@ def test_bad_reschedule(kafka_cluster):
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
@pytest.mark.timeout(1200)
@pytest.mark.timeout(300)
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
......@@ -2055,8 +2055,16 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
kafka_topic_list = 'duplicates_when_commit_failed',
kafka_group_name = 'duplicates_when_commit_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
kafka_max_block_size = 20,
kafka_flush_interval_ms = 1000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
''')
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree()
ORDER BY key;
......@@ -2067,15 +2075,14 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
''')
# print time.strftime("%m/%d/%Y %H:%M:%S")
time.sleep(
12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen
time.sleep(3) # MV will work for 10 sec, after that commit should happen, we want to pause before
# print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.pause_container('kafka1')
# that timeout it VERY important, and picked after lot of experiments
# when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka)
# when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member"
time.sleep(40)
time.sleep(42)
# print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.unpause_container('kafka1')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册