StorageBuffer.cpp 29.0 KB
Newer Older
1
#include <boost/range/algorithm_ext/erase.hpp>
2 3 4
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
5
#include <Interpreters/castColumn.h>
6
#include <Interpreters/evaluateConstantExpression.h>
7
#include <Processors/Transforms/AddingMissedTransform.h>
8
#include <DataStreams/IBlockInputStream.h>
9
#include <Storages/StorageBuffer.h>
10
#include <Storages/StorageFactory.h>
11
#include <Storages/AlterCommands.h>
12 13
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h>
14
#include <Parsers/ASTLiteral.h>
15 16 17
#include <Parsers/ASTExpressionList.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
18
#include <Common/MemoryTracker.h>
19
#include <Common/FieldVisitors.h>
20
#include <Common/quoteString.h>
21
#include <Common/typeid_cast.h>
22
#include <Common/ProfileEvents.h>
A
alexey-milovidov 已提交
23
#include <common/logger_useful.h>
A
Alexey Milovidov 已提交
24
#include <common/getThreadId.h>
25
#include <ext/range.h>
26 27 28
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
N
Nikolai Kochetov 已提交
29
#include <Processors/Sources/SourceFromInputStream.h>
30

31

32 33
namespace ProfileEvents
{
34 35 36 37 38 39
    extern const Event StorageBufferFlush;
    extern const Event StorageBufferErrorOnFlush;
    extern const Event StorageBufferPassedAllMinThresholds;
    extern const Event StorageBufferPassedTimeMaxThreshold;
    extern const Event StorageBufferPassedRowsMaxThreshold;
    extern const Event StorageBufferPassedBytesMaxThreshold;
40 41 42 43
}

namespace CurrentMetrics
{
44 45
    extern const Metric StorageBufferRows;
    extern const Metric StorageBufferBytes;
46 47 48
}


49 50 51
namespace DB
{

52 53
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
54 55
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
56
    extern const int INFINITE_LOOP;
57
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
58 59
}

60

61 62 63 64
StorageBuffer::StorageBuffer(
    const StorageID & table_id_,
    const ColumnsDescription & columns_,
    const ConstraintsDescription & constraints_,
65
    Context & context_,
66 67 68
    size_t num_shards_,
    const Thresholds & min_thresholds_,
    const Thresholds & max_thresholds_,
69
    const StorageID & destination_id_,
70 71 72 73 74 75
    bool allow_materialized_)
    : IStorage(table_id_)
    , global_context(context_)
    , num_shards(num_shards_), buffers(num_shards_)
    , min_thresholds(min_thresholds_)
    , max_thresholds(max_thresholds_)
76
    , destination_id(destination_id_)
77 78
    , allow_materialized(allow_materialized_)
    , log(&Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
79
{
A
Alexey Milovidov 已提交
80 81
    setColumns(columns_);
    setConstraints(constraints_);
82 83
}

P
proller 已提交
84 85 86 87 88 89 90 91 92 93
StorageBuffer::~StorageBuffer()
{
    // Should not happen if shutdown was called
    if (flush_thread.joinable())
    {
        shutdown_event.set();
        flush_thread.join();
    }
}

94

F
f1yegor 已提交
95
/// Reads from one buffer (from one block) under its mutex.
96
class BufferSource : public SourceWithProgress
97 98
{
public:
99
    BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage)
N
Nikolai Kochetov 已提交
100
        : SourceWithProgress(storage.getSampleBlockForColumns(column_names_))
101
        , column_names(column_names_.begin(), column_names_.end()), buffer(buffer_) {}
102

103
    String getName() const override { return "Buffer"; }
104 105

protected:
106
    Chunk generate() override
107
    {
108
        Chunk res;
109

110 111 112
        if (has_been_read)
            return res;
        has_been_read = true;
113

A
Alexey Milovidov 已提交
114
        std::lock_guard lock(buffer.mutex);
115

116 117
        if (!buffer.data.rows())
            return res;
118

119 120 121
        Columns columns;
        columns.reserve(column_names.size());

122
        for (const auto & name : column_names)
123 124 125 126
            columns.push_back(buffer.data.getByName(name).column);

        UInt64 size = columns.at(0)->size();
        res.setColumns(std::move(columns), size);
127

128 129
        return res;
    }
130 131

private:
132
    Names column_names;
133 134
    StorageBuffer::Buffer & buffer;
    bool has_been_read = false;
135 136 137
};


