StorageRabbitMQ.cpp 38.5 KB
Newer Older
1
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
K
kssenii 已提交
2
#include <DataStreams/IBlockInputStream.h>
A
alesapin 已提交
3
#include <DataStreams/ConvertingBlockInputStream.h>
K
kssenii 已提交
4 5 6 7 8 9 10 11 12 13 14 15
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
16
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
K
kssenii 已提交
17 18
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
19
#include <Storages/RabbitMQ/RabbitMQHandler.h>
K
kssenii 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/parseAddress.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <amqpcpp.h>

K
kssenii 已提交
36 37
namespace DB
{
38

39
static const auto CONNECT_SLEEP = 200;
40
static const auto RETRIES_MAX = 20;
K
Fixes  
kssenii 已提交
41
static const uint32_t QUEUE_SIZE = 100000;
K
kssenii 已提交
42
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
K
kssenii 已提交
43
static const auto RESCHEDULE_MS = 500;
K
kssenii 已提交
44
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
45

K
kssenii 已提交
46 47 48 49
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
    extern const int BAD_ARGUMENTS;
50
    extern const int CANNOT_CONNECT_RABBITMQ;
K
kssenii 已提交
51
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
A
alesapin 已提交
52 53
    extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
    extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
K
kssenii 已提交
54
    extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
K
kssenii 已提交
55
    extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
K
kssenii 已提交
56 57
}

58 59 60 61 62 63 64 65 66 67
namespace ExchangeType
{
    /// Note that default here means default by implementation and not by rabbitmq settings
    static const String DEFAULT = "default";
    static const String FANOUT = "fanout";
    static const String DIRECT = "direct";
    static const String TOPIC = "topic";
    static const String HASH = "consistent_hash";
    static const String HEADERS = "headers";
}
68

K
kssenii 已提交
69

K
kssenii 已提交
70 71 72 73
StorageRabbitMQ::StorageRabbitMQ(
        const StorageID & table_id_,
        Context & context_,
        const ColumnsDescription & columns_,
K
kssenii 已提交
74
        std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
K
kssenii 已提交
75 76
        : IStorage(table_id_)
        , global_context(context_.getGlobalContext())
K
kssenii 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90
        , rabbitmq_context(Context(global_context))
        , rabbitmq_settings(std::move(rabbitmq_settings_))
        , exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
        , format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
        , exchange_type(defineExchangeType(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type.value)))
        , routing_keys(parseRoutingKeys(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list.value)))
        , row_delimiter(rabbitmq_settings->rabbitmq_row_delimiter.value)
        , schema_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_schema.value))
        , num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
        , num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
        , queue_base(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base.value))
        , deadletter_exchange(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_deadletter_exchange.value))
        , persistent(rabbitmq_settings->rabbitmq_persistent.value)
        , hash_exchange(num_consumers > 1 || num_queues > 1)
K
kssenii 已提交
91
        , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
