提交 e6bfd9d5 编写于 作者: P Peng Jian

1. Add new setting for Kafka engine, named kafka_thread_per_consumer which...

 1. Add new setting for Kafka engine, named kafka_thread_per_consumer which default value is false. 2. Create separate thread pool for Kafka engine.
上级 de0a40ae
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \ M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \ M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \
...@@ -38,6 +39,7 @@ ...@@ -38,6 +39,7 @@
M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \ M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \
M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \ M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \
M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \ M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \
M(MemoryTrackingInBackgroundMessageBrokerSchedulePool, "Total amount of memory (bytes) allocated in background message broker pool (that is dedicated for background message streaming).") \
M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \ M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \ M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \ M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
......
...@@ -72,6 +72,7 @@ class IColumn; ...@@ -72,6 +72,7 @@ class IColumn;
M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \ M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for kafka streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
\ \
M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
......
...@@ -82,6 +82,9 @@ namespace CurrentMetrics ...@@ -82,6 +82,9 @@ namespace CurrentMetrics
extern const Metric BackgroundDistributedSchedulePoolTask; extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric MemoryTrackingInBackgroundDistributedSchedulePool; extern const Metric MemoryTrackingInBackgroundDistributedSchedulePool;
extern const Metric BackgroundMessageBrokerSchedulePoolTask;
extern const Metric MemoryTrackingInBackgroundMessageBrokerSchedulePool;
} }
...@@ -341,6 +344,7 @@ struct ContextShared ...@@ -341,6 +344,7 @@ struct ContextShared
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used in kafka streaming)
MultiVersion<Macros> macros; /// Substitutions extracted from config. MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk. std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part. /// Rules for selecting the compression settings, depending on the size of the part.
...@@ -441,6 +445,7 @@ struct ContextShared ...@@ -441,6 +445,7 @@ struct ContextShared
schedule_pool.reset(); schedule_pool.reset();
distributed_schedule_pool.reset(); distributed_schedule_pool.reset();
ddl_worker.reset(); ddl_worker.reset();
message_broker_schedule_pool.reset();
/// Stop trace collector if any /// Stop trace collector if any
trace_collector.reset(); trace_collector.reset();
...@@ -1421,6 +1426,18 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() ...@@ -1421,6 +1426,18 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool()
return *shared->distributed_schedule_pool; return *shared->distributed_schedule_pool;
} }
BackgroundSchedulePool & Context::getMessageBrokerSchedulePool()
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
shared->message_broker_schedule_pool.emplace(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
CurrentMetrics::MemoryTrackingInBackgroundMessageBrokerSchedulePool,
"BgMBSchPool");
return *shared->message_broker_schedule_pool;
}
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker) void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
{ {
auto lock = getLock(); auto lock = getLock();
......
...@@ -503,6 +503,7 @@ public: ...@@ -503,6 +503,7 @@ public:
BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool(); BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool(); BackgroundSchedulePool & getSchedulePool();
BackgroundSchedulePool & getMessageBrokerSchedulePool();
BackgroundSchedulePool & getDistributedSchedulePool(); BackgroundSchedulePool & getDistributedSchedulePool();
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker); void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
......
...@@ -28,7 +28,8 @@ class ASTStorage; ...@@ -28,7 +28,8 @@ class ASTStorage;
M(String, kafka_format, "", "The message format for Kafka engine.", 0) \ M(String, kafka_format, "", "The message format for Kafka engine.", 0) \
M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \ M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0)
/** TODO: */ /** TODO: */
/* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ /* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */
......
...@@ -138,11 +138,13 @@ StorageKafka::StorageKafka( ...@@ -138,11 +138,13 @@ StorageKafka::StorageKafka(
, semaphore(0, num_consumers) , semaphore(0, num_consumers)
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value) , intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
, settings_adjustments(createSettingsAdjustments()) , settings_adjustments(createSettingsAdjustments())
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_); storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
for (size_t i = 0; i < num_consumers; i++) auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
{ {
auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); });
task->deactivate(); task->deactivate();
...@@ -258,20 +260,24 @@ void StorageKafka::startup() ...@@ -258,20 +260,24 @@ void StorageKafka::startup()
{ {
tryLogCurrentException(log); tryLogCurrentException(log);
} }
}
// Start the reader thread // Start the reader thread
for (size_t i = 0; i < tasks.size(); ++i)
{
tasks[i]->holder->activateAndSchedule(); tasks[i]->holder->activateAndSchedule();
} }
} }
void StorageKafka::shutdown() void StorageKafka::shutdown()
{ {
// Interrupt streaming thread for (size_t i = 0; i < tasks.size(); ++i)
LOG_TRACE(log, "Waiting for cleanup");
for (size_t i = 0; i < num_consumers; i++)
{ {
// Interrupt streaming thread
tasks[i]->stream_cancelled = true; tasks[i]->stream_cancelled = true;
LOG_TRACE(log, "Waiting for cleanup");
tasks[i]->holder->deactivate(); tasks[i]->holder->deactivate();
} }
...@@ -374,8 +380,12 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number) ...@@ -374,8 +380,12 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number)
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
if (thread_per_consumer)
{
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
} }
size_t StorageKafka::getMaxBlockSize() const size_t StorageKafka::getMaxBlockSize() const
...@@ -473,7 +483,8 @@ bool StorageKafka::checkDependencies(const StorageID & table_id) ...@@ -473,7 +483,8 @@ bool StorageKafka::checkDependencies(const StorageID & table_id)
void StorageKafka::threadFunc(size_t idx) void StorageKafka::threadFunc(size_t idx)
{ {
auto& stream_cancelled = tasks[idx]->stream_cancelled; assert(idx < tasks.size());
auto task = tasks[idx];
try try
{ {
auto table_id = getStorageID(); auto table_id = getStorageID();
...@@ -484,7 +495,7 @@ void StorageKafka::threadFunc(size_t idx) ...@@ -484,7 +495,7 @@ void StorageKafka::threadFunc(size_t idx)
auto start_time = std::chrono::steady_clock::now(); auto start_time = std::chrono::steady_clock::now();
// Keep streaming as long as there are attached views and streaming is not cancelled // Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0) while (!task->stream_cancelled && num_created_consumers > 0)
{ {
if (!checkDependencies(table_id)) if (!checkDependencies(table_id))
break; break;
...@@ -515,10 +526,8 @@ void StorageKafka::threadFunc(size_t idx) ...@@ -515,10 +526,8 @@ void StorageKafka::threadFunc(size_t idx)
} }
// Wait for attached views // Wait for attached views
if (!stream_cancelled) if (!task->stream_cancelled)
{ task->holder->scheduleAfter(RESCHEDULE_MS);
tasks[idx]->holder->scheduleAfter(RESCHEDULE_MS);
}
} }
...@@ -545,7 +554,15 @@ bool StorageKafka::streamToViews() ...@@ -545,7 +554,15 @@ bool StorageKafka::streamToViews()
InterpreterInsertQuery interpreter(insert, *kafka_context, false, true, true); InterpreterInsertQuery interpreter(insert, *kafka_context, false, true, true);
auto block_io = interpreter.execute(); auto block_io = interpreter.execute();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
auto stream_count = thread_per_consumer ? 1 : num_created_consumers;
streams.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false); auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL // Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits; IBlockInputStream::LocalLimits limits;
...@@ -556,16 +573,26 @@ bool StorageKafka::streamToViews() ...@@ -556,16 +573,26 @@ bool StorageKafka::streamToViews()
limits.timeout_overflow_mode = OverflowMode::BREAK; limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits); stream->setLimits(limits);
}
// Join multiple streams if necessary
BlockInputStreamPtr in;
if (streams.size() > 1)
in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
else
in = streams[0];
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer) // It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false}; std::atomic<bool> stub = {false};
copyData(*stream, *block_io.out, &stub); copyData(*in, *block_io.out, &stub);
bool some_stream_is_stalled = false; bool some_stream_is_stalled = false;
some_stream_is_stalled = some_stream_is_stalled || stream->isStalled(); for (auto & stream : streams)
stream->commit(); {
some_stream_is_stalled = some_stream_is_stalled || stream->as<KafkaBlockInputStream>()->isStalled();
stream->as<KafkaBlockInputStream>()->commit();
}
return some_stream_is_stalled; return some_stream_is_stalled;
} }
......
...@@ -103,6 +103,7 @@ private: ...@@ -103,6 +103,7 @@ private:
} }
}; };
std::vector<std::shared_ptr<TaskContext>> tasks; std::vector<std::shared_ptr<TaskContext>> tasks;
bool thread_per_consumer = false;
SettingsChanges createSettingsAdjustments(); SettingsChanges createSettingsAdjustments();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number); ConsumerBufferPtr createReadBuffer(const size_t consumer_number);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册