A
alexey-milovidov 已提交
138
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
139
{
140
    if (destination_id)
141
    {
142
        auto destination = DatabaseCatalog::instance().getTable(destination_id);
143 144 145 146

        if (destination.get() == this)
            throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);

147
        return destination->getQueryProcessingStage(context, to_stage, query_ptr);
148 149 150 151 152
    }

    return QueryProcessingStage::FetchColumns;
}

N
Nikolai Kochetov 已提交
153

154
Pipes StorageBuffer::read(
N
Nikolai Kochetov 已提交
155 156 157 158 159 160 161
    const Names & column_names,
    const SelectQueryInfo & query_info,
    const Context & context,
    QueryProcessingStage::Enum processed_stage,
    size_t max_block_size,
    unsigned num_streams)
{
162
    Pipes pipes_from_dst;
163

164
    if (destination_id)
165
    {
166
        auto destination = DatabaseCatalog::instance().getTable(destination_id);
167

168 169
        if (destination.get() == this)
            throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
170

171 172
        auto destination_lock = destination->lockStructureForShare(
                false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
173

174
        const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
175
        {
176 177 178 179 180 181
            return destination->hasColumn(column_name) &&
                   destination->getColumn(column_name).type->equals(*getColumn(column_name).type);
        });

        if (dst_has_same_structure)
        {
182
            if (query_info.order_by_optimizer)
183
                query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
184

185
            /// The destination table has the same structure of the requested columns and we can simply read blocks from there.
186
            pipes_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
187 188
        }
        else
189
        {
190 191 192 193 194
            /// There is a struct mismatch and we need to convert read blocks from the destination table.
            const Block header = getSampleBlock();
            Names columns_intersection = column_names;
            Block header_after_adding_defaults = header;
            for (const String & column_name : column_names)
195
            {
196 197
                if (!destination->hasColumn(column_name))
                {
198
                    LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
199 200 201 202 203 204 205
                        << " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used.");
                    boost::range::remove_erase(columns_intersection, column_name);
                    continue;
                }
                const auto & dst_col = destination->getColumn(column_name);
                const auto & col = getColumn(column_name);
                if (!dst_col.type->equals(*col.type))
206
                {
207
                    LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
208
                        << " has different type of column " << backQuoteIfNeed(column_name) << " ("
209 210
                        << dst_col.type->getName() << " != " << col.type->getName() << "). Data from destination table are converted.");
                    header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name);
211 212
                }
            }
213 214

            if (columns_intersection.empty())
215
            {
216
                LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
217
                    << " has no common columns with block in buffer. Block of data is skipped.");
218
            }
219
            else
220
            {
221
                pipes_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
222
                for (auto & pipe : pipes_from_dst)
223
                {
224 225 226 227
                    pipe.addSimpleTransform(std::make_shared<AddingMissedTransform>(
                            pipe.getHeader(), header_after_adding_defaults, getColumns().getDefaults(), context));

                    pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
A
Alexey Milovidov 已提交
228
                            pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name));
229
                }
230 231
            }
        }
232

233 234
        for (auto & pipe : pipes_from_dst)
            pipe.addTableLock(destination_lock);
235
    }
236

237 238
    Pipes pipes_from_buffers;
    pipes_from_buffers.reserve(num_shards);
239
    for (auto & buf : buffers)
240
        pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this));
241

242 243 244 245
    /** If the sources from the table were processed before some non-initial stage of query execution,
      * then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
      */
    if (processed_stage > QueryProcessingStage::FetchColumns)
246 247
        for (auto & pipe : pipes_from_buffers)
            pipe = InterpreterSelectQuery(query_info.query, context, std::move(pipe), SelectQueryOptions(processed_stage)).executeWithProcessors().getPipe();
248

