提交 95ffbac2 编写于 作者: R robot-clickhouse

Backport #17309 to 20.12: Merging #16309

上级 e44562b4
......@@ -1182,7 +1182,7 @@ void InterpreterSelectQuery::executeFetchColumns(
const auto & func = desc.function;
std::optional<UInt64> num_rows{};
if (!query.prewhere() && !query.where())
num_rows = storage->totalRows();
num_rows = storage->totalRows(settings);
else // It's possible to optimize count() given only partition predicates
{
SelectQueryInfo temp_query_info;
......
......@@ -463,7 +463,7 @@ public:
/// - For total_rows column in system.tables
///
/// Does takes underlying Storage (if any) into account.
virtual std::optional<UInt64> totalRows() const { return {}; }
virtual std::optional<UInt64> totalRows(const Settings &) const { return {}; }
/// Same as above but also take partition predicate into account.
virtual std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo &, const Context &) const { return {}; }
......@@ -481,7 +481,7 @@ public:
/// Memory part should be estimated as a resident memory size.
/// In particular, alloctedBytes() is preferable over bytes()
/// when considering in-memory blocks.
virtual std::optional<UInt64> totalBytes() const { return {}; }
virtual std::optional<UInt64> totalBytes(const Settings &) const { return {}; }
/// Number of rows INSERTed since server start.
///
......
......@@ -867,13 +867,13 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
}
}
std::optional<UInt64> StorageBuffer::totalRows() const
std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
{
std::optional<UInt64> underlying_rows;
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
if (underlying)
underlying_rows = underlying->totalRows();
underlying_rows = underlying->totalRows(settings);
if (!underlying_rows)
return underlying_rows;
......@@ -886,7 +886,7 @@ std::optional<UInt64> StorageBuffer::totalRows() const
return rows + *underlying_rows;
}
std::optional<UInt64> StorageBuffer::totalBytes() const
std::optional<UInt64> StorageBuffer::totalBytes(const Settings & /*settings*/) const
{
UInt64 bytes = 0;
for (const auto & buffer : buffers)
......
......@@ -109,8 +109,8 @@ public:
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
std::optional<UInt64> lifetimeRows() const override { return writes.rows; }
std::optional<UInt64> lifetimeBytes() const override { return writes.bytes; }
......
......@@ -103,8 +103,8 @@ HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block, true); }
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalRows() const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalBytes() const { return join->getTotalByteCount(); }
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const { return join->getTotalByteCount(); }
void registerStorageJoin(StorageFactory & factory)
......
......@@ -46,8 +46,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
private:
Block sample_block;
......
......@@ -216,21 +216,18 @@ void StorageMemory::truncate(
total_size_rows.store(0, std::memory_order_relaxed);
}
std::optional<UInt64> StorageMemory::totalRows() const
std::optional<UInt64> StorageMemory::totalRows(const Settings &) const
{
/// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency
/// When run concurrently we are fine with any value: "before" or "after"
return total_size_rows.load(std::memory_order_relaxed);
}
std::optional<UInt64> StorageMemory::totalBytes() const
std::optional<UInt64> StorageMemory::totalBytes(const Settings &) const
{
return total_size_bytes.load(std::memory_order_relaxed);
}
void registerStorageMemory(StorageFactory & factory)
{
factory.registerStorage("Memory", [](const StorageFactory::Arguments & args)
......
......@@ -46,8 +46,8 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> totalRows(const Settings &) const override;
std::optional<UInt64> totalBytes(const Settings &) const override;
/** Delays initialization of StorageMemory::read() until the first read is actually happen.
* Usually, fore code like this:
......
......@@ -202,7 +202,7 @@ Pipe StorageMergeTree::read(
return plan.convertToPipe();
}
std::optional<UInt64> StorageMergeTree::totalRows() const
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
{
return getTotalActiveSizeInRows();
}
......@@ -223,7 +223,7 @@ std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const Sele
return res;
}
std::optional<UInt64> StorageMergeTree::totalBytes() const
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const
{
return getTotalActiveSizeInBytes();
}
......
......@@ -56,9 +56,9 @@ public:
size_t max_block_size,
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalRows(const Settings &) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo &, const Context &) const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> totalBytes(const Settings &) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......
......@@ -45,11 +45,11 @@ public:
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
std::optional<UInt64> totalRows() const override
std::optional<UInt64> totalRows(const Settings &) const override
{
return {0};
}
std::optional<UInt64> totalBytes() const override
std::optional<UInt64> totalBytes(const Settings &) const override
{
return {0};
}
......
......@@ -148,8 +148,8 @@ public:
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }
std::optional<UInt64> totalBytes() const override { return getNested()->totalBytes(); }
std::optional<UInt64> totalRows(const Settings & settings) const override { return getNested()->totalRows(settings); }
std::optional<UInt64> totalBytes(const Settings & settings) const override { return getNested()->totalBytes(settings); }
std::optional<UInt64> lifetimeRows() const override { return getNested()->lifetimeRows(); }
std::optional<UInt64> lifetimeBytes() const override { return getNested()->lifetimeBytes(); }
......
......@@ -3742,27 +3742,37 @@ Pipe StorageReplicatedMergeTree::read(
template <class Func>
void StorageReplicatedMergeTree::foreachCommittedParts(const Func & func) const
void StorageReplicatedMergeTree::foreachCommittedParts(Func && func, bool select_sequential_consistency) const
{
auto max_added_blocks = getMaxAddedBlocks();
std::optional<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock> max_added_blocks = {};
/**
* Synchronously go to ZooKeeper when select_sequential_consistency enabled
*/
if (select_sequential_consistency)
max_added_blocks = getMaxAddedBlocks();
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (part->isEmpty())
continue;
auto blocks_iterator = max_added_blocks.find(part->info.partition_id);
if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second)
continue;
if (max_added_blocks)
{
auto blocks_iterator = max_added_blocks->find(part->info.partition_id);
if (blocks_iterator == max_added_blocks->end() || part->info.max_block > blocks_iterator->second)
continue;
}
func(part);
}
}
std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
std::optional<UInt64> StorageReplicatedMergeTree::totalRows(const Settings & settings) const
{
UInt64 res = 0;
foreachCommittedParts([&res](auto & part) { res += part->rows_count; });
foreachCommittedParts([&res](auto & part) { res += part->rows_count; }, settings.select_sequential_consistency);
return res;
}
......@@ -3777,14 +3787,14 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
});
}, context.getSettingsRef().select_sequential_consistency);
return res;
}
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes() const
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & settings) const
{
UInt64 res = 0;
foreachCommittedParts([&res](auto & part) { res += part->getBytesOnDisk(); });
foreachCommittedParts([&res](auto & part) { res += part->getBytesOnDisk(); }, settings.select_sequential_consistency);
return res;
}
......
......@@ -107,9 +107,9 @@ public:
size_t max_block_size,
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......@@ -326,7 +326,7 @@ private:
const size_t replicated_fetches_pool_size;
template <class Func>
void foreachCommittedParts(const Func & func) const;
void foreachCommittedParts(Func && func, bool select_sequential_consistency) const;
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
* Returns true if was created, false if exists.
......
......@@ -153,8 +153,8 @@ void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block);
void StorageSet::finishInsert() { set->finishInsert(); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalRows() const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalBytes() const { return set->getTotalByteCount(); }
std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{
......
......@@ -73,8 +73,8 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
private:
SetPtr set;
......
......@@ -429,7 +429,7 @@ protected:
if (columns_mask[src_index++])
{
assert(table != nullptr);
auto total_rows = table->totalRows();
auto total_rows = table->totalRows(context.getSettingsRef());
if (total_rows)
res_columns[res_index++]->insert(*total_rows);
else
......@@ -439,7 +439,7 @@ protected:
if (columns_mask[src_index++])
{
assert(table != nullptr);
auto total_bytes = table->totalBytes();
auto total_bytes = table->totalBytes(context.getSettingsRef());
if (total_bytes)
res_columns[res_index++]->insert(*total_bytes);
else
......
......@@ -13,7 +13,7 @@
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
<session_timeout_ms>20000</session_timeout_ms>
</zookeeper>
</yandex>
\ No newline at end of file
......@@ -62,14 +62,16 @@ def test_reload_zookeeper(start_cluster):
## stop all zookeepers, table will be readonly
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node.query("SELECT COUNT() FROM test_table")
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM test_table")
node.query("SELECT COUNT() FROM test_table", settings={"select_sequential_consistency" : 1})
## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1
cluster.start_zookeeper_nodes(["zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo2", "zoo3"])
node.query("SELECT COUNT() FROM test_table")
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM test_table")
node.query("SELECT COUNT() FROM test_table", settings={"select_sequential_consistency" : 1})
## set config to zoo2, server will be normal
new_config = """
......
SET send_logs_level = 'fatal';
DROP TABLE IF EXISTS quorum1 SYNC;
DROP TABLE IF EXISTS quorum2 SYNC;
DROP TABLE IF EXISTS quorum3 SYNC;
CREATE TABLE quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01513/sequence_consistency', '1') ORDER BY x PARTITION BY y;
CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01513/sequence_consistency', '2') ORDER BY x PARTITION BY y;
CREATE TABLE quorum3(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01513/sequence_consistency', '3') ORDER BY x PARTITION BY y;
INSERT INTO quorum1 VALUES (1, '1990-11-15');
INSERT INTO quorum1 VALUES (2, '1990-11-15');
INSERT INTO quorum1 VALUES (3, '2020-12-16');
SYSTEM SYNC REPLICA quorum2;
SYSTEM SYNC REPLICA quorum3;
SET select_sequential_consistency=0;
SET optimize_trivial_count_query=1;
SET insert_quorum=2;
SYSTEM STOP FETCHES quorum1;
INSERT INTO quorum2 VALUES (4, toDate('2020-12-16'));
SYSTEM SYNC REPLICA quorum3;
-- Should read local committed parts instead of throwing error code: 289. DB::Exception: Replica doesn't have part 20201216_1_1_0 which was successfully written to quorum of other replicas.
SELECT count() FROM quorum1;
SELECT count() FROM quorum2;
SELECT count() FROM quorum3;
DROP TABLE quorum1 SYNC;
DROP TABLE quorum2 SYNC;
DROP TABLE quorum3 SYNC;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册