StorageRabbitMQ.cpp 39.2 KB
Newer Older
1
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
K
kssenii 已提交
2
#include <DataStreams/IBlockInputStream.h>
A
alesapin 已提交
3
#include <DataStreams/ConvertingBlockInputStream.h>
K
kssenii 已提交
4 5 6 7 8 9 10 11 12 13 14 15
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
16
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
K
kssenii 已提交
17 18
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
19
#include <Storages/RabbitMQ/RabbitMQHandler.h>
K
kssenii 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/parseAddress.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <amqpcpp.h>

K
kssenii 已提交
36 37
namespace DB
{
38

39
static const auto CONNECT_SLEEP = 200;
40
static const auto RETRIES_MAX = 20;
K
Fixes  
kssenii 已提交
41
static const uint32_t QUEUE_SIZE = 100000;
K
kssenii 已提交
42
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
K
kssenii 已提交
43
static const auto RESCHEDULE_MS = 500;
K
kssenii 已提交
44
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
45

K
kssenii 已提交
46 47 48 49
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
    extern const int BAD_ARGUMENTS;
K
kssenii 已提交
50
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
A
alesapin 已提交
51 52
    extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
    extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
K
kssenii 已提交
53
    extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
K
kssenii 已提交
54
    extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
K
kssenii 已提交
55 56
}

57 58 59 60 61 62 63 64 65 66
namespace ExchangeType
{
    /// Note that default here means default by implementation and not by rabbitmq settings
    static const String DEFAULT = "default";
    static const String FANOUT = "fanout";
    static const String DIRECT = "direct";
    static const String TOPIC = "topic";
    static const String HASH = "consistent_hash";
    static const String HEADERS = "headers";
}
67

K
kssenii 已提交
68

K
kssenii 已提交
69 70
StorageRabbitMQ::StorageRabbitMQ(
        const StorageID & table_id_,
A
Amos Bird 已提交
71
        const Context & context_,
K
kssenii 已提交
72
        const ColumnsDescription & columns_,
K
kssenii 已提交
73
        std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
K
kssenii 已提交
74 75
        : IStorage(table_id_)
        , global_context(context_.getGlobalContext())
K
kssenii 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88
        , rabbitmq_settings(std::move(rabbitmq_settings_))
        , exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
        , format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
        , exchange_type(defineExchangeType(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type.value)))
        , routing_keys(parseRoutingKeys(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list.value)))
        , row_delimiter(rabbitmq_settings->rabbitmq_row_delimiter.value)
        , schema_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_schema.value))
        , num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
        , num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
        , queue_base(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base.value))
        , deadletter_exchange(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_deadletter_exchange.value))
        , persistent(rabbitmq_settings->rabbitmq_persistent.value)
        , hash_exchange(num_consumers > 1 || num_queues > 1)
K
kssenii 已提交
89
        , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
