StorageRabbitMQ.cpp 39.1 KB
Newer Older
1
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
K
kssenii 已提交
2
#include <DataStreams/IBlockInputStream.h>
A
alesapin 已提交
3
#include <DataStreams/ConvertingBlockInputStream.h>
K
kssenii 已提交
4 5 6 7 8 9 10 11
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
12
#include <Interpreters/Context.h>
K
kssenii 已提交
13 14 15 16
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
17
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
K
kssenii 已提交
18 19
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
20
#include <Storages/RabbitMQ/RabbitMQHandler.h>
K
kssenii 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/parseAddress.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <amqpcpp.h>

K
kssenii 已提交
37 38
namespace DB
{
39

40
static const auto CONNECT_SLEEP = 200;
41
static const auto RETRIES_MAX = 20;
K
Fixes  
kssenii 已提交
42
static const uint32_t QUEUE_SIZE = 100000;
K
kssenii 已提交
43
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
K
kssenii 已提交
44
static const auto RESCHEDULE_MS = 500;
K
kssenii 已提交
45
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
46

K
kssenii 已提交
47 48 49 50
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
    extern const int BAD_ARGUMENTS;
K
kssenii 已提交
51
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
K
Better  
kssenii 已提交
52
    extern const int CANNOT_CONNECT_RABBITMQ;
A
alesapin 已提交
53 54
    extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
    extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
K
kssenii 已提交
55
    extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
K
kssenii 已提交
56
    extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
K
kssenii 已提交
57 58
}

59 60 61 62 63 64 65 66 67 68
namespace ExchangeType
{
    /// Note that default here means default by implementation and not by rabbitmq settings
    static const String DEFAULT = "default";
    static const String FANOUT = "fanout";
    static const String DIRECT = "direct";
    static const String TOPIC = "topic";
    static const String HASH = "consistent_hash";
    static const String HEADERS = "headers";
}
69

K
kssenii 已提交
70

K
kssenii 已提交
71 72
StorageRabbitMQ::StorageRabbitMQ(
        const StorageID & table_id_,
A
Amos Bird 已提交
73
        const Context & context_,
K
kssenii 已提交
74
        const ColumnsDescription & columns_,
K
kssenii 已提交
75
        std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
K
kssenii 已提交
76 77
        : IStorage(table_id_)
        , global_context(context_.getGlobalContext())
K
kssenii 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90
        , rabbitmq_settings(std::move(rabbitmq_settings_))
        , exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
        , format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
        , exchange_type(defineExchangeType(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type.value)))
        , routing_keys(parseRoutingKeys(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list.value)))
        , row_delimiter(rabbitmq_settings->rabbitmq_row_delimiter.value)
        , schema_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_schema.value))
        , num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
        , num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
        , queue_base(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base.value))
        , deadletter_exchange(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_deadletter_exchange.value))
        , persistent(rabbitmq_settings->rabbitmq_persistent.value)
        , hash_exchange(num_consumers > 1 || num_queues > 1)
K
kssenii 已提交
91
        , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