249 250
    if (query_info.prewhere_info)
    {
251 252 253
        for (auto & pipe : pipes_from_buffers)
            pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), query_info.prewhere_info->prewhere_actions,
                    query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column));
254 255 256

        if (query_info.prewhere_info->alias_actions)
        {
257 258
            for (auto & pipe : pipes_from_buffers)
                pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), query_info.prewhere_info->alias_actions));
259 260 261 262

        }
    }

263 264 265 266
    for (auto & pipe : pipes_from_buffers)
        pipes_from_dst.emplace_back(std::move(pipe));

    return pipes_from_dst;
267 268 269
}


270 271
static void appendBlock(const Block & from, Block & to)
{
272 273 274
    if (!to)
        throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);

275
    assertBlocksHaveEqualStructure(from, to, "Buffer");
276

277 278 279 280 281 282 283 284 285 286 287
    from.checkNumberOfRows();
    to.checkNumberOfRows();

    size_t rows = from.rows();
    size_t bytes = from.bytes();

    CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
    CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, bytes);

    size_t old_rows = to.rows();

288 289
    auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();

290 291 292 293
    try
    {
        for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
        {
294
            const IColumn & col_from = *from.getByPosition(column_no).column.get();
295
            MutableColumnPtr col_to = (*std::move(to.getByPosition(column_no).column)).mutate();
296

297
            col_to->insertRangeFrom(col_from, 0, rows);
298

299
            to.getByPosition(column_no).column = std::move(col_to);
300 301 302 303 304 305 306 307 308
        }
    }
    catch (...)
    {
        /// Rollback changes.
        try
        {
            for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
            {
309
                ColumnPtr & col_to = to.getByPosition(column_no).column;
310
                if (col_to->size() != old_rows)
311
                    col_to = (*std::move(col_to)).mutate()->cut(0, old_rows);
312 313 314 315 316 317 318 319 320 321
            }
        }
        catch (...)
        {
            /// In case when we cannot rollback, do not leave incorrect state in memory.
            std::terminate();
        }

        throw;
    }
322 323 324
}


325 326 327
class BufferBlockOutputStream : public IBlockOutputStream
{
public:
328
    explicit BufferBlockOutputStream(StorageBuffer & storage_) : storage(storage_) {}
329 330

    Block getHeader() const override { return storage.getSampleBlock(); }
331 332 333 334 335 336

    void write(const Block & block) override
    {
        if (!block)
            return;

337 338 339
        // Check table structure.
        storage.check(block, true);

340 341 342 343 344
        size_t rows = block.rows();
        if (!rows)
            return;

        StoragePtr destination;
345
        if (storage.destination_id)
346
        {
347
            destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id);
348 349
            if (destination.get() == &storage)
                throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
350 351 352 353 354 355 356
        }

        size_t bytes = block.bytes();

        /// If the block already exceeds the maximum limit, then we skip the buffer.
        if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
        {
357
            if (storage.destination_id)
358 359 360
            {
                LOG_TRACE(storage.log, "Writing block with " << rows << " rows, " << bytes << " bytes directly.");
                storage.writeBlockToDestination(block, destination);
361
            }
362 363 364 365
            return;
        }

        /// We distribute the load on the shards by the stream number.
A
Alexey Milovidov 已提交
366
        const auto start_shard_num = getThreadId() % storage.num_shards;
367 368 369 370 371 372 373 374 375 376

        /// We loop through the buffers, trying to lock mutex. No more than one lap.
        auto shard_num = start_shard_num;

        StorageBuffer::Buffer * least_busy_buffer = nullptr;
        std::unique_lock<std::mutex> least_busy_lock;
        size_t least_busy_shard_rows = 0;

        for (size_t try_no = 0; try_no < storage.num_shards; ++try_no)
        {
A
Alexey Milovidov 已提交
377
            std::unique_lock lock(storage.buffers[shard_num].mutex, std::try_to_lock);
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394

            if (lock.owns_lock())
            {
                size_t num_rows = storage.buffers[shard_num].data.rows();
                if (!least_busy_buffer || num_rows < least_busy_shard_rows)
                {
                    least_busy_buffer = &storage.buffers[shard_num];
                    least_busy_lock = std::move(lock);
                    least_busy_shard_rows = num_rows;
                }
            }

            shard_num = (shard_num + 1) % storage.num_shards;
        }

