提交 49439aa2 编写于 作者: A Alexey Zatelepin

write mutations to ZK [#CLICKHOUSE-3747]

上级 4ca3bf65
......@@ -372,6 +372,7 @@ namespace ErrorCodes
extern const int FUNCTION_THROW_IF_VALUE_IS_NON_ZERO = 395;
extern const int TOO_MANY_ROWS_OR_BYTES = 396;
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW = 397;
extern const int UNKNOWN_MUTATION_COMMAND = 398;
extern const int KEEPER_EXCEPTION = 999;
......
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const
{
out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "source replica: " << source_replica << "\n"
<< "block numbers count: " << block_numbers.size() << "\n";
for (const auto & kv : block_numbers)
{
const String & partition_id = kv.first;
Int64 number = kv.second;
out << partition_id << "\t" << number << "\n";
}
out << "mutation commands:\n";
commands.writeText(out);
}
void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
{
in >> "format version: 1\n";
LocalDateTime create_time_dt;
in >> "create time: " >> create_time_dt >> "\n";
create_time = create_time_dt;
in >> "source replica: " >> source_replica >> "\n";
size_t count;
in >> "block numbers count: " >> count >> "\n";
for (size_t i = 0; i < count; ++i)
{
String partition_id;
Int64 number;
in >> partition_id >> "\t" >> number >> "\n";
block_numbers[partition_id] = number;
}
in >> "mutation commands:\n";
commands.readText(in);
}
String ReplicatedMergeTreeMutationEntry::toString() const
{
WriteBufferFromOwnString out;
writeText(out);
return out.str();
}
ReplicatedMergeTreeMutationEntry ReplicatedMergeTreeMutationEntry::parse(const String & str)
{
ReadBufferFromString in(str);
ReplicatedMergeTreeMutationEntry res;
res.readText(in);
assertEOF(in);
return res;
}
}
#pragma once
#include <Common/Exception.h>
#include <Core/Types.h>
#include <IO/WriteHelpers.h>
#include <Storages/MutationCommands.h>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
struct ReplicatedMergeTreeMutationEntry
{
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const;
static ReplicatedMergeTreeMutationEntry parse(const String & str);
String znode_name;
time_t create_time = 0;
String source_replica;
std::unordered_map<String, Int64> block_numbers;
MutationCommands commands;
};
}
......@@ -2,11 +2,68 @@
#include <Storages/IStorage.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Columns/FilterDescription.h>
#include <IO/Operators.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_MUTATION_COMMAND;
}
static String typeToString(MutationCommand::Type type)
{
switch (type)
{
case MutationCommand::DELETE: return "DELETE";
default:
throw Exception("Bad mutation type: " + toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
}
void MutationCommand::writeText(WriteBuffer & out) const
{
out << typeToString(type) << "\n";
switch (type)
{
case MutationCommand::DELETE:
{
std::stringstream ss;
formatAST(*predicate, ss, /* hilite = */ false, /* one_line = */ true);
out << "predicate: " << ss.str() << "\n";
break;
}
default:
throw Exception("Bad mutation type: " + toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
}
void MutationCommand::readText(ReadBuffer & in)
{
String type_str;
in >> type_str >> "\n";
if (type_str == "DELETE")
{
type = DELETE;
String predicate_str;
in >> "predicate: " >> predicate_str >> "\n";
ParserExpressionWithOptionalAlias p_expr(false);
predicate = parseQuery(
p_expr, predicate_str.data(), predicate_str.data() + predicate_str.length(), "mutation predicate", 0);
}
else
throw Exception("Unknown mutation command: `" + type_str + "'", ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
void MutationCommands::validate(const IStorage & table, const Context & context)
{
auto all_columns = table.getColumns().getAll();
......@@ -29,4 +86,29 @@ void MutationCommands::validate(const IStorage & table, const Context & context)
}
}
void MutationCommands::writeText(WriteBuffer & out) const
{
out << "format version: 1\n"
<< "count: " << commands.size() << "\n";
for (const MutationCommand & command : commands)
{
command.writeText(out);
}
}
void MutationCommands::readText(ReadBuffer & in)
{
in >> "format version: 1\n";
size_t count;
in >> "count: " >> count >> "\n";
for (size_t i = 0; i < count; ++i)
{
MutationCommand command;
command.readText(in);
commands.push_back(std::move(command));
}
}
}
#pragma once
#include <Parsers/IAST.h>
#include <IO/WriteHelpers.h>
namespace DB
......@@ -28,6 +29,9 @@ struct MutationCommand
res.predicate = predicate;
return res;
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
};
struct MutationCommands
......@@ -35,6 +39,9 @@ struct MutationCommands
std::vector<MutationCommand> commands;
void validate(const IStorage & table, const Context & context);
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
};
}
......@@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
......@@ -278,13 +279,16 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
auto zookeeper = getZooKeeper();
/// Working with quorum.
zookeeper->createIfNotExists(zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
/// Tracking lag of replicas.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", "");
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", "");
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String());
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", String());
/// Mutations
zookeeper->createIfNotExists(zookeeper_path + "/mutations", String());
}
......@@ -3392,6 +3396,53 @@ void StorageReplicatedMergeTree::freezePartition(const ASTPtr & partition, const
}
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context &)
{
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
String mutations_path = zookeeper_path + "/mutations";
/// Update the mutations_path node when creating the mutation and check its version to ensure that
/// nodes for mutations are created in the same order as the corresponding block numbers.
/// Should work well if the number of concurrent mutation requests is small.
while (true)
{
auto zookeeper = getZooKeeper();
zkutil::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
EphemeralLocksInAllPartitions block_number_locks(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : block_number_locks.getLocks())
entry.block_numbers[lock.partition_id] = lock.number;
entry.create_time = time(nullptr);
zkutil::Requests requests;
requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
zkutil::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
if (rc == ZooKeeperImpl::ZooKeeper::ZOK)
break;
else if (rc == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
{
LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying...");
continue;
}
else
throw zkutil::KeeperException("Unable to create a mutation znode", rc);
}
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
......
......@@ -117,6 +117,8 @@ public:
void fetchPartition(const ASTPtr & partition, const String & from, const Context & context) override;
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
void drop() override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册