WriteBufferToRabbitMQProducer.cpp 7.8 KB
Newer Older
K
kssenii 已提交
1 2 3 4 5 6
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <common/logger_useful.h>
#include <amqpcpp.h>
7
#include <uv.h>
K
kssenii 已提交
8 9
#include <chrono>
#include <thread>
K
kssenii 已提交
10
#include <atomic>
K
kssenii 已提交
11 12 13 14 15


namespace DB
{

16
namespace ErrorCodes
K
kssenii 已提交
17
{
18 19 20
    extern const int CANNOT_CONNECT_RABBITMQ;
}

K
kssenii 已提交
21
static const auto QUEUE_SIZE = 50000;
22 23 24
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 1000;
static const auto LOOP_WAIT = 10;
K
kssenii 已提交
25
static const auto BATCH = 10000;
K
kssenii 已提交
26 27

WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
K
Fixes  
kssenii 已提交
28
        std::pair<String, UInt16> & parsed_address,
29
        Context & global_context,
A
alesapin 已提交
30
        const std::pair<String, String> & login_password_,
31 32 33
        const Names & routing_keys_,
        const String & exchange_name_,
        const AMQP::ExchangeType exchange_type_,
K
kssenii 已提交
34
        Poco::Logger * log_,
A
alesapin 已提交
35
        size_t num_queues_,
K
kssenii 已提交
36 37
        const bool use_transactional_channel_,
        const bool persistent_,
K
kssenii 已提交
38 39 40 41
        std::optional<char> delimiter,
        size_t rows_per_message,
        size_t chunk_size_)
        : WriteBuffer(nullptr, 0)
K
Fixes  
kssenii 已提交
42
        , login_password(login_password_)
43 44 45
        , routing_keys(routing_keys_)
        , exchange_name(exchange_name_)
        , exchange_type(exchange_type_)
A
Alexey Milovidov 已提交
46
        , num_queues(num_queues_)
K
kssenii 已提交
47
        , use_transactional_channel(use_transactional_channel_)
K
kssenii 已提交
48
        , persistent(persistent_)
A
Alexey Milovidov 已提交
49 50
        , payloads(QUEUE_SIZE * num_queues)
        , log(log_)
K
kssenii 已提交
51 52 53 54
        , delim(delimiter)
        , max_rows(rows_per_message)
        , chunk_size(chunk_size_)
{
55

A
alesapin 已提交
56 57
    loop = std::make_unique<uv_loop_t>();
    uv_loop_init(loop.get());
58

A
alesapin 已提交
59
    event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
60 61
    connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));

K
kssenii 已提交
62
    /// New coonection for each publisher because cannot publish from different threads.(https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086)
K
kssenii 已提交
63
    size_t cnt_retries = 0;
64
    while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
K
kssenii 已提交
65
    {
A
alesapin 已提交
66
        event_handler->iterateLoop();
67
        std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
K
kssenii 已提交
68 69
    }

70
    if (!connection->ready())
K
kssenii 已提交
71
    {
72
        throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
K
kssenii 已提交
73 74
    }

75
    producer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
K
kssenii 已提交
76 77 78 79
    producer_channel->onError([&](const char * message)
    {
        LOG_ERROR(log, "Prodcuer error: {}", message);
    });
K
kssenii 已提交
80 81 82 83 84

    if (use_transactional_channel)
    {
        producer_channel->startTransaction();
    }
K
kssenii 已提交
85 86 87 88 89 90 91 92 93 94 95 96
    else
    {
        producer_channel->confirmSelect()
        .onAck([&](uint64_t deliveryTag, bool /* multiple */)
        {
            if (deliveryTag > last_processed)
                last_processed = deliveryTag;
        })
        .onNack([&](uint64_t /* deliveryTag */, bool /* multiple */, bool /* requeue */)
        {
        });
    }
97 98 99

    writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
    writing_task->deactivate();
100 101 102 103 104 105 106 107 108 109 110

    if (exchange_type == AMQP::ExchangeType::headers)
    {
        std::vector<String> matching;
        for (const auto & header : routing_keys)
        {
            boost::split(matching, header, [](char c){ return c == '='; });
            key_arguments[matching[0]] = matching[1];
            matching.clear();
        }
    }
K
kssenii 已提交
111 112 113 114 115
}


WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
116 117
    writing_task->deactivate();
    connection->close();
K
kssenii 已提交
118 119 120 121
    assert(rows == 0 && chunks.empty());
}


K
kssenii 已提交
122
void WriteBufferToRabbitMQProducer::countRow()
K
kssenii 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
{
    if (++rows % max_rows == 0)
    {
        const std::string & last_chunk = chunks.back();
        size_t last_chunk_size = offset();

        if (delim && last_chunk[last_chunk_size - 1] == delim)
            --last_chunk_size;

        std::string payload;
        payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);

        for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
            payload.append(*i);

        payload.append(last_chunk, 0, last_chunk_size);

        rows = 0;
        chunks.clear();
        set(nullptr, 0);

K
kssenii 已提交
144
        ++delivery_tag;
145 146 147
        payloads.push(payload);
    }
}
K
kssenii 已提交
148

