StorageBuffer.cpp 35.5 KB
Newer Older
1
#include <boost/range/algorithm_ext/erase.hpp>
2 3 4
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
5
#include <Interpreters/castColumn.h>
6
#include <Interpreters/evaluateConstantExpression.h>
7
#include <Processors/QueryPlan/AddingMissedStep.h>
8
#include <DataStreams/IBlockInputStream.h>
9
#include <Storages/StorageBuffer.h>
10
#include <Storages/StorageFactory.h>
11
#include <Storages/AlterCommands.h>
12 13
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h>
14
#include <Parsers/ASTLiteral.h>
15 16
#include <Parsers/ASTExpressionList.h>
#include <Common/CurrentMetrics.h>
17
#include <Common/MemoryTracker.h>
18
#include <Common/FieldVisitors.h>
19
#include <Common/quoteString.h>
20
#include <Common/typeid_cast.h>
21
#include <Common/ProfileEvents.h>
A
alexey-milovidov 已提交
22
#include <common/logger_useful.h>
A
Alexey Milovidov 已提交
23
#include <common/getThreadId.h>
24
#include <ext/range.h>
25
#include <Processors/QueryPlan/ExpressionStep.h>
26 27
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
N
Nikolai Kochetov 已提交
28
#include <Processors/Sources/SourceFromInputStream.h>
29 30 31
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h>
32

33

34 35
namespace ProfileEvents
{
36 37 38 39 40 41
    extern const Event StorageBufferFlush;
    extern const Event StorageBufferErrorOnFlush;
    extern const Event StorageBufferPassedAllMinThresholds;
    extern const Event StorageBufferPassedTimeMaxThreshold;
    extern const Event StorageBufferPassedRowsMaxThreshold;
    extern const Event StorageBufferPassedBytesMaxThreshold;
42 43 44 45
}

namespace CurrentMetrics
{
46 47
    extern const Metric StorageBufferRows;
    extern const Metric StorageBufferBytes;
48 49 50
}


51 52 53
namespace DB
{

54 55
namespace ErrorCodes
{
A
Alexander Kuzmenkov 已提交
56
    extern const int BAD_ARGUMENTS;
A
Alexey Milovidov 已提交
57 58
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
59
    extern const int INFINITE_LOOP;
60
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
61 62
}

63

64 65 66 67
StorageBuffer::StorageBuffer(
    const StorageID & table_id_,
    const ColumnsDescription & columns_,
    const ConstraintsDescription & constraints_,
A
Amos Bird 已提交
68
    const Context & context_,
69 70 71
    size_t num_shards_,
    const Thresholds & min_thresholds_,
    const Thresholds & max_thresholds_,
72
    const StorageID & destination_id_,
73 74
    bool allow_materialized_)
    : IStorage(table_id_)
A
Amos Bird 已提交
75
    , global_context(context_.getGlobalContext())
76 77 78
    , num_shards(num_shards_), buffers(num_shards_)
    , min_thresholds(min_thresholds_)
    , max_thresholds(max_thresholds_)
79
    , destination_id(destination_id_)
80
    , allow_materialized(allow_materialized_)
A
Alexey Milovidov 已提交
81
    , log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
82
    , bg_pool(global_context.getBufferFlushSchedulePool())
83
{
A
alesapin 已提交
84 85 86 87
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(columns_);
    storage_metadata.setConstraints(constraints_);
    setInMemoryMetadata(storage_metadata);
88 89 90
}


F
f1yegor 已提交
91
/// Reads from one buffer (from one block) under its mutex.
92
class BufferSource : public SourceWithProgress
93 94
{
public:
95 96
    BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage, const StorageMetadataPtr & metadata_snapshot)
        : SourceWithProgress(
A
alesapin 已提交
97
            metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
98 99
        , column_names(column_names_.begin(), column_names_.end())
        , buffer(buffer_) {}
100

101
    String getName() const override { return "Buffer"; }
102 103

protected:
104
    Chunk generate() override
105
    {
106
        Chunk res;
107

108 109 110
        if (has_been_read)
            return res;
        has_been_read = true;
111

A
Alexey Milovidov 已提交
112
        std::lock_guard lock(buffer.mutex);
113

114 115
        if (!buffer.data.rows())
            return res;
116

117 118 119
        Columns columns;
        columns.reserve(column_names.size());

120
        for (const auto & name : column_names)
121 122 123 124
            columns.push_back(buffer.data.getByName(name).column);

        UInt64 size = columns.at(0)->size();
        res.setColumns(std::move(columns), size);
125

126 127
        return res;
    }
128 129

private:
130
    Names column_names;
131 132
    StorageBuffer::Buffer & buffer;
    bool has_been_read = false;
133 134 135
};


136
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const
137
{
138
    if (destination_id)
139
    {
140
        auto destination = DatabaseCatalog::instance().getTable(destination_id, context);
141 142 143 144

        if (destination.get() == this)
            throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);

145
        return destination->getQueryProcessingStage(context, to_stage, query_info);
146 147 148 149 150
    }