K
Fixes  
kssenii 已提交
92 93
        , address(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
        , parsed_address(parseAddress(address, 5672))
K
Fixes  
kssenii 已提交
94
        , login_password(std::make_pair(
A
alesapin 已提交
95 96
                    global_context.getConfigRef().getString("rabbitmq.username"),
                    global_context.getConfigRef().getString("rabbitmq.password")))
K
kssenii 已提交
97
        , vhost(global_context.getConfigRef().getString("rabbitmq.vhost", "/"))
K
kssenii 已提交
98 99
        , semaphore(0, num_consumers)
        , unique_strbase(getRandomName())
K
Fixes  
kssenii 已提交
100
        , queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
K
kssenii 已提交
101
{
A
alesapin 已提交
102 103
    loop = std::make_unique<uv_loop_t>();
    uv_loop_init(loop.get());
104
    event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
K
kssenii 已提交
105
    restoreConnection(false);
106

A
alesapin 已提交
107 108 109
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(columns_);
    setInMemoryMetadata(storage_metadata);
110

A
alesapin 已提交
111
    rabbitmq_context = addSettings(global_context);
A
alesapin 已提交
112
    rabbitmq_context->makeQueryContext();
K
kssenii 已提交
113

K
kssenii 已提交
114 115
    /// One looping task for all consumers as they share the same connection == the same handler == the same event loop
    event_handler->updateLoopState(Loop::STOP);
116
    looping_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
K
kssenii 已提交
117 118
    looping_task->deactivate();

119
    streaming_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
120
    streaming_task->deactivate();
K
kssenii 已提交
121

122
    connection_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
K
kssenii 已提交
123 124
    connection_task->deactivate();

125 126
    if (queue_base.empty())
    {
K
Better  
kssenii 已提交
127
        /* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
K
kssenii 已提交
128
         * be table-based and not just a random string, because local exchanges should be declared the same for same tables
K
Better  
kssenii 已提交
129
         */
K
kssenii 已提交
130
        sharding_exchange = getTableBasedName(exchange_name, table_id_);
131 132 133 134

        /* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
         * to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
         * table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
K
kssenii 已提交
135
         * for the names of later declared queues
136
         */
K
kssenii 已提交
137
        queue_base = getTableBasedName("", table_id_);
138 139 140 141
    }
    else
    {
        /* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
K
kssenii 已提交
142
         * at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
K
kssenii 已提交
143
         * sharding exchange and bridge exchange
144
         */
K
kssenii 已提交
145
        sharding_exchange = exchange_name + "_" + queue_base;
146 147 148
    }

    bridge_exchange = sharding_exchange + "_bridge";
K
kssenii 已提交
149
}
150

K
kssenii 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

Names StorageRabbitMQ::parseRoutingKeys(String routing_key_list)
{
    Names result;
    boost::split(result, routing_key_list, [](char c){ return c == ','; });
    for (String & key : result)
        boost::trim(key);

    return result;
}


AMQP::ExchangeType StorageRabbitMQ::defineExchangeType(String exchange_type_)
{
    AMQP::ExchangeType type;
    if (exchange_type_ != ExchangeType::DEFAULT)
    {
        if (exchange_type_ == ExchangeType::FANOUT)              type = AMQP::ExchangeType::fanout;
        else if (exchange_type_ == ExchangeType::DIRECT)         type = AMQP::ExchangeType::direct;
        else if (exchange_type_ == ExchangeType::TOPIC)          type = AMQP::ExchangeType::topic;
        else if (exchange_type_ == ExchangeType::HASH)           type = AMQP::ExchangeType::consistent_hash;
        else if (exchange_type_ == ExchangeType::HEADERS)        type = AMQP::ExchangeType::headers;
        else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
    }
    else
    {
        type = AMQP::ExchangeType::fanout;
    }

    return type;
}


String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_id)
{
    if (name.empty())
A
Alexander Tokmakov 已提交
187
        return fmt::format("{}_{}", table_id.database_name, table_id.table_name);
K
kssenii 已提交
188
    else
A
Alexander Tokmakov 已提交
189
        return fmt::format("{}_{}_{}", name, table_id.database_name, table_id.table_name);
190 191 192
}


A
alesapin 已提交
193
std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) const
K
kssenii 已提交
194
{
A
alesapin 已提交
195 196 197 198
    auto modified_context = std::make_shared<Context>(context);
    modified_context->setSetting("input_format_skip_unknown_fields", true);
    modified_context->setSetting("input_format_allow_errors_ratio", 0.);
    modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
K
kssenii 已提交
199 200

    if (!schema_name.empty())
A
alesapin 已提交
201
        modified_context->setSetting("format_schema", schema_name);
K
kssenii 已提交
202

K
kssenii 已提交
203 204 205 206 207 208 209 210 211
    for (const auto & setting : *rabbitmq_settings)
    {
        const auto & setting_name = setting.getName();

        /// check for non-rabbitmq-related settings
        if (!setting_name.starts_with("rabbitmq_"))
            modified_context->setSetting(setting_name, setting.getValue());
    }

A
alesapin 已提交
212
    return modified_context;
K
kssenii 已提交
213 214 215
}


216 217
void StorageRabbitMQ::loopingFunc()
{
218 219
    if (event_handler->connectionRunning())
        event_handler->startLoop();
220 221 222
}


K
kssenii 已提交
223 224 225 226 227 228 229 230 231
void StorageRabbitMQ::connectionFunc()
{
    if (restoreConnection(true))
        initRabbitMQ();
    else
        connection_task->scheduleAfter(RESCHEDULE_MS);
}


