未验证 提交 1d9d586e 编写于 作者: A Amos Bird

Make global_context consistent.

上级 bde805cb
......@@ -44,10 +44,10 @@ namespace
}
}
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & global_context_)
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context_)
: IDatabase(name_)
, log(&Poco::Logger::get("DatabaseDictionary(" + database_name + ")"))
, global_context(global_context_.getGlobalContext())
, global_context(context_.getGlobalContext())
{
}
......
......@@ -22,7 +22,7 @@ namespace DB
class DatabaseDictionary final : public IDatabase
{
public:
DatabaseDictionary(const String & name_, const Context & global_context);
DatabaseDictionary(const String & name_, const Context & context_);
String getEngineName() const override
{
......
......@@ -13,6 +13,7 @@
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Parsers/ASTCreateQuery.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ParserCreateQuery.h>
......
......@@ -120,7 +120,7 @@ XDBCDictionarySource::XDBCDictionarySource(
, invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
, bridge_helper{bridge_}
, timeouts{ConnectionTimeouts::getHTTPTimeouts(context_)}
, global_context(context_)
, global_context(context_.getGlobalContext())
{
bridge_url = bridge_helper->getMainURI();
......
......@@ -336,9 +336,9 @@ struct ContextShared
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
......@@ -484,7 +484,7 @@ Context Context::createGlobal(ContextShared * shared)
void Context::initGlobal()
{
DatabaseCatalog::init(this);
DatabaseCatalog::init(*this);
TemporaryLiveViewCleaner::init(*this);
}
......@@ -1401,7 +1401,7 @@ void Context::dropCaches() const
shared->mark_cache->reset();
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool()
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
......@@ -1443,7 +1443,7 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
return task_settings;
}
BackgroundSchedulePool & Context::getSchedulePool()
BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
......@@ -1454,7 +1454,7 @@ BackgroundSchedulePool & Context::getSchedulePool()
return *shared->schedule_pool;
}
BackgroundSchedulePool & Context::getDistributedSchedulePool()
BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
......
......@@ -519,9 +519,9 @@ public:
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;
BackgroundSchedulePool & getBufferFlushSchedulePool();
BackgroundSchedulePool & getSchedulePool();
BackgroundSchedulePool & getDistributedSchedulePool();
BackgroundSchedulePool & getBufferFlushSchedulePool() const;
BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getDistributedSchedulePool() const;
/// Has distributed_ddl configuration or not.
bool hasDistributedDDL() const;
......
......@@ -39,7 +39,7 @@ namespace ErrorCodes
TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
: global_context(&context_.getGlobalContext())
: global_context(context_.getGlobalContext())
, temporary_tables(DatabaseCatalog::instance().getDatabaseForTemporaryTables().get())
{
ASTPtr original_create;
......@@ -62,7 +62,7 @@ TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
}
auto table_id = StorageID(DatabaseCatalog::TEMPORARY_DATABASE, global_name, id);
auto table = creator(table_id);
temporary_tables->createTable(*global_context, global_name, table, original_create);
temporary_tables->createTable(global_context, global_name, table, original_create);
table->startup();
}
......@@ -107,7 +107,7 @@ TemporaryTableHolder & TemporaryTableHolder::operator = (TemporaryTableHolder &&
TemporaryTableHolder::~TemporaryTableHolder()
{
if (id != UUIDHelpers::Nil)
temporary_tables->dropTable(*global_context, "_tmp_" + toString(id));
temporary_tables->dropTable(global_context, "_tmp_" + toString(id));
}
StorageID TemporaryTableHolder::getGlobalTableID() const
......@@ -117,7 +117,7 @@ StorageID TemporaryTableHolder::getGlobalTableID() const
StoragePtr TemporaryTableHolder::getTable() const
{
auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), *global_context);
auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), global_context);
if (!table)
throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR);
return table;
......@@ -126,13 +126,13 @@ StoragePtr TemporaryTableHolder::getTable() const
void DatabaseCatalog::loadDatabases()
{
drop_delay_sec = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
drop_delay_sec = global_context.getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, *global_context);
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, global_context);
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
loadMarkedAsDroppedTables();
auto task_holder = global_context->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
auto task_holder = global_context.getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(task_holder));
(*drop_task)->activate();
std::lock_guard lock{tables_marked_dropped_mutex};
......@@ -328,11 +328,11 @@ DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool d
if (drop)
{
/// Delete the database.
db->drop(*global_context);
db->drop(global_context);
/// Old ClickHouse versions did not store database.sql files
Poco::File database_metadata_file(
global_context->getPath() + "metadata/" + escapeForFileName(database_name) + ".sql");
global_context.getPath() + "metadata/" + escapeForFileName(database_name) + ".sql");
if (database_metadata_file.exists())
database_metadata_file.remove(false);
}
......@@ -505,14 +505,12 @@ void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database,
std::unique_ptr<DatabaseCatalog> DatabaseCatalog::database_catalog;
DatabaseCatalog::DatabaseCatalog(Context * global_context_)
DatabaseCatalog::DatabaseCatalog(Context & global_context_)
: global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog"))
{
if (!global_context)
throw Exception("DatabaseCatalog is not initialized. It's a bug.", ErrorCodes::LOGICAL_ERROR);
}
DatabaseCatalog & DatabaseCatalog::init(Context * global_context_)
DatabaseCatalog & DatabaseCatalog::init(Context & global_context_)
{
if (database_catalog)
{
......@@ -651,7 +649,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
/// we should load them and enqueue cleanup to remove data from store/ and metadata from ZooKeeper
std::map<String, StorageID> dropped_metadata;
String path = global_context->getPath() + "metadata_dropped/";
String path = global_context.getPath() + "metadata_dropped/";
if (!std::filesystem::exists(path))
{
......@@ -706,7 +704,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
String DatabaseCatalog::getPathForDroppedMetadata(const StorageID & table_id) const
{
return global_context->getPath() + "metadata_dropped/" +
return global_context.getPath() + "metadata_dropped/" +
escapeForFileName(table_id.getDatabaseName()) + "." +
escapeForFileName(table_id.getTableName()) + "." +
toString(table_id.uuid) + ".sql";
......@@ -729,7 +727,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
{
/// Try load table from metadata to drop it correctly (e.g. remove metadata from zk or remove data from all volumes)
LOG_INFO(log, "Trying load partially dropped table {} from {}", table_id.getNameForLogs(), dropped_metadata_path);
ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, *global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false);
ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false);
auto * create = typeid_cast<ASTCreateQuery *>(ast.get());
assert(!create || create->uuid == table_id.uuid);
......@@ -740,7 +738,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
create->table = table_id.table_name;
try
{
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, *global_context, false).second;
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, global_context, false).second;
table->is_dropped = true;
}
catch (...)
......@@ -867,7 +865,7 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
/// Even if table is not loaded, try remove its data from disk.
/// TODO remove data from all volumes
String data_path = global_context->getPath() + "store/" + getPathForUUID(table.table_id.uuid);
String data_path = global_context.getPath() + "store/" + getPathForUUID(table.table_id.uuid);
Poco::File table_data_dir{data_path};
if (table_data_dir.exists())
{
......@@ -901,7 +899,7 @@ String DatabaseCatalog::resolveDictionaryName(const String & name) const
String maybe_database_name = name.substr(0, pos);
String maybe_table_name = name.substr(pos + 1);
auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, *global_context);
auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, global_context);
if (!db_and_table.first)
return name;
assert(db_and_table.second);
......
......@@ -73,7 +73,6 @@ struct TemporaryTableHolder : boost::noncopyable
{
typedef std::function<StoragePtr(const StorageID &)> Creator;
TemporaryTableHolder() = default;
TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {});
/// Creates temporary table with Engine=Memory
......@@ -95,7 +94,7 @@ struct TemporaryTableHolder : boost::noncopyable
operator bool () const { return id != UUIDHelpers::Nil; }
const Context * global_context = nullptr;
const Context & global_context;
IDatabase * temporary_tables = nullptr;
UUID id = UUIDHelpers::Nil;
};
......@@ -111,7 +110,7 @@ public:
static constexpr const char * TEMPORARY_DATABASE = "_temporary_and_external_tables";
static constexpr const char * SYSTEM_DATABASE = "system";
static DatabaseCatalog & init(Context * global_context_);
static DatabaseCatalog & init(Context & global_context_);
static DatabaseCatalog & instance();
static void shutdown();
......@@ -199,7 +198,7 @@ private:
// make emplace(global_context_) compile with private constructor ¯\_(ツ)_/¯.
static std::unique_ptr<DatabaseCatalog> database_catalog;
DatabaseCatalog(Context * global_context_);
DatabaseCatalog(Context & global_context_);
void assertDatabaseExistsUnlocked(const String & database_name) const;
void assertDatabaseDoesntExistUnlocked(const String & database_name) const;
......@@ -240,7 +239,7 @@ private:
using UUIDToDatabaseMap = std::unordered_map<UUID, DatabasePtr>;
/// For some reason Context is required to get Storage from Database object
Context * global_context;
Context & global_context;
mutable std::mutex databases_mutex;
ViewDependencies view_dependencies;
......
......@@ -42,8 +42,8 @@ namespace
}
ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & global_context_)
: global_context(global_context_.getGlobalContext())
ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & context_)
: global_context(context_.getGlobalContext())
, database_name(database_.getDatabaseName())
, database(database_)
{
......
......@@ -83,13 +83,13 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
: storage(storage_)
, pool(std::move(pool_))
, path{path_ + '/'}
, should_batch_inserts(storage.global_context->getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context->getSettingsRef().min_insert_block_size_bytes)
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
, max_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, log{&Poco::Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
......@@ -249,7 +249,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
auto pools = createPoolsForAddresses(name, pool_factory);
const auto settings = storage.global_context->getSettings();
const auto settings = storage.global_context.getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
......@@ -308,7 +308,7 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `{}`", file_path);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context->getSettingsRef());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
try
{
......@@ -483,7 +483,7 @@ struct StorageDistributedDirectoryMonitor::Batch
Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
}
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context->getSettingsRef());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
auto connection = parent.pool->get(timeouts);
bool batch_broken = false;
......
......@@ -6,6 +6,7 @@
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <common/logger_useful.h>
#include <Interpreters/Context.h>
namespace DB
{
......
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
......
#include "KafkaBlockOutputStream.h"
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
......
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
......
......@@ -33,6 +33,7 @@
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <librdkafka/rdkafka.h>
#include <common/getFQDNOrHostName.h>
......@@ -169,7 +170,7 @@ namespace
StorageKafka::StorageKafka(
const StorageID & table_id_,
Context & context_,
const Context & context_,
const ColumnsDescription & columns_,
std::unique_ptr<KafkaSettings> kafka_settings_)
: IStorage(table_id_)
......
......@@ -4,7 +4,6 @@
#include <Storages/IStorage.h>
#include <Storages/Kafka/Buffer_fwd.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Interpreters/Context.h>
#include <Common/SettingsChanges.h>
#include <Poco/Semaphore.h>
......@@ -69,13 +68,13 @@ public:
protected:
StorageKafka(
const StorageID & table_id_,
Context & context_,
const Context & context_,
const ColumnsDescription & columns_,
std::unique_ptr<KafkaSettings> kafka_settings_);
private:
// Configuration and state
Context & global_context;
const Context & global_context;
std::unique_ptr<KafkaSettings> kafka_settings;
const Names topics;
const String brokers;
......
......@@ -134,7 +134,7 @@ MergeTreeData::MergeTreeData(
bool attach,
BrokenPartCallback broken_part_callback_)
: IStorage(table_id_)
, global_context(context_)
, global_context(context_.getGlobalContext())
, merging_params(merging_params_)
, require_part_metadata(require_part_metadata_)
, relative_data_path(relative_data_path_)
......
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
namespace ErrorCodes
......
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
......@@ -38,7 +37,7 @@ public:
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
Context context;
const Context & context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;
......
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
......@@ -23,7 +22,7 @@ public:
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
Context context;
const Context & context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
};
......
......@@ -69,7 +69,7 @@ namespace ExchangeType
StorageRabbitMQ::StorageRabbitMQ(
const StorageID & table_id_,
Context & context_,
const Context & context_,
const ColumnsDescription & columns_,
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
: IStorage(table_id_)
......
......@@ -67,12 +67,12 @@ public:
protected:
StorageRabbitMQ(
const StorageID & table_id_,
Context & context_,
const Context & context_,
const ColumnsDescription & columns_,
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_);
private:
Context global_context;
const Context & global_context;
Context rabbitmq_context;
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
......
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <Core/Block.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#include <amqpcpp.h>
#include <uv.h>
......@@ -25,7 +27,7 @@ namespace ErrorCodes
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address_,
Context & global_context,
const Context & global_context,
const std::pair<String, String> & login_password_,
const Names & routing_keys_,
const String & exchange_name_,
......
......@@ -9,7 +9,7 @@
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Core/Names.h>
namespace DB
{
......@@ -19,7 +19,7 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer
public:
WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address_,
Context & global_context,
const Context & global_context,
const std::pair<String, String> & login_password_,
const Names & routing_keys_,
const String & exchange_name_,
......
......@@ -65,14 +65,14 @@ StorageBuffer::StorageBuffer(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const Context & context_,
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
, global_context(context_)
, global_context(context_.getGlobalContext())
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
......
......@@ -9,7 +9,6 @@
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Event.h>
#include <Interpreters/Context.h>
namespace Poco { class Logger; }
......@@ -82,7 +81,13 @@ public:
void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.
void shutdown() override;
bool optimize(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Context & context) override;
bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override
......@@ -112,7 +117,7 @@ public:
private:
Context global_context;
const Context & global_context;
struct Buffer
{
......@@ -165,7 +170,7 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const Context & context_,
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
......
......@@ -366,10 +366,10 @@ StorageDistributed::StorageDistributed(
: IStorage(id_)
, remote_database(remote_database_)
, remote_table(remote_table_)
, global_context(std::make_unique<Context>(context_))
, global_context(context_.getGlobalContext())
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, owned_cluster(std::move(owned_cluster_))
, cluster_name(global_context->getMacros()->expand(cluster_name_))
, cluster_name(global_context.getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
{
......@@ -380,14 +380,14 @@ StorageDistributed::StorageDistributed(
if (sharding_key_)
{
sharding_key_expr = buildShardingKeyExpression(sharding_key_, *global_context, storage_metadata.getColumns().getAllPhysical(), false);
sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, storage_metadata.getColumns().getAllPhysical(), false);
sharding_key_column_name = sharding_key_->getColumnName();
sharding_key_is_deterministic = isExpressionActionsDeterministics(sharding_key_expr);
}
if (!relative_data_path.empty())
{
storage_policy = global_context->getStoragePolicy(storage_policy_name_);
storage_policy = global_context.getStoragePolicy(storage_policy_name_);
data_volume = storage_policy->getVolume(0);
if (storage_policy->getVolumes().size() > 1)
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
......@@ -397,7 +397,7 @@ StorageDistributed::StorageDistributed(
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
{
size_t num_local_shards = global_context->getCluster(cluster_name)->getLocalShardCount();
size_t num_local_shards = global_context.getCluster(cluster_name)->getLocalShardCount();
if (num_local_shards && remote_database == id_.database_name && remote_table == id_.table_name)
throw Exception("Distributed table " + id_.table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
}
......@@ -719,7 +719,7 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(
{
node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, path, node_data.connection_pool, monitors_blocker, global_context->getDistributedSchedulePool());
*this, path, node_data.connection_pool, monitors_blocker, global_context.getDistributedSchedulePool());
}
return *node_data.directory_monitor;
}
......@@ -741,7 +741,7 @@ size_t StorageDistributed::getShardCount() const
ClusterPtr StorageDistributed::getCluster() const
{
return owned_cluster ? owned_cluster : global_context->getCluster(cluster_name);
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}
ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const
......
......@@ -130,7 +130,7 @@ public:
String remote_table;
ASTPtr remote_table_function_ptr;
std::unique_ptr<Context> global_context;
const Context & global_context;
Poco::Logger * log;
/// Used to implement TableFunctionRemote.
......
......@@ -5,6 +5,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/AlterCommands.h>
#include <Interpreters/Context.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
......@@ -76,7 +77,7 @@ StorageMerge::StorageMerge(
: IStorage(table_id_)
, source_database(source_database_)
, table_name_regexp(table_name_regexp_)
, global_context(context_)
, global_context(context_.getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
......
......@@ -4,7 +4,6 @@
#include <Common/OptimizedRegularExpression.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
namespace DB
......@@ -50,7 +49,7 @@ public:
private:
String source_database;
OptimizedRegularExpression table_name_regexp;
Context global_context;
const Context & global_context;
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
......
......@@ -34,8 +34,7 @@ StorageMongoDB::StorageMongoDB(
const std::string & username_,
const std::string & password_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
const ConstraintsDescription & constraints_)
: IStorage(table_id_)
, host(host_)
, port(port_)
......@@ -43,7 +42,6 @@ StorageMongoDB::StorageMongoDB(
, collection_name(collection_name_)
, username(username_)
, password(password_)
, global_context(context_)
, connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
{
StorageInMemoryMetadata storage_metadata;
......@@ -114,8 +112,7 @@ void registerStorageMongoDB(StorageFactory & factory)
username,
password,
args.columns,
args.constraints,
args.context);
args.constraints);
},
{
.source_access_type = AccessType::MONGO,
......
......@@ -3,7 +3,6 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <Poco/MongoDB/Connection.h>
......@@ -28,8 +27,7 @@ public:
const std::string & username_,
const std::string & password_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
const ConstraintsDescription & constraints_);
std::string getName() const override { return "MongoDB"; }
......@@ -51,7 +49,6 @@ private:
std::string username;
std::string password;
Context global_context;
std::shared_ptr<Poco::MongoDB::Connection> connection;
};
......
......@@ -55,7 +55,7 @@ StorageMySQL::StorageMySQL(
, replace_query{replace_query_}
, on_duplicate_clause{on_duplicate_clause_}
, pool(std::move(pool_))
, global_context(context_)
, global_context(context_.getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
......
......@@ -8,7 +8,6 @@
# include <ext/shared_ptr_helper.h>
# include <Interpreters/Context.h>
# include <Storages/IStorage.h>
# include <mysqlxx/Pool.h>
......@@ -57,7 +56,7 @@ private:
std::string on_duplicate_clause;
mysqlxx::Pool pool;
Context global_context;
const Context & global_context;
};
}
......
......@@ -194,17 +194,17 @@ StorageS3::StorageS3(
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const Context & context_,
const String & compression_method_)
: IStorage(table_id_)
, uri(uri_)
, context_global(context_)
, global_context(context_.getGlobalContext())
, format_name(format_name_)
, min_upload_part_size(min_upload_part_size_)
, compression_method(compression_method_)
, name(uri_.storage_name)
{
context_global.getRemoteHostFilter().checkURL(uri_.uri);
global_context.getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
......@@ -325,7 +325,7 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMet
{
return std::make_shared<StorageS3BlockOutputStream>(
format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(),
context_global, chooseCompressionMethod(uri.endpoint, compression_method),
global_context, chooseCompressionMethod(uri.endpoint, compression_method),
client, uri.bucket, uri.key);
}
......
......@@ -33,7 +33,7 @@ public:
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const Context & context_,
const String & compression_method_ = "");
String getName() const override
......@@ -56,7 +56,7 @@ public:
private:
S3::URI uri;
const Context & context_global;
const Context & global_context;
String format_name;
UInt64 min_upload_part_size;
......
......@@ -10,6 +10,7 @@
# include <DataTypes/convertMySQLDataType.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTLiteral.h>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册