提交 34076005 编写于 作者: A Andrey Mironov

dbms: save all columns in a single ZooKeeper node [#METR-12739]

上级 2798a199
......@@ -21,7 +21,6 @@ namespace DB
*/
class Context;
struct ColumnDefaults;
class Block
{
......
......@@ -64,50 +64,86 @@ namespace DB
return lhs.type == rhs.type && queryToString(lhs.expression) == queryToString(rhs.expression);
}
struct ColumnDefaults : public std::unordered_map<String, ColumnDefault>
using ColumnDefaults = std::unordered_map<String, ColumnDefault>;
template <bool store>
struct ColumnsDescription
{
using std::unordered_map<String, ColumnDefault>::unordered_map;
template <typename T> using by_value_or_cref = typename std::conditional<store, T, const T &>::type;
by_value_or_cref<NamesAndTypesList> columns;
by_value_or_cref<NamesAndTypesList> materialized;
by_value_or_cref<NamesAndTypesList> alias;
by_value_or_cref<ColumnDefaults> defaults;
/// @todo implement (de)serialization
String toString() const
{
String s;
WriteBufferFromString buf{s};
writeString("column defaults format version: 1\n", buf);
writeText(size(), buf);
writeString("columns format version: 1\n", buf);
writeText(columns.size() + materialized.size() + alias.size(), buf);
writeString(" columns:\n", buf);
for (const auto & column_default : *this)
{
writeBackQuotedString(column_default.first, buf);
writeChar(' ', buf);
writeString(DB::toString(column_default.second.type), buf);
writeChar('\t', buf);
writeString(queryToString(column_default.second.expression), buf);
writeChar('\n', buf);
}
const auto write_columns = [this, &buf] (const NamesAndTypesList & columns) {
for (const auto & column : columns)
{
const auto it = defaults.find(column.name);
writeBackQuotedString(column.name, buf);
writeChar(' ', buf);
writeString(column.type->getName(), buf);
if (it == std::end(defaults))
{
writeChar('\n', buf);
continue;
}
else
writeChar('\t', buf);
writeString(DB::toString(it->second.type), buf);
writeChar('\t', buf);
writeString(queryToString(it->second.expression), buf);
writeChar('\n', buf);
}
};
write_columns(columns);
write_columns(materialized);
write_columns(alias);
return s;
}
static ColumnDefaults parse(const String & str) {
static ColumnsDescription parse(const String & str, const DataTypeFactory & data_type_factory)
{
ReadBufferFromString buf{str};
ColumnDefaults defaults{};
assertString("column defaults format version: 1\n", buf);
assertString("columns format version: 1\n", buf);
size_t count{};
readText(count, buf);
assertString(" columns:\n", buf);
ParserTernaryOperatorExpression expr_parser;
ColumnsDescription result{};
for (size_t i = 0; i < count; ++i)
{
String column_name;
readBackQuotedString(column_name, buf);
assertString(" ", buf);
String type_name;
readString(type_name, buf);
auto type = data_type_factory.get(type_name);
if (*buf.position() == '\n')
{
assertString("\n", buf);
result.columns.emplace_back(column_name, std::move(type));
continue;
}
assertString("\t", buf);
String default_type_str;
readString(default_type_str, buf);
const auto default_type = columnDefaultTypeFromString(default_type_str);
......@@ -124,12 +160,19 @@ namespace DB
if (!expr_parser.parse(begin, end, default_expr, expected))
throw Exception{"Could not parse default expression", DB::ErrorCodes::CANNOT_PARSE_TEXT};
defaults.emplace(column_name, ColumnDefault{default_type, default_expr});
if (ColumnDefaultType::Default == default_type)
result.columns.emplace_back(column_name, std::move(type));
else if (ColumnDefaultType::Materialized == default_type)
result.materialized.emplace_back(column_name, std::move(type));
else if (ColumnDefaultType::Alias == default_type)
result.alias.emplace_back(column_name, std::move(type));
result.defaults.emplace(column_name, ColumnDefault{default_type, default_expr});
}
assertEOF(buf);
return defaults;
return result;
}
};
}
......@@ -207,13 +207,9 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/metadata", metadata.str(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/columns", data.getColumnsListNonMaterialized().toString(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/materialized_columns", data.materialized_columns.toString(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/alias_columns", data.alias_columns.toString(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/column_defaults", data.column_defaults.toString(),
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/columns", ColumnsDescription<false>{
data.getColumnsListNonMaterialized(), data.materialized_columns,
data.alias_columns, data.column_defaults}.toString(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/log", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
......@@ -261,13 +257,15 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
assertEOF(buf);
zkutil::Stat stat;
auto columns = NamesAndTypesList::parse(zookeeper->get(zookeeper_path + "/columns", &stat), context.getDataTypeFactory());
auto materialized_columns = NamesAndTypesList::parse(
zookeeper->get(zookeeper_path + "/materialized_columns", &stat), context.getDataTypeFactory());
auto alias_columns = NamesAndTypesList::parse(
zookeeper->get(zookeeper_path + "/alias_columns", &stat), context.getDataTypeFactory());
auto column_defaults = ColumnDefaults::parse(zookeeper->get(zookeeper_path + "/column_defaults", &stat));
auto columns_desc = ColumnsDescription<true>::parse(
zookeeper->get(zookeeper_path + "/columns", &stat), context.getDataTypeFactory());
auto & columns = columns_desc.columns;
auto & materialized_columns = columns_desc.materialized;
auto & alias_columns = columns_desc.alias;
auto & column_defaults = columns_desc.defaults;
columns_version = stat.version;
if (columns != data.getColumnsListNonMaterialized() ||
materialized_columns != data.materialized_columns ||
alias_columns != data.alias_columns ||
......@@ -424,10 +422,12 @@ void StorageReplicatedMergeTree::createReplica()
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
}
zookeeper->create(replica_path + "/columns", data.getColumnsListNonMaterialized().toString(), zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/materialized_columns", data.materialized_columns.toString(), zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/alias_columns", data.alias_columns.toString(), zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/column_defaults", data.column_defaults.toString(), zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/columns", ColumnsDescription<false>{
data.getColumnsListNonMaterialized(),
data.materialized_columns,
data.alias_columns,
data.column_defaults
}.toString(), zkutil::CreateMode::Persistent);
}
void StorageReplicatedMergeTree::activateReplica()
......@@ -649,15 +649,6 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData:
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/columns",
expected_columns_version));
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/materialized_columns",
expected_columns_version));
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/alias_columns",
expected_columns_version));
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/column_defaults",
expected_columns_version));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name,
"",
......@@ -1615,17 +1606,12 @@ void StorageReplicatedMergeTree::alterThread()
zkutil::Stat stat;
const String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
const String materialized_columns_str = zookeeper->get(zookeeper_path + "/materialized_columns",
&stat, alter_thread_event);
const String alias_columns_str = zookeeper->get(zookeeper_path + "/alias_columns",
&stat, alter_thread_event);
const String column_defaults_str = zookeeper->get(zookeeper_path + "/column_defaults",
&stat, alter_thread_event);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_str, context.getDataTypeFactory());
NamesAndTypesList materialized_columns = NamesAndTypesList::parse(
materialized_columns_str, context.getDataTypeFactory());
NamesAndTypesList alias_columns = NamesAndTypesList::parse(alias_columns_str, context.getDataTypeFactory());
ColumnDefaults column_defaults = ColumnDefaults::parse(column_defaults_str);
auto columns_desc = ColumnsDescription<true>::parse(columns_str, context.getDataTypeFactory());
auto & columns = columns_desc.columns;
auto & materialized_columns = columns_desc.materialized;
auto & alias_columns = columns_desc.alias;
auto & column_defaults = columns_desc.defaults;
bool changed_version = (stat.version != columns_version);
......@@ -1744,10 +1730,7 @@ void StorageReplicatedMergeTree::alterThread()
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns.toString());
zookeeper->set(replica_path + "/materialized_columns", materialized_columns.toString());
zookeeper->set(replica_path + "/alias_columns", alias_columns.toString());
zookeeper->set(replica_path + "/column_defaults", column_defaults.toString());
zookeeper->set(replica_path + "/columns", columns_str);
if (changed_version)
{
......@@ -2389,9 +2372,6 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
NamesAndTypesList new_alias_columns;
ColumnDefaults new_column_defaults;
String new_columns_str;
String new_materialized_columns_str;
String new_alias_columns_str;
String new_column_defaults_str;
int new_columns_version;
zkutil::Stat stat;
......@@ -2409,16 +2389,13 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
new_column_defaults = data.column_defaults;
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
new_columns_str = new_columns.toString();
new_materialized_columns_str = new_materialized_columns.toString();
new_alias_columns_str = new_alias_columns.toString();
new_column_defaults_str = new_column_defaults.toString();
new_columns_str = ColumnsDescription<false>{
new_columns, new_materialized_columns,
new_alias_columns, new_column_defaults
}.toString();
/// Делаем ALTER.
zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
zookeeper->set(zookeeper_path + "/materialized_columns", new_materialized_columns_str, -1, &stat);
zookeeper->set(zookeeper_path + "/alias_columns", new_alias_columns_str, -1, &stat);
zookeeper->set(zookeeper_path + "/column_defaults", new_column_defaults_str, -1, &stat);
new_columns_version = stat.version;
}
......@@ -2446,18 +2423,9 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
while (!shutdown_called)
{
String replica_columns_str;
String replica_materialized_columns_str;
String replica_alias_columns_str;
String replica_column_defaults_str;
/// Реплику могли успеть удалить.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat) ||
!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/materialized_columns",
replica_materialized_columns_str, &stat) ||
!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/alias_columns",
replica_alias_columns_str, &stat) ||
!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/column_defaults",
replica_column_defaults_str, &stat))
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
{
LOG_WARNING(log, replica << " was removed");
break;
......@@ -2465,24 +2433,12 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
int replica_columns_version = stat.version;
if (replica_columns_str == new_columns_str &&
replica_materialized_columns_str == new_materialized_columns_str &&
replica_alias_columns_str == new_alias_columns_str &&
replica_column_defaults_str == new_column_defaults_str)
if (replica_columns_str == new_columns_str)
break;
if (!zookeeper->exists(zookeeper_path + "/columns", &stat))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (!zookeeper->exists(zookeeper_path + "/materialized_columns", &stat))
throw Exception(zookeeper_path + "/materialized_columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (!zookeeper->exists(zookeeper_path + "/alias_columns", &stat))
throw Exception(zookeeper_path + "/alias_columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (!zookeeper->exists(zookeeper_path + "/column_defaults", &stat))
throw Exception(zookeeper_path + "/column_defaults doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
{
LOG_WARNING(log, zookeeper_path + "/columns changed before ALTER finished; "
......@@ -2490,13 +2446,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
return;
}
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/materialized_columns",
&stat, alter_query_event) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/alias_columns",
&stat, alter_query_event) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/column_defaults",
&stat, alter_query_event))
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
{
LOG_WARNING(log, replica << " was removed");
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册