K
kssenii 已提交
232 233 234 235 236
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
 * inside streaming task try to deactivate any other task
 */
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
K
kssenii 已提交
237 238 239
    if (stop_loop)
        event_handler->updateLoopState(Loop::STOP);

A
alesapin 已提交
240 241
    std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
    if (lock.try_lock())
K
kssenii 已提交
242 243
    {
        task->deactivate();
A
alesapin 已提交
244
        lock.unlock();
K
kssenii 已提交
245
    }
K
Fixes  
kssenii 已提交
246
    else if (wait) /// Wait only if deactivating from shutdown
K
kssenii 已提交
247
    {
A
alesapin 已提交
248
        lock.lock();
K
kssenii 已提交
249 250 251 252 253
        task->deactivate();
    }
}


A
alesapin 已提交
254
size_t StorageRabbitMQ::getMaxBlockSize() const
K
kssenii 已提交
255 256 257 258 259 260 261
 {
     return rabbitmq_settings->rabbitmq_max_block_size.changed
         ? rabbitmq_settings->rabbitmq_max_block_size.value
         : (global_context.getSettingsRef().max_insert_block_size.value / num_consumers);
 }


K
kssenii 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
void StorageRabbitMQ::initRabbitMQ()
{
    setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());

    initExchange();
    bindExchange();

    for (const auto i : ext::range(0, num_queues))
        bindQueue(i + 1);

    LOG_TRACE(log, "RabbitMQ setup completed");

    rabbit_is_ready = true;
    setup_channel->close();
}


279 280
void StorageRabbitMQ::initExchange()
{
K
Better  
kssenii 已提交
281
    /* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
K
kssenii 已提交
282
     * -> sharding exchange (only if needed) -> queues
283 284 285 286
     */
    setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
    .onError([&](const char * message)
    {
K
kssenii 已提交
287 288 289 290 291 292 293 294
        /* This error can be a result of attempt to declare exchange if it was already declared but
         * 1) with different exchange type. In this case can
         * - manually delete previously declared exchange and create a new one.
         * - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself
         *   if it is not needed anymore or use another exchange name.
         * 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and
         * specified its own settings, which differ from this implementation.
         */
A
alesapin 已提交
295 296
        throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
                + std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
297 298
    });

K
kssenii 已提交
299
    /// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
300 301 302
    setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
    .onError([&](const char * message)
    {
K
kssenii 已提交
303
        /// This error is not supposed to happen as this exchange name is always unique to type and its settings
A
alesapin 已提交
304 305
        throw Exception(
            ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
306 307 308 309 310 311 312 313
    });

    if (!hash_exchange)
    {
        consumer_exchange = bridge_exchange;
        return;
    }

K
Better  
kssenii 已提交
314
    /* Change hash property because by default it will be routing key, which has to be an integer, but with support for any exchange
K
kssenii 已提交
315
     * type - routing keys might be of any type
K
Better  
kssenii 已提交
316
     */
317 318 319
    AMQP::Table binding_arguments;
    binding_arguments["hash-property"] = "message_id";

K
Better  
kssenii 已提交
320
    /// Declare exchange for sharding.
321
    setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
322 323
    .onError([&](const char * message)
    {
K
kssenii 已提交
324 325 326 327
        /* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
         * to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
         * is bad.
         */
A
alesapin 已提交
328
        throw Exception(
K
kssenii 已提交
329 330
           ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
           "Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
331 332
    });

333
    setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
334 335
    .onError([&](const char * message)
    {
A
alesapin 已提交
336 337 338 339 340 341
        throw Exception(
            ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
            "Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
            bridge_exchange,
            sharding_exchange,
            std::string(message));
342 343
    });

344
    consumer_exchange = sharding_exchange;
345 346 347 348 349 350
}


void StorageRabbitMQ::bindExchange()
{
    std::atomic<bool> binding_created = false;
K
kssenii 已提交
351
    size_t bound_keys = 0;
352 353 354

    if (exchange_type == AMQP::ExchangeType::headers)
    {
355 356 357 358 359 360 361 362
        AMQP::Table bind_headers;
        for (const auto & header : routing_keys)
        {
            std::vector<String> matching;
            boost::split(matching, header, [](char c){ return c == '='; });
            bind_headers[matching[0]] = matching[1];
        }

363
        setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
A
alesapin 已提交
364
        .onSuccess([&]() { binding_created = true; })
365 366
        .onError([&](const char * message)
        {
A
alesapin 已提交
367 368 369
            throw Exception(
                ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
K
kssenii 已提交
370
                exchange_name, bridge_exchange, std::string(message));
371 372 373 374 375
        });
    }
    else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
    {
        setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0])
A
alesapin 已提交
376
        .onSuccess([&]() { binding_created = true; })
377 378
        .onError([&](const char * message)
        {
A
alesapin 已提交
379 380 381
            throw Exception(
                ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
K
kssenii 已提交
382
                exchange_name, bridge_exchange, std::string(message));
383 384 385 386 387 388 389 390 391
        });
    }
    else
    {
        for (const auto & routing_key : routing_keys)
        {
            setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
            .onSuccess([&]()
            {
K
kssenii 已提交
392 393 394
                ++bound_keys;
                if (bound_keys == routing_keys.size())
                    binding_created = true;
395 396 397
            })
            .onError([&](const char * message)
            {
A
alesapin 已提交
398 399 400
                throw Exception(
                    ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
                    "Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
K
kssenii 已提交
401
                    exchange_name, bridge_exchange, std::string(message));
402 403 404 405 406 407 408 409 410 411 412
            });
        }
    }

    while (!binding_created)
    {
        event_handler->iterateLoop();
    }
}


