WriteBufferToRabbitMQProducer.cpp 5.6 KB
Newer Older
K
kssenii 已提交
1 2 3 4 5 6 7 8
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <common/logger_useful.h>
#include <amqpcpp.h>
#include <chrono>
#include <thread>
K
kssenii 已提交
9
#include <atomic>
K
kssenii 已提交
10 11 12 13 14 15 16 17


namespace DB
{

enum
{
    Connection_setup_sleep = 200,
K
kssenii 已提交
18 19
    Loop_retries_max = 1000,
    Loop_wait = 10,
K
kssenii 已提交
20
    Batch = 10000
K
kssenii 已提交
21 22 23
};

WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
K
Fixes  
kssenii 已提交
24 25
        std::pair<String, UInt16> & parsed_address,
        std::pair<String, String> & login_password_,
K
kssenii 已提交
26
        const String & routing_key_,
K
kssenii 已提交
27
        const String exchange_,
K
kssenii 已提交
28 29 30
        Poco::Logger * log_,
        const size_t num_queues_,
        const bool bind_by_id_,
K
kssenii 已提交
31
        const bool use_transactional_channel_,
K
kssenii 已提交
32 33 34 35
        std::optional<char> delimiter,
        size_t rows_per_message,
        size_t chunk_size_)
        : WriteBuffer(nullptr, 0)
K
Fixes  
kssenii 已提交
36
        , login_password(login_password_)
K
kssenii 已提交
37
        , routing_key(routing_key_)
K
kssenii 已提交
38
        , exchange_name(exchange_ + "_direct")
K
kssenii 已提交
39 40 41
        , log(log_)
        , num_queues(num_queues_)
        , bind_by_id(bind_by_id_)
K
kssenii 已提交
42
        , use_transactional_channel(use_transactional_channel_)
K
kssenii 已提交
43 44 45 46 47
        , delim(delimiter)
        , max_rows(rows_per_message)
        , chunk_size(chunk_size_)
        , producerEvbase(event_base_new())
        , eventHandler(producerEvbase, log)
K
Fixes  
kssenii 已提交
48 49
        , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second,
                    AMQP::Login(login_password.first, login_password.second), "/"))
K
kssenii 已提交
50 51
{
    /* The reason behind making a separate connection for each concurrent producer is explained here:
K
kssenii 已提交
52
     * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
K
kssenii 已提交
53
     * different threads (as outputStreams are asynchronous) with the same connection leads to internal library errors.
K
kssenii 已提交
54 55
     */
    size_t cnt_retries = 0;
K
kssenii 已提交
56
    while (!connection.ready() && ++cnt_retries != Loop_retries_max)
K
kssenii 已提交
57 58 59 60 61 62 63 64 65 66 67
    {
        event_base_loop(producerEvbase, EVLOOP_NONBLOCK | EVLOOP_ONCE);
        std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep));
    }

    if (!connection.ready())
    {
        LOG_ERROR(log, "Cannot set up connection for producer!");
    }

    producer_channel = std::make_shared<AMQP::TcpChannel>(&connection);
K
kssenii 已提交
68
    checkExchange();
K
kssenii 已提交
69 70 71 72 73 74

    /// If publishing should be wrapped in transactions
    if (use_transactional_channel)
    {
        producer_channel->startTransaction();
    }
K
kssenii 已提交
75 76 77 78 79
}


WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
K
kssenii 已提交
80
    finilize();
K
kssenii 已提交
81 82 83 84 85
    connection.close();
    assert(rows == 0 && chunks.empty());
}


K
kssenii 已提交
86
void WriteBufferToRabbitMQProducer::countRow()
K
kssenii 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
{
    if (++rows % max_rows == 0)
    {
        const std::string & last_chunk = chunks.back();
        size_t last_chunk_size = offset();

        if (delim && last_chunk[last_chunk_size - 1] == delim)
            --last_chunk_size;

        std::string payload;
        payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);

        for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
            payload.append(*i);

        payload.append(last_chunk, 0, last_chunk_size);

        rows = 0;
        chunks.clear();
        set(nullptr, 0);

K
kssenii 已提交
108 109
        next_queue = next_queue % num_queues + 1;

K
kssenii 已提交
110
        if (bind_by_id)
K
kssenii 已提交
111 112 113 114 115 116 117 118
        {
            producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
        }
        else
        {
            producer_channel->publish(exchange_name, routing_key, payload);
        }

K
kssenii 已提交
119 120
        ++message_counter;

K
kssenii 已提交
121 122
        /// run event loop to actually publish, checking exchange is just a point to stop the event loop
        if ((message_counter %= Batch) == 0)
K
kssenii 已提交
123
        {
K
kssenii 已提交
124
            checkExchange();
K
kssenii 已提交
125 126 127 128 129
        }
    }
}


K
kssenii 已提交
130
void WriteBufferToRabbitMQProducer::checkExchange()
K
kssenii 已提交
131 132 133
{
    std::atomic<bool> exchange_declared = false, exchange_error = false;

K
kssenii 已提交
134 135 136
    /* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name
     * and makes it visible from current producer_channel.
     */
K
kssenii 已提交
137
    producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive)
K
kssenii 已提交
138 139
    .onSuccess([&]()
    {
K
kssenii 已提交
140
        exchange_declared = true;
K
kssenii 已提交
141 142 143 144 145 146 147
    })
    .onError([&](const char * message)
    {
        exchange_error = true;
        LOG_ERROR(log, "Exchange was not declared: {}", message);
    });

K
kssenii 已提交
148
    /// These variables are updated in a separate thread and starting the loop blocks current thread
K
kssenii 已提交
149 150
    while (!exchange_declared && !exchange_error)
    {
K
kssenii 已提交
151
        startEventLoop();
K
kssenii 已提交
152 153 154 155
    }
}


K
kssenii 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
void WriteBufferToRabbitMQProducer::finilize()
{
    checkExchange();

    if (use_transactional_channel)
    {
        std::atomic<bool> answer_received = false;
        producer_channel->commitTransaction()
        .onSuccess([&]()
        {
            answer_received = true;
            LOG_TRACE(log, "All messages were successfully published");
        })
        .onError([&](const char * message)
        {
            answer_received = true;
            LOG_TRACE(log, "None of messages were publishd: {}", message);
            /// Probably should do something here
        });

        size_t count_retries = 0;
        while (!answer_received && ++count_retries != Loop_retries_max)
        {
            startEventLoop();
            std::this_thread::sleep_for(std::chrono::milliseconds(Loop_wait));
        }
    }
}


K
kssenii 已提交
186 187 188 189 190 191 192 193
void WriteBufferToRabbitMQProducer::nextImpl()
{
    chunks.push_back(std::string());
    chunks.back().resize(chunk_size);
    set(chunks.back().data(), chunk_size);
}


K
kssenii 已提交
194
void WriteBufferToRabbitMQProducer::startEventLoop()
K
kssenii 已提交
195
{
K
kssenii 已提交
196
    eventHandler.startProducerLoop();
K
kssenii 已提交
197 198 199
}

}