    return QueryProcessingStage::FetchColumns;
}

N
Nikolai Kochetov 已提交
151

N
Nikolai Kochetov 已提交
152
Pipe StorageBuffer::read(
153 154
    const Names & column_names,
    const StorageMetadataPtr & metadata_snapshot,
155
    SelectQueryInfo & query_info,
156 157 158 159 160 161 162
    const Context & context,
    QueryProcessingStage::Enum processed_stage,
    const size_t max_block_size,
    const unsigned num_streams)
{
    QueryPlan plan;
    read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
N
Nikolai Kochetov 已提交
163
    return plan.convertToPipe();
164 165 166 167
}

void StorageBuffer::read(
    QueryPlan & query_plan,
N
Nikolai Kochetov 已提交
168
    const Names & column_names,
169
    const StorageMetadataPtr & metadata_snapshot,
170
    SelectQueryInfo & query_info,
N
Nikolai Kochetov 已提交
171 172 173 174 175
    const Context & context,
    QueryProcessingStage::Enum processed_stage,
    size_t max_block_size,
    unsigned num_streams)
{
176
    if (destination_id)
177
    {
178
        auto destination = DatabaseCatalog::instance().getTable(destination_id, context);
179

180 181
        if (destination.get() == this)
            throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
182

A
alesapin 已提交
183
        auto destination_lock = destination->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
184

A
alesapin 已提交
185 186
        auto destination_metadata_snapshot = destination->getInMemoryMetadataPtr();

187
        const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination_metadata_snapshot](const String& column_name)
188
        {
189
            const auto & dest_columns = destination_metadata_snapshot->getColumns();
190
            const auto & our_columns = metadata_snapshot->getColumns();
191 192
            return dest_columns.hasPhysical(column_name) &&
                   dest_columns.get(column_name).type->equals(*our_columns.get(column_name).type);
193 194 195 196
        });

        if (dst_has_same_structure)
        {
D
Dmitry 已提交
197
            if (query_info.order_optimizer)
198
                query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination_metadata_snapshot);
199

200
            /// The destination table has the same structure of the requested columns and we can simply read blocks from there.
201 202
            destination->read(
                query_plan, column_names, destination_metadata_snapshot, query_info,
203
                context, processed_stage, max_block_size, num_streams);
204 205
        }
        else
206
        {
207
            /// There is a struct mismatch and we need to convert read blocks from the destination table.
208
            const Block header = metadata_snapshot->getSampleBlock();
209 210
            Names columns_intersection = column_names;
            Block header_after_adding_defaults = header;
211 212
            const auto & dest_columns = destination_metadata_snapshot->getColumns();
            const auto & our_columns = metadata_snapshot->getColumns();
213
            for (const String & column_name : column_names)
214
            {
215
                if (!dest_columns.hasPhysical(column_name))
216
                {
A
Alexey Milovidov 已提交
217
                    LOG_WARNING(log, "Destination table {} doesn't have column {}. The default values are used.", destination_id.getNameForLogs(), backQuoteIfNeed(column_name));
218 219 220
                    boost::range::remove_erase(columns_intersection, column_name);
                    continue;
                }
A
alesapin 已提交
221 222
                const auto & dst_col = dest_columns.getPhysical(column_name);
                const auto & col = our_columns.getPhysical(column_name);
223
                if (!dst_col.type->equals(*col.type))
224
                {
A
Alexey Milovidov 已提交
225
                    LOG_WARNING(log, "Destination table {} has different type of column {} ({} != {}). Data from destination table are converted.", destination_id.getNameForLogs(), backQuoteIfNeed(column_name), dst_col.type->getName(), col.type->getName());
226
                    header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name);
227 228
                }
            }
