未验证 提交 21854258 编写于 作者: I Ivan 提交者: GitHub

Merge pull request #19673 from azat/distributed-bytes_to_throw_insert

Add ability to throttle INSERT into Distributed
......@@ -31,6 +31,12 @@ Also it accept the following settings:
- `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc).
- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. Default 0.
- `bytes_to_delay_insert` - if more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay. Default 0.
- `max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60.
!!! note "Note"
**Durability settings** (`fsync_...`):
......@@ -39,6 +45,12 @@ Also it accept the following settings:
- May significantly decrease the inserts' performance
- Affect writing the data stored inside Distributed table folder into the **node which accepted your insert**. If you need to have guarantees of writing data to underlying MergeTree tables - see durability settings (`...fsync...`) in `system.merge_tree_settings`
For **Insert limit settings** (`..._insert`) see also:
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting
- `bytes_to_throw_insert` handled before `bytes_to_delay_insert`, so you should not set it to the value less then `bytes_to_delay_insert`
Example:
``` sql
......
......@@ -100,6 +100,12 @@ namespace CurrentMetrics
amount -= value;
}
void add(Value value = 1)
{
what->fetch_add(value, std::memory_order_relaxed);
amount += value;
}
/// Subtract value before destructor.
void destroy()
{
......
......@@ -540,6 +540,7 @@
M(571, DATABASE_REPLICATION_FAILED) \
M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \
M(573, EPOLL_ERROR) \
M(574, DISTRIBUTED_TOO_MANY_PENDING_BYTES) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
......
......@@ -68,6 +68,9 @@
M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \
M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(DistributedDelayedInserts, "Number of times the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
M(DistributedRejectedInserts, "Number of times the INSERT of a block to a Distributed table was rejected with 'Too many bytes' exception due to high number of pending bytes.") \
M(DistributedDelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
M(DuplicatedInsertedBlocks, "Number of times the INSERTed block to a ReplicatedMergeTree table was deduplicated.") \
\
M(ZooKeeperInit, "") \
......
......@@ -208,7 +208,7 @@ BlockIO InterpreterInsertQuery::execute()
auto storage_dst = std::dynamic_pointer_cast<StorageDistributed>(table);
if (storage_src && storage_dst && storage_src->cluster_name == storage_dst->cluster_name)
if (storage_src && storage_dst && storage_src->getClusterName() == storage_dst->getClusterName())
{
is_distributed_insert_select = true;
......
......@@ -46,7 +46,7 @@ std::string getClusterName(const IAST & node)
}
String getClusterNameAndMakeLiteral(ASTPtr & node)
std::string getClusterNameAndMakeLiteral(ASTPtr & node)
{
String cluster_name = getClusterName(*node);
node = std::make_shared<ASTLiteral>(cluster_name);
......
......@@ -14,8 +14,8 @@ namespace DB
* This name will be parsed as an expression with an operator minus - not at all what you need.
* Therefore, consider this case separately.
*/
String getClusterName(const IAST & node);
std::string getClusterName(const IAST & node);
String getClusterNameAndMakeLiteral(ASTPtr & node);
std::string getClusterNameAndMakeLiteral(ASTPtr & node);
}
......@@ -276,7 +276,7 @@ void StorageDistributedDirectoryMonitor::flushAllData()
if (quit)
return;
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
const auto & files = getFiles();
if (!files.empty())
......@@ -303,7 +303,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
void StorageDistributedDirectoryMonitor::run()
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
bool do_sleep = false;
while (!quit)
......@@ -320,12 +320,12 @@ void StorageDistributedDirectoryMonitor::run()
{
do_sleep = !processFiles(files);
std::unique_lock metrics_lock(metrics_mutex);
std::lock_guard metrics_lock(metrics_mutex);
last_exception = std::exception_ptr{};
}
catch (...)
{
std::unique_lock metrics_lock(metrics_mutex);
std::lock_guard metrics_lock(metrics_mutex);
do_sleep = true;
++error_count;
......@@ -344,7 +344,7 @@ void StorageDistributedDirectoryMonitor::run()
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
std::unique_lock metrics_lock(metrics_mutex);
std::lock_guard metrics_lock(metrics_mutex);
error_count /= 2;
last_decrease_time = now;
......@@ -453,10 +453,15 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
}
}
metric_pending_files.changeTo(files.size());
{
std::unique_lock metrics_lock(metrics_mutex);
std::lock_guard metrics_lock(metrics_mutex);
if (files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count);
if (bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count);
metric_pending_files.changeTo(files.size());
files_count = files.size();
bytes_count = new_bytes_count;
}
......@@ -509,9 +514,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
}
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
Poco::File{file_path}.remove();
metric_pending_files.sub();
markAsSend(file_path);
LOG_TRACE(log, "Finished processing `{}`", file_path);
}
......@@ -661,7 +664,7 @@ struct StorageDistributedDirectoryMonitor::Batch
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
for (UInt64 file_index : file_indices)
Poco::File{file_index_to_path.at(file_index)}.remove();
parent.markAsSend(file_index_to_path.at(file_index));
}
else
{
......@@ -747,16 +750,24 @@ BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(con
return std::make_shared<DirectoryMonitorBlockInputStream>(file_name);
}
bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms)
{
if (quit)
return false;
{
std::lock_guard metrics_lock(metrics_mutex);
metric_pending_files.add();
bytes_count += file_size;
++files_count;
}
return task_handle->scheduleAfter(ms, false);
}
StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus() const
StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus()
{
std::unique_lock metrics_lock(metrics_mutex);
std::lock_guard metrics_lock(metrics_mutex);
return Status{
path,
......@@ -780,7 +791,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
batch.readText(in);
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
......@@ -850,7 +860,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
if (batch.isEnoughSize())
{
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
}
......@@ -858,7 +867,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
{
Batch & batch = kv.second;
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
{
......@@ -871,7 +879,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
}
}
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) const
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path)
{
const auto last_path_separator_pos = file_path.rfind('/');
const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
......@@ -884,12 +892,37 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path + "/broken/");
Poco::File{file_path}.renameTo(broken_file_path);
Poco::File file(file_path);
{
std::lock_guard metrics_lock(metrics_mutex);
size_t file_size = file.getSize();
--files_count;
bytes_count -= file_size;
}
file.renameTo(broken_file_path);
LOG_ERROR(log, "Renamed `{}` to `{}`", file_path, broken_file_path);
}
void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_path)
{
Poco::File file(file_path);
size_t file_size = file.getSize();
{
std::lock_guard metrics_lock(metrics_mutex);
metric_pending_files.sub();
--files_count;
bytes_count -= file_size;
}
file.remove();
}
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e)
{
/// mark file as broken if necessary
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
......@@ -912,7 +945,7 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_rela
std::lock_guard lock{mutex};
{
std::unique_lock metrics_lock(metrics_mutex);
std::lock_guard metrics_lock(metrics_mutex);
relative_path = new_relative_path;
path = disk->getPath() + relative_path + '/';
}
......
......@@ -48,7 +48,7 @@ public:
static BlockInputStreamPtr createStreamFromFile(const String & file_name);
/// For scheduling via DistributedBlockOutputStream
bool scheduleAfter(size_t ms);
bool addAndSchedule(size_t file_size, size_t ms);
/// system.distribution_queue interface
struct Status
......@@ -60,7 +60,7 @@ public:
size_t bytes_count;
bool is_blocked;
};
Status getStatus() const;
Status getStatus();
private:
void run();
......@@ -70,8 +70,9 @@ private:
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
void markAsBroken(const std::string & file_path) const;
bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const;
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);
bool maybeMarkAsBroken(const std::string & file_path, const Exception & e);
std::string getLoggerName() const;
......@@ -91,7 +92,7 @@ private:
struct BatchHeader;
struct Batch;
mutable std::mutex metrics_mutex;
std::mutex metrics_mutex;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
......
......@@ -114,6 +114,7 @@ Block DistributedBlockOutputStream::getHeader() const
void DistributedBlockOutputStream::writePrefix()
{
storage.delayInsertOrThrowIfNeeded();
}
......@@ -717,6 +718,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
auto dir_sync_guard = make_directory_sync_guard(*it);
}
auto file_size = Poco::File(first_file_tmp_path).getSize();
/// remove the temporary file, enabling the OS to reclaim inode after all threads
/// have removed their corresponding files
Poco::File(first_file_tmp_path).remove();
......@@ -726,7 +728,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
}
}
......
......@@ -17,6 +17,10 @@ class ASTStorage;
#define LIST_OF_DISTRIBUTED_SETTINGS(M) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \
/** Inserts settings. */ \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \
M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
......
......@@ -14,10 +14,12 @@
#include <Columns/ColumnConst.h>
#include <Common/Macros.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Common/formatReadable.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExpressionList.h>
......@@ -69,6 +71,13 @@ const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2;
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2;
}
namespace ProfileEvents
{
extern const Event DistributedRejectedInserts;
extern const Event DistributedDelayedInserts;
extern const Event DistributedDelayedInsertsMilliseconds;
}
namespace DB
{
......@@ -85,6 +94,8 @@ namespace ErrorCodes
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int INVALID_SHARD_ID;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace ActionLocks
......@@ -768,6 +779,14 @@ std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getD
return statuses;
}
std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
{
UInt64 total_bytes = 0;
for (const auto & status : getDirectoryMonitorsStatuses())
total_bytes += status.bytes_count;
return total_bytes;
}
size_t StorageDistributed::getShardCount() const
{
return getCluster()->getShardCount();
......@@ -967,6 +986,54 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
relative_data_path = new_path_to_table_data;
}
void StorageDistributed::delayInsertOrThrowIfNeeded() const
{
if (!distributed_settings.bytes_to_throw_insert &&
!distributed_settings.bytes_to_delay_insert)
return;
UInt64 total_bytes = *totalBytes(global_context.getSettingsRef());
if (distributed_settings.bytes_to_throw_insert && total_bytes > distributed_settings.bytes_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert));
}
if (distributed_settings.bytes_to_delay_insert && total_bytes > distributed_settings.bytes_to_delay_insert)
{
/// Step is 5% of the delay and minimal one second.
/// NOTE: max_delay_to_insert is in seconds, and step is in ms.
const size_t step_ms = std::min<double>(1., double(distributed_settings.max_delay_to_insert) * 1'000 * 0.05);
UInt64 delayed_ms = 0;
do {
delayed_ms += step_ms;
std::this_thread::sleep_for(std::chrono::milliseconds(step_ms));
} while (*totalBytes(global_context.getSettingsRef()) > distributed_settings.bytes_to_delay_insert && delayed_ms < distributed_settings.max_delay_to_insert*1000);
ProfileEvents::increment(ProfileEvents::DistributedDelayedInserts);
ProfileEvents::increment(ProfileEvents::DistributedDelayedInsertsMilliseconds, delayed_ms);
UInt64 new_total_bytes = *totalBytes(global_context.getSettingsRef());
LOG_INFO(log, "Too many bytes pending for async INSERT: was {}, now {}, INSERT was delayed to {} ms",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(new_total_bytes),
delayed_ms);
if (new_total_bytes > distributed_settings.bytes_to_delay_insert)
{
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_delay_insert={})",
formatReadableSizeWithBinarySuffix(new_total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_delay_insert));
}
}
}
void registerStorageDistributed(StorageFactory & factory)
{
......@@ -1033,6 +1100,17 @@ void registerStorageDistributed(StorageFactory & factory)
distributed_settings.loadFromQuery(*args.storage_def);
}
if (distributed_settings.max_delay_to_insert < 1)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"max_delay_to_insert cannot be less then 1");
if (distributed_settings.bytes_to_throw_insert && distributed_settings.bytes_to_delay_insert &&
distributed_settings.bytes_to_throw_insert <= distributed_settings.bytes_to_delay_insert)
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
}
return StorageDistributed::create(
args.table_id, args.columns, args.constraints,
remote_database, remote_table, cluster_name,
......
......@@ -41,6 +41,7 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute
friend struct ext::shared_ptr_helper<StorageDistributed>;
friend class DistributedBlockOutputStream;
friend class StorageDistributedDirectoryMonitor;
friend class StorageSystemDistributionQueue;
public:
~StorageDistributed() override;
......@@ -76,6 +77,7 @@ public:
unsigned /*num_streams*/) override;
bool supportsParallelInsert() const override { return true; }
std::optional<UInt64> totalBytes(const Settings &) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......@@ -83,7 +85,6 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void renameOnDisk(const String & new_path_to_table_data);
void checkAlterIsPossible(const AlterCommands & commands, const Context & context) const override;
......@@ -98,26 +99,70 @@ public:
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override;
ActionLock getActionLock(StorageActionBlockType type) override;
NamesAndTypesList getVirtuals() const override;
/// Used by InterpreterInsertQuery
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
/// Returns empty string if tables is used by TableFunctionRemote
std::string getClusterName() const { return cluster_name; }
ClusterPtr getCluster() const;
/// Used by InterpreterSystemQuery
void flushClusterNodesAllData(const Context & context);
/// Used by ClusterCopier
size_t getShardCount() const;
private:
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_ = {});
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_ = {});
void renameOnDisk(const String & new_path_to_table_data);
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
size_t getShardCount() const;
const String & getRelativeDataPath() const { return relative_data_path; }
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
std::string getClusterName() const { return cluster_name; } /// Returns empty string if tables is used by TableFunctionRemote
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const DiskPtr & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
///
/// Used by StorageSystemDistributionQueue
std::vector<StorageDistributedDirectoryMonitor::Status> getDirectoryMonitorsStatuses() const;
void flushClusterNodesAllData(const Context & context);
ClusterPtr getCluster() const;
static IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result);
/// Apply the following settings:
/// - optimize_skip_unused_shards
......@@ -125,14 +170,13 @@ public:
ClusterPtr getOptimizedCluster(const Context &, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const;
ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const StorageMetadataPtr & metadata_snapshot, const Context & context) const;
ActionLock getActionLock(StorageActionBlockType type) override;
NamesAndTypesList getVirtuals() const override;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }
void delayInsertOrThrowIfNeeded() const;
private:
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
......@@ -156,36 +200,6 @@ public:
ActionBlocker monitors_blocker;
protected:
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_ = {});
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_ = {});
String relative_data_path;
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
......
SELECT 'Check total_bytes/total_rows for Distributed';
CREATE TABLE check_system_tables_null (key Int) Engine=Null();
CREATE TABLE check_system_tables AS check_system_tables_null Engine=Distributed(test_shard_localhost, currentDatabase(), check_system_tables_null);
SYSTEM STOP DISTRIBUTED SENDS check_system_tables;
SELECT total_bytes, total_rows FROM system.tables WHERE database = currentDatabase() AND name = 'check_system_tables';
INSERT INTO check_system_tables SELECT * FROM numbers(1) SETTINGS prefer_localhost_replica=0;
SELECT total_bytes>0, total_rows FROM system.tables WHERE database = currentDatabase() AND name = 'check_system_tables';
SYSTEM FLUSH DISTRIBUTED check_system_tables;
SELECT total_bytes, total_rows FROM system.tables WHERE database = currentDatabase() AND name = 'check_system_tables';
DROP TABLE check_system_tables_null;
DROP TABLE check_system_tables;
......@@ -10,20 +10,15 @@ select * from system.distribution_queue;
select 'INSERT';
system stop distributed sends dist_01293;
insert into dist_01293 select * from numbers(10);
-- metrics updated only after distributed_directory_monitor_sleep_time_ms
set distributed_directory_monitor_sleep_time_ms=10;
-- 1 second should guarantee metrics update
-- XXX: but this is kind of quirk, way more better will be account this metrics without any delays.
select sleep(1) format Null;
select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue;
select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue where database = currentDatabase();
system flush distributed dist_01293;
select 'FLUSH';
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue;
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select 'UNBLOCK';
system start distributed sends dist_01293;
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue;
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
drop table null_01293;
drop table dist_01293;
drop table if exists dist_01670;
drop table if exists data_01670;
create table data_01670 (key Int) engine=Null();
create table dist_01670 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01670) settings bytes_to_throw_insert=1;
system stop distributed sends dist_01670;
-- first batch is always OK, since there is no pending bytes yet
insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0;
-- second will fail, because of bytes_to_throw_insert=1
-- (previous block definitelly takes more, since it has header)
insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 }
system flush distributed dist_01670;
drop table dist_01670;
drop table data_01670;
max_delay_to_insert will throw
max_delay_to_insert will succeed
flushed
#!/usr/bin/env bash
# NOTE: $SECONDS accuracy is second, so we need some delta, hence -1 in time conditions.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
max_delay_to_insert=5
${CLICKHOUSE_CLIENT} -nq "
drop table if exists dist_01675;
drop table if exists data_01675;
"
${CLICKHOUSE_CLIENT} -nq "
create table data_01675 (key Int) engine=Null();
create table dist_01675 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01675) settings bytes_to_delay_insert=1, max_delay_to_insert=$max_delay_to_insert;
system stop distributed sends dist_01675;
"
#
# Case 1: max_delay_to_insert will throw.
#
echo "max_delay_to_insert will throw"
start_seconds=$SECONDS
${CLICKHOUSE_CLIENT} --testmode -nq "
-- first batch is always OK, since there is no pending bytes yet
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0;
-- second will fail, because of bytes_to_delay_insert=1 and max_delay_to_insert=5,
-- while distributed sends is stopped.
--
-- (previous block definitelly takes more, since it has header)
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 }
system flush distributed dist_01675;
"
end_seconds=$SECONDS
if (( (end_seconds-start_seconds)<(max_delay_to_insert-1) )); then
echo "max_delay_to_insert was not satisfied ($end_seconds-$start_seconds)"
fi
#
# Case 2: max_delay_to_insert will finally finished.
#
echo "max_delay_to_insert will succeed"
max_delay_to_insert=10
${CLICKHOUSE_CLIENT} -nq "
drop table dist_01675;
create table dist_01675 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01675) settings bytes_to_delay_insert=1, max_delay_to_insert=$max_delay_to_insert;
system stop distributed sends dist_01675;
"
flush_delay=4
function flush_distributed_worker()
{
sleep $flush_delay
${CLICKHOUSE_CLIENT} -q "system flush distributed dist_01675"
echo flushed
}
flush_distributed_worker &
start_seconds=$SECONDS
${CLICKHOUSE_CLIENT} --testmode -nq "
-- first batch is always OK, since there is no pending bytes yet
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0;
-- second will succcedd, due to SYSTEM FLUSH DISTRIBUTED in background.
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0;
"
end_seconds=$SECONDS
wait
if (( (end_seconds-start_seconds)<(flush_delay-1) )); then
echo "max_delay_to_insert was not wait flush_delay ($end_seconds-$start_seconds)"
fi
if (( (end_seconds-start_seconds)>=(max_delay_to_insert-1) )); then
echo "max_delay_to_insert was overcommited ($end_seconds-$start_seconds)"
fi
${CLICKHOUSE_CLIENT} -nq "
drop table dist_01675;
drop table data_01675;
"
......@@ -15,6 +15,7 @@
00732_base64_functions
00746_hashing_tuples
00751_hashing_ints
00753_distributed_system_columns_and_system_tables
00800_function_java_hash
00800_low_cardinality_distributed_insert
00821_distributed_storage_with_join_on
......@@ -197,7 +198,9 @@
01181_db_atomic_drop_on_cluster
01658_test_base64Encode_mysql_compatibility
01659_test_base64Decode_mysql_compatibility
01670_distributed_bytes_to_throw_insert
01674_htm_xml_coarse_parse
01675_distributed_bytes_to_delay_insert
01675_data_type_coroutine
01676_clickhouse_client_autocomplete
01671_aggregate_function_group_bitmap_data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册