K
Fixes  
kssenii 已提交
90 91
        , address(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
        , parsed_address(parseAddress(address, 5672))
K
Fixes  
kssenii 已提交
92
        , login_password(std::make_pair(
A
alesapin 已提交
93 94
                    global_context.getConfigRef().getString("rabbitmq.username"),
                    global_context.getConfigRef().getString("rabbitmq.password")))
K
kssenii 已提交
95 96
        , semaphore(0, num_consumers)
        , unique_strbase(getRandomName())
K
Fixes  
kssenii 已提交
97
        , queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
K
kssenii 已提交
98
        , rabbit_is_ready(false)
K
kssenii 已提交
99
{
A
alesapin 已提交
100 101
    loop = std::make_unique<uv_loop_t>();
    uv_loop_init(loop.get());
102
    event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
K
kssenii 已提交
103
    restoreConnection(false);
104

A
alesapin 已提交
105 106 107
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(columns_);
    setInMemoryMetadata(storage_metadata);
108

A
alesapin 已提交
109
    rabbitmq_context = addSettings(global_context);
A
alesapin 已提交
110
    rabbitmq_context->makeQueryContext();
K
kssenii 已提交
111

K
kssenii 已提交
112 113 114 115 116
    /// One looping task for all consumers as they share the same connection == the same handler == the same event loop
    event_handler->updateLoopState(Loop::STOP);
    looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
    looping_task->deactivate();

A
alesapin 已提交
117
    streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
118
    streaming_task->deactivate();
K
kssenii 已提交
119

K
kssenii 已提交
120 121 122
    connection_task = global_context.getSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
    connection_task->deactivate();

123 124
    if (queue_base.empty())
    {
K
Better  
kssenii 已提交
125
        /* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
K
kssenii 已提交
126
         * be table-based and not just a random string, because local exchanges should be declared the same for same tables
K
Better  
kssenii 已提交
127
         */
K
kssenii 已提交
128
        sharding_exchange = getTableBasedName(exchange_name, table_id_);
129 130 131 132

        /* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
         * to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
         * table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
K
kssenii 已提交
133
         * for the names of later declared queues
134
         */
K
kssenii 已提交
135
        queue_base = getTableBasedName("", table_id_);
136 137 138 139
    }
    else
    {
        /* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
K
kssenii 已提交
140
         * at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
K
kssenii 已提交
141
         * sharding exchange and bridge exchange
142
         */
K
kssenii 已提交
143
        sharding_exchange = exchange_name + "_" + queue_base;
144 145 146
    }

    bridge_exchange = sharding_exchange + "_bridge";
K
kssenii 已提交
147
}
148

K
kssenii 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

Names StorageRabbitMQ::parseRoutingKeys(String routing_key_list)
{
    Names result;
    boost::split(result, routing_key_list, [](char c){ return c == ','; });
    for (String & key : result)
        boost::trim(key);

    return result;
}


AMQP::ExchangeType StorageRabbitMQ::defineExchangeType(String exchange_type_)
{
    AMQP::ExchangeType type;
    if (exchange_type_ != ExchangeType::DEFAULT)
    {
        if (exchange_type_ == ExchangeType::FANOUT)              type = AMQP::ExchangeType::fanout;
        else if (exchange_type_ == ExchangeType::DIRECT)         type = AMQP::ExchangeType::direct;
        else if (exchange_type_ == ExchangeType::TOPIC)          type = AMQP::ExchangeType::topic;
        else if (exchange_type_ == ExchangeType::HASH)           type = AMQP::ExchangeType::consistent_hash;
        else if (exchange_type_ == ExchangeType::HEADERS)        type = AMQP::ExchangeType::headers;
        else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
    }
    else
    {
        type = AMQP::ExchangeType::fanout;
    }

    return type;
}


String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_id)
{
    if (name.empty())
A
Alexander Tokmakov 已提交
185
        return fmt::format("{}_{}", table_id.database_name, table_id.table_name);
K
kssenii 已提交
186
    else
A
Alexander Tokmakov 已提交
187
        return fmt::format("{}_{}_{}", name, table_id.database_name, table_id.table_name);
188 189 190
}


A
alesapin 已提交
191
std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) const
K
kssenii 已提交
192
{
A
alesapin 已提交
193 194 195 196
    auto modified_context = std::make_shared<Context>(context);
    modified_context->setSetting("input_format_skip_unknown_fields", true);
    modified_context->setSetting("input_format_allow_errors_ratio", 0.);
    modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
K
kssenii 已提交
197 198

    if (!schema_name.empty())
A
alesapin 已提交
199
        modified_context->setSetting("format_schema", schema_name);
K
kssenii 已提交
200

A
alesapin 已提交
201
    return modified_context;
K
kssenii 已提交
202 203 204
}


205 206
void StorageRabbitMQ::loopingFunc()
{
207 208
    if (event_handler->connectionRunning())
        event_handler->startLoop();
209 210 211
}


K
kssenii 已提交
212 213 214 215 216 217 218 219 220
void StorageRabbitMQ::connectionFunc()
{
    if (restoreConnection(true))
        initRabbitMQ();
    else
        connection_task->scheduleAfter(RESCHEDULE_MS);
}


K
kssenii 已提交
221 222 223 224 225
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
 * inside streaming task try to deactivate any other task
 */
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
K
kssenii 已提交
226 227 228
    if (stop_loop)
        event_handler->updateLoopState(Loop::STOP);

A
alesapin 已提交
229 230
    std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
    if (lock.try_lock())
