提交 7643a49b 编写于 作者: M Michael Kolupaev

Merge

上级 20f8e17e
......@@ -255,6 +255,8 @@ namespace ErrorCodes
INVALID_NESTED_NAME,
CORRUPTED_DATA,
INCORRECT_MARK,
INVALID_PARTITION_NAME,
NOT_LEADER,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,
......
......@@ -23,13 +23,31 @@ public:
*/
static void updateMetadata(const String & database, const String & table, const NamesAndTypesList & columns, Context & context);
private:
typedef std::vector<Field> Partitions;
struct PartitionCommand
{
enum Type
{
DROP_PARTITION,
};
Type type;
Field partition;
bool detach; /// true для DETACH PARTITION.
static PartitionCommand dropPartition(const Field & partition, bool detach)
{
return {DROP_PARTITION, partition, detach};
}
};
typedef std::vector<PartitionCommand> PartitionCommands;
ASTPtr query_ptr;
Context context;
static void parseAlter(const ASTAlterQuery::ParameterContainer & params, const DataTypeFactory & data_type_factory,
AlterCommands & out_commands, Partitions & out_partitions_to_drop);
AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands);
};
}
......@@ -29,7 +29,7 @@ public:
struct Parameters
{
Parameters() : type(NO_TYPE) {}
int type;
int type = NO_TYPE;
/** В запросе ADD COLUMN здесь хранится имя и тип добавляемого столбца
* В запросе DROP это поле не используется
......@@ -45,6 +45,7 @@ public:
/** В запросе DROP PARTITION здесь хранится имя partition'а.
*/
ASTPtr partition;
bool detach = false; /// true для DETACH PARTITION.
/// deep copy
void clone(Parameters & p) const
......@@ -53,6 +54,7 @@ public:
p.name_type = name_type->clone();
p.column = column->clone();
p.partition = partition->clone();
p.detach = detach;
}
};
typedef std::vector<Parameters> ParameterContainer;
......
......@@ -207,7 +207,7 @@ public:
/** Выполнить запрос DROP PARTITION.
*/
virtual void dropPartition(const Field & partition)
virtual void dropPartition(const Field & partition, bool detach)
{
throw Exception("Method dropPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
......
......@@ -13,7 +13,7 @@ namespace DB
* При вызове деструктора или завершении сессии в ZooKeeper, переходит в состояние ABANDONED.
* (В том числе при падении программы)
*/
class AbandonableLockInZooKeeper
class AbandonableLockInZooKeeper : private boost::noncopyable
{
public:
enum State
......@@ -34,6 +34,14 @@ public:
path = zookeeper.create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
}
AbandonableLockInZooKeeper(AbandonableLockInZooKeeper && rhs)
: zookeeper(rhs.zookeeper)
{
std::swap(path_prefix, rhs.path_prefix);
std::swap(path, rhs.path);
std::swap(holder_path, rhs.holder_path);
}
String getPath()
{
return path;
......@@ -49,6 +57,7 @@ public:
{
zookeeper.remove(path);
zookeeper.remove(holder_path);
holder_path = "";
}
/// Добавляет в список действия, эквивалентные unlock().
......@@ -60,6 +69,9 @@ public:
~AbandonableLockInZooKeeper()
{
if (holder_path.empty())
return;
try
{
zookeeper.tryRemove(holder_path);
......
......@@ -630,10 +630,10 @@ public:
*/
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
/** Переименовывает кусок в detached/prefix_кусок и забывает про него. Данные не будут удалены в clearOldParts.
* Если restore_covered, добавляет в рабочий набор неактивные куски, слиянием которых получен удаляемый кусок.
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered = false);
void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false);
/** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска.
*/
......
......@@ -28,26 +28,7 @@ public:
time_t min_date_time = DateLUT::instance().fromDayNum(DayNum_t(current_block.min_date));
String month_name = toString(Date2OrderedIdentifier(min_date_time) / 100);
String month_path = storage.zookeeper_path + "/block_numbers/" + month_name;
if (!storage.zookeeper->exists(month_path))
{
/// Создадим в block_numbers ноду для месяца и пропустим в ней 200 значений инкремента.
/// Нужно, чтобы в будущем при необходимости можно было добавить данные в начало.
zkutil::Ops ops;
auto acl = storage.zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent));
for (size_t i = 0; i < 200; ++i)
{
ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1));
}
/// Игнорируем ошибки - не получиться могло только если кто-то еще выполнил эту строчку раньше нас.
storage.zookeeper->tryMulti(ops);
}
AbandonableLockInZooKeeper block_number_lock(
storage.zookeeper_path + "/block_numbers/" + month_name + "/block-",
storage.zookeeper_path + "/temp", *storage.zookeeper);
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name);
UInt64 part_number = block_number_lock.getNumber();
......
......@@ -6,6 +6,7 @@
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include "MergeTree/AbandonableLockInZooKeeper.h"
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <zkutil/ZooKeeper.h>
#include <zkutil/LeaderElection.h>
......@@ -77,6 +78,8 @@ public:
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
void dropPartition(const Field & partition, bool detach) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/
void drop() override;
......@@ -139,9 +142,13 @@ private:
Type type;
String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога.
String new_part_name; /// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им.
Strings parts_to_merge;
bool detach = false; /// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached.
FuturePartTaggerPtr future_part_tagger;
bool currently_executing = false; /// Доступ под queue_mutex.
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
......@@ -269,6 +276,7 @@ private:
/// Поток, выбирающий куски для слияния.
std::thread merge_selecting_thread;
Poco::Event merge_selecting_event;
std::mutex merge_selecting_mutex; /// Берется на каждую итерацию выбора кусков для слияния.
/// Поток, удаляющий старые куски, записи в логе и блоки.
std::thread cleanup_thread;
......@@ -433,6 +441,15 @@ private:
/** Скачать указанный кусок с указанной реплики.
*/
void fetchPart(const String & part_name, const String & replica_name);
///
AbandonableLockInZooKeeper allocateBlockNumber(const String & month_name);
/** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога.
* Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику.
*/
void waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry);
};
}
......@@ -33,21 +33,27 @@ void InterpreterAlterQuery::execute()
ASTAlterQuery & alter = typeid_cast<ASTAlterQuery &>(*query_ptr);
String & table_name = alter.table;
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;
AlterCommands commands;
Partitions partitions_to_drop;
parseAlter(alter.parameters, context.getDataTypeFactory(), commands, partitions_to_drop);
AlterCommands alter_commands;
PartitionCommands partition_commands;
parseAlter(alter.parameters, context.getDataTypeFactory(), alter_commands, partition_commands);
StoragePtr table = context.getTable(database_name, table_name);
for (const Field & partition : partitions_to_drop)
table->dropPartition(partition);
for (const PartitionCommand & command : partition_commands)
{
if (command.type == PartitionCommand::DROP_PARTITION)
table->dropPartition(command.partition, command.detach);
else
throw Exception("Bad PartitionCommand::Type: " + toString(command.type), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
table->alter(commands, database_name, table_name, context);
if (!alter_commands.empty())
table->alter(alter_commands, database_name, table_name, context);
}
void InterpreterAlterQuery::parseAlter(
const ASTAlterQuery::ParameterContainer & params_container, const DataTypeFactory & data_type_factory,
AlterCommands & out_commands, Partitions & out_partitions_to_drop)
AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands)
{
for (const auto & params : params_container)
{
......@@ -66,7 +72,7 @@ void InterpreterAlterQuery::parseAlter(
if (params.column)
command.after_column = typeid_cast<const ASTIdentifier &>(*params.column).name;
out_commands.push_back(command);
out_alter_commands.push_back(command);
}
else if (params.type == ASTAlterQuery::DROP_COLUMN)
{
......@@ -74,7 +80,7 @@ void InterpreterAlterQuery::parseAlter(
command.type = AlterCommand::DROP;
command.column_name = typeid_cast<const ASTIdentifier &>(*(params.column)).name;
out_commands.push_back(command);
out_alter_commands.push_back(command);
}
else if (params.type == ASTAlterQuery::MODIFY_COLUMN)
{
......@@ -88,12 +94,12 @@ void InterpreterAlterQuery::parseAlter(
command.column_name = ast_name_type.name;
command.data_type = data_type_factory.get(type_string);
out_commands.push_back(command);
out_alter_commands.push_back(command);
}
else if (params.type == ASTAlterQuery::DROP_PARTITION)
{
const Field & partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
out_partitions_to_drop.push_back(partition);
out_partition_commands.push_back(PartitionCommand::dropPartition(partition, params.detach));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
......
......@@ -23,6 +23,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & e
ParserString s_modify("MODIFY", true, true);
ParserString s_drop("DROP", true, true);
ParserString s_detach("DETACH", true, true);
ParserString s_partition("PARTITION", true, true);
ParserString s_comma(",");
......@@ -115,10 +116,26 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & e
return false;
params.type = ASTAlterQuery::DROP_COLUMN;
params.detach = false;
}
else
return false;
}
else if (s_detach.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if (!s_partition.ignore(pos, end, expected))
return false;
ws.ignore(pos, end);
if (!parser_literal.parse(pos, end, params.partition, expected))
return false;
params.type = ASTAlterQuery::DROP_PARTITION;
params.detach = true;
}
else if (s_modify.ignore(pos, end, expected))
{
ws.ignore(pos, end);
......
......@@ -41,6 +41,7 @@ MergeTreeData::MergeTreeData(
{
/// создаём директорию, если её нет
Poco::File(full_path).createDirectories();
Poco::File(full_path + "detached").createDirectory();
/// инициализируем описание сортировки
sort_descr.reserve(primary_expr_ast->children.size());
......@@ -104,6 +105,7 @@ void MergeTreeData::loadDataParts()
if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
continue;
/// TODO: Это можно удалить, если нигде больше не осталось директорий old_* (их давно никто не пишет).
if (0 == file_name.compare(0, strlen("old_"), "old_"))
{
String new_file_name = file_name.substr(strlen("old_"));
......@@ -731,7 +733,7 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix,
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
data_parts.erase(part);
part->renameAddPrefix(prefix);
part->renameAddPrefix("detached/" + prefix);
if (restore_covered)
{
......
......@@ -961,7 +961,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
{
LOG_INFO(log, "Removing parts covered by " << entry.new_part_name << ".");
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
{
LogEntries to_wait;
......@@ -998,7 +998,7 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
}
}
LOG_DEBUG(log, "Removing parts.");
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
/// Удалим куски, содержащиеся в удаляемом диапазоне.
......@@ -1010,16 +1010,40 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
LOG_DEBUG(log, "Removing part " << part->name);
++removed_parts;
/// Если кусок удалять не нужно, надежнее переместить директорию до изменений в ZooKeeper.
if (entry.detach)
data.renameAndDetachPart(part);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
zookeeper->multi(ops);
data.replaceParts({part}, {}, false);
/// Если кусок нужно удалить не нужно, надежнее удалить директорию после изменений в ZooKeeper.
if (!entry.detach)
data.replaceParts({part}, {}, false);
}
LOG_INFO(log, "Finished removing parts covered by " << entry.new_part_name << ".");
LOG_INFO(log, (entry.detach ? "Detached" : "Removed") << removed_parts << " parts inside " << entry.new_part_name << ".");
if (unreplicated_data)
{
removed_parts = 0;
parts = unreplicated_data->getDataParts();
for (const auto & part : parts)
{
if (!ActiveDataPartSet::contains(entry.new_part_name, part->name))
continue;
LOG_DEBUG(log, "Removing unreplicated part " << part->name);
++removed_parts;
if (entry.detach)
unreplicated_data->renameAndDetachPart(part, "");
else
unreplicated_data->replaceParts({part}, {}, false);
}
}
return true;
}
......@@ -1143,6 +1167,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
try
{
std::unique_lock<std::mutex> merge_selecting_lock(merge_selecting_mutex);
if (need_pull)
{
/// Нужно загрузить новую запись в очередь перед тем, как выбирать куски для слияния.
......@@ -1266,6 +1292,9 @@ void StorageReplicatedMergeTree::cleanupThread()
{
clearOldParts();
if (unreplicated_data)
unreplicated_data->clearOldParts();
if (is_leader_node)
{
clearOldLogs();
......@@ -1989,7 +2018,8 @@ bool StorageReplicatedMergeTree::optimize()
return true;
}
void StorageReplicatedMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
void StorageReplicatedMergeTree::alter(const AlterCommands & params,
const String & database_name, const String & table_name, Context & context)
{
LOG_DEBUG(log, "Doing ALTER");
......@@ -2077,6 +2107,90 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, const Strin
LOG_DEBUG(log, "ALTER finished");
}
static bool isValidMonthName(const String & s)
{
if (s.size() != 6)
return false;
if (!std::all_of(s.begin(), s.end(), isdigit))
return false;
DayNum_t date = DateLUT::instance().toDayNum(OrderedIdentifier2Date(s + "01"));
/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
return s == toString(Date2OrderedIdentifier(DateLUT::instance().fromDayNum(date)) / 100);
}
/// Название воображаемого куска, покрывающего все возможные куски в указанном месяце с номерами в указанном диапазоне.
static String getFakePartNameForDrop(const String & month_name, UInt64 left, UInt64 right)
{
/// Диапазон дат - весь месяц.
DateLUT & lut = DateLUT::instance();
time_t start_time = OrderedIdentifier2Date(month_name + "01");
DayNum_t left_date = lut.toDayNum(start_time);
DayNum_t right_date = DayNum_t(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);
/// Уровень - right-left+1: кусок не мог образоваться в результате такого или большего количества слияний.
return ActiveDataPartSet::getPartName(left_date, right_date, left, right, right - left + 1);
}
void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
{
String month_name;
if (field.getType() == Field::Types::UInt64)
month_name = toString(field.get<UInt64>());
else
month_name = field.safeGet<String>();
if (!isValidMonthName(month_name))
throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
ErrorCodes::INVALID_PARTITION_NAME);
/// TODO: Делать запрос в лидера по TCP.
if (!is_leader_node)
throw Exception("DROP PARTITION can only be done on leader replica.", ErrorCodes::NOT_LEADER);
/** Пропустим один номер в block_numbers для удаляемого месяца, и будем удалять только куски до этого номера.
* Это запретит мерджи удаляемых кусков с новыми вставляемыми данными.
* Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками.
* NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм,
* чтобы гарантировать этот инвариант.
*/
UInt64 right;
{
AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(month_name);
right = block_number_lock.getNumber();
block_number_lock.unlock();
}
/// Такого никогда не должно происходить.
if (right == 0)
return;
--right;
String fake_part_name = getFakePartNameForDrop(month_name, 0, right);
/** Запретим выбирать для слияния удаляемые куски - сделаем вид, что их всех уже собираются слить в fake_part_name.
* Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков.
*/
{
std::unique_lock<std::mutex> merge_selecting_lock(merge_selecting_mutex);
virtual_parts.add(fake_part_name);
}
/// Наконец, добившись нужны инвариантов, можно положить запись в лог.
LogEntry entry;
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = fake_part_name;
entry.detach = detach;
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
/// Дождемся, пока все реплики выполнят дроп.
waitForAllReplicasToProcessLogEntry(log_znode_path, entry);
}
void StorageReplicatedMergeTree::drop()
{
if (!zookeeper)
......@@ -2114,6 +2228,86 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str
/// TODO: Можно обновить названия логгеров.
}
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name)
{
String month_path = zookeeper_path + "/block_numbers/" + month_name;
if (!zookeeper->exists(month_path))
{
/// Создадим в block_numbers ноду для месяца и пропустим в ней 200 значений инкремента.
/// Нужно, чтобы в будущем при необходимости можно было добавить данные в начало.
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent));
for (size_t i = 0; i < 200; ++i)
{
ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1));
}
/// Игнорируем ошибки - не получиться могло только если кто-то еще выполнил эту строчку раньше нас.
zookeeper->tryMulti(ops);
}
return AbandonableLockInZooKeeper(
zookeeper_path + "/block_numbers/" + month_name + "/block-",
zookeeper_path + "/temp", *zookeeper);
}
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry)
{
UInt64 log_index = parse<UInt64>(log_znode_path.substr(log_znode_path.size() - 10));
String log_entry_str = entry.toString();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
/// Дождемся, пока запись попадет в очередь реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
String pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!pointer.empty() && parse<UInt64>(pointer) > log_index)
break;
event->wait();
}
/// Найдем запись в очереди реплики.
Strings queue_entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
String entry_to_wait_for;
for (const String & entry_name : queue_entries)
{
String queue_entry_str;
auto code = zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
if (code == ZOK && queue_entry_str == log_entry_str)
{
entry_to_wait_for = entry_name;
break;
}
}
/// Пока искали запись, ее уже выполнили и удалили.
if (entry_to_wait_for.empty())
continue;
/// Дождемся, пока запись исчезнет из очереди реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
String unused;
/// get вместо exists, чтобы не утек watch, если ноды уже нет.
auto code = zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_to_wait_for, unused, nullptr, event);
if (code == ZNONODE)
break;
event->wait();
}
}
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
......@@ -2137,7 +2331,10 @@ void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
writeString(new_part_name, out);
break;
case DROP_RANGE:
writeString("drop\n", out);
if (detach)
writeString("detach\n", out);
else
writeString("drop\n", out);
writeString(new_part_name, out);
break;
}
......@@ -2174,9 +2371,10 @@ void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
}
readString(new_part_name, in);
}
else if (type_str == "drop")
else if (type_str == "drop" || type_str == "detach")
{
type = DROP_RANGE;
detach = type_str == "detach";
readString(new_part_name, in);
}
assertString("\n", in);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册