229 230

            if (columns_intersection.empty())
231
            {
A
Alexey Milovidov 已提交
232
                LOG_WARNING(log, "Destination table {} has no common columns with block in buffer. Block of data is skipped.", destination_id.getNameForLogs());
233
            }
234
            else
235
            {
236 237 238
                destination->read(
                        query_plan, columns_intersection, destination_metadata_snapshot, query_info,
                        context, processed_stage, max_block_size, num_streams);
239

N
Nikolai Kochetov 已提交
240 241 242 243 244 245
                if (query_plan.isInitialized())
                {

                    auto adding_missed = std::make_unique<AddingMissedStep>(
                            query_plan.getCurrentDataStream(),
                            header_after_adding_defaults,
N
Nikolai Kochetov 已提交
246
                            metadata_snapshot->getColumns(), context);
247

N
Nikolai Kochetov 已提交
248 249
                    adding_missed->setStepDescription("Add columns missing in destination table");
                    query_plan.addStep(std::move(adding_missed));
250

251 252 253 254 255 256
                    auto actions_dag = ActionsDAG::makeConvertingActions(
                            query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
                            header.getColumnsWithTypeAndName(),
                            ActionsDAG::MatchColumnsMode::Name);

                    auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), actions_dag);
257

N
Nikolai Kochetov 已提交
258 259 260
                    converting->setStepDescription("Convert destination table columns to Buffer table structure");
                    query_plan.addStep(std::move(converting));
                }
261 262
            }
        }
263

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
        if (query_plan.isInitialized())
        {
            StreamLocalLimits limits;
            SizeLimits leaf_limits;

            /// Add table lock for destination table.
            auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
                    query_plan.getCurrentDataStream(),
                    destination,
                    std::move(destination_lock),
                    limits,
                    leaf_limits,
                    nullptr,
                    nullptr);

            adding_limits_and_quota->setStepDescription("Lock destination table for Buffer");
            query_plan.addStep(std::move(adding_limits_and_quota));
        }
282
    }
283

N
Nikolai Kochetov 已提交
284 285 286 287 288 289 290 291 292
    Pipe pipe_from_buffers;
    {
        Pipes pipes_from_buffers;
        pipes_from_buffers.reserve(num_shards);
        for (auto & buf : buffers)
            pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this, metadata_snapshot));

        pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers));
    }
293

294 295 296 297
    if (pipe_from_buffers.empty())
        return;

    QueryPlan buffers_plan;
298

299 300 301 302
    /** If the sources from the table were processed before some non-initial stage of query execution,
      * then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
      */
    if (processed_stage > QueryProcessingStage::FetchColumns)
303
    {
304 305 306 307 308 309 310 311
        auto interpreter = InterpreterSelectQuery(
                query_info.query, context, std::move(pipe_from_buffers),
                SelectQueryOptions(processed_stage));
        interpreter.buildQueryPlan(buffers_plan);
    }
    else
    {
        if (query_info.prewhere_info)
312
        {
N
Nikolai Kochetov 已提交
313 314
            pipe_from_buffers.addSimpleTransform([&](const Block & header)
            {
315 316 317
                return std::make_shared<FilterTransform>(
                        header, query_info.prewhere_info->prewhere_actions,
                        query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column);
N
Nikolai Kochetov 已提交
318
            });
319 320 321 322 323 324 325 326

            if (query_info.prewhere_info->alias_actions)
            {
                pipe_from_buffers.addSimpleTransform([&](const Block & header)
                {
                    return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->alias_actions);
                });
            }
327
        }
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344

        auto read_from_buffers = std::make_unique<ReadFromPreparedSource>(std::move(pipe_from_buffers));
        read_from_buffers->setStepDescription("Read from buffers of Buffer table");
        buffers_plan.addStep(std::move(read_from_buffers));
    }

    if (!query_plan.isInitialized())
    {
        query_plan = std::move(buffers_plan);
        return;
    }

    auto result_header = buffers_plan.getCurrentDataStream().header;

    /// Convert structure from table to structure from buffer.
    if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
    {
345 346 347 348 349 350
        auto convert_actions_dag = ActionsDAG::makeConvertingActions(
                query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
                result_header.getColumnsWithTypeAndName(),
                ActionsDAG::MatchColumnsMode::Name);

        auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), convert_actions_dag);
