提交 a029034d 编写于 作者: V Vitaly Baranov

CLICKHOUSE-4127: Fix ALTER of destination table for the BUFFER engine.

上级 b21aa60e
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageBuffer.h>
......@@ -145,7 +147,34 @@ BlockInputStreams StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
const Block structure_of_destination_table =
allow_materialized ? destination->getSampleBlock() : destination->getSampleBlockNonMaterialized();
bool can_read_from_destination = true;
for (const String& column_name : column_names)
{
if (!structure_of_destination_table.has(column_name))
{
LOG_WARNING(log, "Destination table " << destination_database << "." << destination_table
<< " doesn't have column " << column_name << ". Data from destination table is skipped.");
can_read_from_destination = false;
break;
}
auto col = getColumn(column_name);
auto dst_col = structure_of_destination_table.getByName(column_name);
if (!dst_col.type->equals(*col.type))
{
LOG_WARNING(log, "Destination table " << destination_database << "." << destination_table
<< " have different type of column " << column_name << " ("
<< col.type->getName() << " != " << dst_col.type->getName()
<< "). Data from destination table is skipped.");
can_read_from_destination = false;
break;
}
}
if (can_read_from_destination)
streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockInputStreams streams_from_buffers;
......@@ -233,6 +262,9 @@ public:
if (!block)
return;
// Check table structure.
storage.check(block, true);
size_t rows = block.rows();
if (!rows)
return;
......@@ -241,23 +273,8 @@ public:
if (!storage.no_destination)
{
destination = storage.context.tryGetTable(storage.destination_database, storage.destination_table);
if (destination)
{
if (destination.get() == &storage)
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
/// Check table structure.
try
{
destination->check(block, true);
}
catch (Exception & e)
{
e.addMessage("(when looking at destination table " + storage.destination_database + "." + storage.destination_table + ")");
throw;
}
}
if (destination.get() == &storage)
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
}
size_t bytes = block.bytes();
......@@ -561,48 +578,46 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
* This will support some of the cases (but not all) when the table structure does not match.
*/
Block structure_of_destination_table = allow_materialized ? table->getSampleBlock() : table->getSampleBlockNonMaterialized();
Names columns_intersection;
columns_intersection.reserve(block.columns());
Block block_to_write;
for (size_t i : ext::range(0, structure_of_destination_table.columns()))
{
auto dst_col = structure_of_destination_table.getByPosition(i);
if (block.has(dst_col.name))
{
if (!block.getByName(dst_col.name).type->equals(*dst_col.type))
auto column = block.getByName(dst_col.name);
if (!column.type->equals(*dst_col.type))
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table
<< " have different type of column " << dst_col.name << " ("
<< block.getByName(dst_col.name).type->getName() << " != " << dst_col.type->getName()
<< "). Block of data is discarded.");
return;
LOG_WARNING(log, "Destination table " << destination_database << "." << destination_table
<< " have different type of column " << column.name << " ("
<< column.type->getName() << " != " << dst_col.type->getName()
<< "). Block of data is converted.");
column.column = castColumn(column, dst_col.type, context);
column.type = dst_col.type;
}
columns_intersection.push_back(dst_col.name);
block_to_write.insert(column);
}
}
if (columns_intersection.empty())
if (block_to_write.columns() == 0)
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded.");
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table
<< " have no common columns with block in buffer. Block of data is discarded.");
return;
}
if (columns_intersection.size() != block.columns())
if (block_to_write.columns() != block.columns())
LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
<< destination_database << "." << destination_table << ". Some columns are discarded.");
auto list_of_columns = std::make_shared<ASTExpressionList>();
insert->columns = list_of_columns;
list_of_columns->children.reserve(columns_intersection.size());
for (const String & column : columns_intersection)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column));
list_of_columns->children.reserve(block_to_write.columns());
for (const auto& column : block_to_write)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
InterpreterInsertQuery interpreter{insert, context, allow_materialized};
Block block_to_write;
for (const auto & name : columns_intersection)
block_to_write.insert(block.getByName(name));
auto block_io = interpreter.execute();
block_io.out->writePrefix();
block_io.out->write(block_to_write);
......
init
1 100
2 200
-
1 100
2 200
3 300
alt
100 DEFZ
200 DEFZ
-
3 300
4 400
opt
100 DEFZ
200 DEFZ
300 DEFZ
400 DEFZ
-
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.buffer;
SET send_logs_level = 'error';
CREATE TABLE test.dst (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE test.buffer (x UInt64, y UInt64) ENGINE = Buffer(test, dst, 1, 99999, 99999, 1, 1, 99999, 99999);
INSERT INTO test.buffer VALUES (1, 100);
INSERT INTO test.buffer VALUES (2, 200);
INSERT INTO test.buffer VALUES (3, 300);
SELECT 'init';
SELECT * FROM test.dst ORDER BY x;
SELECT '-';
SELECT * FROM test.buffer ORDER BY x;
ALTER TABLE test.dst DROP COLUMN x, MODIFY COLUMN y String, ADD COLUMN z String DEFAULT 'DEFZ';
INSERT INTO test.buffer VALUES (4, 400);
SELECT 'alt';
SELECT * FROM test.dst ORDER BY y;
SELECT '-';
SELECT * FROM test.buffer ORDER BY x;
OPTIMIZE TABLE test.buffer;
SELECT 'opt';
SELECT * FROM test.dst ORDER BY y;
SELECT '-';
SELECT * FROM test.buffer ORDER BY x;
SET send_logs_level = 'warning';
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.buffer;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册