提交 21383c54 编写于 作者: A Alexey Milovidov

dbms: FETCH PARTITION: development [#METR-13153].

上级 54636994
......@@ -261,6 +261,9 @@ namespace ErrorCodes
NO_SUCH_REPLICA,
TOO_MUCH_PARTS,
REPLICA_IS_ALREADY_EXIST,
NO_ACTIVE_REPLICAS,
TOO_MUCH_RETRIES_TO_FETCH_PARTS,
PARTITION_ALREADY_EXISTS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,
......
......@@ -62,6 +62,8 @@ public:
};
void add(const String & name);
/// Если не найдено - возвращает пустую строку.
String getContainingPart(const String & name) const;
Strings getParts() const; /// В порядке возрастания месяца и номера блока.
......
......@@ -40,12 +40,13 @@ class ReplicatedMergeTreePartsFetcher
public:
ReplicatedMergeTreePartsFetcher(MergeTreeData & data_) : data(data_), log(&Logger::get("ReplicatedMergeTreePartsFetcher")) {}
/// Скачивает кусок в tmp_директорию.
/// Скачивает кусок в tmp_директорию. Если to_detached - скачивает в директорию detached.
MergeTreeData::MutableDataPartPtr fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port);
int port,
bool to_detached = false);
private:
MergeTreeData & data;
......
......@@ -473,8 +473,9 @@ private:
String findReplicaHavingPart(const String & part_name, bool active);
/** Скачать указанный кусок с указанной реплики.
* Если to_detached, то кусок помещается в директорию detached.
*/
void fetchPart(const String & part_name, const String & replica_name);
void fetchPart(const String & part_name, const String & replica_path, bool to_detached = false);
AbandonableLockInZooKeeper allocateBlockNumber(const String & month_name);
......
......@@ -710,10 +710,10 @@ void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent, bo
{
if (!ast.database.empty())
{
s << (hilite ? hilite_keyword : "") << indent_str << ast.database << (hilite ? hilite_none : "");
s << indent_str << ast.database;
s << ".";
}
s << (hilite ? hilite_keyword : "") << indent_str << ast.table << (hilite ? hilite_none : "");
s << indent_str << ast.table;
}
s << nl_or_ws;
......@@ -755,6 +755,14 @@ void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent, bo
<< (p.part ? "PART " : "PARTITION ") << (hilite ? hilite_none : "");
formatAST(*p.partition, s, indent, hilite, true);
}
else if (p.type == ASTAlterQuery::FETCH_PARTITION)
{
s << (hilite ? hilite_keyword : "") << indent_str << "FETCH " << (p.unreplicated ? "UNREPLICATED " : "")
<< "PARTITION " << (hilite ? hilite_none : "");
formatAST(*p.partition, s, indent, hilite, true);
s << (hilite ? hilite_keyword : "") << " FROM " << (hilite ? hilite_none : "")
<< mysqlxx::quote << p.from;
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
......
......@@ -111,19 +111,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
data_parts.clear();
Strings all_file_names;
Strings part_file_names;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
all_file_names.push_back(it.name());
Strings part_file_names;
for (const String & file_name : all_file_names)
{
/// Удаляем временные директории старше суток.
if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
/// Пропускаем временные директории старше суток.
if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
continue;
part_file_names.push_back(file_name);
part_file_names.push_back(it.name());
}
DataPartsVector broken_parts_to_remove;
......
......@@ -64,7 +64,8 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port)
int port,
bool to_detached)
{
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
......@@ -72,7 +73,7 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
std::make_pair("compress", "false")};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
String part_path = data.getFullPath() + (to_detached ? "detached/" : "") + "tmp_" + part_name + "/";
Poco::File part_file(part_path);
if (part_file.exists())
......
......@@ -993,7 +993,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
fetchPart(entry.new_part_name, replica);
fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
......@@ -1955,16 +1955,18 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
return "";
}
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached)
{
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path);
auto table_lock = lockStructure(true);
TableStructureReadLockPtr table_lock;
if (!to_detached)
table_lock = lockStructure(true);
String host;
int port;
String host_port_str = zookeeper->get(zookeeper_path + "/replicas/" + replica_name + "/host");
String host_port_str = zookeeper->get(replica_path + "/host");
ReadBufferFromString buf(host_port_str);
assertString("host: ", buf);
readString(host, buf);
......@@ -1973,27 +1975,34 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
assertString("\n", buf);
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, replica_path, host, port, to_detached);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, part_name);
if (!to_detached)
{
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, part_name);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zookeeper->multi(ops);
transaction.commit();
merge_selecting_event.set();
zookeeper->multi(ops);
transaction.commit();
merge_selecting_event.set();
for (const auto & removed_part : removed_parts)
for (const auto & removed_part : removed_parts)
{
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
}
else
{
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
Poco::File(data.getFullPath() + "detached/tmp_" + part_name).renameTo(data.getFullPath() + "detached/" + part_name);
}
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_name);
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_name << (to_detached ? " (to 'detached' directory)" : ""));
}
void StorageReplicatedMergeTree::shutdown()
......@@ -2841,6 +2850,14 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, bool un
LOG_INFO(log, "Will fetch partition " << partition_str << " from shard " << from_);
/** Проверим, что в директории detached (куда мы будем записывать скаченные куски) ещё нет такой партиции.
* Ненадёжно (есть race condition) - такая партиция может появиться чуть позже.
*/
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it{data.getFullPath() + "detached/"}; dir_it != dir_end; ++dir_it)
if (0 == dir_it.name().compare(0, partition_str.size(), partition_str))
throw Exception("Detached partition " + partition_str + " is already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
if (unreplicated)
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); /// TODO
......@@ -2856,7 +2873,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, bool un
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception("No active replicas for shard " + from/* TODO ErrorCodes */);
throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
/** Надо выбрать лучшую (наиболее актуальную) реплику.
* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
......@@ -2870,13 +2887,13 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, bool un
for (const String & replica : active_replicas)
{
String replica_path = from + "/replicas/" + replica;
String current_replica_path = from + "/replicas/" + replica;
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
zkutil::Stat stat;
zookeeper->get(replica_path + "/queue", &stat);
zookeeper->get(current_replica_path + "/queue", &stat);
size_t queue_size = stat.numChildren;
if (log_pointer > max_log_pointer
......@@ -2894,13 +2911,74 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, bool un
LOG_INFO(log, "Found " << replicas.size() << " replicas, " << active_replicas.size() << " of them are active."
<< " Selected " << best_replica << " to fetch from.");
String best_replica_path = from + "/replicas/" + best_replica;
/// Выясним, какие куски есть на лучшей реплике.
Strings parts = zookeeper->getChildren(replica_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
Strings active_parts = active_parts_set.getParts();
/** Пытаемся скачать эти куски.
* Часть из них могла удалиться из-за мерджа.
* В этом случае, обновляем информацию о доступных кусках и пробуем снова.
*/
unsigned try_no = 0;
Strings missing_parts;
do
{
if (try_no)
LOG_INFO(log, "Some of parts (" << missing_parts.size() << ") are missing. Will try to fetch covering parts.");
if (try_no >= 5)
throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS);
Strings parts = zookeeper->getChildren(best_replica_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
Strings parts_to_fetch;
if (missing_parts.empty())
{
parts_to_fetch = active_parts_set.getParts();
/// Оставляем только куски нужной партиции.
Strings parts_to_fetch_partition;
for (const String & part : parts_to_fetch)
if (0 == part.compare(0, partition_str.size(), partition_str))
parts_to_fetch_partition.push_back(part);
parts_to_fetch = std::move(parts_to_fetch_partition);
}
else
{
for (const String & missing_part : missing_parts)
{
String containing_part = active_parts_set.getContainingPart(missing_part);
if (!containing_part.empty())
parts_to_fetch.push_back(containing_part);
else
LOG_WARNING(log, "Part " << missing_part << " on replica " << best_replica_path << " has been vanished.");
}
}
LOG_INFO(log, "Parts to fetch: " << parts_to_fetch.size());
missing_parts.clear();
for (const String & part : parts_to_fetch)
{
try
{
fetchPart(part, best_replica_path, true);
}
catch (const DB::Exception & e)
{
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER)
throw;
LOG_INFO(log, e.displayText());
missing_parts.push_back(part);
}
}
/// TODO
++try_no;
} while (!missing_parts.empty());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册