351
        query_plan.addStep(std::move(converting));
352 353
    }

354 355 356 357 358 359 360 361 362 363 364 365
    DataStreams input_streams;
    input_streams.emplace_back(query_plan.getCurrentDataStream());
    input_streams.emplace_back(buffers_plan.getCurrentDataStream());

    std::vector<std::unique_ptr<QueryPlan>> plans;
    plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
    plans.emplace_back(std::make_unique<QueryPlan>(std::move(buffers_plan)));
    query_plan = QueryPlan();

    auto union_step = std::make_unique<UnionStep>(std::move(input_streams), result_header);
    union_step->setStepDescription("Unite sources from Buffer table");
    query_plan.unitePlans(std::move(union_step), std::move(plans));
366 367 368
}


369
static void appendBlock(const Block & from, Block & to)
370
{
371 372 373
    if (!to)
        throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);

374
    assertBlocksHaveEqualStructure(from, to, "Buffer");
375

376 377 378 379 380 381 382 383 384 385 386
    from.checkNumberOfRows();
    to.checkNumberOfRows();

    size_t rows = from.rows();
    size_t bytes = from.bytes();

    CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
    CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, bytes);

    size_t old_rows = to.rows();

387
    MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
388

389 390 391 392
    try
    {
        for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
        {
393
            const IColumn & col_from = *from.getByPosition(column_no).column.get();
394
            MutableColumnPtr col_to = IColumn::mutate(std::move(to.getByPosition(column_no).column));
395

396
            col_to->insertRangeFrom(col_from, 0, rows);
397

398
            to.getByPosition(column_no).column = std::move(col_to);
399 400 401 402 403 404 405 406 407
        }
    }
    catch (...)
    {
        /// Rollback changes.
        try
        {
            for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
            {
408
                ColumnPtr & col_to = to.getByPosition(column_no).column;
409
                if (col_to->size() != old_rows)
410
                    col_to = col_to->cut(0, old_rows);
411 412 413 414 415 416 417 418 419 420
            }
        }
        catch (...)
        {
            /// In case when we cannot rollback, do not leave incorrect state in memory.
            std::terminate();
        }

        throw;
    }
421 422 423
}


424 425 426
class BufferBlockOutputStream : public IBlockOutputStream
{
public:
427 428 429 430 431 432
    explicit BufferBlockOutputStream(
        StorageBuffer & storage_,
        const StorageMetadataPtr & metadata_snapshot_)
        : storage(storage_)
        , metadata_snapshot(metadata_snapshot_)
    {}
433

434
    Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
435 436 437 438 439 440

    void write(const Block & block) override
    {
        if (!block)
            return;

441
        // Check table structure.
A
alesapin 已提交
442
        metadata_snapshot->check(block, true);
443

444 445 446 447 448
        size_t rows = block.rows();
        if (!rows)
            return;

        StoragePtr destination;
449
        if (storage.destination_id)
450
        {
451
            destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.global_context);
452 453
            if (destination.get() == &storage)
                throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
454 455 456 457
        }