        /// If you still can not lock anything at once, then we'll wait on mutex.
        if (!least_busy_buffer)
A
Amos Bird 已提交
395 396
        {
            least_busy_buffer = &storage.buffers[start_shard_num];
A
Alexey Milovidov 已提交
397
            least_busy_lock = std::unique_lock(least_busy_buffer->mutex);
A
Amos Bird 已提交
398 399
        }
        insertIntoBuffer(block, *least_busy_buffer);
400
    }
401
private:
402 403
    StorageBuffer & storage;

A
Amos Bird 已提交
404
    void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
405
    {
406
        time_t current_time = time(nullptr);
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421

        /// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
        Block sorted_block = block.sortColumns();

        if (!buffer.data)
        {
            buffer.data = sorted_block.cloneEmpty();
        }
        else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
        {
            /** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
              * This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
              *  an exception will be thrown, and new data will not be added to the buffer.
              */

422
            storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
423 424 425 426 427 428 429
        }

        if (!buffer.first_write_time)
            buffer.first_write_time = current_time;

        appendBlock(sorted_block, buffer.data);
    }
430 431 432
};


433
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const Context & /*context*/)
434
{
435
    return std::make_shared<BufferBlockOutputStream>(*this);
436 437 438
}


439
bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
A
Alexey Milovidov 已提交
440
{
441
    if (!destination_id)
A
Alexey Milovidov 已提交
442 443
        return false;

444
    auto destination = DatabaseCatalog::instance().getTable(destination_id);
A
Alexey Milovidov 已提交
445 446 447 448

    if (destination.get() == this)
        throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);

449
    return destination->mayBenefitFromIndexForIn(left_in_operand, query_context);
A
Alexey Milovidov 已提交
450 451 452
}


453 454
void StorageBuffer::startup()
{
455
    if (global_context.getSettingsRef().readonly)
456 457
    {
        LOG_WARNING(log, "Storage " << getName() << " is run with readonly settings, it will not be able to insert data."
P
Pradeep Chhetri 已提交
458
            << " Set appropriate system_profile to fix this.");
459 460
    }

461
    flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this);
462 463 464
}


465 466
void StorageBuffer::shutdown()
{
467 468 469 470 471 472 473
    shutdown_event.set();

    if (flush_thread.joinable())
        flush_thread.join();

    try
    {
474
        optimize(nullptr /*query*/, {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context);
475 476 477 478 479
    }
    catch (...)
    {
        tryLogCurrentException(__PRETTY_FUNCTION__);
    }
480 481 482
}


483 484 485 486 487 488 489 490 491 492
/** NOTE If you do OPTIMIZE after insertion,
  * it does not guarantee, that all data will be in destination table at the time of next SELECT just after OPTIMIZE.
  *
  * Because in case if there was already running flushBuffer method,
  *  then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly,
  *  but at the same time, the already running flushBuffer method possibly is not finished,
  *  so next SELECT will observe missing data.
  *
  * This kind of race condition make very hard to implement proper tests.
  */
A
Alexey Milovidov 已提交
493
bool StorageBuffer::optimize(const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & /*context*/)
494
{
495
    if (partition)
496
        throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
497

498 499
    if (final)
        throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
500

501 502 503
    if (deduplicate)
        throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);

504 505
    flushAllBuffers(false);
    return true;
506 507 508
}


509
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
510
{
511 512 513
    time_t time_passed = 0;
    if (buffer.first_write_time)
        time_passed = current_time - buffer.first_write_time;
514

515 516
    size_t rows = buffer.data.rows() + additional_rows;
    size_t bytes = buffer.data.bytes() + additional_bytes;
517

518
    return checkThresholdsImpl(rows, bytes, time_passed);
519 520 521 522 523
}


bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
{
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
    if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedAllMinThresholds);
        return true;
    }

    if (time_passed > max_thresholds.time)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeMaxThreshold);
        return true;
    }

    if (rows > max_thresholds.rows)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsMaxThreshold);
        return true;
    }

    if (bytes > max_thresholds.bytes)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesMaxThreshold);
        return true;
    }

    return false;
