提交 0f252046 编写于 作者: A Alexey Milovidov

Preparations [#CLICKHOUSE-2]

上级 0aef5f49
......@@ -35,8 +35,8 @@ try
auto system_database = std::make_shared<DatabaseMemory>("system");
context.addDatabase("system", system_database);
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
context.setCurrentDatabase("system");
AnalyzeLambdas analyze_lambdas;
......
......@@ -70,7 +70,7 @@ BlockIO InterpreterAlterQuery::execute()
case PartitionCommand::RESHARD_PARTITION:
table->reshardPartitions(query_ptr, database_name, command.partition, command.last_partition,
command.weighted_zookeeper_paths, command.sharding_key_expr, command.do_copy,
command.coordinator, context.getSettingsRef());
command.coordinator, context);
break;
case PartitionCommand::DROP_COLUMN:
......
......@@ -33,7 +33,7 @@ namespace ErrorCodes
}
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_)
InterpreterInsertQuery::InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_)
: query_ptr(query_ptr_), context(context_)
{
ProfileEvents::increment(ProfileEvents::InsertQuery);
......
......@@ -15,7 +15,7 @@ namespace DB
class InterpreterInsertQuery : public IInterpreter
{
public:
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_);
InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_);
/** Подготовить запрос к выполнению. Вернуть потоки блоков
* - поток, в который можно писать данные для выполнения запроса, если INSERT;
......
......@@ -185,7 +185,7 @@ public:
* It is guaranteed that the table structure will not change over the lifetime of the returned streams (that is, there will not be ALTER, RENAME and DROP).
*/
virtual BlockOutputStreamPtr write(
ASTPtr query,
const ASTPtr & query,
const Settings & settings)
{
throw Exception("Method write is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
......@@ -251,11 +251,12 @@ public:
/** Run the RESHARD PARTITION query.
*/
virtual void reshardPartitions(ASTPtr query, const String & database_name,
virtual void reshardPartitions(
const ASTPtr & query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, bool do_copy, const Field & coordinator,
const Settings & settings)
Context & context)
{
throw Exception("Method reshardPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
......
......@@ -347,7 +347,7 @@ private:
};
BlockOutputStreamPtr StorageBuffer::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & query, const Settings & settings)
{
return std::make_shared<BufferBlockOutputStream>(*this);
}
......
......@@ -76,7 +76,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
/// Resets all buffers to the subordinate table.
void shutdown() override;
......
......@@ -253,7 +253,7 @@ BlockInputStreams StorageDistributed::read(
}
BlockOutputStreamPtr StorageDistributed::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
{
auto cluster = context.getCluster(cluster_name);
......@@ -293,11 +293,12 @@ void StorageDistributed::shutdown()
}
void StorageDistributed::reshardPartitions(ASTPtr query, const String & database_name,
void StorageDistributed::reshardPartitions(
const ASTPtr & query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, bool do_copy, const Field & coordinator,
const Settings & settings)
Context & context)
{
auto & resharding_worker = context.getReshardingWorker();
if (!resharding_worker.isStarted())
......@@ -377,7 +378,7 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
ClusterProxy::AlterQueryConstructor alter_query_constructor;
BlockInputStreams streams = ClusterProxy::Query{alter_query_constructor, cluster, alter_query_ptr,
context, settings, enable_shard_multiplexing}.execute();
context, context.getSettingsRef(), enable_shard_multiplexing}.execute();
/// This callback is called if an exception has occurred while attempting to read
/// a block from a shard. This is to avoid a potential deadlock if other shards are
......@@ -399,7 +400,7 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
};
streams[0] = std::make_shared<UnionBlockInputStream<>>(
streams, nullptr, settings.max_distributed_connections, exception_callback);
streams, nullptr, context.getSettingsRef().max_distributed_connections, exception_callback);
streams.resize(1);
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
......
......@@ -75,7 +75,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void drop() override {}
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; }
......@@ -85,11 +85,12 @@ public:
void shutdown() override;
void reshardPartitions(ASTPtr query, const String & database_name,
void reshardPartitions(
const ASTPtr & query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, bool do_copy, const Field & coordinator,
const Settings & settings) override;
Context & context) override;
/// From each replica, get a description of the corresponding local table.
BlockInputStreams describe(const Context & context, const Settings & settings);
......@@ -145,7 +146,7 @@ private:
String remote_database;
String remote_table;
Context & context;
const Context & context;
Logger * log = &Logger::get("StorageDistributed");
/// Used to implement TableFunctionRemote.
......
......@@ -218,7 +218,7 @@ private:
};
BlockOutputStreamPtr StorageFile::write(
ASTPtr query,
const ASTPtr & query,
const Settings & settings)
{
return std::make_shared<StorageFileBlockOutputStream>(*this);
......
......@@ -82,7 +82,7 @@ public:
unsigned threads = 1) override;
BlockOutputStreamPtr write(
ASTPtr query,
const ASTPtr & query,
const Settings & settings) override;
void drop() override;
......
......@@ -919,7 +919,7 @@ BlockInputStreams StorageLog::read(
BlockOutputStreamPtr StorageLog::write(
ASTPtr query, const Settings & settings)
const ASTPtr & query, const Settings & settings)
{
loadMarks();
return std::make_shared<LogBlockOutputStream>(*this);
......
......@@ -78,7 +78,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
......
......@@ -119,7 +119,7 @@ BlockInputStreams StorageMaterializedView::read(
return getInnerTable()->read(column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageMaterializedView::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Settings & settings)
{
return getInnerTable()->write(query, settings);
}
......
......@@ -37,7 +37,7 @@ public:
bool supportsParallelReplicas() const override { return getInnerTable()->supportsParallelReplicas(); }
bool supportsIndexForIn() const override { return getInnerTable()->supportsIndexForIn(); }
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void drop() override;
bool optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) override;
......
......@@ -149,7 +149,7 @@ BlockInputStreams StorageMemory::read(
BlockOutputStreamPtr StorageMemory::write(
ASTPtr query, const Settings & settings)
const ASTPtr & query, const Settings & settings)
{
return std::make_shared<MemoryBlockOutputStream>(*this);
}
......
......@@ -54,7 +54,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void drop() override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; }
......
......@@ -124,7 +124,7 @@ BlockInputStreams StorageMergeTree::read(
return reader.read(column_names, query, context, settings, processed_stage, max_block_size, threads, nullptr, 0);
}
BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & query, const Settings & settings)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this);
}
......
......@@ -83,7 +83,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
/** Perform the next step in combining the parts.
*/
......
......@@ -46,7 +46,7 @@ public:
return { std::make_shared<NullBlockInputStream>() };
}
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override
{
return std::make_shared<NullBlockOutputStream>();
}
......
......@@ -2409,7 +2409,7 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
}
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & query, const Settings & settings)
{
assertNotReadonly();
......@@ -3543,11 +3543,12 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const
}
void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & database_name,
void StorageReplicatedMergeTree::reshardPartitions(
const ASTPtr & query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, bool do_copy, const Field & coordinator,
const Settings & settings)
Context & context)
{
auto & resharding_worker = context.getReshardingWorker();
if (!resharding_worker.isStarted())
......
......@@ -131,7 +131,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
bool optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) override;
......@@ -142,11 +142,12 @@ public:
void fetchPartition(const Field & partition, const String & from, const Settings & settings) override;
void freezePartition(const Field & partition, const String & with_name, const Settings & settings) override;
void reshardPartitions(ASTPtr query, const String & database_name,
void reshardPartitions(
const ASTPtr & query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, bool do_copy, const Field & coordinator,
const Settings & settings) override;
Context & context) override;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
......
......@@ -66,7 +66,7 @@ void SetOrJoinBlockOutputStream::writeSuffix()
BlockOutputStreamPtr StorageSetOrJoinBase::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & query, const Settings & settings)
{
++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, path, path + "tmp/", toString(increment) + ".bin");
......
......@@ -25,7 +25,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
protected:
StorageSetOrJoinBase(
......
......@@ -283,7 +283,7 @@ BlockInputStreams StorageStripeLog::read(
BlockOutputStreamPtr StorageStripeLog::write(
ASTPtr query, const Settings & settings)
const ASTPtr & query, const Settings & settings)
{
return std::make_shared<StripeLogBlockOutputStream>(*this);
}
......
......@@ -53,7 +53,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
......
......@@ -363,7 +363,7 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType &
void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
OffsetColumns & offset_columns, size_t level)
OffsetColumns & offset_columns, size_t level)
{
if (type.isNullable())
{
......@@ -557,7 +557,7 @@ BlockInputStreams StorageTinyLog::read(
BlockOutputStreamPtr StorageTinyLog::write(
ASTPtr query, const Settings & settings)
const ASTPtr & query, const Settings & settings)
{
return std::make_shared<TinyLogBlockOutputStream>(*this);
}
......
......@@ -53,7 +53,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void drop() override;
......
......@@ -72,8 +72,8 @@ StoragePtr TableFunctionMerge::execute(ASTPtr ast_function, Context & context) c
args[0] = evaluateConstantExpressionOrIdentidierAsLiteral(args[0], context);
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
String source_database = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
String table_name_regexp = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
String source_database = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
String table_name_regexp = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
return StorageMerge::create(
getName(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册