WriteBufferToRabbitMQProducer.cpp 6.5 KB
Newer Older
K
kssenii 已提交
1 2 3 4 5 6
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <common/logger_useful.h>
#include <amqpcpp.h>
7
#include <uv.h>
K
kssenii 已提交
8 9
#include <chrono>
#include <thread>
K
kssenii 已提交
10
#include <atomic>
K
kssenii 已提交
11 12 13 14 15


namespace DB
{

16
namespace ErrorCodes
K
kssenii 已提交
17
{
18 19 20
    extern const int CANNOT_CONNECT_RABBITMQ;
}

K
kssenii 已提交
21
static const auto QUEUE_SIZE = 50000;
22 23 24
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 1000;
static const auto LOOP_WAIT = 10;
K
kssenii 已提交
25 26

WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
K
Fixes  
kssenii 已提交
27
        std::pair<String, UInt16> & parsed_address,
28
        Context & global_context,
A
alesapin 已提交
29
        const std::pair<String, String> & login_password_,
K
kssenii 已提交
30
        const String & routing_key_,
A
alesapin 已提交
31
        const String & exchange_,
K
kssenii 已提交
32
        Poco::Logger * log_,
A
alesapin 已提交
33 34 35
        size_t num_queues_,
        bool bind_by_id_,
        bool use_transactional_channel_,
K
kssenii 已提交
36 37 38 39
        std::optional<char> delimiter,
        size_t rows_per_message,
        size_t chunk_size_)
        : WriteBuffer(nullptr, 0)
K
Fixes  
kssenii 已提交
40
        , login_password(login_password_)
K
kssenii 已提交
41
        , routing_key(routing_key_)
K
kssenii 已提交
42
        , exchange_name(exchange_ + "_direct")
K
kssenii 已提交
43
        , bind_by_id(bind_by_id_)
A
Alexey Milovidov 已提交
44
        , num_queues(num_queues_)
K
kssenii 已提交
45
        , use_transactional_channel(use_transactional_channel_)
A
Alexey Milovidov 已提交
46 47
        , payloads(QUEUE_SIZE * num_queues)
        , log(log_)
K
kssenii 已提交
48 49 50 51
        , delim(delimiter)
        , max_rows(rows_per_message)
        , chunk_size(chunk_size_)
{
52

A
alesapin 已提交
53 54
    loop = std::make_unique<uv_loop_t>();
    uv_loop_init(loop.get());
55

A
alesapin 已提交
56
    event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
57 58
    connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));

K
kssenii 已提交
59
    /* The reason behind making a separate connection for each concurrent producer is explained here:
K
kssenii 已提交
60
     * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
K
kssenii 已提交
61
     * different threads (as outputStreams are asynchronous) with the same connection leads to internal library errors.
K
kssenii 已提交
62 63
     */
    size_t cnt_retries = 0;
64
    while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
K
kssenii 已提交
65
    {
A
alesapin 已提交
66
        event_handler->iterateLoop();
67
        std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
K
kssenii 已提交
68 69
    }

70
    if (!connection->ready())
K
kssenii 已提交
71
    {
72
        throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
K
kssenii 已提交
73 74
    }

75
    producer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
K
kssenii 已提交
76
    checkExchange();
K
kssenii 已提交
77 78 79 80 81 82

    /// If publishing should be wrapped in transactions
    if (use_transactional_channel)
    {
        producer_channel->startTransaction();
    }
83 84 85

    writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
    writing_task->deactivate();
K
kssenii 已提交
86 87 88 89 90
}


WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
91 92 93
    stop_loop.store(true);
    writing_task->deactivate();
    checkExchange();
94

95
    connection->close();
K
kssenii 已提交
96 97 98 99
    assert(rows == 0 && chunks.empty());
}


K
kssenii 已提交
100
void WriteBufferToRabbitMQProducer::countRow()
K
kssenii 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
{
    if (++rows % max_rows == 0)
    {
        const std::string & last_chunk = chunks.back();
        size_t last_chunk_size = offset();

        if (delim && last_chunk[last_chunk_size - 1] == delim)
            --last_chunk_size;

        std::string payload;
        payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);

        for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
            payload.append(*i);

        payload.append(last_chunk, 0, last_chunk_size);

        rows = 0;
        chunks.clear();
        set(nullptr, 0);

122 123 124
        payloads.push(payload);
    }
}
K
kssenii 已提交
125