K
Fixes  
kssenii 已提交
92 93
        , address(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
        , parsed_address(parseAddress(address, 5672))
K
Fixes  
kssenii 已提交
94
        , login_password(std::make_pair(
A
alesapin 已提交
95 96
                    global_context.getConfigRef().getString("rabbitmq.username"),
                    global_context.getConfigRef().getString("rabbitmq.password")))
K
kssenii 已提交
97 98
        , semaphore(0, num_consumers)
        , unique_strbase(getRandomName())
K
Fixes  
kssenii 已提交
99
        , queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
K
kssenii 已提交
100
{
A
alesapin 已提交
101 102
    loop = std::make_unique<uv_loop_t>();
    uv_loop_init(loop.get());
103
    event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
104

K
kssenii 已提交
105
    if (!restoreConnection(false))
K
kssenii 已提交
106
    {
K
Better  
kssenii 已提交
107 108
        if (!connection->closed())
            connection->close(true);
K
kssenii 已提交
109

K
Fixes  
kssenii 已提交
110
        throw Exception("Cannot connect to RabbitMQ " + address, ErrorCodes::CANNOT_CONNECT_RABBITMQ);
K
kssenii 已提交
111
    }
112

A
alesapin 已提交
113 114 115
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(columns_);
    setInMemoryMetadata(storage_metadata);
116

K
kssenii 已提交
117
    rabbitmq_context.makeQueryContext();
K
kssenii 已提交
118
    rabbitmq_context = addSettings(rabbitmq_context);
K
kssenii 已提交
119

K
kssenii 已提交
120 121 122 123 124
    /// One looping task for all consumers as they share the same connection == the same handler == the same event loop
    event_handler->updateLoopState(Loop::STOP);
    looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
    looping_task->deactivate();

A
alesapin 已提交
125
    streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
126
    streaming_task->deactivate();
K
kssenii 已提交
127

128 129
    if (queue_base.empty())
    {
K
Better  
kssenii 已提交
130
        /* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
K
kssenii 已提交
131
         * be table-based and not just a random string, because local exchanges should be declared the same for same tables
K
Better  
kssenii 已提交
132
         */
K
kssenii 已提交
133
        sharding_exchange = getTableBasedName(exchange_name, table_id_);
134 135 136 137

        /* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
         * to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
         * table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
K
kssenii 已提交
138
         * for the names of later declared queues
139
         */
K
kssenii 已提交
140
        queue_base = getTableBasedName("", table_id_);
141 142 143 144
    }
    else
    {
        /* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
K
kssenii 已提交
145
         * at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
K
kssenii 已提交
146
         * sharding exchange and bridge exchange
147
         */
K
kssenii 已提交
148
        sharding_exchange = exchange_name + "_" + queue_base;
149 150 151
    }

    bridge_exchange = sharding_exchange + "_bridge";
K
kssenii 已提交
152
}
153

K
kssenii 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189

Names StorageRabbitMQ::parseRoutingKeys(String routing_key_list)
{
    Names result;
    boost::split(result, routing_key_list, [](char c){ return c == ','; });
    for (String & key : result)
        boost::trim(key);

    return result;
}


AMQP::ExchangeType StorageRabbitMQ::defineExchangeType(String exchange_type_)
{
    AMQP::ExchangeType type;
    if (exchange_type_ != ExchangeType::DEFAULT)
    {
        if (exchange_type_ == ExchangeType::FANOUT)              type = AMQP::ExchangeType::fanout;
        else if (exchange_type_ == ExchangeType::DIRECT)         type = AMQP::ExchangeType::direct;
        else if (exchange_type_ == ExchangeType::TOPIC)          type = AMQP::ExchangeType::topic;
        else if (exchange_type_ == ExchangeType::HASH)           type = AMQP::ExchangeType::consistent_hash;
        else if (exchange_type_ == ExchangeType::HEADERS)        type = AMQP::ExchangeType::headers;
        else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
    }
    else
    {
        type = AMQP::ExchangeType::fanout;
    }

    return type;
}


String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_id)
{
    std::stringstream ss;
190
    ss.exceptions(std::ios::failbit);
K
kssenii 已提交
191 192 193 194 195 196 197

    if (name.empty())
        ss << table_id.database_name << "_" << table_id.table_name;
    else
        ss << name << "_" << table_id.database_name << "_" << table_id.table_name;

    return ss.str();
198 199 200
}


A
alesapin 已提交
201
Context StorageRabbitMQ::addSettings(Context context) const
K
kssenii 已提交
202 203 204 205 206 207 208 209 210 211 212 213
{
    context.setSetting("input_format_skip_unknown_fields", true);
    context.setSetting("input_format_allow_errors_ratio", 0.);
    context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);

    if (!schema_name.empty())
        context.setSetting("format_schema", schema_name);

    return context;
}


214 215
void StorageRabbitMQ::loopingFunc()
{
216 217
    if (event_handler->connectionRunning())
        event_handler->startLoop();
218 219 220
}


K
kssenii 已提交
221 222 223 224 225
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
 * inside streaming task try to deactivate any other task
 */
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
K
kssenii 已提交
226 227 228
    if (stop_loop)
        event_handler->updateLoopState(Loop::STOP);