        size_t bytes = block.bytes();

458 459 460
        storage.writes.rows += rows;
        storage.writes.bytes += bytes;

461 462 463
        /// If the block already exceeds the maximum limit, then we skip the buffer.
        if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
        {
464
            if (storage.destination_id)
465
            {
A
Alexey Milovidov 已提交
466
                LOG_TRACE(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
467
                storage.writeBlockToDestination(block, destination);
468
            }
469 470 471 472
            return;
        }

        /// We distribute the load on the shards by the stream number.
A
Alexey Milovidov 已提交
473
        const auto start_shard_num = getThreadId() % storage.num_shards;
474 475 476 477 478 479 480 481 482 483

        /// We loop through the buffers, trying to lock mutex. No more than one lap.
        auto shard_num = start_shard_num;

        StorageBuffer::Buffer * least_busy_buffer = nullptr;
        std::unique_lock<std::mutex> least_busy_lock;
        size_t least_busy_shard_rows = 0;

        for (size_t try_no = 0; try_no < storage.num_shards; ++try_no)
        {
A
Alexey Milovidov 已提交
484
            std::unique_lock lock(storage.buffers[shard_num].mutex, std::try_to_lock);
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501

            if (lock.owns_lock())
            {
                size_t num_rows = storage.buffers[shard_num].data.rows();
                if (!least_busy_buffer || num_rows < least_busy_shard_rows)
                {
                    least_busy_buffer = &storage.buffers[shard_num];
                    least_busy_lock = std::move(lock);
                    least_busy_shard_rows = num_rows;
                }
            }

            shard_num = (shard_num + 1) % storage.num_shards;
        }

        /// If you still can not lock anything at once, then we'll wait on mutex.
        if (!least_busy_buffer)
A
Amos Bird 已提交
502 503
        {
            least_busy_buffer = &storage.buffers[start_shard_num];
A
Alexey Milovidov 已提交
504
            least_busy_lock = std::unique_lock(least_busy_buffer->mutex);
A
Amos Bird 已提交
505 506
        }
        insertIntoBuffer(block, *least_busy_buffer);
507 508 509
        least_busy_lock.unlock();

        storage.reschedule();
510
    }
511
private:
512
    StorageBuffer & storage;
513
    StorageMetadataPtr metadata_snapshot;
514

A
Amos Bird 已提交
515
    void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
516
    {
517
        time_t current_time = time(nullptr);
518 519 520 521

        /// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
        Block sorted_block = block.sortColumns();

A
alesapin 已提交
522
        if (!buffer.data)
523 524 525 526 527 528 529 530 531 532
        {
            buffer.data = sorted_block.cloneEmpty();
        }
        else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
        {
            /** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
              * This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
              *  an exception will be thrown, and new data will not be added to the buffer.
              */

533
            storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
534 535 536 537 538
        }

        if (!buffer.first_write_time)
            buffer.first_write_time = current_time;

539
        appendBlock(sorted_block, buffer.data);
540
    }
541 542 543
};


544
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
545
{
546
    return std::make_shared<BufferBlockOutputStream>(*this, metadata_snapshot);
547 548 549
}


550 551
bool StorageBuffer::mayBenefitFromIndexForIn(
    const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & /*metadata_snapshot*/) const
A
Alexey Milovidov 已提交
552
{
553
    if (!destination_id)
A
Alexey Milovidov 已提交
554 555
        return false;

556
    auto destination = DatabaseCatalog::instance().getTable(destination_id, query_context);
A
Alexey Milovidov 已提交
557 558 559 560

    if (destination.get() == this)
        throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);

561
    return destination->mayBenefitFromIndexForIn(left_in_operand, query_context, destination->getInMemoryMetadataPtr());
A
Alexey Milovidov 已提交
562 563 564
}


565 566
void StorageBuffer::startup()
{
567
    if (global_context.getSettingsRef().readonly)
568
    {
A
Alexey Milovidov 已提交
569
        LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName());
570 571
    }

572 573
    flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ flushBack(); });
    flush_handle->activateAndSchedule();
574 575 576
}


577 578
void StorageBuffer::shutdown()
{
579 580 581
    if (!flush_handle)
        return;

582
    flush_handle->deactivate();
583 584 585

    try
    {
A
alesapin 已提交
586
        optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context);
587 588 589 590 591
    }
    catch (...)
    {
        tryLogCurrentException(__PRETTY_FUNCTION__);
    }
592 593 594
}


595 596 597 598 599 600 601 602 603 604
/** NOTE If you do OPTIMIZE after insertion,
  * it does not guarantee, that all data will be in destination table at the time of next SELECT just after OPTIMIZE.
  *
  * Because in case if there was already running flushBuffer method,
  *  then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly,
  *  but at the same time, the already running flushBuffer method possibly is not finished,
  *  so next SELECT will observe missing data.
  *
  * This kind of race condition make very hard to implement proper tests.
  */