K
kssenii 已提交
231 232
    {
        task->deactivate();
A
alesapin 已提交
233
        lock.unlock();
K
kssenii 已提交
234
    }
K
Fixes  
kssenii 已提交
235
    else if (wait) /// Wait only if deactivating from shutdown
K
kssenii 已提交
236
    {
A
alesapin 已提交
237
        lock.lock();
K
kssenii 已提交
238 239 240 241 242
        task->deactivate();
    }
}


A
alesapin 已提交
243
size_t StorageRabbitMQ::getMaxBlockSize() const
K
kssenii 已提交
244 245 246 247 248 249 250
 {
     return rabbitmq_settings->rabbitmq_max_block_size.changed
         ? rabbitmq_settings->rabbitmq_max_block_size.value
         : (global_context.getSettingsRef().max_insert_block_size.value / num_consumers);
 }


K
kssenii 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
void StorageRabbitMQ::initRabbitMQ()
{
    setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());

    initExchange();
    bindExchange();

    for (const auto i : ext::range(0, num_queues))
        bindQueue(i + 1);

    LOG_TRACE(log, "RabbitMQ setup completed");

    rabbit_is_ready = true;
    setup_channel->close();
}


268 269
void StorageRabbitMQ::initExchange()
{
K
Better  
kssenii 已提交
270
    /* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
K
kssenii 已提交
271
     * -> sharding exchange (only if needed) -> queues
272 273 274 275
     */
    setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
    .onError([&](const char * message)
    {
K
kssenii 已提交
276 277 278 279 280 281 282 283
        /* This error can be a result of attempt to declare exchange if it was already declared but
         * 1) with different exchange type. In this case can
         * - manually delete previously declared exchange and create a new one.
         * - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself
         *   if it is not needed anymore or use another exchange name.
         * 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and
         * specified its own settings, which differ from this implementation.
         */
A
alesapin 已提交
284 285
        throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
                + std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
286 287
    });

K
kssenii 已提交
288
    /// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
289 290 291
    setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
    .onError([&](const char * message)
    {
K
kssenii 已提交
292
        /// This error is not supposed to happen as this exchange name is always unique to type and its settings
A
alesapin 已提交
293 294
        throw Exception(
            ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
295 296 297 298 299 300 301 302
    });

    if (!hash_exchange)
    {
        consumer_exchange = bridge_exchange;
        return;
    }

K
Better  
kssenii 已提交
303
    /* Change hash property because by default it will be routing key, which has to be an integer, but with support for any exchange
K
kssenii 已提交
304
     * type - routing keys might be of any type
K
Better  
kssenii 已提交
305
     */
306 307 308
    AMQP::Table binding_arguments;
    binding_arguments["hash-property"] = "message_id";

K
Better  
kssenii 已提交
309
    /// Declare exchange for sharding.
310
    setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
311 312
    .onError([&](const char * message)
    {
K
kssenii 已提交
313 314 315 316
        /* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
         * to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
         * is bad.
         */
A
alesapin 已提交
317
        throw Exception(
K
kssenii 已提交
318 319
           ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
           "Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
320 321
    });

322
    setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
323 324
    .onError([&](const char * message)
    {
A
alesapin 已提交
325 326 327 328 329 330
        throw Exception(
            ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
            "Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
            bridge_exchange,
            sharding_exchange,
            std::string(message));
331 332
    });

333
    consumer_exchange = sharding_exchange;
334 335 336 337 338 339
}


void StorageRabbitMQ::bindExchange()
{
    std::atomic<bool> binding_created = false;
K
kssenii 已提交
340
    size_t bound_keys = 0;
341 342 343

    if (exchange_type == AMQP::ExchangeType::headers)
    {
344 345 346 347 348 349 350 351
        AMQP::Table bind_headers;
        for (const auto & header : routing_keys)
        {
            std::vector<String> matching;
            boost::split(matching, header, [](char c){ return c == '='; });
            bind_headers[matching[0]] = matching[1];
        }

352
        setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
A
alesapin 已提交
353
        .onSuccess([&]() { binding_created = true; })
354 355
        .onError([&](const char * message)
        {
A
alesapin 已提交
356 357 358
            throw Exception(
                ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
K
kssenii 已提交
359
                exchange_name, bridge_exchange, std::string(message));
360 361 362 363 364
        });
    }
    else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
    {
        setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0])
A
alesapin 已提交
365
        .onSuccess([&]() { binding_created = true; })
366 367
        .onError([&](const char * message)
        {
A
alesapin 已提交
368 369 370
            throw Exception(
                ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
K
kssenii 已提交
371
                exchange_name, bridge_exchange, std::string(message));
372 373 374 375 376 377 378 379 380
        });
    }
    else
    {
        for (const auto & routing_key : routing_keys)
        {
            setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
            .onSuccess([&]()
            {
K
kssenii 已提交
381 382 383
                ++bound_keys;
                if (bound_keys == routing_keys.size())
                    binding_created = true;
384 385 386
            })
            .onError([&](const char * message)
            {
A
alesapin 已提交
387 388 389
                throw Exception(
                    ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                    "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
K
kssenii 已提交
390
                    exchange_name, bridge_exchange, std::string(message));
391 392 393 394 395 396 397 398 399 400 401
            });
        }
    }

    while (!binding_created)
    {
        event_handler->iterateLoop();
    }
}


