提交 9bec7e4a 编写于 作者: M Michael Kolupaev

Merge

上级 7643a49b
......@@ -28,6 +28,7 @@ private:
enum Type
{
DROP_PARTITION,
ATTACH_PARTITION,
};
Type type;
......@@ -35,10 +36,18 @@ private:
Field partition;
bool detach; /// true для DETACH PARTITION.
bool unreplicated;
bool part;
static PartitionCommand dropPartition(const Field & partition, bool detach)
{
return {DROP_PARTITION, partition, detach};
}
static PartitionCommand attachPartition(const Field & partition, bool unreplicated, bool part)
{
return {ATTACH_PARTITION, partition, false, part, unreplicated};
}
};
typedef std::vector<PartitionCommand> PartitionCommands;
......
......@@ -23,6 +23,7 @@ public:
DROP_COLUMN,
MODIFY_COLUMN,
DROP_PARTITION,
ATTACH_PARTITION,
NO_TYPE
};
......@@ -47,14 +48,16 @@ public:
ASTPtr partition;
bool detach = false; /// true для DETACH PARTITION.
bool part = false; /// true для ATTACH [UNREPLICATED] PART
bool unreplicated = false; /// true для ATTACH UNREPLICATED ...
/// deep copy
void clone(Parameters & p) const
{
p.type = type;
p = *this;
p.name_type = name_type->clone();
p.column = column->clone();
p.partition = partition->clone();
p.detach = detach;
}
};
typedef std::vector<Parameters> ParameterContainer;
......
......@@ -205,13 +205,20 @@ public:
throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить запрос DROP PARTITION.
/** Выполнить запрос (DROP|DETACH) PARTITION.
*/
virtual void dropPartition(const Field & partition, bool detach)
{
throw Exception("Method dropPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить запрос ATTACH [UNREPLICATED] (PART|PARTITION).
*/
virtual void attachPartition(const Field & partition, bool unreplicated, bool part)
{
throw Exception("Method attachPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
* Возвращает - была ли выполнена какая-либо работа.
*/
......
......@@ -79,6 +79,7 @@ public:
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
void dropPartition(const Field & partition, bool detach) override;
void attachPartition(const Field & partition, bool unreplicated, bool part) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/
......
......@@ -43,6 +43,8 @@ void InterpreterAlterQuery::execute()
{
if (command.type == PartitionCommand::DROP_PARTITION)
table->dropPartition(command.partition, command.detach);
else if (command.type == PartitionCommand::ATTACH_PARTITION)
table->attachPartition(command.partition, command.unreplicated, command.part);
else
throw Exception("Bad PartitionCommand::Type: " + toString(command.type), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
......@@ -101,6 +103,11 @@ void InterpreterAlterQuery::parseAlter(
const Field & partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
out_partition_commands.push_back(PartitionCommand::dropPartition(partition, params.detach));
}
else if (params.type == ASTAlterQuery::ATTACH_PARTITION)
{
const Field & partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
out_partition_commands.push_back(PartitionCommand::attachPartition(partition, params.unreplicated, params.part));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
......
......@@ -24,6 +24,9 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & e
ParserString s_drop("DROP", true, true);
ParserString s_detach("DETACH", true, true);
ParserString s_attach("ATTACH", true, true);
ParserString s_unreplicated("UNREPLICATED", true, true);
ParserString s_part("PART", true, true);
ParserString s_partition("PARTITION", true, true);
ParserString s_comma(",");
......@@ -136,6 +139,28 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & e
params.type = ASTAlterQuery::DROP_PARTITION;
params.detach = true;
}
else if (s_attach.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if (s_unreplicated.ignore(pos, end, expected))
{
params.unreplicated = true;
ws.ignore(pos, end);
}
if (s_part.ignore(pos, end, expected))
params.part = true;
else if (!s_partition.ignore(pos, end, expected))
return false;
ws.ignore(pos, end);
if (!parser_literal.parse(pos, end, params.partition, expected))
return false;
params.type = ASTAlterQuery::ATTACH_PARTITION;
}
else if (s_modify.ignore(pos, end, expected))
{
ws.ignore(pos, end);
......
......@@ -745,7 +745,14 @@ void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent, bo
}
else if (p.type == ASTAlterQuery::DROP_PARTITION)
{
s << (hilite ? hilite_keyword : "") << indent_str << "DROP PARTITION " << (hilite ? hilite_none : "");
s << (hilite ? hilite_keyword : "") << indent_str << (p.detach ? "DETACH" : "DROP") << " PARTITION "
<< (hilite ? hilite_none : "");
formatAST(*p.partition, s, indent, hilite, true);
}
else if (p.type == ASTAlterQuery::ATTACH_PARTITION)
{
s << (hilite ? hilite_keyword : "") << indent_str << "ATTACH " << (p.unreplicated ? "UNREPLICATED" : "")
<< (p.part ? " PART " : " PARTITION ") << (hilite ? hilite_none : "");
formatAST(*p.partition, s, indent, hilite, true);
}
else
......
......@@ -707,8 +707,6 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
{
LOG_TRACE(log, "Removing " << remove.size() << " parts and adding " << add.size() << " parts.");
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
for (const DataPartPtr & part : remove)
......
......@@ -991,12 +991,9 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
"Waiting for " << to_wait.size() << " entries that are currently executing.");
/// Дождемся завершения операций с кусками, содержащимися в удаляемом диапазоне.
{
std::unique_lock<std::mutex> lock(queue_mutex);
for (LogEntryPtr & entry : to_wait)
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
}
}
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
......@@ -1020,15 +1017,17 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
zookeeper->multi(ops);
/// Если кусок нужно удалить не нужно, надежнее удалить директорию после изменений в ZooKeeper.
/// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper.
if (!entry.detach)
data.replaceParts({part}, {}, false);
}
LOG_INFO(log, (entry.detach ? "Detached" : "Removed") << removed_parts << " parts inside " << entry.new_part_name << ".");
LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
if (unreplicated_data)
{
Poco::ScopedLock<Poco::FastMutex> unreplicated_lock(unreplicated_mutex);
removed_parts = 0;
parts = unreplicated_data->getDataParts();
for (const auto & part : parts)
......@@ -2179,18 +2178,24 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
virtual_parts.add(fake_part_name);
}
/// Наконец, добившись нужны инвариантов, можно положить запись в лог.
/// Наконец, добившись нужных инвариантов, можно положить запись в лог.
LogEntry entry;
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = fake_part_name;
entry.detach = detach;
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// Дождемся, пока все реплики выполнят дроп.
waitForAllReplicasToProcessLogEntry(log_znode_path, entry);
}
void StorageReplicatedMergeTree::attachPartition(const Field& partition, bool unreplicated, bool part)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
void StorageReplicatedMergeTree::drop()
{
if (!zookeeper)
......@@ -2254,12 +2259,16 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry)
{
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
UInt64 log_index = parse<UInt64>(log_znode_path.substr(log_znode_path.size() - 10));
String log_entry_str = entry.toString();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << entry.znode_name << " to queue");
/// Дождемся, пока запись попадет в очередь реплики.
while (true)
{
......@@ -2272,6 +2281,8 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Strin
event->wait();
}
LOG_DEBUG(log, "Looking for " << entry.znode_name << " in " << replica << " queue");
/// Найдем запись в очереди реплики.
Strings queue_entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
String entry_to_wait_for;
......@@ -2291,6 +2302,8 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Strin
if (entry_to_wait_for.empty())
continue;
LOG_DEBUG(log, "Waiting for " << entry_to_wait_for << " to disappear from " << replica << " queue");
/// Дождемся, пока запись исчезнет из очереди реплики.
while (true)
{
......@@ -2305,6 +2318,8 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Strin
event->wait();
}
}
LOG_DEBUG(log, "Finished waiting for all replicas to process " << entry.znode_name);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册