K
kssenii 已提交
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
void StorageRabbitMQ::bindQueue(size_t queue_id)
{
    std::atomic<bool> binding_created = false;

    auto success_callback = [&](const std::string &  queue_name, int msgcount, int /* consumercount */)
    {
        queues.emplace_back(queue_name);
        LOG_DEBUG(log, "Queue {} is declared", queue_name);

        if (msgcount)
            LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);

       /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
        * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
        * fanout exchange it can be arbitrary
        */
        setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
        .onSuccess([&] { binding_created = true; })
        .onError([&](const char * message)
        {
            throw Exception(
                ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
                "Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
        });
    };

    auto error_callback([&](const char * message)
    {
        /* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
         * given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
         * max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
         * declared queues via any of the various cli tools.
         */
        throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
                specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
                which  were declared with the same names. ERROR reason: "
                + std::string(message), ErrorCodes::BAD_ARGUMENTS);
    });

    AMQP::Table queue_settings;

    queue_settings["x-max-length"] = queue_size;

    if (!deadletter_exchange.empty())
        queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
    else
        queue_settings["x-overflow"] = "reject-publish";

    /* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
     * specific queue when its name is specified in queue_base setting
     */
    const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
    setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);

    while (!binding_created)
    {
        event_handler->iterateLoop();
    }
}


K
kssenii 已提交
474 475 476 477 478 479 480 481 482
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
    size_t cnt_retries = 0;

    if (reconnecting)
    {
        connection->close(); /// Connection might be unusable, but not closed

        /* Connection is not closed immediately (firstly, all pending operations are completed, and then
483
         * an AMQP closing-handshake is  performed). But cannot open a new connection until previous one is properly closed
K
kssenii 已提交
484 485 486 487 488 489 490 491
         */
        while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
            event_handler->iterateLoop();

        /// This will force immediate closure if not yet closed
        if (!connection->closed())
            connection->close(true);

K
Fixes  
kssenii 已提交
492
        LOG_TRACE(log, "Trying to restore connection to " + address);
K
kssenii 已提交
493 494
    }

A
alesapin 已提交
495
    connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
K
kssenii 已提交
496 497 498
            AMQP::Address(
                parsed_address.first, parsed_address.second,
                AMQP::Login(login_password.first, login_password.second), vhost));
K
kssenii 已提交
499 500

    cnt_retries = 0;
K
kssenii 已提交
501
    while (!connection->ready() && !stream_cancelled && ++cnt_retries != RETRIES_MAX)
K
kssenii 已提交
502 503 504 505 506 507 508 509 510
    {
        event_handler->iterateLoop();
        std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
    }

    return event_handler->connectionRunning();
}