A
alesapin 已提交
229 230
    std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
    if (lock.try_lock())
K
kssenii 已提交
231 232
    {
        task->deactivate();
A
alesapin 已提交
233
        lock.unlock();
K
kssenii 已提交
234
    }
K
Fixes  
kssenii 已提交
235
    else if (wait) /// Wait only if deactivating from shutdown
K
kssenii 已提交
236
    {
A
alesapin 已提交
237
        lock.lock();
K
kssenii 已提交
238 239 240 241 242
        task->deactivate();
    }
}


A
alesapin 已提交
243
size_t StorageRabbitMQ::getMaxBlockSize() const
K
kssenii 已提交
244 245 246 247 248 249 250
 {
     return rabbitmq_settings->rabbitmq_max_block_size.changed
         ? rabbitmq_settings->rabbitmq_max_block_size.value
         : (global_context.getSettingsRef().max_insert_block_size.value / num_consumers);
 }


251 252
void StorageRabbitMQ::initExchange()
{
K
Better  
kssenii 已提交
253
    /* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
K
kssenii 已提交
254
     * -> sharding exchange (only if needed) -> queues
255 256 257 258
     */
    setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
    .onError([&](const char * message)
    {
K
kssenii 已提交
259 260 261 262 263 264 265 266
        /* This error can be a result of attempt to declare exchange if it was already declared but
         * 1) with different exchange type. In this case can
         * - manually delete previously declared exchange and create a new one.
         * - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself
         *   if it is not needed anymore or use another exchange name.
         * 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and
         * specified its own settings, which differ from this implementation.
         */
A
alesapin 已提交
267 268
        throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
                + std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
269 270
    });

K
kssenii 已提交
271
    /// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
272 273 274
    setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
    .onError([&](const char * message)
    {
K
kssenii 已提交
275
        /// This error is not supposed to happen as this exchange name is always unique to type and its settings
A
alesapin 已提交
276 277
        throw Exception(
            ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
278 279 280 281 282 283 284 285
    });

    if (!hash_exchange)
    {
        consumer_exchange = bridge_exchange;
        return;
    }

K
Better  
kssenii 已提交
286
    /* Change hash property because by default it will be routing key, which has to be an integer, but with support for any exchange
K
kssenii 已提交
287
     * type - routing keys might be of any type
K
Better  
kssenii 已提交
288
     */
289 290 291
    AMQP::Table binding_arguments;
    binding_arguments["hash-property"] = "message_id";

K
Better  
kssenii 已提交
292
    /// Declare exchange for sharding.
293
    setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
294 295
    .onError([&](const char * message)
    {
K
kssenii 已提交
296 297 298 299
        /* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
         * to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
         * is bad.
         */
A
alesapin 已提交
300 301
        throw Exception(
           ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
302 303
    });

304
    setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
305 306
    .onError([&](const char * message)
    {
A
alesapin 已提交
307 308 309 310 311 312
        throw Exception(
            ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
            "Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
            bridge_exchange,
            sharding_exchange,
            std::string(message));
313 314
    });

315
    consumer_exchange = sharding_exchange;
316 317 318 319 320 321
}


void StorageRabbitMQ::bindExchange()
{
    std::atomic<bool> binding_created = false;
K
kssenii 已提交
322
    size_t bound_keys = 0;
323 324 325

    if (exchange_type == AMQP::ExchangeType::headers)
    {
326 327 328 329 330 331 332 333
        AMQP::Table bind_headers;
        for (const auto & header : routing_keys)
        {
            std::vector<String> matching;
            boost::split(matching, header, [](char c){ return c == '='; });
            bind_headers[matching[0]] = matching[1];
        }

334
        setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
A
alesapin 已提交
335
        .onSuccess([&]() { binding_created = true; })
336 337
        .onError([&](const char * message)
        {
A
alesapin 已提交
338 339 340 341 342 343
            throw Exception(
                ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
                exchange_name,
                bridge_exchange,
                std::string(message));
344 345 346 347 348
        });
    }
    else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
    {
        setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0])
A
alesapin 已提交
349
        .onSuccess([&]() { binding_created = true; })
350 351
        .onError([&](const char * message)
        {
A
alesapin 已提交
352 353 354 355 356 357
            throw Exception(
                ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
                exchange_name,
                bridge_exchange,
                std::string(message));
358 359 360 361 362 363 364 365 366
        });
    }
    else
    {
        for (const auto & routing_key : routing_keys)
        {
            setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
            .onSuccess([&]()
            {
K
kssenii 已提交
367 368 369
                ++bound_keys;
                if (bound_keys == routing_keys.size())
                    binding_created = true;
370 371 372
            })
            .onError([&](const char * message)
            {
A
alesapin 已提交
373 374 375 376 377 378
                throw Exception(
                    ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                    "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
                    exchange_name,
                    bridge_exchange,
                    std::string(message));
379 380 381 382 383 384 385 386 387 388 389
            });
        }
    }

    while (!binding_created)
    {
        event_handler->iterateLoop();
    }
}