549 550 551
}


552 553
void StorageBuffer::flushAllBuffers(const bool check_thresholds)
{
554 555
    for (auto & buf : buffers)
        flushBuffer(buf, check_thresholds);
556 557 558
}


A
Amos Bird 已提交
559
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
560
{
561
    Block block_to_write;
562
    time_t current_time = time(nullptr);
563 564 565 566 567

    size_t rows = 0;
    size_t bytes = 0;
    time_t time_passed = 0;

A
Alexey Milovidov 已提交
568
    std::unique_lock lock(buffer.mutex, std::defer_lock);
A
Amos Bird 已提交
569 570
    if (!locked)
        lock.lock();
571

572
    block_to_write = buffer.data.cloneEmpty();
573

574 575 576 577
    rows = buffer.data.rows();
    bytes = buffer.data.bytes();
    if (buffer.first_write_time)
        time_passed = current_time - buffer.first_write_time;
578

579 580 581 582 583 584 585 586 587 588
    if (check_thresholds)
    {
        if (!checkThresholdsImpl(rows, bytes, time_passed))
            return;
    }
    else
    {
        if (rows == 0)
            return;
    }
589

590 591
    buffer.data.swap(block_to_write);
    buffer.first_write_time = 0;
592

593 594
    CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
    CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
595

596
    ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
597

598
    LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
599

600
    if (!destination_id)
601
        return;
602

603
    /** For simplicity, buffer is locked during write.
A
Alexey Milovidov 已提交
604
        * We could unlock buffer temporary, but it would lead to too many difficulties:
605 606 607 608 609 610
        * - data, that is written, will not be visible for SELECTs;
        * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written;
        * - this could lead to infinite memory growth.
        */
    try
    {
611
        writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id));
612 613 614 615
    }
    catch (...)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
616

617
        /// Return the block to its place in the buffer.
618

619 620
        CurrentMetrics::add(CurrentMetrics::StorageBufferRows, block_to_write.rows());
        CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
621

622
        buffer.data.swap(block_to_write);
623

624 625
        if (!buffer.first_write_time)
            buffer.first_write_time = current_time;
626

627 628
        /// After a while, the next write attempt will happen.
        throw;
629
    }
630 631
}

632

633 634
void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{
635
    if (!destination_id || !block)
636 637 638 639
        return;

    if (!table)
    {
640
        LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs() << " doesn't exist. Block of data is discarded.");
641 642 643
        return;
    }

644 645
    auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();

646
    auto insert = std::make_shared<ASTInsertQuery>();
647
    insert->table_id = destination_id;
648 649 650 651

    /** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
      * This will support some of the cases (but not all) when the table structure does not match.
      */
652
    Block structure_of_destination_table = allow_materialized ? table->getSampleBlock() : table->getSampleBlockNonMaterialized();
653
    Block block_to_write;
654 655 656 657 658
    for (size_t i : ext::range(0, structure_of_destination_table.columns()))
    {
        auto dst_col = structure_of_destination_table.getByPosition(i);
        if (block.has(dst_col.name))
        {
659 660
            auto column = block.getByName(dst_col.name);
            if (!column.type->equals(*dst_col.type))
661
            {
662
                LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
A
alexey-milovidov 已提交
663
                    << " have different type of column " << backQuoteIfNeed(column.name) << " ("
664
                    << dst_col.type->getName() << " != " << column.type->getName()
665
                    << "). Block of data is converted.");
A
Alexey Milovidov 已提交
666
                column.column = castColumn(column, dst_col.type);
667
                column.type = dst_col.type;
668 669
            }

670
            block_to_write.insert(column);
671 672 673
        }
    }

674
    if (block_to_write.columns() == 0)
675
    {
676
        LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs()
A
alexey-milovidov 已提交
677
            << " have no common columns with block in buffer. Block of data is discarded.");
678 679 680
        return;
    }

681
    if (block_to_write.columns() != block.columns())
682
        LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
683
            << destination_id.getNameForLogs() << ". Some columns are discarded.");