K
Better  
kssenii 已提交
511
bool StorageRabbitMQ::updateChannel(ChannelPtr & channel)
K
kssenii 已提交
512
{
K
kssenii 已提交
513
    if (event_handler->connectionRunning())
K
Better  
kssenii 已提交
514
    {
K
kssenii 已提交
515
        channel = std::make_shared<AMQP::TcpChannel>(connection.get());
K
Better  
kssenii 已提交
516 517 518 519 520
        return true;
    }

    channel = nullptr;
    return false;
K
kssenii 已提交
521 522 523
}


524 525
void StorageRabbitMQ::unbindExchange()
{
K
kssenii 已提交
526 527 528
    /* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
     * on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
     * As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
K
Better  
kssenii 已提交
529 530 531
     * consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
     * input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
     * externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
K
kssenii 已提交
532
     * queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
K
kssenii 已提交
533 534
     * bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
     * not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
K
Better  
kssenii 已提交
535
     */
K
kssenii 已提交
536
    std::call_once(flag, [&]()
537
    {
K
kssenii 已提交
538
        streaming_task->deactivate();
K
kssenii 已提交
539 540 541
        event_handler->updateLoopState(Loop::STOP);
        looping_task->deactivate();

K
kssenii 已提交
542
        setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
543 544 545 546 547 548 549
        setup_channel->removeExchange(bridge_exchange)
        .onSuccess([&]()
        {
            exchange_removed.store(true);
        })
        .onError([&](const char * message)
        {
K
kssenii 已提交
550
            throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE);
551 552
        });

K
kssenii 已提交
553
        while (!exchange_removed.load())
554 555 556
        {
            event_handler->iterateLoop();
        }
K
kssenii 已提交
557 558

        setup_channel->close();
K
kssenii 已提交
559
    });
560 561 562
}


N
Nikolai Kochetov 已提交
563
Pipe StorageRabbitMQ::read(
564
        const Names & column_names,
A
alesapin 已提交
565
        const StorageMetadataPtr & metadata_snapshot,
566
        SelectQueryInfo & /* query_info */,
567 568 569 570 571
        const Context & context,
        QueryProcessingStage::Enum /* processed_stage */,
        size_t /* max_block_size */,
        unsigned /* num_streams */)
{
K
Better  
kssenii 已提交
572 573 574
    if (!rabbit_is_ready)
        throw Exception("RabbitMQ setup not finished. Connection might be lost", ErrorCodes::CANNOT_CONNECT_RABBITMQ);

575 576 577
    if (num_created_consumers == 0)
        return {};

K
kssenii 已提交
578
    auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
K
kssenii 已提交
579 580
    auto modified_context = addSettings(context);
    auto block_size = getMaxBlockSize();
K
kssenii 已提交
581

K
Fixes  
kssenii 已提交
582
    if (!event_handler->connectionRunning())
K
kssenii 已提交
583 584
    {
        if (event_handler->loopRunning())
K
kssenii 已提交
585
            deactivateTask(looping_task, false, true);
K
Better  
kssenii 已提交
586
        restoreConnection(true);
K
kssenii 已提交
587 588
    }

589 590 591 592 593
    Pipes pipes;
    pipes.reserve(num_created_consumers);

    for (size_t i = 0; i < num_created_consumers; ++i)
    {
K
kssenii 已提交
594 595
        auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
                *this, metadata_snapshot, modified_context, column_names, block_size);
K
kssenii 已提交
596

A
alesapin 已提交
597 598 599
        auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
            rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
        pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
600 601
    }

K
kssenii 已提交
602
    if (!event_handler->loopRunning() && event_handler->connectionRunning())
603
        looping_task->activateAndSchedule();
604

605
    LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
A
alesapin 已提交
606 607 608
    auto united_pipe = Pipe::unitePipes(std::move(pipes));
    united_pipe.addInterpreterContext(modified_context);
    return united_pipe;
609 610 611
}


A
alesapin 已提交
612
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
K
kssenii 已提交
613
{
A
alesapin 已提交
614
    return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, context);
K
kssenii 已提交
615 616 617
}