K
kssenii 已提交
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
void StorageRabbitMQ::bindQueue(size_t queue_id)
{
    std::atomic<bool> binding_created = false;

    auto success_callback = [&](const std::string &  queue_name, int msgcount, int /* consumercount */)
    {
        queues.emplace_back(queue_name);
        LOG_DEBUG(log, "Queue {} is declared", queue_name);

        if (msgcount)
            LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);

       /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
        * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
        * fanout exchange it can be arbitrary
        */
        setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
        .onSuccess([&] { binding_created = true; })
        .onError([&](const char * message)
        {
            throw Exception(
                ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
                "Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
        });
    };

    auto error_callback([&](const char * message)
    {
        /* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
         * given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
         * max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
         * declared queues via any of the various cli tools.
         */
        throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
                specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
                which  were declared with the same names. ERROR reason: "
                + std::string(message), ErrorCodes::BAD_ARGUMENTS);
    });

    AMQP::Table queue_settings;

    queue_settings["x-max-length"] = queue_size;

    if (!deadletter_exchange.empty())
        queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
    else
        queue_settings["x-overflow"] = "reject-publish";

    /* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
     * specific queue when its name is specified in queue_base setting
     */
    const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
    setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);

    while (!binding_created)
    {
        event_handler->iterateLoop();
    }
}


K
kssenii 已提交
463 464 465 466 467 468 469 470 471
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
    size_t cnt_retries = 0;

    if (reconnecting)
    {
        connection->close(); /// Connection might be unusable, but not closed

        /* Connection is not closed immediately (firstly, all pending operations are completed, and then
472
         * an AMQP closing-handshake is  performed). But cannot open a new connection until previous one is properly closed
K
kssenii 已提交
473 474 475 476 477 478 479 480
         */
        while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
            event_handler->iterateLoop();

        /// This will force immediate closure if not yet closed
        if (!connection->closed())
            connection->close(true);

K
Fixes  
kssenii 已提交
481
        LOG_TRACE(log, "Trying to restore connection to " + address);
K
kssenii 已提交
482 483
    }

A
alesapin 已提交
484
    connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
K
kssenii 已提交
485 486 487
            AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));

    cnt_retries = 0;
K
kssenii 已提交
488
    while (!connection->ready() && !stream_cancelled && ++cnt_retries != RETRIES_MAX)
K
kssenii 已提交
489 490 491 492 493 494 495 496 497 498 499
    {
        event_handler->iterateLoop();
        std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
    }

    return event_handler->connectionRunning();
}


void StorageRabbitMQ::updateChannel(ChannelPtr & channel)
{
K
kssenii 已提交
500
    std::lock_guard lock(conn_mutex);
K
kssenii 已提交
501 502 503 504
    if (event_handler->connectionRunning())
        channel = std::make_shared<AMQP::TcpChannel>(connection.get());
    else
        channel = nullptr;
K
kssenii 已提交
505 506 507
}


