未验证 提交 a2485ef6 编写于 作者: N Nikolai Kochetov 提交者: GitHub

Merge pull request #11519 from filimonov/20.1_kafka_backports

20.1 kafka backports
Subproject commit 9b184d881c15cc50784b28688c7c99d3d764db24
Subproject commit f555ee36aaa74d17ca0dab3ce472070a610b2966
......@@ -29,7 +29,7 @@ KafkaBlockInputStream::KafkaBlockInputStream(
KafkaBlockInputStream::~KafkaBlockInputStream()
{
if (!claimed)
if (!buffer)
return;
if (broken)
......@@ -47,7 +47,6 @@ void KafkaBlockInputStream::readPrefixImpl()
{
auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
claimed = !!buffer;
if (!buffer)
return;
......@@ -169,7 +168,7 @@ Block KafkaBlockInputStream::readImpl()
}
}
if (buffer->rebalanceHappened() || total_rows == 0)
if (buffer->polledDataUnusable() || total_rows == 0)
return Block();
/// MATERIALIZED columns can be added here, but I think
......
......@@ -25,6 +25,7 @@ public:
void readSuffixImpl() override;
void commit();
bool isStalled() const { return buffer->isStalled(); }
private:
StorageKafka & storage;
......@@ -33,9 +34,12 @@ private:
UInt64 max_block_size;
ConsumerBufferPtr buffer;
bool broken = true, finished = false, claimed = false, commit_in_suffix;
bool broken = true;
bool finished = false;
bool commit_in_suffix;
const Block non_virtual_header, virtual_header;
const Block non_virtual_header;
const Block virtual_header;
};
}
......@@ -23,7 +23,7 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.", 0) \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.", 0) \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum batch size for poll.", 0) \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block", 0)
......
......@@ -10,6 +10,7 @@ namespace DB
using namespace std::chrono_literals;
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
const auto DRAIN_TIMEOUT_MS = 5000ms;
ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
......@@ -77,21 +78,71 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
{
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
try
{
if (!consumer->get_subscription().empty())
consumer->unsubscribe();
if (!assignment.empty())
consumer->unassign();
while (consumer->get_consumer_queue().next_event(100ms));
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error during unsubscribe: " << e.what());
}
drain();
}
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Exception from ReadBufferFromKafkaConsumer destructor: " << e.what());
LOG_ERROR(log, "Error while destructing consumer: " << e.what());
}
}
// Needed to drain rest of the messages / queued callback calls from the consumer
// after unsubscribe, otherwise consumer will hang on destruction
// see https://github.com/edenhill/librdkafka/issues/2077
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
void ReadBufferFromKafkaConsumer::drain()
{
auto start_time = std::chrono::steady_clock::now();
cppkafka::Error last_error(RD_KAFKA_RESP_ERR_NO_ERROR);
while (true)
{
auto msg = consumer->poll(100ms);
if (!msg)
break;
auto error = msg.get_error();
if (error)
{
if (msg.is_eof() || error == last_error)
{
break;
}
else
{
LOG_ERROR(log, "Error during draining: " << error);
}
}
// i don't stop draining on first error,
// only if it repeats once again sequentially
last_error = error;
auto ts = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time) > DRAIN_TIMEOUT_MS)
{
LOG_ERROR(log, "Timeout during draining.");
break;
}
}
}
void ReadBufferFromKafkaConsumer::commit()
{
auto PrintOffsets = [this] (const char * prefix, const cppkafka::TopicPartitionList & offsets)
......@@ -196,8 +247,13 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
// it should not raise exception as used in destructor
try
{
if (!consumer->get_subscription().empty())
consumer->unsubscribe();
// From docs: Any previous subscription will be unassigned and unsubscribed first.
consumer->subscribe(topics);
// I wanted to avoid explicit unsubscribe as it requires draining the messages
// to close the consumer safely after unsubscribe
// see https://github.com/edenhill/librdkafka/issues/2077
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
}
catch (const cppkafka::HandleException & e)
{
......@@ -229,12 +285,24 @@ void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled || stopped || !allowed || rebalance_happened)
// we can react on stop only during fetching data
// after block is formed (i.e. during copying data to MV / commiting) we ignore stop attempts
if (stopped)
{
was_stopped = true;
offsets_stored = 0;
return false;
}
if (stalled || was_stopped || !allowed || rebalance_happened)
return false;
if (current == messages.end())
{
if (intermediate_commit)
......@@ -246,7 +314,13 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
if (rebalance_happened)
if (stopped)
{
was_stopped = true;
offsets_stored = 0;
return false;
}
else if (rebalance_happened)
{
if (!new_messages.empty())
{
......@@ -317,7 +391,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
void ReadBufferFromKafkaConsumer::storeLastReadMessageOffset()
{
if (!stalled && !rebalance_happened)
if (!stalled && !was_stopped && !rebalance_happened)
{
consumer->store_offset(*(current - 1));
++offsets_stored;
......
......@@ -29,7 +29,6 @@ public:
const Names & _topics
);
~ReadBufferFromKafkaConsumer() override;
void allowNext() { allowed = true; } // Allow to read next message.
void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics.
......@@ -38,7 +37,8 @@ public:
auto pollTimeout() const { return poll_timeout; }
bool hasMorePolledMessages() const;
auto rebalanceHappened() const { return rebalance_happened; }
bool polledDataUnusable() const { return (was_stopped || rebalance_happened); }
bool isStalled() const { return stalled; }
void storeLastReadMessageOffset();
void resetToLastCommitted(const char * msg);
......@@ -64,13 +64,20 @@ private:
const std::atomic<bool> & stopped;
// order is important, need to be destructed before consumer
Messages messages;
Messages::const_iterator current;
bool rebalance_happened = false;
bool was_stopped = false;
// order is important, need to be destructed before consumer
cppkafka::TopicPartitionList assignment;
const Names topics;
void drain();
bool nextImpl() override;
};
......
......@@ -54,6 +54,7 @@ namespace
{
const auto RESCHEDULE_MS = 500;
const auto CLEANUP_TIMEOUT_MS = 3000;
const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever)
/// Configuration prefix
const String CONFIG_PREFIX = "kafka";
......@@ -182,17 +183,14 @@ void StorageKafka::shutdown()
// Interrupt streaming thread
stream_cancelled = true;
LOG_TRACE(log, "Waiting for cleanup");
task->deactivate();
// Close all consumers
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto buffer = popReadBuffer();
// FIXME: not sure if we really close consumers here, and if we really need to close them here.
}
LOG_TRACE(log, "Waiting for cleanup");
rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
task->deactivate();
}
......@@ -263,17 +261,23 @@ ProducerBufferPtr StorageKafka::createWriteBuffer()
ConsumerBufferPtr StorageKafka::createReadBuffer()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
updateConfiguration(conf);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
updateConfiguration(conf);
// Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(conf);
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
// Limit the number of batched messages to allow early cancellations
const Settings & settings = global_context.getSettingsRef();
......@@ -337,6 +341,8 @@ void StorageKafka::threadFunc()
// Check if at least one direct dependency is attached
auto dependencies = global_context.getDependencies(database_name, table_name);
auto start_time = std::chrono::steady_clock::now();
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0 && dependencies.size() > 0)
{
......@@ -345,9 +351,21 @@ void StorageKafka::threadFunc()
LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views");
// Reschedule if not limited
if (!streamToViews())
// Exit the loop & reschedule if some stream stalled
auto some_stream_is_stalled = streamToViews();
if (some_stream_is_stalled)
{
LOG_TRACE(log, "Stream(s) stalled. Reschedule.");
break;
}
auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
break;
}
}
}
catch (...)
......@@ -405,17 +423,19 @@ bool StorageKafka::streamToViews()
else
in = streams[0];
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
bool some_stream_is_stalled = false;
for (auto & stream : streams)
{
some_stream_is_stalled = some_stream_is_stalled || stream->as<KafkaBlockInputStream>()->isStalled();
stream->as<KafkaBlockInputStream>()->commit();
}
// Check whether the limits were applied during query execution
bool limits_applied = false;
const BlockStreamProfileInfo & info = in->getProfileInfo();
limits_applied = info.hasAppliedLimit();
return limits_applied;
return some_stream_is_stalled;
}
void registerStorageKafka(StorageFactory & factory)
......
<yandex>
<kafka>
<auto_offset_reset>earliest</auto_offset_reset>
<!-- Debugging of possible issues, like:
- https://github.com/edenhill/librdkafka/issues/2077
- https://github.com/edenhill/librdkafka/issues/1778
- #5615
XXX: for now this messages will appears in stderr.
-->
<debug>cgrp,consumer,topic,protocol</debug>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</yandex>
......@@ -28,7 +28,6 @@ import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for SELECT LIMIT is working.
# TODO: modify tests to respect `skip_broken_messages` setting.
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
......@@ -199,6 +198,98 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_consumer_hang(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang',
kafka_group_name = 'consumer_hang',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8,
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory();
CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka;
''')
time.sleep(10)
instance.query('SELECT * FROM test.view')
# This should trigger heartbeat fail,
# which will trigger REBALANCE_IN_PROGRESS,
# and which can lead to consumer hang.
kafka_cluster.pause_container('kafka1')
time.sleep(0.5)
kafka_cluster.unpause_container('kafka1')
# print("Attempt to drop")
instance.query('DROP TABLE test.kafka')
#kafka_cluster.open_bash_shell('instance')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
# original problem appearance was a sequence of the following messages in librdkafka logs:
# BROKERFAIL -> |ASSIGN| -> REBALANCE_IN_PROGRESS -> "waiting for rebalance_cb" (repeated forever)
# so it was waiting forever while the application will execute queued rebalance callback
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
def test_kafka_consumer_hang2(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
''')
# first consumer subscribe the topic, try to poll some data, and go to rest
instance.query('SELECT * FROM test.kafka')
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query('SELECT * FROM test.kafka2')
#echo 'SELECT * FROM test.kafka; SELECT * FROM test.kafka2; DROP TABLE test.kafka;' | clickhouse client -mn &
# kafka_cluster.open_bash_shell('instance')
# first consumer has pending rebalance callback unprocessed (no poll after select)
# one of those queries was failing because of
# https://github.com/edenhill/librdkafka/issues/2077
# https://github.com/edenhill/librdkafka/issues/2898
instance.query('DROP TABLE test.kafka')
instance.query('DROP TABLE test.kafka2')
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query('''
......@@ -877,8 +968,11 @@ def test_kafka_flush_by_block_size(kafka_cluster):
time.sleep(1)
result = instance.query('SELECT count() FROM test.view')
print(result)
# TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216
# second flush happens earlier than expected, so we have 2 parts here instead of one
# flush by block size works correctly, so the feature checked by the test is working correctly
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
# print(result)
# kafka_cluster.open_bash_shell('instance')
......@@ -1034,6 +1128,7 @@ def test_kafka_rebalance(kafka_cluster):
print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination'))
# Some queries to debug...
# SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1)
# select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0;
# SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key;
......@@ -1041,6 +1136,18 @@ def test_kafka_rebalance(kafka_cluster):
# select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset;
# SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset;
# CREATE TABLE test.reference (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092',
# kafka_topic_list = 'topic_with_multiple_partitions',
# kafka_group_name = 'rebalance_test_group_reference',
# kafka_format = 'JSONEachRow',
# kafka_max_block_size = 100000;
#
# CREATE MATERIALIZED VIEW test.reference_mv Engine=Log AS
# SELECT key, value, _topic,_key,_offset, _partition, _timestamp, 'reference' as _consumed_by
# FROM test.reference;
#
# select * from test.reference_mv left join test.destination using (key,_topic,_offset,_partition) where test.destination._consumed_by = '';
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination'))
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS):
......@@ -1141,10 +1248,196 @@ def test_exception_from_destructor(kafka_cluster):
DROP TABLE test.kafka;
''')
kafka_cluster.open_bash_shell('instance')
#kafka_cluster.open_bash_shell('instance')
assert TSV(instance.query('SELECT 1')) == TSV('1')
@pytest.mark.timeout(120)
def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(1)]
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination (
key UInt64,
value UInt64,
_topic String,
_key String,
_offset UInt64,
_partition UInt64,
_timestamp Nullable(DateTime),
_consumed_by LowCardinality(String)
)
ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.kafka_consumer TO test.destination AS
SELECT
key,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) == 0:
print("Waiting for test.kafka_consumer to start consume")
time.sleep(1)
cancel = threading.Event()
i = [2]
def produce():
while not cancel.is_set():
messages = []
for _ in range(113):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
time.sleep(1)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
time.sleep(12)
instance.query('''
DROP TABLE test.kafka;
''')
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10000;
''')
cancel.set()
time.sleep(15)
#kafka_cluster.open_bash_shell('instance')
# SELECT key, _timestamp, _offset FROM test.destination where runningDifference(key) <> 1 ORDER BY key;
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.destination')
print(result)
instance.query('''
DROP TABLE test.kafka_consumer;
DROP TABLE test.destination;
''')
kafka_thread.join()
assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0]-1)), 'Missing data!'
@pytest.mark.timeout(120)
def test_bad_reschedule(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)]
kafka_produce('test_bad_reschedule', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
SELECT
key,
now() as consume_ts,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
print("Waiting for consume")
time.sleep(1)
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
@pytest.mark.timeout(1200)
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'duplicates_when_commit_failed',
kafka_group_name = 'duplicates_when_commit_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(0.5);
''')
#print time.strftime("%m/%d/%Y %H:%M:%S")
time.sleep(12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen
#print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.pause_container('kafka1')
# that timeout it VERY important, and picked after lot of experiments
# when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka)
# when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member"
time.sleep(40)
#print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.unpause_container('kafka1')
#kafka_cluster.open_bash_shell('instance')
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(30)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("Local: Waiting for coordinator")
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result)
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
assert TSV(result) == TSV('22\t22\t22')
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册