K
kssenii 已提交
126

127 128 129
void WriteBufferToRabbitMQProducer::writingFunc()
{
    String payload;
K
kssenii 已提交
130

131 132 133
    while (!stop_loop || !payloads.empty())
    {
        while (!payloads.empty())
K
kssenii 已提交
134
        {
135 136 137 138 139 140 141 142 143 144 145
            payloads.pop(payload);
            next_queue = next_queue % num_queues + 1;

            if (bind_by_id)
            {
                producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
            }
            else
            {
                producer_channel->publish(exchange_name, routing_key, payload);
            }
K
kssenii 已提交
146
        }
A
alesapin 已提交
147
        iterateEventLoop();
K
kssenii 已提交
148 149 150 151
    }
}


K
kssenii 已提交
152
void WriteBufferToRabbitMQProducer::checkExchange()
K
kssenii 已提交
153 154 155
{
    std::atomic<bool> exchange_declared = false, exchange_error = false;

K
kssenii 已提交
156
    /// The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name.
K
kssenii 已提交
157
    producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive)
K
kssenii 已提交
158 159
    .onSuccess([&]()
    {
K
kssenii 已提交
160
        exchange_declared = true;
K
kssenii 已提交
161 162 163 164
    })
    .onError([&](const char * message)
    {
        exchange_error = true;
165
        LOG_ERROR(log, "Exchange for INSERT query was not declared. Reason: {}", message);
K
kssenii 已提交
166 167
    });

K
kssenii 已提交
168
    /// These variables are updated in a separate thread and starting the loop blocks current thread
K
kssenii 已提交
169 170
    while (!exchange_declared && !exchange_error)
    {
A
alesapin 已提交
171
        iterateEventLoop();
K
kssenii 已提交
172 173 174 175
    }
}


176
void WriteBufferToRabbitMQProducer::finilizeProducer()
K
kssenii 已提交
177
{
K
kssenii 已提交
178
    /// This will make sure everything is published
K
kssenii 已提交
179 180 181 182
    checkExchange();

    if (use_transactional_channel)
    {
K
kssenii 已提交
183
        std::atomic<bool> answer_received = false, wait_rollback = false;
K
kssenii 已提交
184 185 186 187 188 189
        producer_channel->commitTransaction()
        .onSuccess([&]()
        {
            answer_received = true;
            LOG_TRACE(log, "All messages were successfully published");
        })
A
Alexey Milovidov 已提交
190
        .onError([&](const char * message1)
K
kssenii 已提交
191 192
        {
            answer_received = true;
K
kssenii 已提交
193
            wait_rollback = true;
A
Alexey Milovidov 已提交
194
            LOG_TRACE(log, "Publishing not successful: {}", message1);
K
kssenii 已提交
195 196 197 198 199
            producer_channel->rollbackTransaction()
            .onSuccess([&]()
            {
                wait_rollback = false;
            })
A
Alexey Milovidov 已提交
200
            .onError([&](const char * message2)
K
kssenii 已提交
201
            {
A
Alexey Milovidov 已提交
202
                LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
K
kssenii 已提交
203 204
                wait_rollback = false;
            });
K
kssenii 已提交
205 206 207
        });

        size_t count_retries = 0;
K
kssenii 已提交
208
        while ((!answer_received || wait_rollback) && ++count_retries != RETRIES_MAX)
K
kssenii 已提交
209
        {
A
alesapin 已提交
210
            iterateEventLoop();
211
            std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_WAIT));
K
kssenii 已提交
212 213 214 215 216
        }
    }
}


K
kssenii 已提交
217 218 219 220 221 222 223 224
void WriteBufferToRabbitMQProducer::nextImpl()
{
    chunks.push_back(std::string());
    chunks.back().resize(chunk_size);
    set(chunks.back().data(), chunk_size);
}


A
alesapin 已提交
225
void WriteBufferToRabbitMQProducer::iterateEventLoop()
K
kssenii 已提交
226
{
A
alesapin 已提交
227
    event_handler->iterateLoop();
K
kssenii 已提交
228 229 230
}

}