508 509
void StorageRabbitMQ::unbindExchange()
{
K
kssenii 已提交
510 511 512
    /* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
     * on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
     * As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
K
Better  
kssenii 已提交
513 514 515
     * consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
     * input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
     * externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
K
kssenii 已提交
516
     * queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
K
kssenii 已提交
517 518
     * bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
     * not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
K
Better  
kssenii 已提交
519
     */
K
kssenii 已提交
520
    std::call_once(flag, [&]()
521
    {
K
kssenii 已提交
522
        streaming_task->deactivate();
K
kssenii 已提交
523 524 525
        event_handler->updateLoopState(Loop::STOP);
        looping_task->deactivate();

K
kssenii 已提交
526
        setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
527 528 529 530 531 532 533
        setup_channel->removeExchange(bridge_exchange)
        .onSuccess([&]()
        {
            exchange_removed.store(true);
        })
        .onError([&](const char * message)
        {
K
kssenii 已提交
534
            throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE);
535 536
        });

K
kssenii 已提交
537
        while (!exchange_removed.load())
538 539 540
        {
            event_handler->iterateLoop();
        }
K
kssenii 已提交
541 542

        setup_channel->close();
K
kssenii 已提交
543
    });
544 545 546
}


N
Nikolai Kochetov 已提交
547
Pipe StorageRabbitMQ::read(
548
        const Names & column_names,
A
alesapin 已提交
549
        const StorageMetadataPtr & metadata_snapshot,
550
        SelectQueryInfo & /* query_info */,
551 552 553 554 555 556 557 558
        const Context & context,
        QueryProcessingStage::Enum /* processed_stage */,
        size_t /* max_block_size */,
        unsigned /* num_streams */)
{
    if (num_created_consumers == 0)
        return {};

K
kssenii 已提交
559
    auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
K
kssenii 已提交
560 561
    auto modified_context = addSettings(context);
    auto block_size = getMaxBlockSize();
K
kssenii 已提交
562 563

    bool update_channels = false;
K
Fixes  
kssenii 已提交
564
    if (!event_handler->connectionRunning())
K
kssenii 已提交
565 566
    {
        if (event_handler->loopRunning())
K
kssenii 已提交
567
            deactivateTask(looping_task, false, true);
K
kssenii 已提交
568

K
kssenii 已提交
569
        update_channels = restoreConnection(true);
K
kssenii 已提交
570 571
    }

572 573 574 575 576
    Pipes pipes;
    pipes.reserve(num_created_consumers);

    for (size_t i = 0; i < num_created_consumers; ++i)
    {
K
kssenii 已提交
577 578
        auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
                *this, metadata_snapshot, modified_context, column_names, block_size);
K
kssenii 已提交
579 580 581 582 583 584

        /* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update.
         * But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also
         * close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
         * and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
         */
K
kssenii 已提交
585
        if (event_handler->connectionRunning() && (update_channels || rabbit_stream->needChannelUpdate()))
K
kssenii 已提交
586 587 588
        {
            if (event_handler->loopRunning())
            {
K
kssenii 已提交
589
                deactivateTask(looping_task, false, true);
K
kssenii 已提交
590 591 592 593 594
            }

            rabbit_stream->updateChannel();
        }

A
alesapin 已提交
595 596 597
        auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
            rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
        pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
598 599
    }

K
kssenii 已提交
600
    if (!event_handler->loopRunning() && event_handler->connectionRunning())
601
        looping_task->activateAndSchedule();
602

603
    LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
A
alesapin 已提交
604 605 606
    auto united_pipe = Pipe::unitePipes(std::move(pipes));
    united_pipe.addInterpreterContext(modified_context);
    return united_pipe;
607 608 609
}


A
alesapin 已提交
610
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
K
kssenii 已提交
611
{
A
alesapin 已提交
612
    return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, context);
K
kssenii 已提交
613 614 615
}