K
kssenii 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
void StorageRabbitMQ::bindQueue(size_t queue_id)
{
    std::atomic<bool> binding_created = false;

    auto success_callback = [&](const std::string &  queue_name, int msgcount, int /* consumercount */)
    {
        queues.emplace_back(queue_name);
        LOG_DEBUG(log, "Queue {} is declared", queue_name);

        if (msgcount)
            LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);

       /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
        * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
        * fanout exchange it can be arbitrary
        */
        setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
        .onSuccess([&] { binding_created = true; })
        .onError([&](const char * message)
        {
            throw Exception(
                ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
                "Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
        });
    };

    auto error_callback([&](const char * message)
    {
        /* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
         * given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
         * max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
         * declared queues via any of the various cli tools.
         */
        throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
                specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
                which  were declared with the same names. ERROR reason: "
                + std::string(message), ErrorCodes::BAD_ARGUMENTS);
    });

    AMQP::Table queue_settings;

    queue_settings["x-max-length"] = queue_size;

    if (!deadletter_exchange.empty())
        queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
    else
        queue_settings["x-overflow"] = "reject-publish";

    /* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
     * specific queue when its name is specified in queue_base setting
     */
    const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
    setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);

    while (!binding_created)
    {
        event_handler->iterateLoop();
    }
}


K
kssenii 已提交
451 452 453 454 455 456 457 458 459
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
    size_t cnt_retries = 0;

    if (reconnecting)
    {
        connection->close(); /// Connection might be unusable, but not closed

        /* Connection is not closed immediately (firstly, all pending operations are completed, and then
460
         * an AMQP closing-handshake is  performed). But cannot open a new connection until previous one is properly closed
K
kssenii 已提交
461 462 463 464 465 466 467 468
         */
        while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
            event_handler->iterateLoop();

        /// This will force immediate closure if not yet closed
        if (!connection->closed())
            connection->close(true);

K
Fixes  
kssenii 已提交
469
        LOG_TRACE(log, "Trying to restore connection to " + address);
K
kssenii 已提交
470 471
    }

A
alesapin 已提交
472
    connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
K
kssenii 已提交
473 474 475
            AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));

    cnt_retries = 0;
K
kssenii 已提交
476
    while (!connection->ready() && !stream_cancelled && ++cnt_retries != RETRIES_MAX)
K
kssenii 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
    {
        event_handler->iterateLoop();
        std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
    }

    return event_handler->connectionRunning();
}


void StorageRabbitMQ::updateChannel(ChannelPtr & channel)
{
    channel = std::make_shared<AMQP::TcpChannel>(connection.get());
}