618 619
void StorageRabbitMQ::startup()
{
K
kssenii 已提交
620 621 622 623
    if (event_handler->connectionRunning())
        initRabbitMQ();
    else
        connection_task->activateAndSchedule();
K
kssenii 已提交
624

625 626 627 628 629 630 631
    for (size_t i = 0; i < num_consumers; ++i)
    {
        try
        {
            pushReadBuffer(createReadBuffer());
            ++num_created_consumers;
        }
K
kssenii 已提交
632
        catch (const AMQP::Exception & e)
633
        {
A
alesapin 已提交
634
            LOG_ERROR(log, "Got AMQ exception {}", e.what());
K
kssenii 已提交
635
            throw;
636 637 638
        }
    }

K
kssenii 已提交
639
    event_handler->updateLoopState(Loop::RUN);
640
    streaming_task->activateAndSchedule();
641 642 643 644 645 646
}


void StorageRabbitMQ::shutdown()
{
    stream_cancelled = true;
A
alesapin 已提交
647
    wait_confirm = false;
648

K
kssenii 已提交
649 650
    deactivateTask(streaming_task, true, false);
    deactivateTask(looping_task, true, true);
K
kssenii 已提交
651
    deactivateTask(connection_task, true, false);
K
kssenii 已提交
652

K
Better  
kssenii 已提交
653 654 655
    connection->close();

    size_t cnt_retries = 0;
K
kssenii 已提交
656
    while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
K
Better  
kssenii 已提交
657
        event_handler->iterateLoop();
658

K
kssenii 已提交
659
    /// Should actually force closure, if not yet closed, but it generates distracting error logs
K
Better  
kssenii 已提交
660 661 662 663 664
    //if (!connection->closed())
    //    connection->close(true);

    for (size_t i = 0; i < num_created_consumers; ++i)
        popReadBuffer();
665 666 667 668 669
}


void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
{
A
alesapin 已提交
670
    std::lock_guard lock(buffers_mutex);
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
    buffers.push_back(buffer);
    semaphore.set();
}


ConsumerBufferPtr StorageRabbitMQ::popReadBuffer()
{
    return popReadBuffer(std::chrono::milliseconds::zero());
}


ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout)
{
    // Wait for the first free buffer
    if (timeout == std::chrono::milliseconds::zero())
        semaphore.wait();
    else
    {
        if (!semaphore.tryWait(timeout.count()))
            return nullptr;
    }

    // Take the first available buffer from the list
A
alesapin 已提交
694
    std::lock_guard lock(buffers_mutex);
695 696 697 698 699 700 701 702 703
    auto buffer = buffers.back();
    buffers.pop_back();

    return buffer;
}


ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
{
K
kssenii 已提交
704 705 706
    ChannelPtr consumer_channel;
    if (event_handler->connectionRunning())
        consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
K
kssenii 已提交
707

708
    return std::make_shared<ReadBufferFromRabbitMQConsumer>(
K
kssenii 已提交
709 710
        consumer_channel, event_handler, queues, ++consumer_id,
        unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
711 712 713
}


K
kssenii 已提交
714 715
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
716
    return std::make_shared<WriteBufferToRabbitMQProducer>(
K
kssenii 已提交
717
        parsed_address, global_context, login_password, vhost, routing_keys, exchange_name, exchange_type,
K
kssenii 已提交
718
        producer_id.fetch_add(1), persistent, wait_confirm, log,
719
        row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
K
kssenii 已提交
720 721 722
}


723 724 725 726 727 728 729 730 731 732
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
{
    // Check if all dependencies are attached
    auto dependencies = DatabaseCatalog::instance().getDependencies(table_id);
    if (dependencies.empty())
        return true;

    // Check the dependencies are ready?
    for (const auto & db_tab : dependencies)
    {
733
        auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context);
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
        if (!table)
            return false;

        // If it materialized view, check it's target table
        auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
        if (materialized_view && !materialized_view->tryGetTargetTable())
            return false;

        // Check all its dependencies
        if (!checkDependencies(db_tab))
            return false;
    }

    return true;
}