616 617
void StorageRabbitMQ::startup()
{
K
kssenii 已提交
618 619 620 621
    if (event_handler->connectionRunning())
        initRabbitMQ();
    else
        connection_task->activateAndSchedule();
K
kssenii 已提交
622

623 624 625 626 627 628 629
    for (size_t i = 0; i < num_consumers; ++i)
    {
        try
        {
            pushReadBuffer(createReadBuffer());
            ++num_created_consumers;
        }
K
kssenii 已提交
630
        catch (const AMQP::Exception & e)
631
        {
A
alesapin 已提交
632
            LOG_ERROR(log, "Got AMQ exception {}", e.what());
K
kssenii 已提交
633
            throw;
634 635 636
        }
    }

K
kssenii 已提交
637
    event_handler->updateLoopState(Loop::RUN);
638
    streaming_task->activateAndSchedule();
639 640 641 642 643 644
}


void StorageRabbitMQ::shutdown()
{
    stream_cancelled = true;
A
alesapin 已提交
645
    wait_confirm = false;
646

K
kssenii 已提交
647 648
    deactivateTask(streaming_task, true, false);
    deactivateTask(looping_task, true, true);
K
kssenii 已提交
649

K
Better  
kssenii 已提交
650 651 652
    connection->close();

    size_t cnt_retries = 0;
K
kssenii 已提交
653
    while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
K
Better  
kssenii 已提交
654
        event_handler->iterateLoop();
655

K
kssenii 已提交
656
    /// Should actually force closure, if not yet closed, but it generates distracting error logs
K
Better  
kssenii 已提交
657 658 659 660 661
    //if (!connection->closed())
    //    connection->close(true);

    for (size_t i = 0; i < num_created_consumers; ++i)
        popReadBuffer();
662 663 664 665 666
}


void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
{
A
alesapin 已提交
667
    std::lock_guard lock(buffers_mutex);
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
    buffers.push_back(buffer);
    semaphore.set();
}


ConsumerBufferPtr StorageRabbitMQ::popReadBuffer()
{
    return popReadBuffer(std::chrono::milliseconds::zero());
}


ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout)
{
    // Wait for the first free buffer
    if (timeout == std::chrono::milliseconds::zero())
        semaphore.wait();
    else
    {
        if (!semaphore.tryWait(timeout.count()))
            return nullptr;
    }

    // Take the first available buffer from the list
A
alesapin 已提交
691
    std::lock_guard lock(buffers_mutex);
692 693 694 695 696 697 698 699 700
    auto buffer = buffers.back();
    buffers.pop_back();

    return buffer;
}


ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
{
K
kssenii 已提交
701 702 703
    ChannelPtr consumer_channel;
    if (event_handler->connectionRunning())
        consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
K
kssenii 已提交
704

705
    return std::make_shared<ReadBufferFromRabbitMQConsumer>(
K
kssenii 已提交
706 707
        consumer_channel, event_handler, queues, ++consumer_id,
        unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
708 709 710
}


K
kssenii 已提交
711 712
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
713
    return std::make_shared<WriteBufferToRabbitMQProducer>(
714
        parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type,
K
kssenii 已提交
715
        producer_id.fetch_add(1), persistent, wait_confirm, log,
716
        row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
K
kssenii 已提交
717 718 719
}


720 721 722 723 724 725 726 727 728 729
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
{
    // Check if all dependencies are attached
    auto dependencies = DatabaseCatalog::instance().getDependencies(table_id);
    if (dependencies.empty())
        return true;

    // Check the dependencies are ready?
    for (const auto & db_tab : dependencies)
    {
730
        auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context);
731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
        if (!table)
            return false;

        // If it materialized view, check it's target table
        auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
        if (materialized_view && !materialized_view->tryGetTargetTable())
            return false;

        // Check all its dependencies
        if (!checkDependencies(db_tab))
            return false;
    }

    return true;
}


A
alesapin 已提交
748
void StorageRabbitMQ::streamingToViewsFunc()
749
{
K
kssenii 已提交
750
    if (rabbit_is_ready)
751
    {
K
kssenii 已提交
752
        try
753
        {
K
kssenii 已提交
754
            auto table_id = getStorageID();
755

K
kssenii 已提交
756 757
            // Check if at least one direct dependency is attached
            size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
758

K
kssenii 已提交
759 760 761
            if (dependencies_count)
            {
                auto start_time = std::chrono::steady_clock::now();
K
kssenii 已提交
762

K
kssenii 已提交
763 764
                // Keep streaming as long as there are attached views and streaming is not cancelled
                while (!stream_cancelled && num_created_consumers > 0)
K
kssenii 已提交
765
                {
K
kssenii 已提交
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781
                    if (!checkDependencies(table_id))
                        break;

                    LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);

                    if (streamToViews())
                        break;

                    auto end_time = std::chrono::steady_clock::now();
                    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
                    if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
                    {
                        event_handler->updateLoopState(Loop::STOP);
                        LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
                        break;
                    }
K
kssenii 已提交
782
                }
783 784
            }
        }
K
kssenii 已提交
785 786 787 788
        catch (...)
        {
            tryLogCurrentException(__PRETTY_FUNCTION__);
        }
789 790 791 792
    }

    /// Wait for attached views
    if (!stream_cancelled)