A
alesapin 已提交
605 606 607 608 609 610 611
bool StorageBuffer::optimize(
    const ASTPtr & /*query*/,
    const StorageMetadataPtr & /*metadata_snapshot*/,
    const ASTPtr & partition,
    bool final,
    bool deduplicate,
    const Context & /*context*/)
612
{
613
    if (partition)
614
        throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
615

616 617
    if (final)
        throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
618

619 620 621
    if (deduplicate)
        throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);

A
alesapin 已提交
622
    flushAllBuffers(false, true);
623
    return true;
624 625 626
}


627
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
628
{
629 630 631
    time_t time_passed = 0;
    if (buffer.first_write_time)
        time_passed = current_time - buffer.first_write_time;
632

633 634
    size_t rows = buffer.data.rows() + additional_rows;
    size_t bytes = buffer.data.bytes() + additional_bytes;
635

636
    return checkThresholdsImpl(rows, bytes, time_passed);
637 638 639 640 641
}


bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
{
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
    if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedAllMinThresholds);
        return true;
    }

    if (time_passed > max_thresholds.time)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeMaxThreshold);
        return true;
    }

    if (rows > max_thresholds.rows)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsMaxThreshold);
        return true;
    }

    if (bytes > max_thresholds.bytes)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesMaxThreshold);
        return true;
    }

    return false;
667 668 669
}


A
alesapin 已提交
670
void StorageBuffer::flushAllBuffers(bool check_thresholds, bool reset_blocks_structure)
671
{
672
    for (auto & buf : buffers)
A
alesapin 已提交
673
        flushBuffer(buf, check_thresholds, false, reset_blocks_structure);
674 675 676
}


A
alesapin 已提交
677
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked, bool reset_block_structure)
678
{
679
    Block block_to_write;
680
    time_t current_time = time(nullptr);
681 682 683 684 685

    size_t rows = 0;
    size_t bytes = 0;
    time_t time_passed = 0;

A
Alexey Milovidov 已提交
686
    std::unique_lock lock(buffer.mutex, std::defer_lock);
A
Amos Bird 已提交
687 688
    if (!locked)
        lock.lock();
689

690
    block_to_write = buffer.data.cloneEmpty();
691

692 693 694 695
    rows = buffer.data.rows();
    bytes = buffer.data.bytes();
    if (buffer.first_write_time)
        time_passed = current_time - buffer.first_write_time;
696

697 698 699 700 701 702 703 704 705 706
    if (check_thresholds)
    {
        if (!checkThresholdsImpl(rows, bytes, time_passed))
            return;
    }
    else
    {
        if (rows == 0)
            return;
    }
707

708 709
    buffer.data.swap(block_to_write);
    buffer.first_write_time = 0;
710

711 712
    CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
    CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
713

714
    ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
715

A
Alexey Milovidov 已提交
716
    LOG_TRACE(log, "Flushing buffer with {} rows, {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
717

718
    if (!destination_id)
719
        return;
720

721
    /** For simplicity, buffer is locked during write.
A
Alexey Milovidov 已提交
722
        * We could unlock buffer temporary, but it would lead to too many difficulties:
723 724 725 726 727 728
        * - data, that is written, will not be visible for SELECTs;
        * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written;
        * - this could lead to infinite memory growth.
        */
    try
    {
729
        writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context));
A
alesapin 已提交
730 731
        if (reset_block_structure)
            buffer.data.clear();
732 733 734 735
    }
    catch (...)
    {
        ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
736

737
        /// Return the block to its place in the buffer.
738

739 740
        CurrentMetrics::add(CurrentMetrics::StorageBufferRows, block_to_write.rows());
        CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
741

742
        buffer.data.swap(block_to_write);
743

744 745
        if (!buffer.first_write_time)
            buffer.first_write_time = current_time;
746

747 748
        /// After a while, the next write attempt will happen.
        throw;
749
    }
750 751
}

752

