提交 c2fb72ab 编写于 作者: K kssenii

Better mv, more comments

上级 e57d1c82
......@@ -16,11 +16,13 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
const StorageMetadataPtr & metadata_snapshot_,
Context & context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
......@@ -51,12 +53,14 @@ void RabbitMQBlockInputStream::readPrefixImpl()
}
bool RabbitMQBlockInputStream::needManualChannelUpdate()
bool RabbitMQBlockInputStream::needChannelUpdate()
{
if (!buffer)
return false;
return !buffer->channelUsable() && buffer->channelAllowed() && storage.connectionRunning();
ChannelPtr channel = buffer->getChannel();
return !channel || !channel->usable();
}
......@@ -83,7 +87,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, 1);
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
......@@ -164,7 +168,7 @@ Block RabbitMQBlockInputStream::readImpl()
buffer->allowNext();
if (buffer->queueEmpty() || !checkTimeLimit())
if (total_rows >= max_block_size || buffer->queueEmpty() || buffer->consumerStopped() || !checkTimeLimit())
break;
}
......@@ -189,7 +193,7 @@ void RabbitMQBlockInputStream::readSuffixImpl()
bool RabbitMQBlockInputStream::sendAck()
{
if (!buffer || !buffer->channelUsable())
if (!buffer)
return false;
if (!buffer->ackMessages())
......
......@@ -18,6 +18,7 @@ public:
const StorageMetadataPtr & metadata_snapshot_,
Context & context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = true);
~RabbitMQBlockInputStream() override;
......@@ -29,8 +30,8 @@ public:
Block readImpl() override;
void readSuffixImpl() override;
bool needChannelUpdate();
void updateChannel();
bool needManualChannelUpdate();
bool sendAck();
private:
......@@ -38,6 +39,7 @@ private:
StorageMetadataPtr metadata_snapshot;
Context context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;
bool finished = false;
......
......@@ -2,7 +2,6 @@
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
......@@ -21,11 +20,11 @@ namespace DB
M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(String, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \
M(Bool, rabbitmq_persistent, false, "If set, delivery mode will be set to 2 (makes messages 'persistent', durable).", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
struct RabbitMQSettings : public BaseSettings<RabbitMQSettingsTraits>
{
......
......@@ -50,7 +50,6 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, row_delimiter(row_delimiter_)
, stopped(stopped_)
, received(QUEUE_SIZE * num_queues)
, last_inserted_record(AckTracker())
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
bindQueue(queue_id);
......@@ -165,10 +164,14 @@ bool ReadBufferFromRabbitMQConsumer::ackMessages()
{
/// Commit all received messages with delivery tags from last commited to last inserted
if (!consumer_channel->ack(record.delivery_tag, AMQP::multiple))
{
LOG_ERROR(log, "Failed to commit messages with delivery tags from last commited to {} on channel {}",
record.delivery_tag, channel_id);
return false;
}
prev_tag = record.delivery_tag;
LOG_TRACE(log, "Consumer acknowledged messages with deliveryTags up to {} on channel {}", record.delivery_tag, channel_id);
LOG_TRACE(log, "Consumer commited messages with deliveryTags up to {} on channel {}", record.delivery_tag, channel_id);
}
return true;
......@@ -207,6 +210,8 @@ void ReadBufferFromRabbitMQConsumer::setupChannel()
consumer_channel->onError([&](const char * message)
{
/// If here, then fatal error occured on the channel and it is not usable anymore, need to close it
consumer_channel->close();
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
......
......@@ -59,6 +59,7 @@ public:
bool channelUsable() { return !channel_error.load(); }
/// Do not allow to update channel untill current channel is properly set up and subscribed
bool channelAllowed() { return !wait_subscription.load(); }
bool consumerStopped() { return stopped; }
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
......
......@@ -107,8 +107,7 @@ StorageRabbitMQ::StorageRabbitMQ(
setInMemoryMetadata(storage_metadata);
rabbitmq_context.makeQueryContext();
if (!schema_name.empty())
rabbitmq_context.setSetting("format_schema", schema_name);
rabbitmq_context = addSettings(rabbitmq_context);
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
event_handler->updateLoopState(Loop::STOP);
......@@ -193,6 +192,19 @@ String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_i
}
Context StorageRabbitMQ::addSettings(Context context)
{
context.setSetting("input_format_skip_unknown_fields", true);
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
if (!schema_name.empty())
context.setSetting("format_schema", schema_name);
return context;
}
void StorageRabbitMQ::heartbeatFunc()
{
if (!stream_cancelled && event_handler->connectionRunning())
......@@ -215,10 +227,11 @@ void StorageRabbitMQ::loopingFunc()
*/
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
if (stop_loop)
event_handler->updateLoopState(Loop::STOP);
if (task_mutex.try_lock())
{
if (stop_loop)
event_handler->updateLoopState(Loop::STOP);
task->deactivate();
task_mutex.unlock();
......@@ -232,6 +245,14 @@ void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task,
}
size_t StorageRabbitMQ::getMaxBlockSize()
{
return rabbitmq_settings->rabbitmq_max_block_size.changed
? rabbitmq_settings->rabbitmq_max_block_size.value
: (global_context.getSettingsRef().max_insert_block_size.value / num_consumers);
}
void StorageRabbitMQ::initExchange()
{
/* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
......@@ -240,7 +261,15 @@ void StorageRabbitMQ::initExchange()
setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
/* This error can be a result of attempt to declare exchange if it was already declared but
* 1) with different exchange type. In this case can
* - manually delete previously declared exchange and create a new one.
* - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself
* if it is not needed anymore or use another exchange name.
* 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and
* specified its own settings, which differ from this implementation.
*/
throw Exception("Unable to declare exchange (1). Make sure specified exchange is not already declared. Error: "
+ std::string(message), ErrorCodes::LOGICAL_ERROR);
});
......@@ -248,7 +277,8 @@ void StorageRabbitMQ::initExchange()
setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
/// This error is not supposed to happen as this exchange name is always unique to type and its settings
throw Exception("Unable to declare exchange (2). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
if (!hash_exchange)
......@@ -267,13 +297,17 @@ void StorageRabbitMQ::initExchange()
setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
/* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
* to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
* is bad.
*/
throw Exception("Unable to declare exchange (3). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unable to bind exchange (2). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
consumer_exchange = sharding_exchange;
......@@ -302,7 +336,7 @@ void StorageRabbitMQ::bindExchange()
})
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unable to bind exchange (1). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
}
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
......@@ -314,7 +348,7 @@ void StorageRabbitMQ::bindExchange()
})
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unable to bind exchange (1). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
}
else
......@@ -330,7 +364,7 @@ void StorageRabbitMQ::bindExchange()
})
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unable to bind exchange (1). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
}
}
......@@ -348,7 +382,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
if (reconnecting)
{
deactivateTask(heartbeat_task, 0, 0);
deactivateTask(heartbeat_task, false, false);
connection->close(); /// Connection might be unusable, but not closed
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
......@@ -393,8 +427,8 @@ void StorageRabbitMQ::unbindExchange()
* input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
* externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
* queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
* bindings to remove redunadant message copies, but after that mv cannot work unless thoso bindings recreated. Recreating them is not
* difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
* bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
* not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
*/
std::call_once(flag, [&]()
{
......@@ -435,20 +469,17 @@ Pipe StorageRabbitMQ::read(
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = context;
if (!schema_name.empty())
modified_context.setSetting("format_schema", schema_name);
auto modified_context = addSettings(context);
auto block_size = getMaxBlockSize();
bool update_channels = false;
if (!event_handler->connectionRunning())
{
if (event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
}
deactivateTask(looping_task, false, true);
if ((update_channels = restoreConnection(true)))
update_channels = restoreConnection(true);
if (update_channels)
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
......@@ -457,20 +488,20 @@ Pipe StorageRabbitMQ::read(
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, modified_context, column_names);
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, modified_context, column_names, block_size);
/* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update.
* But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also
* close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
* and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
*/
if (update_channels || rabbit_stream->needManualChannelUpdate())
if ((update_channels || rabbit_stream->needChannelUpdate()) && connection->usable())
{
if (event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
heartbeat_task->deactivate();
deactivateTask(looping_task, false, true);
deactivateTask(heartbeat_task, false, false);
}
rabbit_stream->updateChannel();
......@@ -526,9 +557,9 @@ void StorageRabbitMQ::shutdown()
stream_cancelled = true;
wait_confirm.store(false);
deactivateTask(streaming_task, 1, 1);
deactivateTask(heartbeat_task, 1, 0);
deactivateTask(looping_task, 1, 1);
deactivateTask(streaming_task, true, false);
deactivateTask(heartbeat_task, true, false);
deactivateTask(looping_task, true, true);
connection->close();
......@@ -594,7 +625,7 @@ ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
return std::make_shared<WriteBufferToRabbitMQProducer>(
parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type,
producer_id.fetch_add(1), unique_strbase, persistent, wait_confirm, log,
producer_id.fetch_add(1), persistent, wait_confirm, log,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
}
......@@ -683,19 +714,25 @@ bool StorageRabbitMQ::streamToViews()
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
auto block_size = getMaxBlockSize();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names, false);
auto stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
limits.speed_limits.max_execution_time = global_context.getSettingsRef().stream_flush_interval_ms;
limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: global_context.getSettingsRef().stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
......@@ -715,7 +752,7 @@ bool StorageRabbitMQ::streamToViews()
* races inside the library, but only in case any error occurs or connection is lost while ack is being sent
*/
if (event_handler->loopRunning())
deactivateTask(looping_task, 0, 1);
deactivateTask(looping_task, false, true);
if (!event_handler->connectionRunning())
{
......@@ -733,20 +770,37 @@ bool StorageRabbitMQ::streamToViews()
}
else
{
deactivateTask(heartbeat_task, 0, 0);
deactivateTask(heartbeat_task, false, false);
/// Commit
for (auto & stream : streams)
{
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
* no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
* waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
* consumers. So in this case duplicates are inevitable.
* 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
* common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
* Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
{
/* Almost any error with channel will lead to connection closure, but if so happens that channel errored and connection
* is not closed - also need to restore channels
*/
if (!stream->as<RabbitMQBlockInputStream>()->needManualChannelUpdate())
stream->as<RabbitMQBlockInputStream>()->updateChannel();
if (connection->usable())
{
/* Almost any error with channel will lead to connection closure, but if so happens that channel errored and
* connection is not closed - also need to restore channels
*/
if (!stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
stream->as<RabbitMQBlockInputStream>()->updateChannel();
}
else
{
break;
}
}
}
}
......@@ -809,8 +863,9 @@ void registerStorageRabbitMQ(StorageFactory & factory)
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_flush_interval_ms)
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT
......
......@@ -104,12 +104,13 @@ private:
std::mutex mutex, task_mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
String unique_strbase;
String unique_strbase; /// to make unique consumer channel id
String sharding_exchange, bridge_exchange, consumer_exchange;
std::once_flag flag;
size_t consumer_id = 0;
std::atomic<size_t> producer_id = 1;
std::atomic<bool> wait_confirm = true, exchange_removed = false;
std::once_flag flag; /// remove exchange only once
size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id
std::atomic<size_t> producer_id = 1; /// counter for producer buffer, needed for channel id
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
std::atomic<bool> exchange_removed = false;
ChannelPtr setup_channel;
BackgroundSchedulePool::TaskHolder streaming_task;
......@@ -126,6 +127,7 @@ private:
Names parseRoutingKeys(String routing_key_list);
AMQP::ExchangeType defineExchangeType(String exchange_type_);
Context addSettings(Context context);
size_t getMaxBlockSize();
String getTableBasedName(String name, const StorageID & table_id);
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
......
......@@ -18,6 +18,11 @@ static const auto RETRIES_MAX = 20;
static const auto BATCH = 1000;
static const auto RETURNED_LIMIT = 50000;
namespace ErrorCodes
{
extern const int CANNOT_CONNECT_RABBITMQ;
}
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address_,
Context & global_context,
......@@ -26,7 +31,6 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
const String & exchange_name_,
const AMQP::ExchangeType exchange_type_,
const size_t channel_id_base_,
const String channel_base_,
const bool persistent_,
std::atomic<bool> & wait_confirm_,
Poco::Logger * log_,
......@@ -40,7 +44,6 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, exchange_name(exchange_name_)
, exchange_type(exchange_type_)
, channel_id_base(std::to_string(channel_id_base_))
, channel_base(channel_base_)
, persistent(persistent_)
, wait_confirm(wait_confirm_)
, payloads(BATCH)
......@@ -56,7 +59,16 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
if (setupConnection(false))
{
setupChannel();
}
else
{
if (!connection->closed())
connection->close(true);
throw Exception("Cannot connect to RabbitMQ", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
writing_task->deactivate();
......@@ -175,7 +187,7 @@ void WriteBufferToRabbitMQProducer::setupChannel()
producer_channel->onReady([&]()
{
channel_id = channel_id_base + std::to_string(channel_id_counter++) + "_" + channel_base;
channel_id = channel_id_base + std::to_string(channel_id_counter++);
LOG_DEBUG(log, "Producer's channel {} is ready", channel_id);
/* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails,
......@@ -187,17 +199,17 @@ void WriteBufferToRabbitMQProducer::setupChannel()
producer_channel->confirmSelect()
.onAck([&](uint64_t acked_delivery_tag, bool multiple)
{
removeConfirmed(acked_delivery_tag, multiple, false);
removeRecord(acked_delivery_tag, multiple, false);
})
.onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */)
{
removeConfirmed(nacked_delivery_tag, multiple, true);
removeRecord(nacked_delivery_tag, multiple, true);
});
});
}
void WriteBufferToRabbitMQProducer::removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish)
void WriteBufferToRabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish)
{
auto record_iter = delivery_record.find(received_delivery_tag);
......@@ -292,7 +304,6 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
void WriteBufferToRabbitMQProducer::writingFunc()
{
/// wait_confirm == false when shutdown is called, needed because table might be dropped before all acks are received
while ((!payloads.empty() || wait_all) && wait_confirm.load())
{
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished
......@@ -305,10 +316,6 @@ void WriteBufferToRabbitMQProducer::writingFunc()
iterateEventLoop();
/* wait_num != 0 if there will be no new payloads pushed to payloads.queue in countRow(), delivery_record is empty if there are
* no more pending acknowldgements from the server (if receieved ack(), records are deleted, if received nack(), records are pushed
* to returned.queue and deleted, because server will attach new delivery tags to them)
*/
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
wait_all = false;
else if ((!producer_channel->usable() && event_handler->connectionRunning()) || (!event_handler->connectionRunning() && setupConnection(true)))
......
......@@ -25,7 +25,6 @@ public:
const String & exchange_name_,
const AMQP::ExchangeType exchange_type_,
const size_t channel_id_,
const String channel_base_,
const bool persistent_,
std::atomic<bool> & wait_confirm_,
Poco::Logger * log_,
......@@ -46,7 +45,7 @@ private:
void writingFunc();
bool setupConnection(bool reconnecting);
void setupChannel();
void removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish);
void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish);
void publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & message, bool republishing);
std::pair<String, UInt16> parsed_address;
......@@ -54,9 +53,12 @@ private:
const Names routing_keys;
const String exchange_name;
AMQP::ExchangeType exchange_type;
const String channel_id_base;
const String channel_base;
const String channel_id_base; /// Serial number of current producer buffer
const bool persistent;
/* false: when shutdown is called; needed because table might be dropped before all acks are received
* true: in all other cases
*/
std::atomic<bool> & wait_confirm;
AMQP::Table key_arguments;
......@@ -67,14 +69,47 @@ private:
std::unique_ptr<AMQP::TcpConnection> connection;
std::unique_ptr<AMQP::TcpChannel> producer_channel;
/// Channel errors lead to channel closure, need to count number of recreated channels to update channel id
UInt64 channel_id_counter = 0;
/// channel id which contains id of current producer buffer and serial number of recreated channel in this buffer
String channel_id;
/* payloads.queue:
* - payloads are pushed to queue in countRow and poped by another thread in writingFunc, each payload gets into queue only once
* returned.queue:
* - payloads are pushed to queue:
* 1) inside channel->onError() callback if channel becomes unusable and the record of pending acknowledgements from server
* is non-empty.
* 2) inside removeRecord() if received nack() - negative acknowledgement from the server that message failed to be written
* to disk or it was unable to reach the queue.
* - payloads are poped from the queue once republished
*/
ConcurrentBoundedQueue<std::pair<UInt64, String>> payloads, returned;
/* Counter of current delivery on a current channel. Delivery tags are scoped per channel. The server attaches a delivery tag for each
* published message - a serial number of delivery on current channel. Delivery tag is a way of server to notify publisher if it was
* able or unable to process delivery, i.e. it sends back a responce with a corresponding delivery tag.
*/
UInt64 delivery_tag = 0;
std::atomic<bool> wait_all = true;
/* false: message delivery successfully ended: publisher received confirm from server that all published
* 1) persistent messages were written to disk
* 2) non-persistent messages reached the queue
* true: continue to process deliveries and returned messages
*/
bool wait_all = true;
/* false: untill writeSuffix is called
* true: means payloads.queue will not grow anymore
*/
std::atomic<UInt64> wait_num = 0;
/// Needed to fill messageID property
UInt64 payload_counter = 0;
/// Record of pending acknowledgements from the server; its size never exceeds size of returned.queue
std::map<UInt64, std::pair<UInt64, String>> delivery_record;
UInt64 channel_id_counter = 0;
Poco::Logger * log;
const std::optional<char> delim;
......
......@@ -1547,91 +1547,6 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(420)
def test_rabbitmq_no_loss_on_table_drop(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'queue_resume',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'queue_resume',
rabbitmq_queue_base = 'queue_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq_queue_resume;
''')
i = [0]
messages_num = 10000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
for message in messages:
channel.basic_publish(exchange='queue_resume', routing_key='queue_resume', body=message,
properties=pika.BasicProperties(delivery_mode = 2))
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_queue_resume;
''')
for thread in threads:
thread.join()
collected = int(instance.query('SELECT count() FROM test.view'))
instance.query('''
CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'queue_resume',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'queue_resume',
rabbitmq_queue_base = 'queue_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
while True:
result1 = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result1) == messages_num * threads_num:
break
instance.query('''
DROP TABLE test.rabbitmq_queue_resume;
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
instance.query('''
......@@ -1856,6 +1771,85 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(300)
def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'block',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'block',
rabbitmq_max_block_size = 100,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
cancel = threading.Event()
i = [0]
def produce():
while not cancel.is_set():
messages = []
for _ in range(101):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
for message in messages:
channel.basic_publish(exchange='block', routing_key='', body=message)
rabbitmq_thread = threading.Thread(target=produce)
rabbitmq_thread.start()
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
cancel.set()
instance.query('''
DROP TABLE test.rabbitmq;
''')
while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='rabbitmq'")) == 1:
time.sleep(1)
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'block',
rabbitmq_format = 'JSONEachRow',
rabbitmq_max_block_size = 100,
rabbitmq_queue_base = 'block',
rabbitmq_row_delimiter = '\\n';
''')
while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]:
time.sleep(1)
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.view'))
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
rabbitmq_thread.join()
connection.close()
assert result == 1, 'Messages from RabbitMQ get duplicated!'
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册