492 493
void StorageRabbitMQ::unbindExchange()
{
K
kssenii 已提交
494 495 496
    /* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
     * on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
     * As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
K
Better  
kssenii 已提交
497 498 499
     * consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
     * input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
     * externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
K
kssenii 已提交
500
     * queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
K
kssenii 已提交
501 502
     * bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
     * not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
K
Better  
kssenii 已提交
503
     */
K
kssenii 已提交
504
    std::call_once(flag, [&]()
505
    {
K
kssenii 已提交
506
        streaming_task->deactivate();
K
kssenii 已提交
507 508 509
        event_handler->updateLoopState(Loop::STOP);
        looping_task->deactivate();

K
kssenii 已提交
510
        setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
511 512 513 514 515 516 517
        setup_channel->removeExchange(bridge_exchange)
        .onSuccess([&]()
        {
            exchange_removed.store(true);
        })
        .onError([&](const char * message)
        {
K
kssenii 已提交
518
            throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE);
519 520
        });

K
kssenii 已提交
521
        while (!exchange_removed.load())
522 523 524
        {
            event_handler->iterateLoop();
        }
K
kssenii 已提交
525 526

        setup_channel->close();
K
kssenii 已提交
527
    });
528 529 530
}


N
Nikolai Kochetov 已提交
531
Pipe StorageRabbitMQ::read(
532
        const Names & column_names,
A
alesapin 已提交
533
        const StorageMetadataPtr & metadata_snapshot,
534
        SelectQueryInfo & /* query_info */,
535 536 537 538 539 540 541 542
        const Context & context,
        QueryProcessingStage::Enum /* processed_stage */,
        size_t /* max_block_size */,
        unsigned /* num_streams */)
{
    if (num_created_consumers == 0)
        return {};

K
kssenii 已提交
543 544
    auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());

K
kssenii 已提交
545 546
    auto modified_context = addSettings(context);
    auto block_size = getMaxBlockSize();
K
kssenii 已提交
547 548

    bool update_channels = false;
K
Fixes  
kssenii 已提交
549
    if (!event_handler->connectionRunning())
K
kssenii 已提交
550 551
    {
        if (event_handler->loopRunning())
K
kssenii 已提交
552
            deactivateTask(looping_task, false, true);
K
kssenii 已提交
553

K
kssenii 已提交
554
        update_channels = restoreConnection(true);
K
kssenii 已提交
555 556
    }

557 558 559 560 561
    Pipes pipes;
    pipes.reserve(num_created_consumers);

    for (size_t i = 0; i < num_created_consumers; ++i)
    {
K
kssenii 已提交
562 563
        auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
                *this, metadata_snapshot, modified_context, column_names, block_size);
K
kssenii 已提交
564 565 566 567 568 569

        /* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update.
         * But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also
         * close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
         * and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
         */
K
kssenii 已提交
570
        if (event_handler->connectionRunning() && (update_channels || rabbit_stream->needChannelUpdate()))
K
kssenii 已提交
571 572 573
        {
            if (event_handler->loopRunning())
            {
K
kssenii 已提交
574
                deactivateTask(looping_task, false, true);
K
kssenii 已提交
575 576 577 578 579
            }

            rabbit_stream->updateChannel();
        }

A
alesapin 已提交
580 581 582
        auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
            rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
        pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
583 584
    }

K
kssenii 已提交
585
    if (!event_handler->loopRunning() && event_handler->connectionRunning())
586
        looping_task->activateAndSchedule();
587

588
    LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
N
Nikolai Kochetov 已提交
589
    return Pipe::unitePipes(std::move(pipes));
590 591 592
}


A
alesapin 已提交
593
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
K
kssenii 已提交
594
{
A
alesapin 已提交
595
    return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, context);
K
kssenii 已提交
596 597 598
}


599 600
void StorageRabbitMQ::startup()
{
601 602 603 604
    setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
    initExchange();
    bindExchange();

K
kssenii 已提交
605 606 607 608 609 610 611
    for (size_t i = 1; i <= num_queues; ++i)
    {
        bindQueue(i);
    }

    setup_channel->close();

612 613 614 615 616 617 618
    for (size_t i = 0; i < num_consumers; ++i)
    {
        try
        {
            pushReadBuffer(createReadBuffer());
            ++num_created_consumers;
        }
K
kssenii 已提交
619
        catch (const AMQP::Exception & e)
620
        {
A
alesapin 已提交
621
            LOG_ERROR(log, "Got AMQ exception {}", e.what());
K
kssenii 已提交
622
            throw;
623 624 625
        }
    }

K
kssenii 已提交
626
    event_handler->updateLoopState(Loop::RUN);
627
    streaming_task->activateAndSchedule();
628 629 630 631 632 633
}