753 754
void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{
755
    if (!destination_id || !block)
756 757 758 759
        return;

    if (!table)
    {
A
Alexey Milovidov 已提交
760
        LOG_ERROR(log, "Destination table {} doesn't exist. Block of data is discarded.", destination_id.getNameForLogs());
761 762
        return;
    }
763
    auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
764

765
    MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
766

767
    auto insert = std::make_shared<ASTInsertQuery>();
768
    insert->table_id = destination_id;
769 770 771 772

    /** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
      * This will support some of the cases (but not all) when the table structure does not match.
      */
773 774
    Block structure_of_destination_table = allow_materialized ? destination_metadata_snapshot->getSampleBlock()
                                                              : destination_metadata_snapshot->getSampleBlockNonMaterialized();
775
    Block block_to_write;
776 777 778 779 780
    for (size_t i : ext::range(0, structure_of_destination_table.columns()))
    {
        auto dst_col = structure_of_destination_table.getByPosition(i);
        if (block.has(dst_col.name))
        {
781 782
            auto column = block.getByName(dst_col.name);
            if (!column.type->equals(*dst_col.type))
783
            {
A
Alexey Milovidov 已提交
784
                LOG_WARNING(log, "Destination table {} have different type of column {} ({} != {}). Block of data is converted.", destination_id.getNameForLogs(), backQuoteIfNeed(column.name), dst_col.type->getName(), column.type->getName());
A
Alexey Milovidov 已提交
785
                column.column = castColumn(column, dst_col.type);
786
                column.type = dst_col.type;
787 788
            }

789
            block_to_write.insert(column);
790 791 792
        }
    }

793
    if (block_to_write.columns() == 0)
794
    {
A
Alexey Milovidov 已提交
795
        LOG_ERROR(log, "Destination table {} have no common columns with block in buffer. Block of data is discarded.", destination_id.getNameForLogs());
796 797 798
        return;
    }

799
    if (block_to_write.columns() != block.columns())
A
Alexey Milovidov 已提交
800
        LOG_WARNING(log, "Not all columns from block in buffer exist in destination table {}. Some columns are discarded.", destination_id.getNameForLogs());
801 802 803

    auto list_of_columns = std::make_shared<ASTExpressionList>();
    insert->columns = list_of_columns;
804
    list_of_columns->children.reserve(block_to_write.columns());
A
alexey-milovidov 已提交
805
    for (const auto & column : block_to_write)
806
        list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
807

M
Mikhail Filimonov 已提交
808 809 810 811
    auto insert_context = Context(global_context);
    insert_context.makeQueryContext();

    InterpreterInsertQuery interpreter{insert, insert_context, allow_materialized};
812 813 814

    auto block_io = interpreter.execute();
    block_io.out->writePrefix();
A
Alexey Milovidov 已提交
815
    block_io.out->write(block_to_write);
816
    block_io.out->writeSuffix();
817 818 819
}


820
void StorageBuffer::flushBack()
821
{
822 823 824 825 826 827 828 829
    try
    {
        flushAllBuffers(true);
    }
    catch (...)
    {
        tryLogCurrentException(__PRETTY_FUNCTION__);
    }
830

831 832 833 834 835 836 837 838 839
    reschedule();
}

void StorageBuffer::reschedule()
{
    time_t min_first_write_time = std::numeric_limits<time_t>::max();
    time_t rows = 0;

    for (auto & buffer : buffers)
840
    {
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
        std::lock_guard lock(buffer.mutex);
        min_first_write_time = buffer.first_write_time;
        rows += buffer.data.rows();
    }

    /// will be rescheduled via INSERT
    if (!rows)
        return;

    time_t current_time = time(nullptr);
    time_t time_passed = current_time - min_first_write_time;

    size_t min = std::max<ssize_t>(min_thresholds.time - time_passed, 1);
    size_t max = std::max<ssize_t>(max_thresholds.time - time_passed, 1);
    flush_handle->scheduleAfter(std::min(min, max) * 1000);
856 857
}

A
alesapin 已提交
858
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) const
A
alesapin 已提交
859 860 861 862 863 864 865 866 867 868 869
{
    for (const auto & command : commands)
    {
        if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
            && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
            throw Exception(
                "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
                ErrorCodes::NOT_IMPLEMENTED);
    }
}