A
alesapin 已提交
751
void StorageRabbitMQ::streamingToViewsFunc()
752
{
K
Better  
kssenii 已提交
753
    if (rabbit_is_ready && (event_handler->connectionRunning() || restoreConnection(true)))
754
    {
K
kssenii 已提交
755
        try
756
        {
K
kssenii 已提交
757
            auto table_id = getStorageID();
758

K
kssenii 已提交
759 760
            // Check if at least one direct dependency is attached
            size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
761

K
kssenii 已提交
762 763 764
            if (dependencies_count)
            {
                auto start_time = std::chrono::steady_clock::now();
K
kssenii 已提交
765

K
kssenii 已提交
766 767
                // Keep streaming as long as there are attached views and streaming is not cancelled
                while (!stream_cancelled && num_created_consumers > 0)
K
kssenii 已提交
768
                {
K
kssenii 已提交
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
                    if (!checkDependencies(table_id))
                        break;

                    LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);

                    if (streamToViews())
                        break;

                    auto end_time = std::chrono::steady_clock::now();
                    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
                    if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
                    {
                        event_handler->updateLoopState(Loop::STOP);
                        LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
                        break;
                    }
K
kssenii 已提交
785
                }
786 787
            }
        }
K
kssenii 已提交
788 789 790 791
        catch (...)
        {
            tryLogCurrentException(__PRETTY_FUNCTION__);
        }
792 793 794 795
    }

    /// Wait for attached views
    if (!stream_cancelled)
K
kssenii 已提交
796
        streaming_task->scheduleAfter(RESCHEDULE_MS);
797 798 799 800 801 802
}


bool StorageRabbitMQ::streamToViews()
{
    auto table_id = getStorageID();
803
    auto table = DatabaseCatalog::instance().getTable(table_id, global_context);
804 805 806 807 808 809 810
    if (!table)
        throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);

    // Create an INSERT query for streaming data
    auto insert = std::make_shared<ASTInsertQuery>();
    insert->table_id = table_id;

K
kssenii 已提交
811
    // Only insert into dependent views and expect that input blocks contain virtual columns
A
alesapin 已提交
812
    InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
813 814
    auto block_io = interpreter.execute();

K
kssenii 已提交
815 816 817 818
    auto metadata_snapshot = getInMemoryMetadataPtr();
    auto column_names = block_io.out->getHeader().getNames();
    auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());

K
kssenii 已提交
819 820
    auto block_size = getMaxBlockSize();