K
kssenii 已提交
793
        streaming_task->scheduleAfter(RESCHEDULE_MS);
794 795 796 797 798 799
}


bool StorageRabbitMQ::streamToViews()
{
    auto table_id = getStorageID();
800
    auto table = DatabaseCatalog::instance().getTable(table_id, global_context);
801 802 803 804 805 806 807
    if (!table)
        throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);

    // Create an INSERT query for streaming data
    auto insert = std::make_shared<ASTInsertQuery>();
    insert->table_id = table_id;

K
kssenii 已提交
808
    // Only insert into dependent views and expect that input blocks contain virtual columns
A
alesapin 已提交
809
    InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
810 811
    auto block_io = interpreter.execute();

K
kssenii 已提交
812 813 814 815
    auto metadata_snapshot = getInMemoryMetadataPtr();
    auto column_names = block_io.out->getHeader().getNames();
    auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());

K
kssenii 已提交
816 817
    auto block_size = getMaxBlockSize();

818 819 820 821 822 823
    // Create a stream for each consumer and join them in a union stream
    BlockInputStreams streams;
    streams.reserve(num_created_consumers);

    for (size_t i = 0; i < num_created_consumers; ++i)
    {
K
kssenii 已提交
824 825
        auto stream = std::make_shared<RabbitMQBlockInputStream>(
                *this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
K
kssenii 已提交
826
        streams.emplace_back(stream);
827 828

        // Limit read batch to maximum block size to allow DDL
N
Nikolai Kochetov 已提交
829
        StreamLocalLimits limits;
K
kssenii 已提交
830

K
kssenii 已提交
831 832 833 834
        limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
                                                  ? rabbitmq_settings->rabbitmq_flush_interval_ms
                                                  : global_context.getSettingsRef().stream_flush_interval_ms;

835
        limits.timeout_overflow_mode = OverflowMode::BREAK;
K
kssenii 已提交
836 837

        stream->setLimits(limits);
838 839 840 841 842 843 844 845 846 847
    }

    // Join multiple streams if necessary
    BlockInputStreamPtr in;
    if (streams.size() > 1)
        in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
    else
        in = streams[0];

    std::atomic<bool> stub = {false};
K
kssenii 已提交
848 849 850 851 852 853 854

    if (!event_handler->loopRunning())
    {
        event_handler->updateLoopState(Loop::RUN);
        looping_task->activateAndSchedule();
    }

855 856
    copyData(*in, *block_io.out, &stub);

K
kssenii 已提交
857 858
    /* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
     * error occurs or connection is lost while ack is being sent
K
kssenii 已提交
859
     */
K
kssenii 已提交
860 861
    deactivateTask(looping_task, false, true);
    size_t queue_empty = 0;
K
kssenii 已提交
862 863 864

    if (!event_handler->connectionRunning())
    {
K
kssenii 已提交
865 866 867 868
        if (stream_cancelled)
            return true;

        if (restoreConnection(true))
K
kssenii 已提交
869 870 871 872 873 874
        {
            for (auto & stream : streams)
                stream->as<RabbitMQBlockInputStream>()->updateChannel();
        }
        else
        {
K
kssenii 已提交
875 876
            LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
            return true;
K
kssenii 已提交
877 878 879 880 881 882 883
        }
    }
    else
    {
        /// Commit
        for (auto & stream : streams)
        {
K
kssenii 已提交
884 885 886
            if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
                ++queue_empty;

K
kssenii 已提交
887 888 889 890 891 892 893 894 895 896 897 898
            /* false is returned by the sendAck function in only two cases:
             * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
             *    delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
             *    no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
             *    waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
             *    consumers. So in this case duplicates are inevitable.
             * 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
             *    common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
             *    Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
             *    the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
             *    will ever happen.
             */
K
kssenii 已提交
899 900
            if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
            {
K
Fixes  
kssenii 已提交
901 902 903 904
                /// Iterate loop to activate error callbacks if they happened
                event_handler->iterateLoop();

                if (event_handler->connectionRunning())
K
kssenii 已提交
905 906 907 908 909 910 911
                {
                    /* Almost any error with channel will lead to connection closure, but if so happens that channel errored and
                     * connection is not closed - also need to restore channels
                     */
                    if (!stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
                        stream->as<RabbitMQBlockInputStream>()->updateChannel();
                }
K
kssenii 已提交
912
                else
K
kssenii 已提交
913
                {
K
kssenii 已提交
914
                    break;
K
kssenii 已提交
915
                }
K
kssenii 已提交
916
            }
K
kssenii 已提交
917 918

            event_handler->iterateLoop();
K
kssenii 已提交
919 920 921
        }
    }

K
kssenii 已提交
922
    if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
K
kssenii 已提交
923 924 925 926 927 928 929 930 931 932 933
    {
        connection->heartbeat();
        read_attempts = 0;
        LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
        return true;
    }
    else
    {
        event_handler->updateLoopState(Loop::RUN);
        looping_task->activateAndSchedule();
    }
934

K
kssenii 已提交
935
    return false;
K
kssenii 已提交
936 937 938 939 940 941 942 943 944 945 946
}


void registerStorageRabbitMQ(StorageFactory & factory)
{
    auto creator_fn = [](const StorageFactory::Arguments & args)
    {
        ASTs & engine_args = args.engine_args;
        size_t args_count = engine_args.size();
        bool has_settings = args.storage_def->settings;

K
kssenii 已提交
947
        auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
K
kssenii 已提交
948
        if (has_settings)
K
kssenii 已提交
949
            rabbitmq_settings->loadFromQuery(*args.storage_def);
K
kssenii 已提交
950

K
kssenii 已提交
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
        // Check arguments and settings
        #define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME)                                           \
            /* One of the three required arguments is not specified */                                       \
            if (args_count < (ARG_NUM) && (ARG_NUM) <= 3 && !rabbitmq_settings->ARG_NAME.changed)            \
            {                                                                                                \
                throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified",     \
                    ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);                                           \
            }                                                                                                \
            if (args_count >= (ARG_NUM))                                                                     \
            {                                                                                                \
                if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */      \
                {                                                                                            \
                    throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ "                        \
                        "and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS);       \
                }                                                                                            \
K
kssenii 已提交
966 967
            }

K
kssenii 已提交
968 969 970
        CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_exchange_name)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_format)
K
kssenii 已提交
971

K
kssenii 已提交
972 973 974 975 976 977 978 979 980
        CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
K
kssenii 已提交
981

K
kssenii 已提交
982 983 984
        CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
K
kssenii 已提交
985

K
kssenii 已提交
986
        #undef CHECK_RABBITMQ_STORAGE_ARGUMENT
K
kssenii 已提交
987

K
kssenii 已提交
988
        return StorageRabbitMQ::create(args.table_id, args.context, args.columns, std::move(rabbitmq_settings));
K
kssenii 已提交
989 990 991 992 993 994 995 996 997
    };

    factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}


NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{
    return NamesAndTypesList{
K
kssenii 已提交
998
            {"_exchange_name", std::make_shared<DataTypeString>()},
K
Better  
kssenii 已提交
999
            {"_channel_id", std::make_shared<DataTypeString>()},
K
kssenii 已提交
1000
            {"_delivery_tag", std::make_shared<DataTypeUInt64>()},
1001
            {"_redelivered", std::make_shared<DataTypeUInt8>()},
K
kssenii 已提交
1002 1003
            {"_message_id", std::make_shared<DataTypeString>()},
            {"_timestamp", std::make_shared<DataTypeUInt64>()}
K
kssenii 已提交
1004 1005 1006 1007
    };
}

}