K
kssenii 已提交
149

150 151 152
void WriteBufferToRabbitMQProducer::writingFunc()
{
    String payload;
153
    current = 0;
K
kssenii 已提交
154

K
kssenii 已提交
155 156 157 158 159 160 161
    auto returned_callback = [&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */)
    {
        payloads.push(std::string(message.body(), message.size()));
        //LOG_DEBUG(log, "Message returned with code: {}, description: {}. Republishing", code, description);
    };

    while ((!payloads.empty() || wait_all) && connection->usable())
162
    {
K
kssenii 已提交
163
        while (!payloads.empty() && producer_channel->usable())
K
kssenii 已提交
164
        {
165
            payloads.pop(payload);
K
kssenii 已提交
166 167 168 169 170 171
            AMQP::Envelope envelope(payload.data(), payload.size());
            current = wait_num ? ++current % wait_num : ++current;

            /// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse.
            if (persistent)
                envelope.setDeliveryMode(2);
172

173
            if (exchange_type == AMQP::ExchangeType::consistent_hash)
174
            {
K
kssenii 已提交
175
                producer_channel->publish(exchange_name, std::to_string(current), envelope).onReturned(returned_callback);
176
            }
177 178 179
            else if (exchange_type == AMQP::ExchangeType::headers)
            {
                envelope.setHeaders(key_arguments);
K
kssenii 已提交
180
                producer_channel->publish(exchange_name, "", envelope, key_arguments).onReturned(returned_callback);
181
            }
182 183
            else
            {
K
kssenii 已提交
184
                producer_channel->publish(exchange_name, routing_keys[0], envelope).onReturned(returned_callback);
185
            }
186

K
kssenii 已提交
187 188 189
            if (current % BATCH == 0)
                iterateEventLoop();
        }
K
kssenii 已提交
190

K
kssenii 已提交
191 192 193 194 195 196 197 198 199
        if (wait_num.load() && last_processed.load() >= wait_num.load())
        {
            wait_all.store(false);
            LOG_DEBUG(log, "All messages are successfully published");
        }
        else
        {
            iterateEventLoop();
        }
K
kssenii 已提交
200 201 202 203
    }
}


204
void WriteBufferToRabbitMQProducer::finilizeProducer()
K
kssenii 已提交
205 206 207
{
    if (use_transactional_channel)
    {
K
kssenii 已提交
208
        std::atomic<bool> answer_received = false, wait_rollback = false;
K
kssenii 已提交
209 210 211 212
        producer_channel->commitTransaction()
        .onSuccess([&]()
        {
            answer_received = true;
K
kssenii 已提交
213
            wait_all.store(false);
K
kssenii 已提交
214 215
            LOG_TRACE(log, "All messages were successfully published");
        })
A
Alexey Milovidov 已提交
216
        .onError([&](const char * message1)
K
kssenii 已提交
217 218
        {
            answer_received = true;
K
kssenii 已提交
219
            wait_all.store(false);
K
kssenii 已提交
220
            wait_rollback = true;
A
Alexey Milovidov 已提交
221
            LOG_TRACE(log, "Publishing not successful: {}", message1);
K
kssenii 已提交
222 223 224 225 226
            producer_channel->rollbackTransaction()
            .onSuccess([&]()
            {
                wait_rollback = false;
            })
A
Alexey Milovidov 已提交
227
            .onError([&](const char * message2)
K
kssenii 已提交
228
            {
A
Alexey Milovidov 已提交
229
                LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
K
kssenii 已提交
230 231
                wait_rollback = false;
            });
K
kssenii 已提交
232 233 234
        });

        size_t count_retries = 0;
K
kssenii 已提交
235
        while ((!answer_received || wait_rollback) && ++count_retries != RETRIES_MAX)
K
kssenii 已提交
236
        {
A
alesapin 已提交
237
            iterateEventLoop();
238
            std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_WAIT));
K
kssenii 已提交
239 240 241 242 243
        }
    }
}


K
kssenii 已提交
244 245 246 247 248 249 250 251
void WriteBufferToRabbitMQProducer::nextImpl()
{
    chunks.push_back(std::string());
    chunks.back().resize(chunk_size);
    set(chunks.back().data(), chunk_size);
}


A
alesapin 已提交
252
void WriteBufferToRabbitMQProducer::iterateEventLoop()
K
kssenii 已提交
253
{
A
alesapin 已提交
254
    event_handler->iterateLoop();
K
kssenii 已提交
255 256 257
}

}