684 685 686

    auto list_of_columns = std::make_shared<ASTExpressionList>();
    insert->columns = list_of_columns;
687
    list_of_columns->children.reserve(block_to_write.columns());
A
alexey-milovidov 已提交
688
    for (const auto & column : block_to_write)
689
        list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
690

691
    InterpreterInsertQuery interpreter{insert, global_context, allow_materialized};
692 693 694

    auto block_io = interpreter.execute();
    block_io.out->writePrefix();
A
Alexey Milovidov 已提交
695
    block_io.out->write(block_to_write);
696
    block_io.out->writeSuffix();
697 698 699 700 701
}


void StorageBuffer::flushThread()
{
702 703 704 705 706 707 708 709 710 711 712 713 714
    setThreadName("BufferFlush");

    do
    {
        try
        {
            flushAllBuffers(true);
        }
        catch (...)
        {
            tryLogCurrentException(__PRETTY_FUNCTION__);
        }
    } while (!shutdown_event.tryWait(1000));
715 716
}

A
alesapin 已提交
717 718 719 720 721 722 723 724 725 726 727 728
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
    for (const auto & command : commands)
    {
        if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
            && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
            throw Exception(
                "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
                ErrorCodes::NOT_IMPLEMENTED);
    }
}

729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
std::optional<UInt64> StorageBuffer::totalRows() const
{
    std::optional<UInt64> underlying_rows;
    auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id);

    if (underlying)
        underlying_rows = underlying->totalRows();
    if (!underlying_rows)
        return underlying_rows;

    UInt64 rows = 0;
    for (auto & buffer : buffers)
    {
        std::lock_guard lock(buffer.mutex);
        rows += buffer.data.rows();
    }
    return rows + *underlying_rows;
}
747

748 749 750 751 752 753 754 755 756 757 758
std::optional<UInt64> StorageBuffer::totalBytes() const
{
    UInt64 bytes = 0;
    for (auto & buffer : buffers)
    {
        std::lock_guard lock(buffer.mutex);
        bytes += buffer.data.bytes();
    }
    return bytes;
}

759
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
760
{
761
    lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
762

763
    auto table_id = getStorageID();
A
alesapin 已提交
764
    checkAlterIsPossible(params, context.getSettingsRef());
765

766
    /// So that no blocks of the old structure remain.
767
    optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
768

A
alesapin 已提交
769 770
    StorageInMemoryMetadata metadata = getInMemoryMetadata();
    params.apply(metadata);
771
    DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
A
alesapin 已提交
772
    setColumns(std::move(metadata.columns));
773 774
}

775 776 777 778 779 780 781 782 783 784

void registerStorageBuffer(StorageFactory & factory)
{
    /** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
      *
      * db, table - in which table to put data from buffer.
      * num_buckets - level of parallelism.
      * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
      */

785
    factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
786
    {
787 788 789
        ASTs & engine_args = args.engine_args;

        if (engine_args.size() != 9)
790 791 792 793
            throw Exception("Storage Buffer requires 9 parameters: "
                " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

794
        engine_args[0] = evaluateConstantExpressionForDatabaseName(engine_args[0], args.local_context);
795
        engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
796

I
Ivan Lezhankin 已提交
797 798
        String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
        String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
799

I
Ivan Lezhankin 已提交
800
        UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
801

I
Ivan Lezhankin 已提交
802 803 804 805 806 807
        Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
        Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
        UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
        UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
        UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
        UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
808

809 810 811 812 813 814 815 816
        /// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
        StorageID destination_id = StorageID::createEmpty();
        if (!destination_table.empty())
        {
            destination_id.database_name = args.context.resolveDatabase(destination_database);
            destination_id.table_name = destination_table;
        }

817
        return StorageBuffer::create(
818 819 820
            args.table_id,
            args.columns,
            args.constraints,
821
            args.context,
822 823 824
            num_buckets,
            StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
            StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
825
            destination_id,
Z
zhang2014 已提交
826
            static_cast<bool>(args.local_context.getSettingsRef().insert_allow_materialized_columns));
827 828 829
    });
}

830
}