void StorageRabbitMQ::shutdown()
{
    stream_cancelled = true;
A
alesapin 已提交
634
    wait_confirm = false;
635

K
kssenii 已提交
636 637
    deactivateTask(streaming_task, true, false);
    deactivateTask(looping_task, true, true);
K
kssenii 已提交
638

K
Better  
kssenii 已提交
639 640 641
    connection->close();

    size_t cnt_retries = 0;
K
kssenii 已提交
642
    while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
K
Better  
kssenii 已提交
643
        event_handler->iterateLoop();
644

K
kssenii 已提交
645
    /// Should actually force closure, if not yet closed, but it generates distracting error logs
K
Better  
kssenii 已提交
646 647 648 649 650
    //if (!connection->closed())
    //    connection->close(true);

    for (size_t i = 0; i < num_created_consumers; ++i)
        popReadBuffer();
651 652 653 654 655
}


void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
{
A
alesapin 已提交
656
    std::lock_guard lock(buffers_mutex);
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
    buffers.push_back(buffer);
    semaphore.set();
}


ConsumerBufferPtr StorageRabbitMQ::popReadBuffer()
{
    return popReadBuffer(std::chrono::milliseconds::zero());
}


ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout)
{
    // Wait for the first free buffer
    if (timeout == std::chrono::milliseconds::zero())
        semaphore.wait();
    else
    {
        if (!semaphore.tryWait(timeout.count()))
            return nullptr;
    }

    // Take the first available buffer from the list
A
alesapin 已提交
680
    std::lock_guard lock(buffers_mutex);
681 682 683 684 685 686 687 688 689
    auto buffer = buffers.back();
    buffers.pop_back();

    return buffer;
}


ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
{
690
    ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
K
kssenii 已提交
691

692
    return std::make_shared<ReadBufferFromRabbitMQConsumer>(
K
kssenii 已提交
693 694
        consumer_channel, event_handler, queues, ++consumer_id,
        unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
695 696 697
}


K
kssenii 已提交
698 699
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
700
    return std::make_shared<WriteBufferToRabbitMQProducer>(
701
        parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type,
K
kssenii 已提交
702
        producer_id.fetch_add(1), persistent, wait_confirm, log,
703
        row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
K
kssenii 已提交
704 705 706
}


707 708 709 710 711 712 713 714 715 716
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
{
    // Check if all dependencies are attached
    auto dependencies = DatabaseCatalog::instance().getDependencies(table_id);
    if (dependencies.empty())
        return true;

    // Check the dependencies are ready?
    for (const auto & db_tab : dependencies)
    {
717
        auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context);
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
        if (!table)
            return false;

        // If it materialized view, check it's target table
        auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
        if (materialized_view && !materialized_view->tryGetTargetTable())
            return false;

        // Check all its dependencies
        if (!checkDependencies(db_tab))
            return false;
    }

    return true;
}


A
alesapin 已提交
735
void StorageRabbitMQ::streamingToViewsFunc()
736 737 738 739
{
    try
    {
        auto table_id = getStorageID();
K
kssenii 已提交
740

741 742 743 744 745
        // Check if at least one direct dependency is attached
        size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();

        if (dependencies_count)
        {
K
kssenii 已提交
746 747
            auto start_time = std::chrono::steady_clock::now();

748 749 750 751 752 753
            // Keep streaming as long as there are attached views and streaming is not cancelled
            while (!stream_cancelled && num_created_consumers > 0)
            {
                if (!checkDependencies(table_id))
                    break;

K
kssenii 已提交
754
                LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
755

K
kssenii 已提交
756
                if (streamToViews())
757
                    break;
K
kssenii 已提交
758 759 760 761 762 763 764

                auto end_time = std::chrono::steady_clock::now();
                auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
                if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
                {
                    event_handler->updateLoopState(Loop::STOP);
                    LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
765
                    break;
K
kssenii 已提交
766
                }
767 768 769 770 771 772 773 774 775 776
            }
        }
    }
    catch (...)
    {
        tryLogCurrentException(__PRETTY_FUNCTION__);
    }

    /// Wait for attached views
    if (!stream_cancelled)
K
kssenii 已提交
777
        streaming_task->scheduleAfter(RESCHEDULE_MS);
778 779 780 781 782 783
}


