#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int INCORRECT_DATA; extern const int UNKNOWN_EXCEPTION; extern const int CANNOT_READ_FROM_ISTREAM; extern const int INVALID_CONFIG_PARAMETER; extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int UNSUPPORTED_METHOD; extern const int UNKNOWN_SETTING; extern const int READONLY_SETTING; } namespace { const auto RESCHEDULE_MS = 500; const auto CLEANUP_TIMEOUT_MS = 3000; const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever) /// Configuration prefix const String CONFIG_PREFIX = "kafka"; void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path) { Poco::Util::AbstractConfiguration::Keys keys; std::vector errstr(512); config.keys(path, keys); for (const auto & key : keys) { const String key_path = path + "." + key; const String key_name = boost::replace_all_copy(key, "_", "."); conf.set(key_name, config.getString(key_path)); } } } StorageKafka::StorageKafka( const std::string & table_name_, const std::string & database_name_, Context & context_, const ColumnsDescription & columns_, const String & brokers_, const String & group_, const Names & topics_, const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken_, bool intermediate_commit_) : IStorage( ColumnsDescription({{"_topic", std::make_shared()}, {"_key", std::make_shared()}, {"_offset", std::make_shared()}, {"_partition", std::make_shared()}, {"_timestamp", std::make_shared(std::make_shared())}}, true)) , table_name(table_name_) , database_name(database_name_) , global_context(context_.getGlobalContext()) , kafka_context(Context(global_context)) , topics(global_context.getMacros()->expand(topics_)) , brokers(global_context.getMacros()->expand(brokers_)) , group(global_context.getMacros()->expand(group_)) , format_name(global_context.getMacros()->expand(format_name_)) , row_delimiter(row_delimiter_) , schema_name(global_context.getMacros()->expand(schema_name_)) , num_consumers(num_consumers_) , max_block_size(max_block_size_) , log(&Logger::get("StorageKafka (" + table_name_ + ")")) , semaphore(0, num_consumers_) , skip_broken(skip_broken_) , intermediate_commit(intermediate_commit_) { kafka_context.makeQueryContext(); setColumns(columns_); task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); task->deactivate(); } BlockInputStreams StorageKafka::read( const Names & column_names, const SelectQueryInfo & /* query_info */, const Context & context, QueryProcessingStage::Enum /* processed_stage */, size_t /* max_block_size */, unsigned /* num_streams */) { if (num_created_consumers == 0) return BlockInputStreams(); /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. BlockInputStreams streams; streams.reserve(num_created_consumers); // Claim as many consumers as requested, but don't block for (size_t i = 0; i < num_created_consumers; ++i) { /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block /// TODO: probably that leads to awful performance. /// FIXME: seems that doesn't help with extra reading and committing unprocessed messages. streams.emplace_back(std::make_shared(*this, context, column_names, 1)); } LOG_DEBUG(log, "Starting reading " << streams.size() << " streams"); return streams; } BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const Context & context) { if (topics.size() > 1) throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED); return std::make_shared(*this, context); } void StorageKafka::startup() { for (size_t i = 0; i < num_consumers; ++i) { try { pushReadBuffer(createReadBuffer()); ++num_created_consumers; } catch (const cppkafka::Exception &) { tryLogCurrentException(log); } } // Start the reader thread task->activateAndSchedule(); } void StorageKafka::shutdown() { // Interrupt streaming thread stream_cancelled = true; LOG_TRACE(log, "Waiting for cleanup"); task->deactivate(); // Close all consumers for (size_t i = 0; i < num_created_consumers; ++i) auto buffer = popReadBuffer(); rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS); } void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) { table_name = new_table_name; database_name = new_database_name; } void StorageKafka::updateDependencies() { task->activateAndSchedule(); } void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer) { std::lock_guard lock(mutex); buffers.push_back(buffer); semaphore.set(); } ConsumerBufferPtr StorageKafka::popReadBuffer() { return popReadBuffer(std::chrono::milliseconds::zero()); } ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout) { // Wait for the first free buffer if (timeout == std::chrono::milliseconds::zero()) semaphore.wait(); else { if (!semaphore.tryWait(timeout.count())) return nullptr; } // Take the first available buffer from the list std::lock_guard lock(mutex); auto buffer = buffers.back(); buffers.pop_back(); return buffer; } ProducerBufferPtr StorageKafka::createWriteBuffer() { cppkafka::Configuration conf; conf.set("metadata.broker.list", brokers); conf.set("group.id", group); conf.set("client.id", VERSION_FULL); // TODO: fill required settings updateConfiguration(conf); auto producer = std::make_shared(conf); const Settings & settings = global_context.getSettingsRef(); size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); return std::make_shared( producer, topics[0], row_delimiter ? std::optional{row_delimiter} : std::optional(), 1, 1024, std::chrono::milliseconds(poll_timeout)); } ConsumerBufferPtr StorageKafka::createReadBuffer() { cppkafka::Configuration conf; conf.set("metadata.broker.list", brokers); conf.set("group.id", group); conf.set("client.id", VERSION_FULL); conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start updateConfiguration(conf); // those settings should not be changed by users. conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once. conf.set("enable.partition.eof", "false"); // Ignore EOF messages // Create a consumer and subscribe to topics auto consumer = std::make_shared(conf); consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); // Limit the number of batched messages to allow early cancellations const Settings & settings = global_context.getSettingsRef(); size_t batch_size = max_block_size; if (!batch_size) batch_size = settings.max_block_size.value; size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. return std::make_shared(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled, getTopics()); } void StorageKafka::updateConfiguration(cppkafka::Configuration & conf) { // Update consumer configuration from the configuration const auto & config = global_context.getConfigRef(); if (config.has(CONFIG_PREFIX)) loadFromConfig(conf, config, CONFIG_PREFIX); // Update consumer topic-specific configuration for (const auto & topic : topics) { const auto topic_config_key = CONFIG_PREFIX + "_" + topic; if (config.has(topic_config_key)) loadFromConfig(conf, config, topic_config_key); } } bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name) { // Check if all dependencies are attached auto dependencies = global_context.getDependencies(current_database_name, current_table_name); if (dependencies.size() == 0) return true; // Check the dependencies are ready? for (const auto & db_tab : dependencies) { auto table = global_context.tryGetTable(db_tab.first, db_tab.second); if (!table) return false; // If it materialized view, check it's target table auto * materialized_view = dynamic_cast(table.get()); if (materialized_view && !materialized_view->tryGetTargetTable()) return false; // Check all its dependencies if (!checkDependencies(db_tab.first, db_tab.second)) return false; } return true; } void StorageKafka::threadFunc() { try { // Check if at least one direct dependency is attached auto dependencies = global_context.getDependencies(database_name, table_name); auto start_time = std::chrono::steady_clock::now(); // Keep streaming as long as there are attached views and streaming is not cancelled while (!stream_cancelled && num_created_consumers > 0 && dependencies.size() > 0) { if (!checkDependencies(database_name, table_name)) break; LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views"); // Exit the loop & reschedule if some stream stalled auto some_stream_is_stalled = streamToViews(); if (some_stream_is_stalled) { LOG_TRACE(log, "Stream(s) stalled. Reschedule."); break; } auto ts = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(ts-start_time); if (duration.count() > MAX_THREAD_WORK_DURATION_MS) { LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule."); break; } } } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } // Wait for attached views if (!stream_cancelled) task->scheduleAfter(RESCHEDULE_MS); } bool StorageKafka::streamToViews() { auto table = global_context.getTable(database_name, table_name); if (!table) throw Exception("Engine table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); // Create an INSERT query for streaming data auto insert = std::make_shared(); insert->database = database_name; insert->table = table_name; const Settings & settings = global_context.getSettingsRef(); size_t block_size = max_block_size; if (block_size == 0) block_size = settings.max_block_size; // Create a stream for each consumer and join them in a union stream // Only insert into dependent views and expect that input blocks contain virtual columns InterpreterInsertQuery interpreter(insert, kafka_context, false, true, true); auto block_io = interpreter.execute(); // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; streams.reserve(num_created_consumers); for (size_t i = 0; i < num_created_consumers; ++i) { auto stream = std::make_shared(*this, kafka_context, block_io.out->getHeader().getNames(), block_size, false); streams.emplace_back(stream); // Limit read batch to maximum block size to allow DDL IBlockInputStream::LocalLimits limits; limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms; limits.timeout_overflow_mode = OverflowMode::BREAK; stream->setLimits(limits); } // Join multiple streams if necessary BlockInputStreamPtr in; if (streams.size() > 1) in = std::make_shared(streams, nullptr, streams.size()); else in = streams[0]; // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. // It will be cancelled on underlying layer (kafka buffer) std::atomic stub = {false}; copyData(*in, *block_io.out, &stub); bool some_stream_is_stalled = false; for (auto & stream : streams) { some_stream_is_stalled = some_stream_is_stalled || stream->as()->isStalled(); stream->as()->commit(); } return some_stream_is_stalled; } void registerStorageKafka(StorageFactory & factory) { factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; size_t args_count = engine_args.size(); bool has_settings = args.storage_def->settings; KafkaSettings kafka_settings; if (has_settings) { kafka_settings.loadFromQuery(*args.storage_def); } /** Arguments of engine is following: * - Kafka broker list * - List of topics * - Group ID (may be a constaint expression with a string result) * - Message format (string) * - Row delimiter * - Schema (optional, if the format supports it) * - Number of consumers * - Max block size for background consumption * - Skip (at least) unreadable messages number * - Do intermediate commits when the batch consumed and handled */ // Check arguments and settings #define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \ /* One of the four required arguments is not specified */ \ if (args_count < ARG_NUM && ARG_NUM <= 4 && \ !kafka_settings.PAR_NAME.changed) \ { \ throw Exception( \ "Required parameter '" #PAR_NAME "' " \ "for storage Kafka not specified", \ ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \ } \ /* The same argument is given in two places */ \ if (has_settings && \ kafka_settings.PAR_NAME.changed && \ args_count >= ARG_NUM) \ { \ throw Exception( \ "The argument №" #ARG_NUM " of storage Kafka " \ "and the parameter '" #PAR_NAME "' " \ "in SETTINGS cannot be specified at the same time", \ ErrorCodes::BAD_ARGUMENTS); \ } CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list) CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list) CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name) CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format) CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter) CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema) CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers) CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size) CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages) CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch) #undef CHECK_KAFKA_STORAGE_ARGUMENT // Get and check broker list String brokers = kafka_settings.kafka_broker_list; if (args_count >= 1) { const auto * ast = engine_args[0]->as(); if (ast && ast->value.getType() == Field::Types::String) { brokers = safeGet(ast->value); } else { throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS); } } // Get and check topic list String topic_list = kafka_settings.kafka_topic_list.value; if (args_count >= 2) { engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); topic_list = engine_args[1]->as().value.safeGet(); } Names topics; boost::split(topics, topic_list , [](char c){ return c == ','; }); for (String & topic : topics) { boost::trim(topic); } // Get and check group name String group = kafka_settings.kafka_group_name.value; if (args_count >= 3) { engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); group = engine_args[2]->as().value.safeGet(); } // Get and check message format name String format = kafka_settings.kafka_format.value; if (args_count >= 4) { engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); const auto * ast = engine_args[3]->as(); if (ast && ast->value.getType() == Field::Types::String) { format = safeGet(ast->value); } else { throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS); } } // Parse row delimiter (optional) char row_delimiter = kafka_settings.kafka_row_delimiter; if (args_count >= 5) { engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); const auto * ast = engine_args[4]->as(); String arg; if (ast && ast->value.getType() == Field::Types::String) { arg = safeGet(ast->value); } else { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); } if (arg.size() > 1) { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); } else if (arg.size() == 0) { row_delimiter = '\0'; } else { row_delimiter = arg[0]; } } // Parse format schema if supported (optional) String schema = kafka_settings.kafka_schema.value; if (args_count >= 6) { engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context); const auto * ast = engine_args[5]->as(); if (ast && ast->value.getType() == Field::Types::String) { schema = safeGet(ast->value); } else { throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS); } } // Parse number of consumers (optional) UInt64 num_consumers = kafka_settings.kafka_num_consumers; if (args_count >= 7) { const auto * ast = engine_args[6]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); } else { throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS); } } // Parse max block size (optional) UInt64 max_block_size = static_cast(kafka_settings.kafka_max_block_size); if (args_count >= 8) { const auto * ast = engine_args[7]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { max_block_size = static_cast(safeGet(ast->value)); } else { // TODO: no check if the integer is really positive throw Exception("Maximum block size must be a positive integer", ErrorCodes::BAD_ARGUMENTS); } } size_t skip_broken = static_cast(kafka_settings.kafka_skip_broken_messages); if (args_count >= 9) { const auto * ast = engine_args[8]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { skip_broken = static_cast(safeGet(ast->value)); } else { throw Exception("Number of broken messages to skip must be a non-negative integer", ErrorCodes::BAD_ARGUMENTS); } } bool intermediate_commit = static_cast(kafka_settings.kafka_commit_every_batch); if (args_count >= 10) { const auto * ast = engine_args[9]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { intermediate_commit = static_cast(safeGet(ast->value)); } else { throw Exception("Flag for committing every batch must be 0 or 1", ErrorCodes::BAD_ARGUMENTS); } } return StorageKafka::create( args.table_name, args.database_name, args.context, args.columns, brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit); }); } }