870
std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
871 872
{
    std::optional<UInt64> underlying_rows;
873
    auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
874 875

    if (underlying)
876
        underlying_rows = underlying->totalRows(settings);
877 878 879 880
    if (!underlying_rows)
        return underlying_rows;

    UInt64 rows = 0;
A
Alexey Milovidov 已提交
881
    for (const auto & buffer : buffers)
882 883 884 885 886 887
    {
        std::lock_guard lock(buffer.mutex);
        rows += buffer.data.rows();
    }
    return rows + *underlying_rows;
}
888

889
std::optional<UInt64> StorageBuffer::totalBytes(const Settings & /*settings*/) const
890 891
{
    UInt64 bytes = 0;
A
Alexey Milovidov 已提交
892
    for (const auto & buffer : buffers)
893 894
    {
        std::lock_guard lock(buffer.mutex);
895
        bytes += buffer.data.allocatedBytes();
896 897 898 899
    }
    return bytes;
}

A
alesapin 已提交
900
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableLockHolder &)
901
{
902
    auto table_id = getStorageID();
A
alesapin 已提交
903
    checkAlterIsPossible(params, context.getSettingsRef());
A
alesapin 已提交
904
    auto metadata_snapshot = getInMemoryMetadataPtr();
905

906 907 908
    /// Flush all buffers to storages, so that no non-empty blocks of the old
    /// structure remain. Structure of empty blocks will be updated during first
    /// insert.
A
alesapin 已提交
909
    optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
910

A
alesapin 已提交
911
    StorageInMemoryMetadata new_metadata = *metadata_snapshot;
A
alesapin 已提交
912 913
    params.apply(new_metadata, context);
    DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
914
    setInMemoryMetadata(new_metadata);
915 916
}

917 918 919 920 921 922 923 924 925 926

void registerStorageBuffer(StorageFactory & factory)
{
    /** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
      *
      * db, table - in which table to put data from buffer.
      * num_buckets - level of parallelism.
      * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
      */

927
    factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
928
    {
929 930 931
        ASTs & engine_args = args.engine_args;

        if (engine_args.size() != 9)
932 933 934 935
            throw Exception("Storage Buffer requires 9 parameters: "
                " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

A
Alexander Kuzmenkov 已提交
936
        // Table and database name arguments accept expressions, evaluate them.
937
        engine_args[0] = evaluateConstantExpressionForDatabaseName(engine_args[0], args.local_context);
938
        engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
939

A
Alexander Kuzmenkov 已提交
940 941 942 943 944 945 946
        // After we evaluated all expressions, check that all arguments are
        // literals.
        for (size_t i = 0; i < 9; i++)
        {
            if (!typeid_cast<ASTLiteral *>(engine_args[i].get()))
            {
                throw Exception(ErrorCodes::BAD_ARGUMENTS,
A
typo  
Alexander Kuzmenkov 已提交
947
                    "Storage Buffer expects a literal as an argument #{}, got '{}'"
A
Alexander Kuzmenkov 已提交
948 949 950 951
                    " instead", i, engine_args[i]->formatForErrorMessage());
            }
        }

I
Ivan Lezhankin 已提交
952 953
        String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
        String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
954

I
Ivan Lezhankin 已提交
955
        UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
956

I
Ivan Lezhankin 已提交
957 958 959 960 961 962
        Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
        Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
        UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
        UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
        UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
        UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
963

964 965 966 967 968 969 970 971
        /// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
        StorageID destination_id = StorageID::createEmpty();
        if (!destination_table.empty())
        {
            destination_id.database_name = args.context.resolveDatabase(destination_database);
            destination_id.table_name = destination_table;
        }

972
        return StorageBuffer::create(
973 974 975
            args.table_id,
            args.columns,
            args.constraints,
976
            args.context,
977 978 979
            num_buckets,
            StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
            StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
980
            destination_id,
Z
zhang2014 已提交
981
            static_cast<bool>(args.local_context.getSettingsRef().insert_allow_materialized_columns));
982 983 984
    });
}

985
}