bool StorageRabbitMQ::streamToViews()
{
    auto table_id = getStorageID();
784
    auto table = DatabaseCatalog::instance().getTable(table_id, global_context);
785 786 787 788 789 790 791
    if (!table)
        throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);

    // Create an INSERT query for streaming data
    auto insert = std::make_shared<ASTInsertQuery>();
    insert->table_id = table_id;

K
kssenii 已提交
792
    // Only insert into dependent views and expect that input blocks contain virtual columns
K
kssenii 已提交
793
    InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
794 795
    auto block_io = interpreter.execute();

K
kssenii 已提交
796 797 798 799
    auto metadata_snapshot = getInMemoryMetadataPtr();
    auto column_names = block_io.out->getHeader().getNames();
    auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());

K
kssenii 已提交
800 801
    auto block_size = getMaxBlockSize();

802 803 804 805 806 807
    // Create a stream for each consumer and join them in a union stream
    BlockInputStreams streams;
    streams.reserve(num_created_consumers);

    for (size_t i = 0; i < num_created_consumers; ++i)
    {
K
kssenii 已提交
808 809
        auto stream = std::make_shared<RabbitMQBlockInputStream>(
                *this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
K
kssenii 已提交
810
        streams.emplace_back(stream);
811 812

        // Limit read batch to maximum block size to allow DDL
N
Nikolai Kochetov 已提交
813
        StreamLocalLimits limits;
K
kssenii 已提交
814

K
kssenii 已提交
815 816 817 818
        limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
                                                  ? rabbitmq_settings->rabbitmq_flush_interval_ms
                                                  : global_context.getSettingsRef().stream_flush_interval_ms;

819
        limits.timeout_overflow_mode = OverflowMode::BREAK;
K
kssenii 已提交
820 821

        stream->setLimits(limits);
822 823 824 825 826 827 828 829 830 831
    }

    // Join multiple streams if necessary
    BlockInputStreamPtr in;
    if (streams.size() > 1)
        in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
    else
        in = streams[0];

    std::atomic<bool> stub = {false};
K
kssenii 已提交
832 833 834 835 836 837 838

    if (!event_handler->loopRunning())
    {
        event_handler->updateLoopState(Loop::RUN);
        looping_task->activateAndSchedule();
    }

839 840
    copyData(*in, *block_io.out, &stub);

K
kssenii 已提交
841 842
    /* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
     * error occurs or connection is lost while ack is being sent
K
kssenii 已提交
843
     */
K
kssenii 已提交
844 845
    deactivateTask(looping_task, false, true);
    size_t queue_empty = 0;
K
kssenii 已提交
846 847 848

    if (!event_handler->connectionRunning())
    {
K
kssenii 已提交
849 850 851 852
        if (stream_cancelled)
            return true;

        if (restoreConnection(true))
K
kssenii 已提交
853 854 855 856 857 858
        {
            for (auto & stream : streams)
                stream->as<RabbitMQBlockInputStream>()->updateChannel();
        }
        else
        {
K
kssenii 已提交
859 860
            LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
            return true;
K
kssenii 已提交
861 862 863 864 865 866 867
        }
    }
    else
    {
        /// Commit
        for (auto & stream : streams)
        {
K
kssenii 已提交
868 869 870
            if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
                ++queue_empty;

K
kssenii 已提交
871 872 873 874 875 876 877 878 879 880 881 882
            /* false is returned by the sendAck function in only two cases:
             * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
             *    delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
             *    no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
             *    waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
             *    consumers. So in this case duplicates are inevitable.
             * 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
             *    common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
             *    Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
             *    the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
             *    will ever happen.
             */
K
kssenii 已提交
883 884
            if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
            {
K
Fixes  
kssenii 已提交
885 886 887 888
                /// Iterate loop to activate error callbacks if they happened
                event_handler->iterateLoop();

                if (event_handler->connectionRunning())
K
kssenii 已提交
889 890 891 892 893 894 895
                {
                    /* Almost any error with channel will lead to connection closure, but if so happens that channel errored and
                     * connection is not closed - also need to restore channels
                     */
                    if (!stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
                        stream->as<RabbitMQBlockInputStream>()->updateChannel();
                }
K
kssenii 已提交
896
                else
K
kssenii 已提交
897
                {
K
kssenii 已提交
898
                    break;
K
kssenii 已提交
899
                }
K
kssenii 已提交
900
            }
K
kssenii 已提交
901 902

            event_handler->iterateLoop();
K
kssenii 已提交
903 904 905
        }
    }

K
kssenii 已提交
906
    if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
K
kssenii 已提交
907 908 909 910 911 912 913 914 915 916 917
    {
        connection->heartbeat();
        read_attempts = 0;
        LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
        return true;
    }
    else
    {
        event_handler->updateLoopState(Loop::RUN);
        looping_task->activateAndSchedule();
    }
918

K
kssenii 已提交
919
    return false;
K
kssenii 已提交
920 921 922 923 924 925 926 927 928 929 930
}


void registerStorageRabbitMQ(StorageFactory & factory)
{
    auto creator_fn = [](const StorageFactory::Arguments & args)
    {
        ASTs & engine_args = args.engine_args;
        size_t args_count = engine_args.size();
        bool has_settings = args.storage_def->settings;

K
kssenii 已提交
931
        auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
K
kssenii 已提交
932
        if (has_settings)
K
kssenii 已提交
933
            rabbitmq_settings->loadFromQuery(*args.storage_def);
K
kssenii 已提交
934

K
kssenii 已提交
935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
        // Check arguments and settings
        #define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME)                                           \
            /* One of the three required arguments is not specified */                                       \
            if (args_count < (ARG_NUM) && (ARG_NUM) <= 3 && !rabbitmq_settings->ARG_NAME.changed)            \
            {                                                                                                \
                throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified",     \
                    ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);                                           \
            }                                                                                                \
            if (args_count >= (ARG_NUM))                                                                     \
            {                                                                                                \
                if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */      \
                {                                                                                            \
                    throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ "                        \
                        "and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS);       \
                }                                                                                            \
K
kssenii 已提交
950 951
            }

K
kssenii 已提交
952 953 954
        CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_exchange_name)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_format)
K
kssenii 已提交
955

K
kssenii 已提交
956 957 958 959 960 961 962 963 964
        CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
K
kssenii 已提交
965

K
kssenii 已提交
966 967 968
        CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
K
kssenii 已提交
969

K
kssenii 已提交
970
        #undef CHECK_RABBITMQ_STORAGE_ARGUMENT
K
kssenii 已提交
971

K
kssenii 已提交
972
        return StorageRabbitMQ::create(args.table_id, args.context, args.columns, std::move(rabbitmq_settings));
K
kssenii 已提交
973 974 975 976 977 978 979 980 981
    };

    factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}


NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{
    return NamesAndTypesList{
K
kssenii 已提交
982
            {"_exchange_name", std::make_shared<DataTypeString>()},
K
Better  
kssenii 已提交
983
            {"_channel_id", std::make_shared<DataTypeString>()},
K
kssenii 已提交
984
            {"_delivery_tag", std::make_shared<DataTypeUInt64>()},
985
            {"_redelivered", std::make_shared<DataTypeUInt8>()},
K
kssenii 已提交
986 987
            {"_message_id", std::make_shared<DataTypeString>()},
            {"_timestamp", std::make_shared<DataTypeUInt64>()}
K
kssenii 已提交
988 989 990 991
    };
}

}