提交 038c9a88 编写于 作者: N Nikita Mikhaylov

something works

上级 bf9ed2c0
......@@ -313,6 +313,79 @@ static ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
}
}
/*
* Choosing a Primary Key that Differs from the Sorting Key
* It is possible to specify a primary key (an expression with values that are written in the index file for each mark)
* that is different from the sorting key (an expression for sorting the rows in data parts).
* In this case the primary key expression tuple must be a prefix of the sorting key expression tuple.
* This feature is helpful when using the SummingMergeTree and AggregatingMergeTree table engines.
* In a common case when using these engines, the table has two types of columns: dimensions and measures.
* Typical queries aggregate values of measure columns with arbitrary GROUP BY and filtering by dimensions.
* Because SummingMergeTree and AggregatingMergeTree aggregate rows with the same value of the sorting key,
* it is natural to add all dimensions to it. As a result, the key expression consists of a long list of columns
* and this list must be frequently updated with newly added dimensions.
* In this case it makes sense to leave only a few columns in the primary key that will provide efficient
* range scans and add the remaining dimension columns to the sorting key tuple.
* ALTER of the sorting key is a lightweight operation because when a new column is simultaneously added t
* o the table and to the sorting key, existing data parts don't need to be changed.
* Since the old sorting key is a prefix of the new sorting key and there is no data in the newly added column,
* the data is sorted by both the old and new sorting keys at the moment of table modification.
*
* */
[[maybe_unused]] static ASTPtr extractPrimaryKeyOrOrderBy(const ASTPtr & storage_ast)
{
String storage_str = queryToString(storage_ast);
const auto & storage = storage_ast->as<ASTStorage &>();
const auto & engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree"))
{
throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
/// FIXME
if (!isExtendedDefinitionStorage(storage_ast))
{
throw Exception("Is not extended deginition storage " + storage_str + " Will be fixed later.",
ErrorCodes::BAD_ARGUMENTS);
}
if (storage.primary_key)
return storage.primary_key->clone();
return storage.order_by->clone();
}
[[maybe_unused]] static String createCommaSeparatedStringFrom(const Strings & strings)
{
String answer;
for (auto & string: strings)
answer += string + ", ";
/// Remove last comma and space
answer.pop_back();
answer.pop_back();
return answer;
}
[[maybe_unused]] static Strings extractPrimaryKeyString(const ASTPtr & storage_ast)
{
const auto primary_key_or_order_by = extractPrimaryKeyOrOrderBy(storage_ast)->as<ASTFunction &>();
ASTPtr primary_key_or_order_by_arguments_ast = primary_key_or_order_by.arguments->clone();
ASTs & primary_key_or_order_by_arguments = primary_key_or_order_by_arguments_ast->children;
Strings answer;
answer.reserve(primary_key_or_order_by_arguments.size());
for (auto & column : primary_key_or_order_by_arguments)
answer.push_back(column->getColumnName());
return answer;
}
static ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random)
......
......@@ -36,7 +36,7 @@ struct ShardPartitionPiece {
String ShardPartitionPiece::getPartitionPiecePath() const
{
return shard_partition.getPartitionPath() + "/piece" + std::to_string(current_piece_number);
return shard_partition.getPartitionPath() + "/piece_" + std::to_string(current_piece_number);
}
String ShardPartitionPiece::getPartitionPieceCleanStartPath() const
......
......@@ -53,7 +53,7 @@ void TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration &config, con
Poco::Util::AbstractConfiguration::Keys tables_keys;
config.keys(prefix + "tables", tables_keys);
for (const auto &table_key : tables_keys) {
for (const auto & table_key : tables_keys) {
table_tasks.emplace_back(*this, config, prefix + "tables", table_key);
}
}
......
......@@ -2,26 +2,50 @@
#include "Internals.h"
#include "TaskCluster.h"
#include "ext/range.h"
namespace DB {
struct TaskTable {
TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguration &config, const String &prefix,
const String &table_key);
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
const String & table_key);
TaskCluster &task_cluster;
TaskCluster & task_cluster;
String getPartitionPath(const String &partition_name) const;
/// These functions used in checkPartitionIsDone() or checkPartitionPieceIsDone()
/// They are implemented here not to call task_table.tasks_shard[partition_name].second.pieces[current_piece_number] etc.
[[maybe_unused]] String
getPartitionPiecePath(const String &partition_name, size_t current_piece_number) const;
String getPartitionPath(const String & partition_name) const;
String getCertainPartitionIsDirtyPath(const String &partition_name) const;
[[maybe_unused]] String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionIsCleanedPath(const String &partition_name) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionTaskStatusPath(const String &partition_name) const;
[[maybe_unused]] String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const
{
UNUSED(partition_name);
UNUSED(piece_number);
return "Not Implemented";
}
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
[[maybe_unused]] String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const
{
UNUSED(partition_name);
UNUSED(piece_number);
return "Not implemented";
}
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
[[maybe_unused]] String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const
{
UNUSED(partition_name);
UNUSED(piece_number);
return "Not implemented";
}
/// Partitions will be splitted into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
......@@ -32,6 +56,9 @@ struct TaskTable {
/// Used as task ID
String table_id;
/// Column names in primary key
String primary_key_comma_separated;
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
......@@ -41,14 +68,37 @@ struct TaskTable {
DatabaseAndTableName table_push;
/// Storage of destination table
/// (tables that are stored on each shard of target cluster)
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
/// A Distributed table definition used to split data
/*
* A Distributed table definition used to split data
* Distributed table will be created on each shard of default
* cluster to perform data copying and resharding
* */
String sharding_key_str;
ASTPtr sharding_key_ast;
ASTPtr engine_split_ast;
ASTPtr main_engine_split_ast;
/*
* Auxuliary table engines used to perform partition piece copying.
* Each AST represent table engine for certatin piece number.
* After copying partition piece is Ok, this piece will be moved to the main
* target table. All this tables are stored on each shard as the main table.
* We have to use separate tables for partition pieces because of the atomicity of copying.
* Also if we want to move some partition to another table, the partition keys have to be the same.
* */
/*
* To copy partiton piece form one cluster to another we have to use Distributed table.
* In case of usage separate table (engine_push) for each partiton piece,
* we have to use many Distributed tables.
* */
ASTs auxiliary_engine_split_asts;
/// Additional WHERE expression to filter input data
String where_condition_str;
......@@ -63,9 +113,11 @@ struct TaskTable {
Strings enabled_partitions;
NameSet enabled_partitions_set;
/// Prioritized list of shards
/// all_shards contains information about all shards in the table.
/// So we have to check whether particular shard have current partiton or not while processing.
/**
* Prioritized list of shards
* all_shards contains information about all shards in the table.
* So we have to check whether particular shard have current partiton or not while processing.
*/
TasksShard all_shards;
TasksShard local_shards;
......@@ -76,7 +128,7 @@ struct TaskTable {
/// Parition names to process in user-specified order
Strings ordered_partition_names;
ClusterPartition &getClusterPartition(const String &partition_name) {
ClusterPartition & getClusterPartition(const String &partition_name) {
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
throw Exception("There are no cluster partition " + partition_name + " in " + table_id,
......@@ -99,10 +151,10 @@ String TaskTable::getPartitionPath(const String &partition_name) const {
+ "/" + escapeForFileName(partition_name); // 201701
}
String TaskTable::getPartitionPiecePath(const String &partition_name, size_t current_piece_number) const {
assert(current_piece_number < number_of_splits);
String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const {
assert(piece_number < number_of_splits);
return getPartitionPath(partition_name) + "/" +
std::to_string(current_piece_number); // 1...number_of_splits
std::to_string(piece_number); // 1...number_of_splits
}
String TaskTable::getCertainPartitionIsDirtyPath(const String &partition_name) const {
......@@ -131,9 +183,8 @@ String TaskShard::getHostNameExample() const {
return replicas.at(0).readableString();
}
TaskTable::TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguration &config,
const String &prefix_,
const String &table_key)
TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
: task_cluster(parent) {
String table_prefix = prefix_ + "." + table_key + ".";
......@@ -160,14 +211,26 @@ TaskTable::TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguratio
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
primary_key_comma_separated = createCommaSeparatedStringFrom(extractPrimaryKeyString(engine_push_ast));
}
sharding_key_str = config.getString(table_prefix + "sharding_key");
auxiliary_engine_split_asts.reserve(number_of_splits);
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second,
main_engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second,
sharding_key_ast);
for (const auto piece_number : ext::range(0, number_of_splits))
{
auxiliary_engine_split_asts.emplace_back
(
createASTStorageDistributed(cluster_push_name, table_push.first,
table_push.second + ".piece_" + toString(piece_number), sharding_key_ast)
);
}
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
......@@ -209,7 +272,7 @@ TaskTable::TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguratio
template<typename RandomEngine>
void TaskTable::initShards(RandomEngine &&random_engine) {
const String &fqdn_name = getFQDNOrHostName();
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册