提交 9ebf0b63 编写于 作者: A Alexey Milovidov

Added IStorage::startup method to avoid starting replication and merging...

Added IStorage::startup method to avoid starting replication and merging threads too early during server startup [#CLICKHOUSE-2].
上级 8e261e34
......@@ -23,6 +23,8 @@ struct CollectTables;
* SELECT array FROM t ARRAY JOIN array array -> array
* SELECT nested.elem FROM t ARRAY JOIN nested nested -> nested
* SELECT elem FROM t ARRAY JOIN [1, 2, 3] AS elem elem -> [1, 2, 3]
*
* Does not analyze arrayJoin functions.
*/
struct AnalyzeArrayJoins
{
......
......@@ -32,8 +32,8 @@ try
auto system_database = std::make_shared<DatabaseMemory>("system");
context.addDatabase("system", system_database);
context.setCurrentDatabase("system");
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
AnalyzeResultOfQuery analyzer;
analyzer.process(ast, context);
......
......@@ -33,8 +33,8 @@ try
auto system_database = std::make_shared<DatabaseMemory>("system");
context.addDatabase("system", system_database);
context.setCurrentDatabase("system");
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
CollectAliases collect_aliases;
collect_aliases.process(ast);
......
......@@ -38,8 +38,8 @@ try
auto system_database = std::make_shared<DatabaseMemory>("system");
context.addDatabase("system", system_database);
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
context.setCurrentDatabase("system");
AnalyzeLambdas analyze_lambdas;
......
......@@ -39,8 +39,8 @@ try
auto system_database = std::make_shared<DatabaseMemory>("system");
context.addDatabase("system", system_database);
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
context.setCurrentDatabase("system");
AnalyzeLambdas analyze_lambdas;
......
......@@ -198,6 +198,7 @@ public:
/// Create table
NamesAndTypesListPtr columns = std::make_shared<NamesAndTypesList>(sample_block.getColumnsList());
StoragePtr storage = StorageMemory::create(data.second, columns);
storage->startup();
context.addExternalTable(data.second, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
......
......@@ -106,6 +106,7 @@ int main(int argc, char ** argv)
/// create an object of an existing hit log table
StoragePtr table = StorageLog::create("./", "HitLog", std::make_shared<NamesAndTypesList>(names_and_types_list));
table->startup();
/// read from it, apply the expression, filter, and write in tsv form to the console
......
......@@ -96,6 +96,7 @@ try
/// create an object of an existing hit log table
StoragePtr table = StorageLog::create("./", "HitLog", std::make_shared<NamesAndTypesList>(names_and_types_list));
table->startup();
/// read from it
if (argc == 2 && 0 == strcmp(argv[1], "read"))
......
......@@ -108,6 +108,7 @@ try
/// create an object of an existing hit log table
StoragePtr table = StorageLog::create("./", "HitLog", std::make_shared<NamesAndTypesList>(names_and_types_list));
table->startup();
/// read from it, sort it, and write it in tsv form to the console
......
......@@ -414,6 +414,8 @@ StoragePtr DatabaseCloud::tryGetTable(const String & table_name)
definition, name, data_path, context, false,
"in zookeeper node " + zookeeper_path + "/table_definitions/" + hashToHex(table_hash));
table->startup();
local_tables_cache.emplace(table_name, table);
return table;
}
......
......@@ -181,6 +181,60 @@ void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, b
task();
}
if (thread_pool)
thread_pool->wait();
/// After all tables was basically initialized, startup them.
startupTables(thread_pool);
}
void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
{
LOG_INFO(log, "Starting up tables.");
StopwatchWithLock watch;
std::atomic<size_t> tables_processed {0};
size_t total_tables = tables.size();
auto task_function = [&](Tables::iterator begin, Tables::iterator end)
{
for (Tables::iterator it = begin; it != end; ++it)
{
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.lockTestAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
it->second->startup();
}
};
const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
Tables::iterator begin = tables.begin();
for (size_t i = 0; i < num_bunches; ++i)
{
auto end = begin;
if (i + 1 == num_bunches)
end = tables.end();
else
std::advance(end, bunch_size);
auto task = std::bind(task_function, begin, end);
if (thread_pool)
thread_pool->schedule(task);
else
task();
begin = end;
}
if (thread_pool)
thread_pool->wait();
}
......
......@@ -45,6 +45,9 @@ public:
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier) override;
private:
void startupTables(ThreadPool * thread_pool);
};
}
......@@ -24,6 +24,7 @@ String getTableDefinitionFromCreateQuery(const ASTPtr & query);
/** Create a table by its definition, without using InterpreterCreateQuery.
* (InterpreterCreateQuery has more complex functionality, and it can not be used if the database has not been created yet)
* Returns the table name and the table itself.
* You must subsequently call IStorage::startup method to use the table.
*/
std::pair<String, StoragePtr> createTableFromDefinition(
const String & definition,
......
......@@ -534,6 +534,7 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name_or_t
NamesAndTypesListPtr columns = std::make_shared<NamesAndTypesList>(sample.getColumnsList());
StoragePtr external_storage = StorageMemory::create(external_table_name, columns);
external_storage->startup();
/** There are two ways to perform distributed GLOBAL subqueries.
*
......
......@@ -534,6 +534,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
context.getDatabase(database_name)->createTable(table_name, res, query_ptr, storage_name, context.getSettingsRef());
}
res->startup();
/// If the CREATE SELECT query is, insert the data into the table
if (create.select && storage_name != "View" && (storage_name != "MaterializedView" || create.is_populate))
{
......
......@@ -40,7 +40,7 @@ try
DatabasePtr system = std::make_shared<DatabaseOrdinary>("system", "./metadata/system/");
context.addDatabase("system", system);
system->loadTables(context, nullptr, false);
system->attachTable("one", StorageSystemOne::create("one"));
system->attachTable("one", StorageSystemOne::create("one"));
system->attachTable("numbers", StorageSystemNumbers::create("numbers"));
context.setCurrentDatabase("default");
......
......@@ -630,6 +630,7 @@ bool TCPHandler::receiveData()
{
NamesAndTypesListPtr columns = std::make_shared<NamesAndTypesList>(block.getColumnsList());
storage = StorageMemory::create(external_table_name, columns);
storage->startup();
query_context.addExternalTable(external_table_name, storage);
}
/// The data will be written directly to the table.
......
......@@ -74,8 +74,8 @@ using TableFullWriteLockPtr = std::pair<TableDataWriteLockPtr, TableStructureWri
/** Storage. Responsible for
* - storage of the table data;
* - the definition in which file (or not the file) the data is stored;
* - search for data and update data;
* - the definition in which files (or not in files) the data is stored;
* - data lookups and appends;
* - data storage structure (compression, etc.)
* - concurrent access to data (locks, etc.)
*/
......@@ -85,7 +85,7 @@ public:
/// The main name of the table type (for example, StorageMergeTree).
virtual std::string getName() const = 0;
/** Returns true if the store receives data from a remote server or servers. */
/** Returns true if the storage receives data from a remote server or servers. */
virtual bool isRemote() const { return false; }
/** Returns true if the storage supports queries with the SAMPLE section. */
......@@ -97,7 +97,7 @@ public:
/** Returns true if the storage supports queries with the PREWHERE section. */
virtual bool supportsPrewhere() const { return false; }
/** Returns true if the storage supports multiple replicas. */
/** Returns true if the storage supports read from multiple replicas. Assumed isRemote. */
virtual bool supportsParallelReplicas() const { return false; }
/** Does not allow you to change the structure or name of the table.
......@@ -268,9 +268,19 @@ public:
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** If you have to do some complicated work when destroying an object - do it in advance.
/** If the table have to do some complicated work on startup,
* that must be postponed after creation of table object
* (like launching some background threads),
* do it in this method.
* You should call this method after creation of object.
* By default, does nothing.
* Cannot be called simultaneously by multiple threads.
*/
virtual void startup() {}
/** If the table have to do some complicated work when destroying an object - do it in advance.
* For example, if the table contains any threads for background work - ask them to complete and wait for completion.
* By default, do nothing.
* By default, does nothing.
* Can be called simultaneously from different threads, even after a call to drop().
*/
virtual void shutdown() {}
......
......@@ -85,7 +85,7 @@ private:
/** Parts for which you want to check one of two:
* - If we have the part, check, its data with its checksums, and them with ZooKeeper.
* - If we do not have a part, check to see if it (or the part covering it) exists anywhere.
* - If we do not have a part, check to see if it (or the part covering it) exists anywhere on another replicas.
*/
StringSet parts_set;
......
......@@ -70,8 +70,7 @@ StorageBuffer::StorageBuffer(const std::string & name_, NamesAndTypesListPtr col
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
destination_database(destination_database_), destination_table(destination_table_),
no_destination(destination_database.empty() && destination_table.empty()),
log(&Logger::get("StorageBuffer (" + name + ")")),
flush_thread(&StorageBuffer::flushThread, this)
log(&Logger::get("StorageBuffer (" + name + ")"))
{
}
......@@ -346,6 +345,12 @@ BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & query, const Settings &
}
void StorageBuffer::startup()
{
flush_thread = std::thread(&StorageBuffer::flushThread, this);
}
void StorageBuffer::shutdown()
{
shutdown_event.set();
......
......@@ -25,15 +25,15 @@ class Context;
* The buffer is a set of num_shards blocks.
* When writing, select the block number by the remainder of the `ThreadNumber` division by `num_shards` (or one of the others),
* and add rows to the corresponding block.
* When using a block, it is blocked by some mutex. If during write the corresponding block is already occupied
* - try to block the next block clockwise, and so no more than `num_shards` times (further blocked).
* When using a block, it is locked by some mutex. If during write the corresponding block is already occupied
* - try to lock the next block in a round-robin fashion, and so no more than `num_shards` times (then wait for lock).
* Thresholds are checked on insertion, and, periodically, in the background thread (to implement time thresholds).
* Thresholds act independently for each shard. Each shard can be flushed independently of the others.
* If a block is inserted into the table, which itself exceeds the max-thresholds, it is written directly to the subordinate table without buffering.
* Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
* and a part of 800,000 lines is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table
* and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
*
* When you destroy a Buffer type table and when you quit, all data is discarded.
* When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
* The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
*/
class StorageBuffer : private ext::shared_ptr_helper<StorageBuffer>, public IStorage
......@@ -52,7 +52,7 @@ public:
};
/** num_shards - the level of internal parallelism (the number of independent buffers)
* The buffer is reset if all minimum thresholds or at least one of the maximum thresholds are exceeded.
* The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded.
*/
static StoragePtr create(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
......@@ -77,7 +77,8 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
/// Resets all buffers to the subordinate table.
void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.
void shutdown() override;
bool optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) override;
......
......@@ -138,8 +138,6 @@ StorageDistributed::StorageDistributed(
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
}
......@@ -163,8 +161,6 @@ StorageDistributed::StorageDistributed(
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
}
......@@ -186,8 +182,7 @@ StoragePtr StorageDistributed::create(
materialized_columns_, alias_columns_, column_defaults_,
remote_database_, remote_table_,
cluster_name_, context_,
sharding_key_, data_path_
);
sharding_key_, data_path_);
}
......@@ -201,8 +196,7 @@ StoragePtr StorageDistributed::create(
{
auto res = make_shared(
name_, columns_, remote_database_,
remote_table_, String{}, context_
);
remote_table_, String{}, context_);
res->owned_cluster = owned_cluster_;
......@@ -288,6 +282,13 @@ void StorageDistributed::alter(const AlterCommands & params, const String & data
}
void StorageDistributed::startup()
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
}
void StorageDistributed::shutdown()
{
directory_monitors.clear();
......
......@@ -82,6 +82,7 @@ public:
/// the structure of the sub-table is not checked
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void startup() override;
void shutdown() override;
void reshardPartitions(
......@@ -136,9 +137,6 @@ private:
ClusterPtr getCluster() const;
/// Get monotonically increasing string to name files with data to be written to remote servers.
String getMonotonicFileName();
String name;
NamesAndTypesListPtr columns;
......
......@@ -10,8 +10,9 @@ namespace DB
class Context;
/** Allows you to create a table by the name of the engine.
/** Allows you to create a table by the name and parameters of the engine.
* In 'columns', 'materialized_columns', etc., Nested data structures must be flattened.
* You should subsequently call IStorage::startup method to work with table.
*/
class StorageFactory : public Singleton<StorageFactory>
{
......
......@@ -58,7 +58,7 @@ StorageMergeTree::StorageMergeTree(
data.loadDataParts(has_force_restore_data_flag);
data.clearOldParts();
/// Temporary directories contain unfinalized results of Merges (after forced restart)
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
data.clearOldTemporaryDirectories(0);
......@@ -81,15 +81,18 @@ StoragePtr StorageMergeTree::create(
bool has_force_restore_data_flag_,
const MergeTreeSettings & settings_)
{
auto res = make_shared(
return make_shared(
path_, database_name_, table_name_,
columns_, materialized_columns_, alias_columns_, column_defaults_, attach,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_, has_force_restore_data_flag_, settings_
);
res->merge_task_handle = res->background_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res.get()));
}
return res;
void StorageMergeTree::startup()
{
merge_task_handle = background_pool.addTask([this] { return mergeTask(); });
}
......
......@@ -48,6 +48,7 @@ public:
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
void startup() override;
void shutdown() override;
~StorageMergeTree() override;
......
......@@ -221,8 +221,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }
),
[this] () { clearOldPartsAndRemoveFromZK(); }),
reader(data), writer(data, context), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this),
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
......@@ -296,16 +295,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
createNewZooKeeperNodes();
queue.initialize(
zookeeper_path, replica_path,
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
data.getDataParts(), current_zookeeper);
queue.pullLogsToQueue(current_zookeeper, nullptr);
/// In this thread replica will be activated.
restarting_thread = std::make_unique<ReplicatedMergeTreeRestartingThread>(*this);
}
......@@ -2202,6 +2191,23 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
}
void StorageReplicatedMergeTree::startup()
{
if (is_readonly)
return;
queue.initialize(
zookeeper_path, replica_path,
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
data.getDataParts(), current_zookeeper);
queue.pullLogsToQueue(current_zookeeper, nullptr);
/// In this thread replica will be activated.
restarting_thread = std::make_unique<ReplicatedMergeTreeRestartingThread>(*this);
}
void StorageReplicatedMergeTree::shutdown()
{
/** This must be done before waiting for restarting_thread.
......
......@@ -74,7 +74,7 @@ class StorageReplicatedMergeTree : private ext::shared_ptr_helper<StorageReplica
friend class ext::shared_ptr_helper<StorageReplicatedMergeTree>;
public:
/** If !attach, either creates a new table in ZK, or adds a replica to an existing table.
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
static StoragePtr create(
const String & zookeeper_path_,
......@@ -94,6 +94,7 @@ public:
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
void startup() override;
void shutdown() override;
~StorageReplicatedMergeTree() override;
......
......@@ -77,8 +77,7 @@ StorageTrivialBuffer::StorageTrivialBuffer(const std::string & name_, NamesAndTy
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
destination_database(destination_database_), destination_table(destination_table_),
no_destination(destination_database.empty() && destination_table.empty()),
log(&Logger::get("TrivialBuffer (" + name + ")")),
flush_thread(&StorageTrivialBuffer::flushThread, this)
log(&Logger::get("TrivialBuffer (" + name + ")"))
{
zookeeper->createAncestors(path_in_zk_for_deduplication);
zookeeper->createOrUpdate(path_in_zk_for_deduplication, {}, zkutil::CreateMode::Persistent);
......@@ -357,6 +356,12 @@ BlockOutputStreamPtr StorageTrivialBuffer::write(const ASTPtr & query, const Set
return std::make_shared<TrivialBufferBlockOutputStream>(*this);
}
void StorageTrivialBuffer::startup()
{
flush_thread = std::thread(&StorageTrivialBuffer::flushThread, this);
}
void StorageTrivialBuffer::shutdown()
{
shutdown_event.set();
......@@ -421,7 +426,7 @@ bool StorageTrivialBuffer::checkThresholds(
}
bool StorageTrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const
const time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
......
......@@ -74,7 +74,9 @@ public:
bool checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const;
/// Writes all the blocks in buffer into the destination table.
/// Start flushing thread.
void startup() override;
/// Writes all the blocks in buffer into the destination table. Stop flushing thread.
void shutdown() override;
bool optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) override;
......@@ -89,7 +91,7 @@ public:
/// Does not check or alter the structure of dependent table.
void alter(const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context) override;
const String & table_name, const Context & context) override;
class ZookeeperDeduplicationController
{
......
......@@ -99,6 +99,7 @@ try
/// create a hit log table
StoragePtr table = StorageLog::create("./", "HitLog", std::make_shared<NamesAndTypesList>(names_and_types_list));
table->startup();
/// create a description of how to read data from the tab separated dump
......
......@@ -48,6 +48,7 @@ try
names_and_types, {}, {}, ColumnDefaults{}, false,
context, primary_expr, "d",
nullptr, 101, params, false, {});
table->startup();
/// write into it
{
......
......@@ -25,6 +25,7 @@ try
names_and_types->push_back(NameAndTypePair("b", std::make_shared<DataTypeUInt8>()));
StoragePtr table = StorageLog::create("./", "test", names_and_types);
table->startup();
/// write into it
{
......
......@@ -75,12 +75,14 @@ StoragePtr TableFunctionMerge::execute(const ASTPtr & ast_function, const Contex
String source_database = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
String table_name_regexp = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
return StorageMerge::create(
auto res = StorageMerge::create(
getName(),
std::make_shared<NamesAndTypesList>(chooseColumns(source_database, table_name_regexp, context)),
source_database,
table_name_regexp,
context);
res->startup();
return res;
}
}
......@@ -274,13 +274,15 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
auto cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password);
return StorageDistributed::create(
auto res = StorageDistributed::create(
getName(),
std::make_shared<NamesAndTypesList>(getStructureOfRemoteTable(*cluster, remote_database, remote_table, context)),
remote_database,
remote_table,
cluster,
context);
res->startup();
return res;
}
}
......@@ -71,13 +71,15 @@ StoragePtr TableFunctionShardByHash::execute(const ASTPtr & ast_function, const
std::shared_ptr<Cluster> shard(cluster->getClusterWithSingleShard(shard_index).release());
return StorageDistributed::create(
auto res = StorageDistributed::create(
getName(),
std::make_shared<NamesAndTypesList>(getStructureOfRemoteTable(*shard, remote_database, remote_table, context)),
remote_database,
remote_table,
shard,
context);
res->startup();
return res;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册