821 822 823 824 825 826
    // Create a stream for each consumer and join them in a union stream
    BlockInputStreams streams;
    streams.reserve(num_created_consumers);

    for (size_t i = 0; i < num_created_consumers; ++i)
    {
K
kssenii 已提交
827 828
        auto stream = std::make_shared<RabbitMQBlockInputStream>(
                *this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
K
kssenii 已提交
829
        streams.emplace_back(stream);
830 831

        // Limit read batch to maximum block size to allow DDL
N
Nikolai Kochetov 已提交
832
        StreamLocalLimits limits;
K
kssenii 已提交
833

K
kssenii 已提交
834 835 836 837
        limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
                                                  ? rabbitmq_settings->rabbitmq_flush_interval_ms
                                                  : global_context.getSettingsRef().stream_flush_interval_ms;

838
        limits.timeout_overflow_mode = OverflowMode::BREAK;
K
kssenii 已提交
839 840

        stream->setLimits(limits);
841 842 843 844 845 846 847 848 849 850
    }

    // Join multiple streams if necessary
    BlockInputStreamPtr in;
    if (streams.size() > 1)
        in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
    else
        in = streams[0];

    std::atomic<bool> stub = {false};
K
kssenii 已提交
851 852 853 854 855 856 857

    if (!event_handler->loopRunning())
    {
        event_handler->updateLoopState(Loop::RUN);
        looping_task->activateAndSchedule();
    }

858 859
    copyData(*in, *block_io.out, &stub);

K
kssenii 已提交
860 861
    /* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
     * error occurs or connection is lost while ack is being sent
K
kssenii 已提交
862
     */
K
kssenii 已提交
863 864
    deactivateTask(looping_task, false, true);
    size_t queue_empty = 0;
K
kssenii 已提交
865 866 867

    if (!event_handler->connectionRunning())
    {
K
kssenii 已提交
868 869 870 871
        if (stream_cancelled)
            return true;

        if (restoreConnection(true))
K
kssenii 已提交
872 873 874 875 876 877
        {
            for (auto & stream : streams)
                stream->as<RabbitMQBlockInputStream>()->updateChannel();
        }
        else
        {
K
kssenii 已提交
878 879
            LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
            return true;
K
kssenii 已提交
880 881 882 883 884 885 886
        }
    }
    else
    {
        /// Commit
        for (auto & stream : streams)
        {
K
kssenii 已提交
887 888 889
            if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
                ++queue_empty;

K
kssenii 已提交
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
            if (stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
            {
                auto buffer = stream->as<RabbitMQBlockInputStream>()->getBuffer();
                if (buffer)
                {
                    if (buffer->queuesCount() != queues.size())
                        buffer->updateQueues(queues);

                    buffer->updateAckTracker();

                    if (updateChannel(buffer->getChannel()))
                        buffer->setupChannel();
                }
            }

K
kssenii 已提交
905 906 907 908 909 910 911 912 913 914 915 916
            /* false is returned by the sendAck function in only two cases:
             * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
             *    delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
             *    no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
             *    waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
             *    consumers. So in this case duplicates are inevitable.
             * 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
             *    common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
             *    Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
             *    the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
             *    will ever happen.
             */
K
kssenii 已提交
917 918
            if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
            {
K
Fixes  
kssenii 已提交
919 920
                /// Iterate loop to activate error callbacks if they happened
                event_handler->iterateLoop();
K
Better  
kssenii 已提交
921
                if (!event_handler->connectionRunning())
K
kssenii 已提交
922 923
                    break;
            }
K
kssenii 已提交
924 925

            event_handler->iterateLoop();
K
kssenii 已提交
926 927 928
        }
    }

K
kssenii 已提交
929
    if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
K
kssenii 已提交
930 931 932 933 934 935 936 937 938 939 940
    {
        connection->heartbeat();
        read_attempts = 0;
        LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
        return true;
    }
    else
    {
        event_handler->updateLoopState(Loop::RUN);
        looping_task->activateAndSchedule();
    }
941

K
kssenii 已提交
942
    return false;
K
kssenii 已提交
943 944 945 946 947 948 949 950 951 952 953
}


void registerStorageRabbitMQ(StorageFactory & factory)
{
    auto creator_fn = [](const StorageFactory::Arguments & args)
    {
        ASTs & engine_args = args.engine_args;
        size_t args_count = engine_args.size();
        bool has_settings = args.storage_def->settings;

K
kssenii 已提交
954
        auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
K
kssenii 已提交
955
        if (has_settings)
K
kssenii 已提交
956
            rabbitmq_settings->loadFromQuery(*args.storage_def);
K
kssenii 已提交
957

K
kssenii 已提交
958 959 960 961 962 963 964 965 966 967 968 969 970 971 972
        // Check arguments and settings
        #define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME)                                           \
            /* One of the three required arguments is not specified */                                       \
            if (args_count < (ARG_NUM) && (ARG_NUM) <= 3 && !rabbitmq_settings->ARG_NAME.changed)            \
            {                                                                                                \
                throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified",     \
                    ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);                                           \
            }                                                                                                \
            if (args_count >= (ARG_NUM))                                                                     \
            {                                                                                                \
                if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */      \
                {                                                                                            \
                    throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ "                        \
                        "and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS);       \
                }                                                                                            \
K
kssenii 已提交
973 974
            }

K
kssenii 已提交
975 976 977
        CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_exchange_name)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_format)
K
kssenii 已提交
978

K
kssenii 已提交
979 980 981 982 983 984 985 986 987
        CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
K
kssenii 已提交
988

K
kssenii 已提交
989 990 991
        CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
        CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
K
kssenii 已提交
992

K
kssenii 已提交
993
        #undef CHECK_RABBITMQ_STORAGE_ARGUMENT
K
kssenii 已提交
994

K
kssenii 已提交
995
        return StorageRabbitMQ::create(args.table_id, args.context, args.columns, std::move(rabbitmq_settings));
K
kssenii 已提交
996 997 998 999 1000 1001 1002 1003 1004
    };

    factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}


NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{
    return NamesAndTypesList{
K
kssenii 已提交
1005
            {"_exchange_name", std::make_shared<DataTypeString>()},
K
Better  
kssenii 已提交
1006
            {"_channel_id", std::make_shared<DataTypeString>()},
K
kssenii 已提交
1007
            {"_delivery_tag", std::make_shared<DataTypeUInt64>()},
1008
            {"_redelivered", std::make_shared<DataTypeUInt8>()},
K
kssenii 已提交
1009 1010
            {"_message_id", std::make_shared<DataTypeString>()},
            {"_timestamp", std::make_shared<DataTypeUInt64>()}
K
kssenii 已提交
1011 1012 1013 1014
    };
}

}