未验证 提交 9bfefc99 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #11599 from filimonov/better_kafka_states

Kafka work with formats based on PeekableReadBuffer and other improvements
......@@ -20,6 +20,19 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
checkStateCorrect();
}
void PeekableReadBuffer::reset()
{
peeked_size = 0;
checkpoint = nullptr;
checkpoint_in_own_memory = false;
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
}
bool PeekableReadBuffer::peekNext()
{
checkStateCorrect();
......
......@@ -12,7 +12,8 @@ namespace ErrorCodes
/// Also allows to set checkpoint at some position in stream and come back to this position later.
/// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer
/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer.
/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless
/// you reset() the state of peekable buffer after each change of underlying buffer)
/// If position() of peekable buffer is explicitly set to some position before checkpoint
/// (e.g. by istr.position() = prev_pos), behavior is undefined.
class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
......@@ -38,6 +39,11 @@ public:
peeked_size = 0;
}
checkpoint = pos;
// FIXME: we are checking checkpoint existence in few places (rollbackToCheckpoint/dropCheckpoint)
// by simple if(checkpoint) but checkpoint can be nullptr after
// setCheckpoint called on empty (non initialized/eof) buffer
// and we can't just use simple if(checkpoint)
}
/// Forget checkpoint and all data between checkpoint and position
......@@ -68,6 +74,10 @@ public:
/// This data will be lost after destruction of peekable buffer.
bool hasUnreadData() const;
// for streaming reading (like in Kafka) we need to restore initial state of the buffer
// w/o recreating the buffer.
void reset();
private:
bool nextImpl() override;
......
......@@ -21,6 +21,12 @@ JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, Re
}
}
void JSONAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
}
void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
......
......@@ -20,6 +20,7 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "JSONAsStringRowInputFormat"; }
void resetParser() override;
private:
void readJSONObject(IColumn & column);
......
......@@ -22,8 +22,6 @@ JSONCompactEachRowRowInputFormat::JSONCompactEachRowRowInputFormat(ReadBuffer &
bool with_names_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_)
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
const auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
......@@ -39,8 +37,18 @@ JSONCompactEachRowRowInputFormat::JSONCompactEachRowRowInputFormat(ReadBuffer &
}
}
void JSONCompactEachRowRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
column_indexes_for_input_fields.clear();
not_seen_columns.clear();
}
void JSONCompactEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
if (with_names)
{
size_t num_columns = getPort().getHeader().columns();
......
......@@ -26,7 +26,7 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
void addInputColumn(const String & column_name);
......
......@@ -26,6 +26,13 @@ namespace ErrorCodes
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in), parser(visitor), data_types(header_.getDataTypes()) {}
void MsgPackRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
visitor.reset();
}
void MsgPackVisitor::set_info(IColumn & column, DataTypePtr type) // NOLINT
{
while (!info_stack.empty())
......@@ -35,6 +42,11 @@ void MsgPackVisitor::set_info(IColumn & column, DataTypePtr type) // NOLINT
info_stack.push(Info{column, type});
}
void MsgPackVisitor::reset()
{
info_stack = {};
}
void MsgPackVisitor::insert_integer(UInt64 value) // NOLINT
{
Info & info = info_stack.top();
......
......@@ -37,6 +37,8 @@ public:
void insert_integer(UInt64 value);
void reset();
private:
/// Stack is needed to process nested arrays
std::stack<Info> info_stack;
......@@ -49,13 +51,15 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "MagPackRowInputFormat"; }
void resetParser() override;
private:
bool readObject();
PeekableReadBuffer buf;
MsgPackVisitor visitor;
msgpack::detail::parse_helper<MsgPackVisitor> parser;
DataTypes data_types;
const DataTypes data_types;
};
}
......@@ -15,7 +15,11 @@ namespace ErrorCodes
RegexpRowInputFormat::RegexpRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in_), format_settings(format_settings_), regexp(format_settings_.regexp.regexp)
: IRowInputFormat(header_, in_, std::move(params_))
, buf(in_)
, format_settings(format_settings_)
, field_format(stringToFormat(format_settings_.regexp.escaping_rule))
, regexp(format_settings_.regexp.regexp)
{
size_t fields_count = regexp.NumberOfCapturingGroups();
matched_fields.resize(fields_count);
......@@ -28,8 +32,13 @@ RegexpRowInputFormat::RegexpRowInputFormat(
// Save pointer to argument.
re2_arguments_ptrs[i] = &re2_arguments[i];
}
}
field_format = stringToFormat(format_settings_.regexp.escaping_rule);
void RegexpRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
}
RegexpRowInputFormat::ColumnFormat RegexpRowInputFormat::stringToFormat(const String & format)
......
......@@ -32,6 +32,7 @@ public:
String getName() const override { return "RegexpRowInputFormat"; }
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void resetParser() override;
private:
bool readField(size_t index, MutableColumns & columns);
......@@ -40,9 +41,9 @@ private:
PeekableReadBuffer buf;
const FormatSettings format_settings;
ColumnFormat field_format;
const ColumnFormat field_format;
RE2 regexp;
const RE2 regexp;
// The vector of fields extracted from line using regexp.
std::vector<re2::StringPiece> matched_fields;
// These two vectors are needed to use RE2::FullMatchN (function for extracting fields).
......
......@@ -340,8 +340,9 @@ void TabSeparatedRowInputFormat::syncAfterError()
void TabSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
const auto & sample = getPort().getHeader();
read_columns.assign(sample.columns(), false);
column_indexes_for_input_fields.clear();
read_columns.clear();
columns_to_fill_with_default_values.clear();
}
......
......@@ -498,6 +498,7 @@ void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
buf.reset();
}
void registerInputFormatProcessorTemplate(FormatFactory & factory)
......
......@@ -50,19 +50,19 @@ private:
private:
PeekableReadBuffer buf;
DataTypes data_types;
const DataTypes data_types;
FormatSettings settings;
const bool ignore_spaces;
ParsedTemplateFormatString format;
ParsedTemplateFormatString row_format;
const ParsedTemplateFormatString format;
const ParsedTemplateFormatString row_format;
size_t format_data_idx;
bool end_of_stream = false;
std::vector<size_t> always_default_columns;
char default_csv_delimiter;
const char default_csv_delimiter;
std::string row_between_delimiter;
const std::string row_between_delimiter;
};
}
......@@ -416,6 +416,7 @@ void ValuesBlockInputFormat::resetParser()
IInputFormat::resetParser();
// I'm not resetting parser modes here.
// There is a good chance that all messages have the same format.
buf.reset();
total_rows = 0;
}
......
......@@ -70,12 +70,12 @@ private:
private:
PeekableReadBuffer buf;
RowInputFormatParams params;
const RowInputFormatParams params;
std::unique_ptr<Context> context; /// pimpl
const FormatSettings format_settings;
size_t num_columns;
const size_t num_columns;
size_t total_rows = 0;
std::vector<ParserType> parser_type_for_column;
......@@ -87,7 +87,7 @@ private:
ConstantExpressionTemplates templates;
ConstantExpressionTemplate::Cache templates_cache;
DataTypes types;
const DataTypes types;
BlockMissingValues block_missing_values;
};
......
......@@ -5,6 +5,7 @@
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <common/logger_useful.h>
namespace DB
{
......@@ -12,11 +13,17 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
// with default poll timeout (500ms) it will give about 5 sec delay for doing 10 retries
// when selecting from empty topic
const auto MAX_FAILED_POLL_ATTEMPTS = 10;
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const std::shared_ptr<Context> & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_)
StorageKafka & storage_, const std::shared_ptr<Context> & context_, const Names & columns, Poco::Logger * log_, size_t max_block_size_, bool commit_in_suffix_)
: storage(storage_)
, context(context_)
, column_names(columns)
, log(log_)
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized())
......@@ -117,68 +124,75 @@ Block KafkaBlockInputStream::readImpl()
};
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
while (true)
{
// some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
// throw an exception from readPrefix when buffer in empty
if (buffer->eof())
break;
auto new_rows = read_kafka_message();
auto new_rows = buffer->poll() ? read_kafka_message() : 0;
buffer->storeLastReadMessageOffset();
if (new_rows)
{
buffer->storeLastReadMessageOffset();
auto topic = buffer->currentTopic();
auto key = buffer->currentKey();
auto offset = buffer->currentOffset();
auto partition = buffer->currentPartition();
auto timestamp_raw = buffer->currentTimestamp();
auto header_list = buffer->currentHeaderList();
auto topic = buffer->currentTopic();
auto key = buffer->currentKey();
auto offset = buffer->currentOffset();
auto partition = buffer->currentPartition();
auto timestamp_raw = buffer->currentTimestamp();
auto header_list = buffer->currentHeaderList();
Array headers_names;
Array headers_values;
Array headers_names;
Array headers_values;
if (!header_list.empty())
{
headers_names.reserve(header_list.size());
headers_values.reserve(header_list.size());
for (const auto & header : header_list)
if (!header_list.empty())
{
headers_names.emplace_back(header.get_name());
headers_values.emplace_back(static_cast<std::string>(header.get_value()));
headers_names.reserve(header_list.size());
headers_values.reserve(header_list.size());
for (const auto & header : header_list)
{
headers_names.emplace_back(header.get_name());
headers_values.emplace_back(static_cast<std::string>(header.get_value()));
}
}
}
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(topic);
virtual_columns[1]->insert(key);
virtual_columns[2]->insert(offset);
virtual_columns[3]->insert(partition);
if (timestamp_raw)
for (size_t i = 0; i < new_rows; ++i)
{
auto ts = timestamp_raw->get_timestamp();
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(ts).count());
virtual_columns[5]->insert(DecimalField<Decimal64>(std::chrono::duration_cast<std::chrono::milliseconds>(ts).count(),3));
}
else
{
virtual_columns[4]->insertDefault();
virtual_columns[5]->insertDefault();
virtual_columns[0]->insert(topic);
virtual_columns[1]->insert(key);
virtual_columns[2]->insert(offset);
virtual_columns[3]->insert(partition);
if (timestamp_raw)
{
auto ts = timestamp_raw->get_timestamp();
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(ts).count());
virtual_columns[5]->insert(DecimalField<Decimal64>(std::chrono::duration_cast<std::chrono::milliseconds>(ts).count(),3));
}
else
{
virtual_columns[4]->insertDefault();
virtual_columns[5]->insertDefault();
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
}
total_rows = total_rows + new_rows;
buffer->allowNext();
if (buffer->hasMorePolledMessages())
total_rows = total_rows + new_rows;
}
else if (buffer->isStalled())
{
continue;
++failed_poll_attempts;
}
if (total_rows >= max_block_size || !checkTimeLimit())
else if (buffer->polledDataUnusable())
{
break;
}
else
{
LOG_WARNING(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset());
}
if (!buffer->hasMorePolledMessages()
&& (total_rows >= max_block_size || !checkTimeLimit() || failed_poll_attempts >= MAX_FAILED_POLL_ATTEMPTS))
{
break;
}
......
......@@ -7,6 +7,10 @@
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
namespace Poco
{
class Logger;
}
namespace DB
{
......@@ -14,7 +18,7 @@ class KafkaBlockInputStream : public IBlockInputStream
{
public:
KafkaBlockInputStream(
StorageKafka & storage_, const std::shared_ptr<Context> & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix = true);
StorageKafka & storage_, const std::shared_ptr<Context> & context_, const Names & columns, Poco::Logger * log_, size_t max_block_size_, bool commit_in_suffix = true);
~KafkaBlockInputStream() override;
String getName() const override { return storage.getName(); }
......@@ -31,6 +35,7 @@ private:
StorageKafka & storage;
const std::shared_ptr<Context> context;
Names column_names;
Poco::Logger * log;
UInt64 max_block_size;
ConsumerBufferPtr buffer;
......
......@@ -15,6 +15,7 @@ namespace ErrorCodes
using namespace std::chrono_literals;
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
const std::size_t POLL_TIMEOUT_WO_ASSIGNMENT_MS = 50;
const auto DRAIN_TIMEOUT_MS = 5000ms;
......@@ -57,12 +58,11 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
// * clean buffered non-commited messages
// * set flag / flush
messages.clear();
current = messages.begin();
BufferBase::set(nullptr, 0, 0);
cleanUnprocessed();
rebalance_happened = true;
stalled_status = REBALANCE_HAPPENED;
assignment.clear();
waited_for_assignment = 0;
// for now we use slower (but reliable) sync commit in main loop, so no need to repeat
// try
......@@ -225,7 +225,6 @@ void ReadBufferFromKafkaConsumer::commit()
}
offsets_stored = 0;
stalled = false;
}
void ReadBufferFromKafkaConsumer::subscribe()
......@@ -252,18 +251,26 @@ void ReadBufferFromKafkaConsumer::subscribe()
}
}
stalled = false;
rebalance_happened = false;
offsets_stored = 0;
cleanUnprocessed();
allowed = false;
// we can reset any flags (except of CONSUMER_STOPPED) before attempt of reading new block of data
if (stalled_status != CONSUMER_STOPPED)
stalled_status = NO_MESSAGES_RETURNED;
}
void ReadBufferFromKafkaConsumer::unsubscribe()
void ReadBufferFromKafkaConsumer::cleanUnprocessed()
{
LOG_TRACE(log, "Re-joining claimed consumer after failure");
messages.clear();
current = messages.begin();
BufferBase::set(nullptr, 0, 0);
offsets_stored = 0;
}
void ReadBufferFromKafkaConsumer::unsubscribe()
{
LOG_TRACE(log, "Re-joining claimed consumer after failure");
cleanUnprocessed();
// it should not raise exception as used in destructor
try
......@@ -284,12 +291,6 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
}
bool ReadBufferFromKafkaConsumer::hasMorePolledMessages() const
{
return (!stalled) && (current != messages.end());
}
void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
{
if (assignment.empty())
......@@ -302,102 +303,131 @@ void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
LOG_TRACE(log, "{} Returned to committed position: {}", msg, committed_offset);
}
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
// it do the poll when needed
bool ReadBufferFromKafkaConsumer::poll()
{
resetIfStopped();
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (polledDataUnusable())
return false;
// we can react on stop only during fetching data
// after block is formed (i.e. during copying data to MV / commiting) we ignore stop attempts
if (stopped)
if (hasMorePolledMessages())
{
was_stopped = true;
offsets_stored = 0;
return false;
allowed = true;
return true;
}
if (stalled || was_stopped || !allowed || rebalance_happened)
return false;
if (intermediate_commit)
commit();
if (current == messages.end())
while (true)
{
if (intermediate_commit)
commit();
size_t waited_for_assignment = 0;
while (true)
stalled_status = NO_MESSAGES_RETURNED;
// we already wait enough for assignment in the past,
// let's make polls shorter and not block other consumer
// which can work successfully in parallel
// POLL_TIMEOUT_WO_ASSIGNMENT_MS (50ms) is 100% enough just to check if we got assignment
// (see https://github.com/ClickHouse/ClickHouse/issues/11218)
auto actual_poll_timeout_ms = (waited_for_assignment >= MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
? std::min(POLL_TIMEOUT_WO_ASSIGNMENT_MS,poll_timeout)
: poll_timeout;
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size,
std::chrono::milliseconds(actual_poll_timeout_ms));
resetIfStopped();
if (stalled_status == CONSUMER_STOPPED)
{
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
if (stopped)
return false;
}
else if (stalled_status == REBALANCE_HAPPENED)
{
if (!new_messages.empty())
{
was_stopped = true;
offsets_stored = 0;
return false;
}
else if (rebalance_happened)
{
if (!new_messages.empty())
{
// we have polled something just after rebalance.
// we will not use current batch, so we need to return to last commited position
// otherwise we will continue polling from that position
resetToLastCommitted("Rewind last poll after rebalance.");
}
offsets_stored = 0;
return false;
// we have polled something just after rebalance.
// we will not use current batch, so we need to return to last commited position
// otherwise we will continue polling from that position
resetToLastCommitted("Rewind last poll after rebalance.");
}
return false;
}
if (new_messages.empty())
if (new_messages.empty())
{
// While we wait for an assignment after subscription, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (assignment.empty())
{
// While we wait for an assignment after subscription, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (assignment.empty())
waited_for_assignment += poll_timeout; // slightly innaccurate, but rough calculation is ok.
if (waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
{
waited_for_assignment += poll_timeout; // slightly innaccurate, but rough calculation is ok.
if (waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
{
continue;
}
else
{
LOG_TRACE(log, "Can't get assignment");
stalled = true;
return false;
}
continue;
}
else
{
LOG_TRACE(log, "Stalled");
stalled = true;
LOG_WARNING(log, "Can't get assignment. It can be caused by some issue with consumer group (not enough partitions?). Will keep trying.");
stalled_status = NO_ASSIGNMENT;
return false;
}
}
else
{
messages = std::move(new_messages);
current = messages.begin();
LOG_TRACE(log, "Polled batch of {} messages. Offset position: {}", messages.size(), consumer->get_offsets_position(consumer->get_assignment()));
break;
LOG_TRACE(log, "Stalled");
return false;
}
}
else
{
messages = std::move(new_messages);
current = messages.begin();
LOG_TRACE(log, "Polled batch of {} messages. Offset position: {}", messages.size(), consumer->get_offsets_position(consumer->get_assignment()));
break;
}
}
if (auto err = current->get_error())
while (auto err = current->get_error())
{
++current;
// TODO: should throw exception instead
LOG_ERROR(log, "Consumer error: {}", err);
return false;
if (current == messages.end())
{
LOG_ERROR(log, "No actual messages polled, errors only.");
stalled_status = ERRORS_RETURNED;
return false;
}
}
stalled_status = NOT_STALLED;
allowed = true;
return true;
}
void ReadBufferFromKafkaConsumer::resetIfStopped()
{
// we can react on stop only during fetching data
// after block is formed (i.e. during copying data to MV / commiting) we ignore stop attempts
if (stopped)
{
stalled_status = CONSUMER_STOPPED;
cleanUnprocessed();
}
}
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
resetIfStopped();
if (!allowed || !hasMorePolledMessages())
return false;
// XXX: very fishy place with const casting.
auto * new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
......@@ -411,7 +441,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
void ReadBufferFromKafkaConsumer::storeLastReadMessageOffset()
{
if (!stalled && !was_stopped && !rebalance_happened)
if (!isStalled())
{
consumer->store_offset(*(current - 1));
++offsets_stored;
......
......@@ -29,20 +29,33 @@ public:
const Names & _topics
);
~ReadBufferFromKafkaConsumer() override;
void allowNext() { allowed = true; } // Allow to read next message.
void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
auto pollTimeout() const { return poll_timeout; }
bool hasMorePolledMessages() const;
bool polledDataUnusable() const { return (was_stopped || rebalance_happened); }
bool isStalled() const { return stalled; }
inline bool hasMorePolledMessages() const
{
return (stalled_status == NOT_STALLED) && (current != messages.end());
}
inline bool polledDataUnusable() const
{
return (stalled_status != NOT_STALLED) && (stalled_status != NO_MESSAGES_RETURNED);
}
inline bool isStalled() const { return stalled_status != NOT_STALLED; }
void storeLastReadMessageOffset();
void resetToLastCommitted(const char * msg);
// Polls batch of messages from Kafka or allows to read consecutive message by nextImpl
// returns true if there are some messages to process
// return false and sets stalled to false if there are no messages to process.
// additionally sets
bool poll();
// Return values for the message that's being read.
String currentTopic() const { return current[-1].get_topic(); }
String currentKey() const { return current[-1].get_key(); }
......@@ -54,14 +67,27 @@ public:
private:
using Messages = std::vector<cppkafka::Message>;
enum StalledStatus
{
NOT_STALLED,
NO_MESSAGES_RETURNED,
REBALANCE_HAPPENED,
CONSUMER_STOPPED,
NO_ASSIGNMENT,
ERRORS_RETURNED
};
ConsumerPtr consumer;
Poco::Logger * log;
const size_t batch_size = 1;
const size_t poll_timeout = 0;
size_t offsets_stored = 0;
bool stalled = false;
StalledStatus stalled_status = NO_MESSAGES_RETURNED;
bool intermediate_commit = true;
bool allowed = true;
size_t waited_for_assignment = 0;
const std::atomic<bool> & stopped;
......@@ -69,15 +95,13 @@ private:
Messages messages;
Messages::const_iterator current;
bool rebalance_happened = false;
bool was_stopped = false;
// order is important, need to be destructed before consumer
cppkafka::TopicPartitionList assignment;
const Names topics;
void drain();
void cleanUnprocessed();
void resetIfStopped();
bool nextImpl() override;
};
......
......@@ -69,7 +69,10 @@ namespace
for (const auto & key : keys)
{
const String key_path = path + "." + key;
const String key_name = boost::replace_all_copy(key, "_", ".");
// log_level has valid underscore, rest librdkafka setting use dot.separated.format
// which is not acceptable for XML.
// See also https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String key_name = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
conf.set(key_name, config.getString(key_path));
}
}
......@@ -221,7 +224,7 @@ Pipes StorageKafka::read(
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
/// TODO: rewrite KafkaBlockInputStream to KafkaSource. Now it is used in other place.
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, modified_context, column_names, 1)));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, modified_context, column_names, log, 1)));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
......@@ -535,7 +538,7 @@ bool StorageKafka::streamToViews()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream
= std::make_shared<KafkaBlockInputStream>(*this, kafka_context, block_io.out->getHeader().getNames(), block_size, false);
= std::make_shared<KafkaBlockInputStream>(*this, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
......@@ -668,6 +671,16 @@ void registerStorageKafka(StorageFactory & factory)
throw Exception("Number of consumers can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
if (kafka_settings->kafka_max_block_size.changed && kafka_settings->kafka_max_block_size.value < 1)
{
throw Exception("kafka_max_block_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
if (kafka_settings->kafka_poll_max_batch_size.changed && kafka_settings->kafka_poll_max_batch_size.value < 1)
{
throw Exception("kafka_poll_max_batch_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
return StorageKafka::create(args.table_id, args.context, args.columns, std::move(kafka_settings));
};
......
(id = ${id:Escaped}, blockNo = ${blockNo:Escaped}, val1 = ${val1:CSV}, val2 = ${val2:Escaped}, val3 = ${val3:Escaped})
\ No newline at end of file
{
"type": "record",
"name": "row",
"fields": [
{"name": "id", "type": "long"},
{"name": "blockNo", "type": "int"},
{"name": "val1", "type": "string"},
{"name": "val2", "type": "float"},
{"name": "val3", "type": "int"}
]
}
\ No newline at end of file
@0x99f75f775fe63dae;
struct TestRecordStruct
{
id @0 : Int64;
blockNo @1 : UInt16;
val1 @2 : Text;
val2 @3 : Float32;
val3 @4 : UInt8;
}
\ No newline at end of file
syntax = "proto3";
message TestMessage {
int64 id = 1;
uint32 blockNo = 2;
string val1 = 3;
float val2 = 4;
uint32 val3 = 5;
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册