提交 ad2392ee 编写于 作者: A Alexey Milovidov

dbms: fixed error with StorageBuffer [#METR-15105].

上级 b5105ecd
......@@ -48,6 +48,9 @@ public:
*/
virtual size_t sizeOfField() const { throw Exception("Cannot get sizeOfField() for column " + getName(), ErrorCodes::CANNOT_GET_SIZE_OF_FIELD); }
/** Создать столбец с такими же данными. */
virtual SharedPtr<IColumn> clone() const { return cut(0, size()); }
/** Создать пустой столбец такого же типа */
virtual SharedPtr<IColumn> cloneEmpty() const { return cloneResized(0); }
......
......@@ -111,6 +111,9 @@ public:
/** Получить такой же блок, но пустой. */
Block cloneEmpty() const;
/** Получить блок со столбцами, переставленными в порядке их имён. */
Block sortColumns() const;
/** Заменяет столбцы смещений внутри вложенных таблиц на один общий для таблицы.
* Кидает исключение, если эти смещения вдруг оказались неодинаковы.
*/
......
......@@ -81,6 +81,7 @@ void Block::insert(const ColumnWithNameAndType & elem)
index_by_position.push_back(it);
}
void Block::insertDefault(const String & name, const DataTypePtr & type)
{
insert({
......@@ -279,6 +280,17 @@ Block Block::cloneEmpty() const
}
Block Block::sortColumns() const
{
Block sorted_block;
for (const auto & name : index_by_name)
sorted_block.insert(*name.second);
return sorted_block;
}
ColumnsWithNameAndType Block::getColumns() const
{
return ColumnsWithNameAndType(data.begin(), data.end());
......
......@@ -71,11 +71,10 @@ protected:
if (!buffer.data)
return res;
for (size_t i = 0, size = buffer.data.columns(); i < size; ++i)
for (const auto & name : column_names)
{
auto & col = buffer.data.unsafeGetByPosition(i);
if (column_names.count(col.name))
res.insert(col);
auto & col = buffer.data.getByName(name);
res.insert(ColumnWithNameAndType(col.column->clone(), col.type, name));
}
return res;
......@@ -213,14 +212,17 @@ private:
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
{
/// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки.
Block sorted_block = block.sortColumns();
if (!buffer.data)
{
buffer.first_write_time = time(0);
buffer.data = block.cloneEmpty();
buffer.data = sorted_block.cloneEmpty();
}
/// Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
if (storage.checkThresholds(buffer, time(0), block.rowsInFirstColumn(), block.bytes()))
if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
{
/// Вытащим из буфера блок, заменим буфер на пустой. После этого можно разблокировать mutex.
Block block_to_write;
......@@ -230,13 +232,13 @@ private:
if (!storage.no_destination)
{
appendBlock(block, block_to_write);
appendBlock(sorted_block, block_to_write);
storage.writeBlockToDestination(block_to_write,
storage.context.tryGetTable(storage.destination_database, storage.destination_table));
}
}
else
appendBlock(block, buffer.data);
appendBlock(sorted_block, buffer.data);
}
};
......
......@@ -23,19 +23,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & ta
void SetOrJoinBlockOutputStream::write(const Block & block)
{
/// Сортируем столбцы в блоке. Это нужно, так как Set и Join рассчитывают на одинаковый порядок столбцов в разных блоках.
size_t columns = block.columns();
std::vector<std::string> names(columns);
for (size_t i = 0; i < columns; ++i)
names[i] = block.unsafeGetByPosition(i).name;
std::sort(names.begin(), names.end());
Block sorted_block;
for (const auto & name : names)
sorted_block.insert(block.getByName(name)); /// NOTE Можно чуть-чуть оптимальнее.
Block sorted_block = block.sortColumns();
table.insertBlock(sorted_block);
backup_stream.write(sorted_block);
......
1 2 [3]
2 [3] 1
[3] 1 2
1 [3] 2
2 1 [3]
[3] 2 1
1 2
2 [3]
[3] 1
1 [3]
2 1
[3] 2
1
2
[3]
1 2 [3]
9 8 [7]
2 [3] 1
8 [7] 9
[3] 1 2
[7] 9 8
1 [3] 2
9 [7] 8
2 1 [3]
8 9 [7]
[3] 2 1
[7] 8 9
1 2
9 8
2 [3]
8 [7]
[3] 1
[7] 9
1 [3]
9 [7]
2 1
8 9
[3] 2
[7] 8
1
9
2
8
[3]
[7]
1 2 [3]
9 8 [7]
11 [33]
2 [3] 1
8 [7] 9
[33] 11
[3] 1 2
[7] 9 8
[33] 11
1 [3] 2
9 [7] 8
11 [33]
2 1 [3]
8 9 [7]
11 [33]
[3] 2 1
[7] 8 9
[33] 11
1 2
9 8
11
2 [3]
8 [7]
[33]
[3] 1
[7] 9
[33] 11
1 [3]
9 [7]
11 [33]
2 1
8 9
11
[3] 2
[7] 8
[33]
1
9
11
2
8
[3]
[7]
[33]
DROP TABLE IF EXISTS test.buffer;
DROP TABLE IF EXISTS test.null;
CREATE TABLE test.null (a UInt8, b String, c Array(UInt32)) ENGINE = Null;
CREATE TABLE test.buffer (a UInt8, b String, c Array(UInt32)) ENGINE = Buffer(test, null, 1, 1000, 1000, 1000, 1000, 1000000, 1000000);
INSERT INTO test.buffer VALUES (1, '2', [3]);
SELECT a, b, c FROM test.buffer ORDER BY a, b, c;
SELECT b, c, a FROM test.buffer ORDER BY a, b, c;
SELECT c, a, b FROM test.buffer ORDER BY a, b, c;
SELECT a, c, b FROM test.buffer ORDER BY a, b, c;
SELECT b, a, c FROM test.buffer ORDER BY a, b, c;
SELECT c, b, a FROM test.buffer ORDER BY a, b, c;
SELECT a, b FROM test.buffer ORDER BY a, b, c;
SELECT b, c FROM test.buffer ORDER BY a, b, c;
SELECT c, a FROM test.buffer ORDER BY a, b, c;
SELECT a, c FROM test.buffer ORDER BY a, b, c;
SELECT b, a FROM test.buffer ORDER BY a, b, c;
SELECT c, b FROM test.buffer ORDER BY a, b, c;
SELECT a FROM test.buffer ORDER BY a, b, c;
SELECT b FROM test.buffer ORDER BY a, b, c;
SELECT c FROM test.buffer ORDER BY a, b, c;
INSERT INTO test.buffer (c, b, a) VALUES ([7], '8', 9);
SELECT a, b, c FROM test.buffer ORDER BY a, b, c;
SELECT b, c, a FROM test.buffer ORDER BY a, b, c;
SELECT c, a, b FROM test.buffer ORDER BY a, b, c;
SELECT a, c, b FROM test.buffer ORDER BY a, b, c;
SELECT b, a, c FROM test.buffer ORDER BY a, b, c;
SELECT c, b, a FROM test.buffer ORDER BY a, b, c;
SELECT a, b FROM test.buffer ORDER BY a, b, c;
SELECT b, c FROM test.buffer ORDER BY a, b, c;
SELECT c, a FROM test.buffer ORDER BY a, b, c;
SELECT a, c FROM test.buffer ORDER BY a, b, c;
SELECT b, a FROM test.buffer ORDER BY a, b, c;
SELECT c, b FROM test.buffer ORDER BY a, b, c;
SELECT a FROM test.buffer ORDER BY a, b, c;
SELECT b FROM test.buffer ORDER BY a, b, c;
SELECT c FROM test.buffer ORDER BY a, b, c;
INSERT INTO test.buffer (a, c) VALUES (11, [33]);
SELECT a, b, c FROM test.buffer ORDER BY a, b, c;
SELECT b, c, a FROM test.buffer ORDER BY a, b, c;
SELECT c, a, b FROM test.buffer ORDER BY a, b, c;
SELECT a, c, b FROM test.buffer ORDER BY a, b, c;
SELECT b, a, c FROM test.buffer ORDER BY a, b, c;
SELECT c, b, a FROM test.buffer ORDER BY a, b, c;
SELECT a, b FROM test.buffer ORDER BY a, b, c;
SELECT b, c FROM test.buffer ORDER BY a, b, c;
SELECT c, a FROM test.buffer ORDER BY a, b, c;
SELECT a, c FROM test.buffer ORDER BY a, b, c;
SELECT b, a FROM test.buffer ORDER BY a, b, c;
SELECT c, b FROM test.buffer ORDER BY a, b, c;
SELECT a FROM test.buffer ORDER BY a, b, c;
SELECT b FROM test.buffer ORDER BY a, b, c;
SELECT c FROM test.buffer ORDER BY a, b, c;
DROP TABLE test.buffer;
DROP TABLE test.null;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册