提交 e5d4149c 编写于 作者: A Alexey Milovidov

Merge

上级 67a07205
#pragma once
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnVector.h>
#include <type_traits>
#if defined(__x86_64__)
#define LIBDIVIDE_USE_SSE2 1
#endif
#include <libdivide.h>
namespace DB
{
template <typename T>
struct BlockFilterCreator
{
static std::vector<IColumn::Filter> perform(const size_t num_rows, const IColumn * column,
size_t num_shards, const std::vector<size_t> & slots)
{
const auto total_weight = slots.size();
std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using UnsignedT = typename std::make_unsigned<T>::type;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column->isConst())
{
const auto data = typeid_cast<const ColumnConst<T> *>(column)->getData();
const auto shard_num = slots[static_cast<UnsignedT>(data) % total_weight];
for (size_t i = 0; i < num_shards; ++i)
filters[i].assign(num_rows, static_cast<UInt8>(shard_num == i));
}
else
{
/// libdivide поддерживает только UInt32 или UInt64.
using TUInt32Or64 = typename std::conditional<sizeof(UnsignedT) <= 4, UInt32, UInt64>::type;
libdivide::divider<TUInt32Or64> divider(total_weight);
const auto & data = typeid_cast<const ColumnVector<T> *>(column)->getData();
/// NOTE Может быть, стоит поменять местами циклы.
for (size_t i = 0; i < num_shards; ++i)
{
filters[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j)
filters[i][j] = slots[
static_cast<TUInt32Or64>(data[j]) - (static_cast<TUInt32Or64>(data[j]) / divider) * total_weight] == i;
}
}
return filters;
}
};
}
......@@ -251,6 +251,7 @@ public:
BackgroundProcessingPool & getBackgroundPool();
void setReshardingWorker(std::shared_ptr<ReshardingWorker> resharding_worker);
ReshardingWorker & getReshardingWorker();
/** Очистить кэши разжатых блоков и засечек.
......
......@@ -55,7 +55,7 @@ private:
/// Для RESHARD PARTITION.
Field last_partition;
WeightedZooKeeperPaths weighted_zookeeper_paths;
String sharding_key;
ASTPtr sharding_key_expr;
static PartitionCommand dropPartition(const Field & partition, bool detach, bool unreplicated)
{
......@@ -78,9 +78,9 @@ private:
}
static PartitionCommand reshardPartitions(const Field & first_partition_, const Field & last_partition_,
const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const String & sharding_key_)
const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const ASTPtr & sharding_key_expr)
{
return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_};
return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_expr};
}
};
......
......@@ -70,7 +70,7 @@ public:
*/
ASTPtr last_partition;
ASTPtr weighted_zookeeper_paths;
String sharding_key;
ASTPtr sharding_key_expr;
/// deep copy
void clone(Parameters & p) const;
......
#pragma once
#include <DB/Storages/StorageDistributed.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/Increment.h>
#include <memory>
#include <common/Revision.h>
#include <iostream>
#include <type_traits>
#if defined(__x86_64__)
#define LIBDIVIDE_USE_SSE2 1
#endif
#include <libdivide.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Core/Block.h>
namespace DB
{
class StorageDistributed;
/** Запись асинхронная - данные сначала записываются на локальную файловую систему, а потом отправляются на удалённые серверы.
* Если Distributed таблица использует более одного шарда, то для того, чтобы поддерживалась запись,
* при создании таблицы должен быть указан дополнительный параметр у ENGINE - ключ шардирования.
......@@ -38,198 +20,22 @@ namespace DB
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast)
: storage(storage), query_ast(query_ast)
{
}
void write(const Block & block) override
{
if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1))
return writeSplit(block);
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast);
writeImpl(block);
}
void write(const Block & block) override;
private:
template <typename T>
static std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster)
{
const auto total_weight = cluster.slot_to_shard.size();
const auto num_shards = cluster.getShardsInfo().size();
std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using UnsignedT = typename std::make_unsigned<T>::type;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column->isConst())
{
const auto data = typeid_cast<const ColumnConst<T> *>(column)->getData();
const auto shard_num = cluster.slot_to_shard[static_cast<UnsignedT>(data) % total_weight];
for (size_t i = 0; i < num_shards; ++i)
filters[i].assign(num_rows, static_cast<UInt8>(shard_num == i));
}
else
{
/// libdivide поддерживает только UInt32 или UInt64.
using TUInt32Or64 = typename std::conditional<sizeof(UnsignedT) <= 4, UInt32, UInt64>::type;
libdivide::divider<TUInt32Or64> divider(total_weight);
const auto & data = typeid_cast<const ColumnVector<T> *>(column)->getData();
/// NOTE Может быть, стоит поменять местами циклы.
for (size_t i = 0; i < num_shards; ++i)
{
filters[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j)
filters[i][j] = cluster.slot_to_shard[
static_cast<TUInt32Or64>(data[j]) - (static_cast<TUInt32Or64>(data[j]) / divider) * total_weight] == i;
}
}
return filters;
}
std::vector<IColumn::Filter> createFilters(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, const Cluster &);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &createFiltersImpl<UInt8> },
{ TypeName<UInt16>::get(), &createFiltersImpl<UInt16> },
{ TypeName<UInt32>::get(), &createFiltersImpl<UInt32> },
{ TypeName<UInt64>::get(), &createFiltersImpl<UInt64> },
{ TypeName<Int8>::get(), &createFiltersImpl<Int8> },
{ TypeName<Int16>::get(), &createFiltersImpl<Int16> },
{ TypeName<Int32>::get(), &createFiltersImpl<Int32> },
{ TypeName<Int64>::get(), &createFiltersImpl<Int64> },
};
storage.getShardingKeyExpr()->execute(block);
const auto & key_column = block.getByName(storage.getShardingKeyColumnName());
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
return it != std::end(creators)
? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster)
: throw Exception{
"Sharding key expression does not evaluate to an integer type",
ErrorCodes::TYPE_MISMATCH
};
}
void writeSplit(const Block & block)
{
const auto num_cols = block.columns();
/// cache column pointers for later reuse
std::vector<const IColumn*> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.getByPosition(i).column;
std::vector<IColumn::Filter> createFilters(Block block);
auto filters = createFilters(block);
void writeSplit(const Block & block);
const auto num_shards = storage.cluster.getShardsInfo().size();
void writeImpl(const Block & block, const size_t shard_id = 0);
ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад.
void writeToLocal(const Block & block, const size_t repeats);
for (size_t i = 0; i < num_shards; ++i)
{
auto target_block = block.cloneEmpty();
for (size_t col = 0; col < num_cols; ++col)
target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint);
if (target_block.rowsInFirstColumn())
writeImpl(target_block, i);
}
}
void writeImpl(const Block & block, const size_t shard_id = 0)
{
const auto & shard_info = storage.cluster.getShardsInfo()[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
}
void writeToLocal(const Block & block, const size_t repeats)
{
InterpreterInsertQuery interp{query_ast, storage.context};
auto block_io = interp.execute();
block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i)
block_io.out->write(block);
block_io.out->writeSuffix();
}
void writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
std::string first_file_tmp_path{};
auto first = true;
const auto & query_string = queryToString(query_ast);
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';
/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin";
const auto & block_file_path = path + file_name;
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
* the inode is not freed until we're done */
if (first)
{
first = false;
const auto & tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto & block_file_tmp_path = tmp_path + file_name;
first_file_tmp_path = block_file_tmp_path;
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, Revision::get()};
writeStringBinary(query_string, out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}
if (link(first_file_tmp_path.data(), block_file_path.data()))
throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path);
}
/** remove the temporary file, enabling the OS to reclaim inode after all threads
* have removed their corresponding files */
Poco::File(first_file_tmp_path).remove();
}
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
private:
StorageDistributed & storage;
ASTPtr query_ast;
};
......
......@@ -242,7 +242,7 @@ public:
/** Выполнить запрос RESHARD PARTITION.
*/
virtual void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
const Settings & settings)
{
throw Exception("Method reshardPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
......
......@@ -56,6 +56,8 @@ private:
const ReshardingJob & job;
Logger * log;
std::vector<size_t> slots;
ExpressionActionsPtr sharding_key_expr;
std::string sharding_key_column_name;
};
}
......@@ -7,6 +7,8 @@
namespace DB
{
class Context;
namespace RemoteDiskSpaceMonitor
{
......@@ -15,14 +17,14 @@ namespace RemoteDiskSpaceMonitor
class Service final : public InterserverIOEndpoint
{
public:
Service(const std::string & path_);
Service(const Context & context_);
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
const std::string path;
const Context & context;
};
/** Клиент для получения информации о свободном месте на удалённом диске.
......@@ -33,7 +35,7 @@ public:
Client() = default;
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
size_t getFreeDiskSpace(const InterserverIOEndpointLocation & location) const;
size_t getFreeSpace(const InterserverIOEndpointLocation & location) const;
void cancel() { is_cancelled = true; }
private:
......
......@@ -16,7 +16,7 @@ public:
ReshardingJob(const std::string & database_name_, const std::string & table_name_,
const std::string & partition_, const WeightedZooKeeperPaths & paths_,
const std::string & sharding_key_);
const ASTPtr & sharding_key_expr_);
ReshardingJob(const ReshardingJob &) = delete;
ReshardingJob & operator=(const ReshardingJob &) = delete;
......@@ -29,7 +29,7 @@ public:
std::string table_name;
std::string partition;
WeightedZooKeeperPaths paths;
std::string sharding_key;
ASTPtr sharding_key_expr;
};
}
#pragma once
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/AlterCommands.h>
#include <common/logger_useful.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/SharedPtr.h>
#include <string>
#include <thread>
......@@ -22,7 +24,8 @@ class ReshardingJob;
class ReshardingWorker final
{
public:
ReshardingWorker(Context & context_);
ReshardingWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_);
ReshardingWorker(const ReshardingWorker &) = delete;
ReshardingWorker & operator=(const ReshardingWorker &) = delete;
......@@ -37,18 +40,12 @@ public:
const std::string & table_name,
const std::string & partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const std::string & sharding_key);
/// Прислать запрос на перешардирование.
void submitJob(const ReshardingJob & job);
const ASTPtr & sharding_key_expr);
/// Был ли поток запущен?
bool isStarted() const;
private:
/// Прислать запрос на перешардирование (внутренняя версия).
void submitJobImpl(const std::string & serialized_job);
/// Следить за появлением новых задач. Выполнить их последовательно.
void pollAndExecute();
......@@ -81,18 +78,17 @@ private:
/// Принудительно завершить поток.
void abortIfRequested() const;
/// Был ли поток завершён?
bool hasAborted(const Exception & ex) const;
private:
Context & context;
Logger * log;
std::unique_ptr<MergeTreeDataMerger> merger;
std::thread polling_thread;
mutable std::mutex cancel_mutex;
std::string host_task_queue_path;
std::atomic<bool> is_started{false};
std::atomic<bool> must_stop{false};
};
using ReshardingWorkerPtr = Poco::SharedPtr<ReshardingWorker>;
using ReshardingWorkerPtr = std::shared_ptr<ReshardingWorker>;
}
......@@ -2,6 +2,7 @@
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/IO/WriteBuffer.h>
#include <common/logger_useful.h>
namespace DB
{
......@@ -24,6 +25,7 @@ public:
private:
StorageReplicatedMergeTree & storage;
Logger * log;
};
/** Клиент для отправления кусков из партиции таблицы *MergeTree.
......@@ -31,7 +33,7 @@ private:
class Client final
{
public:
Client() = default;
Client();
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
bool send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
......@@ -40,6 +42,7 @@ public:
private:
std::atomic<bool> is_cancelled{false};
Logger * log;
};
}
......
......@@ -77,7 +77,7 @@ public:
void shutdown() override;
void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
const Settings & settings) override;
/// От каждой реплики получить описание соответствующей локальной таблицы.
......
......@@ -136,7 +136,7 @@ public:
void fetchPartition(const Field & partition, const String & from, const Settings & settings) override;
void freezePartition(const Field & partition, const Settings & settings) override;
void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
const Settings & settings) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
......@@ -257,7 +257,7 @@ private:
MergeTreeDataMerger merger;
DataPartsExchange::Fetcher fetcher;
RemoteDiskSpaceMonitor::Client free_disk_space_checker;
RemoteDiskSpaceMonitor::Client disk_space_monitor_client;
ShardedPartitionSender::Client sharded_partition_sender_client;
RemoteQueryExecutor::Client remote_query_executor_client;
......
......@@ -317,13 +317,12 @@ namespace ErrorCodes
extern const int INSUFFICIENT_SPACE_FOR_RESHARDING = 311;
extern const int PARTITION_COPY_FAILED = 312;
extern const int PARTITION_ATTACH_FAILED = 313;
extern const int RESHARDING_CLEANUP_FAILED = 314;
extern const int RESHARDING_NO_WORKER = 315;
extern const int INVALID_PARTITIONS_INTERVAL = 316;
extern const int RESHARDING_INVALID_PARAMETERS = 317;
extern const int INVALID_SHARD_WEIGHT = 318;
extern const int SHARD_DOESNT_REFERENCE_TABLE = 319;
extern const int UNKNOWN_STATUS_OF_INSERT = 320;
extern const int RESHARDING_NO_WORKER = 314;
extern const int INVALID_PARTITIONS_INTERVAL = 315;
extern const int RESHARDING_INVALID_PARAMETERS = 316;
extern const int INVALID_SHARD_WEIGHT = 317;
extern const int INVALID_CONFIG_PARAMETER = 318;
extern const int UNKNOWN_STATUS_OF_INSERT = 319;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -822,16 +822,19 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return *shared->background_pool;
}
ReshardingWorker & Context::getReshardingWorker()
void Context::setReshardingWorker(std::shared_ptr<ReshardingWorker> resharding_worker)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->resharding_worker)
throw Exception("Resharding background thread has already been set.", ErrorCodes::LOGICAL_ERROR);
shared->resharding_worker = resharding_worker;
}
if (!shared->zookeeper)
throw Exception("Resharding background processing requires ZooKeeper", ErrorCodes::LOGICAL_ERROR);
ReshardingWorker & Context::getReshardingWorker()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->resharding_worker)
shared->resharding_worker = new ReshardingWorker(*this);
throw Exception("Resharding background thread not set.", ErrorCodes::LOGICAL_ERROR);
return *shared->resharding_worker;
}
......
......@@ -67,7 +67,7 @@ BlockIO InterpreterAlterQuery::execute()
break;
case PartitionCommand::RESHARD_PARTITION:
table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key, context.getSettingsRef());
table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key_expr, context.getSettingsRef());
break;
default:
......@@ -190,9 +190,8 @@ void InterpreterAlterQuery::parseAlter(
weighted_zookeeper_paths.emplace_back(weighted_zookeeper_path.path, weighted_zookeeper_path.weight);
}
const auto & sharding_key = params.sharding_key;
out_partition_commands.push_back(PartitionCommand::reshardPartitions(first_partition, last_partition, weighted_zookeeper_paths, sharding_key));
out_partition_commands.push_back(PartitionCommand::reshardPartitions(
first_partition, last_partition, weighted_zookeeper_paths, params.sharding_key_expr));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
......
......@@ -19,6 +19,7 @@ void ASTAlterQuery::Parameters::clone(Parameters & p) const
if (partition) p.partition = partition->clone();
if (last_partition) p.last_partition = last_partition->clone();
if (weighted_zookeeper_paths) p.weighted_zookeeper_paths = weighted_zookeeper_paths->clone();
if (sharding_key_expr) p.sharding_key_expr = sharding_key_expr->clone();
}
void ASTAlterQuery::addParameters(const Parameters & params)
......@@ -34,6 +35,8 @@ void ASTAlterQuery::addParameters(const Parameters & params)
children.push_back(params.last_partition);
if (params.weighted_zookeeper_paths)
children.push_back(params.weighted_zookeeper_paths);
if (params.sharding_key_expr)
children.push_back(params.sharding_key_expr);
}
ASTAlterQuery::ASTAlterQuery(StringRange range_) : IAST(range_)
......@@ -153,8 +156,9 @@ void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & st
settings.ostr << settings.nl_or_ws;
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< "USING " << (settings.hilite ? hilite_none : "")
<< p.sharding_key;
<< "USING " << (settings.hilite ? hilite_none : "");
p.sharding_key_expr->formatImpl(settings, state, frame);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
......
......@@ -255,7 +255,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
else if (s_reshard.ignore(pos, end, max_parsed_pos, expected))
{
ParserList weighted_zookeeper_paths_p(ParserPtr(new ParserWeightedZooKeeperPath), ParserPtr(new ParserString(",")), false);
ParserIdentifier sharding_key_parser;
ParserExpressionWithOptionalAlias parser_sharding_key_expr(false);
ws.ignore(pos, end);
......@@ -294,12 +294,9 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
ws.ignore(pos, end);
ASTPtr ast_sharding_key;
if (!sharding_key_parser.parse(pos, end, ast_sharding_key, max_parsed_pos, expected))
if (!parser_sharding_key_expr.parse(pos, end, params.sharding_key_expr, max_parsed_pos, expected))
return false;
params.sharding_key = typeid_cast<const ASTIdentifier &>(*ast_sharding_key).name;
ws.ignore(pos, end);
params.type = ASTAlterQuery::RESHARD_PARTITION;
......
......@@ -320,14 +320,11 @@ int Server::main(const std::vector<std::string> & args)
global_context->setCurrentDatabase(config().getString("default_database", "default"));
if (has_zookeeper)
if (has_zookeeper && config().has("resharding"))
{
zkutil::ZooKeeperPtr zookeeper = global_context->getZooKeeper();
if (!zookeeper->getTaskQueuePath().empty())
{
auto & resharding_worker = global_context->getReshardingWorker();
resharding_worker.start();
}
auto resharding_worker = std::make_shared<ReshardingWorker>(config(), "resharding", *global_context);
global_context->setReshardingWorker(resharding_worker);
resharding_worker->start();
}
SCOPE_EXIT(
......
#include <DB/Storages/Distributed/DistributedBlockOutputStream.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/BlockFilterCreator.h>
#include <DB/Common/Increment.h>
#include <memory>
#include <common/Revision.h>
#include <iostream>
namespace DB
{
namespace
{
template <typename T>
std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster)
{
return BlockFilterCreator<T>::perform(num_rows, column, cluster.getShardsInfo().size(), cluster.slot_to_shard);
}
}
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast)
: storage(storage), query_ast(query_ast)
{
}
void DistributedBlockOutputStream::write(const Block & block)
{
if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1))
return writeSplit(block);
writeImpl(block);
}
std::vector<IColumn::Filter> DistributedBlockOutputStream::createFilters(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, const Cluster &);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &createFiltersImpl<UInt8> },
{ TypeName<UInt16>::get(), &createFiltersImpl<UInt16> },
{ TypeName<UInt32>::get(), &createFiltersImpl<UInt32> },
{ TypeName<UInt64>::get(), &createFiltersImpl<UInt64> },
{ TypeName<Int8>::get(), &createFiltersImpl<Int8> },
{ TypeName<Int16>::get(), &createFiltersImpl<Int16> },
{ TypeName<Int32>::get(), &createFiltersImpl<Int32> },
{ TypeName<Int64>::get(), &createFiltersImpl<Int64> },
};
storage.getShardingKeyExpr()->execute(block);
const auto & key_column = block.getByName(storage.getShardingKeyColumnName());
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
return it != std::end(creators)
? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster)
: throw Exception{
"Sharding key expression does not evaluate to an integer type",
ErrorCodes::TYPE_MISMATCH
};
}
void DistributedBlockOutputStream::writeSplit(const Block & block)
{
const auto num_cols = block.columns();
/// cache column pointers for later reuse
std::vector<const IColumn*> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.getByPosition(i).column;
auto filters = createFilters(block);
const auto num_shards = storage.cluster.getShardsInfo().size();
ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад.
for (size_t i = 0; i < num_shards; ++i)
{
auto target_block = block.cloneEmpty();
for (size_t col = 0; col < num_cols; ++col)
target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint);
if (target_block.rowsInFirstColumn())
writeImpl(target_block, i);
}
}
void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = storage.cluster.getShardsInfo()[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
{
InterpreterInsertQuery interp{query_ast, storage.context};
auto block_io = interp.execute();
block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i)
block_io.out->write(block);
block_io.out->writeSuffix();
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
std::string first_file_tmp_path{};
auto first = true;
const auto & query_string = queryToString(query_ast);
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';
/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin";
const auto & block_file_path = path + file_name;
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
* the inode is not freed until we're done */
if (first)
{
first = false;
const auto & tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto & block_file_tmp_path = tmp_path + file_name;
first_file_tmp_path = block_file_tmp_path;
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, Revision::get()};
writeStringBinary(query_string, out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}
if (link(first_file_tmp_path.data(), block_file_path.data()))
throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path);
}
/** remove the temporary file, enabling the OS to reclaim inode after all threads
* have removed their corresponding files */
Poco::File(first_file_tmp_path).remove();
}
}
......@@ -4,6 +4,8 @@
#include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <DB/Common/BlockFilterCreator.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <ctime>
......@@ -16,47 +18,6 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
}
namespace
{
template <typename T>
std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, size_t num_shards, const std::vector<size_t> & slots)
{
const auto total_weight = slots.size();
std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using UnsignedT = typename std::make_unsigned<T>::type;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column->isConst())
{
const auto data = typeid_cast<const ColumnConst<T> *>(column)->getData();
const auto shard_num = slots[static_cast<UnsignedT>(data) % total_weight];
for (size_t i = 0; i < num_shards; ++i)
filters[i].assign(num_rows, static_cast<UInt8>(shard_num == i));
}
else
{
const auto & data = typeid_cast<const ColumnVector<T> *>(column)->getData();
for (size_t i = 0; i < num_shards; ++i)
{
filters[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j)
filters[i][j] = slots[static_cast<UnsignedT>(data[j]) % total_weight] == i;
}
}
return filters;
}
}
ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_,
size_t shard_no_, UInt16 min_date_, UInt16 max_date_)
: block(block_), shard_no(shard_no_), min_date(min_date_), max_date(max_date_)
......@@ -64,7 +25,9 @@ ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_,
}
MergeTreeSharder::MergeTreeSharder(MergeTreeData & data_, const ReshardingJob & job_)
: data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)"))
: data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)")),
sharding_key_expr(ExpressionAnalyzer(job.sharding_key_expr, data.context, nullptr, data.getColumnsList()).getActions(false)),
sharding_key_column_name(job.sharding_key_expr->getColumnName())
{
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
{
......@@ -201,19 +164,19 @@ std::vector<IColumn::Filter> MergeTreeSharder::createFilters(Block block)
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, size_t num_shards, const std::vector<size_t> & slots);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &createFiltersImpl<UInt8> },
{ TypeName<UInt16>::get(), &createFiltersImpl<UInt16> },
{ TypeName<UInt32>::get(), &createFiltersImpl<UInt32> },
{ TypeName<UInt64>::get(), &createFiltersImpl<UInt64> },
{ TypeName<Int8>::get(), &createFiltersImpl<Int8> },
{ TypeName<Int16>::get(), &createFiltersImpl<Int16> },
{ TypeName<Int32>::get(), &createFiltersImpl<Int32> },
{ TypeName<Int64>::get(), &createFiltersImpl<Int64> },
{ TypeName<UInt8>::get(), &BlockFilterCreator<UInt8>::perform },
{ TypeName<UInt16>::get(), &BlockFilterCreator<UInt16>::perform },
{ TypeName<UInt32>::get(), &BlockFilterCreator<UInt32>::perform },
{ TypeName<UInt64>::get(), &BlockFilterCreator<UInt64>::perform },
{ TypeName<Int8>::get(), &BlockFilterCreator<Int8>::perform },
{ TypeName<Int16>::get(), &BlockFilterCreator<Int16>::perform },
{ TypeName<Int32>::get(), &BlockFilterCreator<Int32>::perform },
{ TypeName<Int64>::get(), &BlockFilterCreator<Int64>::perform },
};
data.getPrimaryExpression()->execute(block);
sharding_key_expr->execute(block);
const auto & key_column = block.getByName(job.sharding_key);
const auto & key_column = block.getByName(sharding_key_column_name);
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
......
#include <DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Interpreters/Context.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
......@@ -25,8 +26,8 @@ std::string getEndpointId(const std::string & node_id)
}
Service::Service(const String & path_)
: path(path_)
Service::Service(const Context & context_)
: context(context_)
{
}
......@@ -40,12 +41,12 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
if (is_cancelled)
throw Exception("RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED);
size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path);
size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(context.getPath());
writeBinary(free_space, out);
out.next();
}
size_t Client::getFreeDiskSpace(const InterserverIOEndpointLocation & location) const
size_t Client::getFreeSpace(const InterserverIOEndpointLocation & location) const
{
ReadBufferFromHTTP::Params params =
{
......
......@@ -3,10 +3,17 @@
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/ExpressionListParsers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ReshardingJob::ReshardingJob(const std::string & serialized_job)
{
ReadBufferFromString buf(serialized_job);
......@@ -14,7 +21,19 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job)
readBinary(database_name, buf);
readBinary(table_name, buf);
readBinary(partition, buf);
readBinary(sharding_key, buf);
std::string expr;
readBinary(expr, buf);
IParser::Pos pos = expr.data();
IParser::Pos max_parsed_pos = pos;
const char * end = pos + expr.size();
ParserExpressionWithOptionalAlias parser(false);
Expected expected = "";
if (!parser.parse(pos, end, sharding_key_expr, max_parsed_pos, expected))
throw Exception("ReshardingJob: Internal error", ErrorCodes::LOGICAL_ERROR);
while (!buf.eof())
{
std::string path;
......@@ -29,12 +48,12 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job)
ReshardingJob::ReshardingJob(const std::string & database_name_, const std::string & table_name_,
const std::string & partition_, const WeightedZooKeeperPaths & paths_,
const std::string & sharding_key_)
const ASTPtr & sharding_key_expr_)
: database_name(database_name_),
table_name(table_name_),
partition(partition_),
paths(paths_),
sharding_key(sharding_key_)
sharding_key_expr(sharding_key_expr_)
{
}
......@@ -46,7 +65,8 @@ std::string ReshardingJob::toString() const
writeBinary(database_name, buf);
writeBinary(table_name, buf);
writeBinary(partition, buf);
writeBinary(sharding_key, buf);
writeBinary(queryToString(sharding_key_expr), buf);
for (const auto & path : paths)
{
writeBinary(path.first, buf);
......
......@@ -11,6 +11,7 @@
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/Increment.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Context.h>
#include <common/threadpool.hpp>
......@@ -30,7 +31,8 @@ namespace ErrorCodes
extern const int UNEXPECTED_ZOOKEEPER_ERROR;
extern const int PARTITION_COPY_FAILED;
extern const int PARTITION_ATTACH_FAILED;
extern const int RESHARDING_CLEANUP_FAILED;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
}
namespace
......@@ -52,17 +54,52 @@ std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts)
return ActiveDataPartSet::getPartName(left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
}
class Arguments final
{
public:
Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (key == "task_queue_path")
{
task_queue_path = config.getString(config_name + "." + key);
if (task_queue_path.empty())
throw Exception("Invalid parameter in resharding configuration", ErrorCodes::INVALID_CONFIG_PARAMETER);
}
else
throw Exception("Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
Arguments(const Arguments &) = delete;
Arguments & operator=(const Arguments &) = delete;
std::string getTaskQueuePath() const
{
return task_queue_path;
}
private:
std::string task_queue_path;
};
}
ReshardingWorker::ReshardingWorker(Context & context_)
ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_)
: context(context_), log(&Logger::get("ReshardingWorker"))
{
Arguments arguments(config, config_name);
auto zookeeper = context.getZooKeeper();
host_task_queue_path = "/clickhouse";
zookeeper->createIfNotExists(host_task_queue_path, "");
host_task_queue_path += "/" + zookeeper->getTaskQueuePath();
host_task_queue_path += "/" + arguments.getTaskQueuePath();
zookeeper->createIfNotExists(host_task_queue_path, "");
host_task_queue_path += "/resharding";
......@@ -75,6 +112,11 @@ ReshardingWorker::ReshardingWorker(Context & context_)
ReshardingWorker::~ReshardingWorker()
{
must_stop = true;
{
std::lock_guard<std::mutex> guard(cancel_mutex);
if (merger)
merger->cancel();
}
if (polling_thread.joinable())
polling_thread.join();
}
......@@ -88,16 +130,12 @@ void ReshardingWorker::submitJob(const std::string & database_name,
const std::string & table_name,
const std::string & partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const std::string & sharding_key)
const ASTPtr & sharding_key_expr)
{
auto str = ReshardingJob(database_name, table_name, partition, weighted_zookeeper_paths, sharding_key).toString();
submitJobImpl(str);
}
void ReshardingWorker::submitJob(const ReshardingJob & job)
{
auto str = job.toString();
submitJobImpl(str);
auto serialized_job = ReshardingJob(database_name, table_name, partition, weighted_zookeeper_paths, sharding_key_expr).toString();
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(host_task_queue_path + "/task-", serialized_job,
zkutil::CreateMode::PersistentSequential);
}
bool ReshardingWorker::isStarted() const
......@@ -105,23 +143,18 @@ bool ReshardingWorker::isStarted() const
return is_started;
}
void ReshardingWorker::submitJobImpl(const std::string & serialized_job)
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(host_task_queue_path + "/task-", serialized_job,
zkutil::CreateMode::PersistentSequential);
}
void ReshardingWorker::pollAndExecute()
{
bool error = false;
try
{
bool old_val = false;
if (!is_started.compare_exchange_strong(old_val, true, std::memory_order_seq_cst,
std::memory_order_relaxed))
throw Exception("Resharding worker thread already started", ErrorCodes::LOGICAL_ERROR);
throw Exception("Resharding background thread already started", ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(log, "Started resharding thread.");
LOG_DEBUG(log, "Started resharding background thread.");
try
{
......@@ -129,10 +162,10 @@ void ReshardingWorker::pollAndExecute()
}
catch (const Exception & ex)
{
if ((ex.code() == ErrorCodes::RESHARDING_CLEANUP_FAILED) || hasAborted(ex))
if (ex.code() == ErrorCodes::ABORTED)
throw;
else
LOG_INFO(log, ex.message());
LOG_ERROR(log, ex.message());
}
catch (...)
{
......@@ -166,10 +199,10 @@ void ReshardingWorker::pollAndExecute()
}
catch (const Exception & ex)
{
if ((ex.code() == ErrorCodes::RESHARDING_CLEANUP_FAILED) || hasAborted(ex))
if (ex.code() == ErrorCodes::ABORTED)
throw;
else
LOG_INFO(log, ex.message());
LOG_ERROR(log, ex.message());
}
catch (...)
{
......@@ -179,11 +212,21 @@ void ReshardingWorker::pollAndExecute()
}
catch (const Exception & ex)
{
if (!hasAborted(ex))
throw;
if (ex.code() != ErrorCodes::ABORTED)
error = true;
}
catch (...)
{
error = true;
}
LOG_DEBUG(log, "Resharding thread terminated.");
if (error)
{
/// Если мы попали сюда, это значит, что где-то кроется баг.
LOG_ERROR(log, "Resharding background thread terminated with critical error.");
}
else
LOG_DEBUG(log, "Resharding background thread terminated.");
}
void ReshardingWorker::performPendingJobs()
......@@ -204,8 +247,24 @@ void ReshardingWorker::perform(const Strings & job_nodes)
std::string child_full_path = host_task_queue_path + "/" + child;
auto job_descriptor = zookeeper->get(child_full_path);
ReshardingJob job(job_descriptor);
try
{
perform(job);
}
catch (const Exception & ex)
{
if (ex.code() != ErrorCodes::ABORTED)
zookeeper->remove(child_full_path);
throw;
}
catch (...)
{
zookeeper->remove(child_full_path);
throw;
}
zookeeper->remove(child_full_path);
perform(job);
}
}
......@@ -229,12 +288,8 @@ void ReshardingWorker::perform(const ReshardingJob & job)
{
cleanup(storage, job);
if (hasAborted(ex))
{
/// Поток завершается. Сохраняем сведения о прерванной задаче.
submitJob(job);
LOG_DEBUG(log, "Resharding job cancelled then re-submitted for later processing.");
}
if (ex.code() == ErrorCodes::ABORTED)
LOG_DEBUG(log, "Resharding job cancelled.");
throw;
}
......@@ -276,15 +331,24 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
/// Для каждого шарда, куски, которые должны быть слиты.
std::unordered_map<size_t, PartsToBeMerged> to_merge;
/// Для нумерации блоков.
SimpleIncrement increment(storage.data.getMaxDataPartIndex());
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
auto zookeeper = storage.getZooKeeper();
const auto & settings = context.getSettingsRef();
(void) settings;
DayNum_t month = MergeTreeData::getMonthFromName(job.partition);
auto parts_from_partition = storage.merger.selectAllPartsFromPartition(month);
{
std::lock_guard<std::mutex> guard(cancel_mutex);
merger = std::make_unique<MergeTreeDataMerger>(storage.data);
}
auto parts_from_partition = merger->selectAllPartsFromPartition(month);
MergeTreeSharder sharder(storage.data, job);
for (const auto & part : parts_from_partition)
{
......@@ -305,8 +369,6 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
DBMS_DEFAULT_BUFFER_SIZE,
true);
MergeTreeSharder sharder(storage.data, job);
Block block;
while (block = source.read())
{
......@@ -318,57 +380,8 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
abortIfRequested();
/// Создать новый кусок соответствующий новому блоку.
std::string month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(block_with_dates.min_date)) / 100);
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name);
Int64 part_number = block_number_lock.getNumber();
MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, part_number);
/// Добавить в БД ZooKeeper информацию о новом блоке.
SipHash hash;
block_part->checksums.summaryDataChecksum(hash);
union
{
char bytes[16];
UInt64 lo;
UInt64 hi;
} hash_value;
hash.get128(hash_value.bytes);
std::string checksum(hash_value.bytes, 16);
std::string block_id = toString(hash_value.lo) + "_" + toString(hash_value.hi);
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
std::string to_path = job.paths[block_with_dates.shard_no].first;
ops.push_back(
new zkutil::Op::Create(
to_path + "/detached_sharded_blocks/" + block_id,
"",
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
to_path + "/detached_sharded_blocks/" + block_id + "/checksum",
checksum,
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
to_path + "/detached_sharded_blocks/" + block_id + "/number",
toString(part_number),
acl,
zkutil::CreateMode::Persistent));
block_number_lock.getUnlockOps(ops);
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
throw Exception("Unexpected error while adding block " + toString(part_number)
+ " with ID " + block_id + ": " + zkutil::ZooKeeper::error2string(code),
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
Int64 temp_index = increment.get();
MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, temp_index);
abortIfRequested();
......@@ -390,7 +403,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name,
job.table_name, merged_name);
MergeTreeData::MutableDataPartPtr new_part = storage.merger.mergeParts(parts, merged_name, *merge_entry,
MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry,
storage.data.context.getSettings().min_bytes_to_use_direct_io);
sharded_parts.insert(new_part);
......@@ -424,7 +437,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name,
job.table_name, merged_name);
MergeTreeData::MutableDataPartPtr new_part = storage.merger.mergeParts(parts, merged_name, *merge_entry,
MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry,
storage.data.context.getSettings().min_bytes_to_use_direct_io);
sharded_parts.insert(new_part);
......@@ -466,8 +479,6 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
auto zookeeper = storage.getZooKeeper();
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
struct TaskInfo
{
TaskInfo(const std::string & replica_path_,
......@@ -493,16 +504,20 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
/// Количество участвующих локальных реплик. Должно быть <= 1.
size_t local_count = 0;
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
for (const auto & entry : storage.data.per_shard_data_parts)
{
const WeightedZooKeeperPath & weighted_path = job.paths[shard_no];
const std::string & zookeeper_path = weighted_path.first;
size_t shard_no = entry.first;
const MergeTreeData::MutableDataParts & sharded_parts = entry.second;
if (sharded_parts.empty())
continue;
std::vector<std::string> part_names;
const MergeTreeData::MutableDataParts & sharded_parts = per_shard_data_parts.at(shard_no);
for (const MergeTreeData::DataPartPtr & sharded_part : sharded_parts)
part_names.push_back(sharded_part->name);
const WeightedZooKeeperPath & weighted_path = job.paths[shard_no];
const std::string & zookeeper_path = weighted_path.first;
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & child : children)
{
......@@ -610,9 +625,14 @@ void ReshardingWorker::applyChanges(StorageReplicatedMergeTree & storage, const
using TaskInfoList = std::vector<TaskInfo>;
TaskInfoList task_info_list;
for (size_t i = 0; i < job.paths.size(); ++i)
for (const auto & entry : storage.data.per_shard_data_parts)
{
const WeightedZooKeeperPath & weighted_path = job.paths[i];
size_t shard_no = entry.first;
const MergeTreeData::MutableDataParts & sharded_parts = entry.second;
if (sharded_parts.empty())
continue;
const WeightedZooKeeperPath & weighted_path = job.paths[shard_no];
const std::string & zookeeper_path = weighted_path.first;
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
......@@ -668,47 +688,13 @@ void ReshardingWorker::cleanup(StorageReplicatedMergeTree & storage, const Resha
{
LOG_DEBUG(log, "Performing cleanup.");
try
{
storage.data.per_shard_data_parts.clear();
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it)
{
auto absolute_path = it.path().absolute().toString();
Poco::File(absolute_path).remove(true);
}
auto zookeeper = storage.getZooKeeper();
zkutil::Ops ops;
for (size_t i = 0; i < job.paths.size(); ++i)
{
const WeightedZooKeeperPath & weighted_path = job.paths[i];
const std::string & zookeeper_path = weighted_path.first;
storage.data.per_shard_data_parts.clear();
auto children = zookeeper->getChildren(zookeeper_path + "/detached_sharded_blocks");
if (!children.empty())
{
for (const auto & child : children)
{
ops.push_back(
new zkutil::Op::Remove(
zookeeper_path + "/detached_sharded_blocks/" + child + "/number", -1));
ops.push_back(
new zkutil::Op::Remove(
zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum", -1));
ops.push_back(
new zkutil::Op::Remove(
zookeeper_path + "/detached_sharded_blocks/" + child, -1));
}
}
}
zookeeper->multi(ops);
}
catch (...)
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it)
{
throw Exception("Failed to perform cleanup during resharding operation",
ErrorCodes::RESHARDING_CLEANUP_FAILED);
auto absolute_path = it.path().absolute().toString();
Poco::File(absolute_path).remove(true);
}
}
......@@ -718,9 +704,4 @@ void ReshardingWorker::abortIfRequested() const
throw Exception("Cancelled resharding", ErrorCodes::ABORTED);
}
bool ReshardingWorker::hasAborted(const Exception & ex) const
{
return must_stop && (ex.code() == ErrorCodes::ABORTED);
}
}
......@@ -51,7 +51,7 @@ std::string getEndpointId(const std::string & node_id)
}
Service::Service(StorageReplicatedMergeTree & storage_)
: storage(storage_)
: storage(storage_), log(&Logger::get("ShardedPartitionSender::Service"))
{
}
......@@ -79,8 +79,18 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no);
part->is_temp = false;
const std::string new_name = "detached/" + part_name;
Poco::File(storage.full_path + part->name).renameTo(storage.full_path + new_name);
const std::string old_part_path = storage.full_path + part->name;
const std::string new_part_path = storage.full_path + "detached/" + part_name;
Poco::File new_part_dir(new_part_path);
if (new_part_dir.exists())
{
LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing.");
new_part_dir.remove(true);
}
Poco::File(old_part_path).renameTo(new_part_path);
}
bool flag = true;
......@@ -88,6 +98,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
out.next();
}
Client::Client()
: log(&Logger::get("ShardedPartitionSender::Client"))
{
}
bool Client::send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::vector<std::string> & parts, size_t shard_no)
{
......
......@@ -228,7 +228,7 @@ void StorageDistributed::shutdown()
void StorageDistributed::reshardPartitions(const String & database_name, const Field & first_partition,
const Field & last_partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const String & sharding_key, const Settings & settings)
const ASTPtr & sharding_key_expr, const Settings & settings)
{
/// Создать запрос ALTER TABLE xxx.yyy RESHARD PARTITION zzz TO ttt USING uuu.
......@@ -258,7 +258,7 @@ void StorageDistributed::reshardPartitions(const String & database_name, const F
}
parameters.weighted_zookeeper_paths = expr_list;
parameters.sharding_key = sharding_key;
parameters.sharding_key_expr = sharding_key_expr;
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
......
......@@ -75,7 +75,6 @@ namespace ErrorCodes
extern const int INVALID_PARTITIONS_INTERVAL;
extern const int RESHARDING_INVALID_PARAMETERS;
extern const int INVALID_SHARD_WEIGHT;
extern const int SHARD_DOESNT_REFERENCE_TABLE;
}
......@@ -335,7 +334,7 @@ StoragePtr StorageReplicatedMergeTree::create(
/// Сервисы для перешардирования.
{
InterserverIOEndpointPtr endpoint = new RemoteDiskSpaceMonitor::Service(res->full_path);
InterserverIOEndpointPtr endpoint = new RemoteDiskSpaceMonitor::Service(res->context);
res->disk_space_monitor_endpoint_holder = get_endpoint_holder(endpoint);
}
......@@ -403,8 +402,6 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/blocks", "",
acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/detached_sharded_blocks", "",
acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/block_numbers", "",
acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/nonincrement_block_numbers", "",
......@@ -2317,7 +2314,7 @@ void StorageReplicatedMergeTree::shutdown()
fetcher.cancel();
disk_space_monitor_endpoint_holder = nullptr;
free_disk_space_checker.cancel();
disk_space_monitor_client.cancel();
sharded_partition_sender_endpoint_holder = nullptr;
sharded_partition_sender_client.cancel();
......@@ -2891,56 +2888,7 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
}
std::string log_msg = "Adding attaches to log";
if (is_leader_node)
{
/// Если ATTACH PART выполняется в рамках перешардирования, обновляем информацию о блоках на шарде.
auto children = zookeeper->getChildren(zookeeper_path + "/detached_sharded_blocks");
if (!children.empty())
{
log_msg += ". Updating information about blocks in the context of the resharding operation.";
auto acl = zookeeper->getDefaultACL();
for (const auto & child : children)
{
std::string checksum = zookeeper->get(zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum");
std::string number = zookeeper->get(zookeeper_path + "/detached_sharded_blocks/" + child + "/number");
ops.push_back(
new zkutil::Op::Create(
zookeeper_path + "/blocks/" + child,
"",
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
zookeeper_path + "/blocks/" + child + "/checksum",
checksum,
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
zookeeper_path + "/blocks/" + child + "/number",
number,
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Remove(
zookeeper_path + "/detached_sharded_blocks/" + child + "/number", -1));
ops.push_back(
new zkutil::Op::Remove(
zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum", -1));
ops.push_back(
new zkutil::Op::Remove(
zookeeper_path + "/detached_sharded_blocks/" + child, -1));
}
}
}
LOG_DEBUG(log, log_msg);
LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops);
......@@ -3481,12 +3429,12 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const
}
void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
const Settings & settings)
{
auto & resharding_worker = context.getReshardingWorker();
if (!resharding_worker.isStarted())
throw Exception("Resharding worker is not running.", ErrorCodes::RESHARDING_NO_WORKER);
throw Exception("Resharding background thread is not running.", ErrorCodes::RESHARDING_NO_WORKER);
for (const auto & weighted_path : weighted_zookeeper_paths)
{
......@@ -3495,14 +3443,6 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name,
throw Exception("Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT);
}
for (const auto & weighted_path : weighted_zookeeper_paths)
{
const std::string & path = weighted_path.first;
if ((path.length() <= getTableName().length()) ||
(path.substr(path.length() - getTableName().length()) != getTableName()))
throw Exception("Shard does not reference table", ErrorCodes::SHARD_DOESNT_REFERENCE_TABLE);
}
DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t();
DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t();
......@@ -3548,7 +3488,7 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name,
/// Зарегистрировать фоновые задачи перешардирования.
for (const auto & partition : partition_list)
resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key);
resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key_expr);
}
void StorageReplicatedMergeTree::enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths)
......@@ -3641,8 +3581,8 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
InterserverIOEndpointLocation location(replica_path, address.host, address.replication_port);
tasks[i] = Tasks::value_type(std::bind(&RemoteDiskSpaceMonitor::Client::getFreeDiskSpace,
&free_disk_space_checker, location));
tasks[i] = Tasks::value_type(std::bind(&RemoteDiskSpaceMonitor::Client::getFreeSpace,
&disk_space_monitor_client, location));
pool.schedule([i, &tasks]{ tasks[i](); });
}
}
......
......@@ -173,8 +173,6 @@ public:
*/
void waitForDisappear(const std::string & path);
std::string getTaskQueuePath() const;
/** Асинхронный интерфейс (реализовано небольшое подмножество операций).
*
* Использование:
......@@ -300,7 +298,7 @@ private:
friend struct WatchWithEvent;
friend class EphemeralNodeHolder;
void init(const std::string & hosts, int32_t session_timeout_ms, const std::string & task_queue_path_ = "");
void init(const std::string & hosts, int32_t session_timeout_ms);
void removeChildrenRecursive(const std::string & path);
void tryRemoveChildrenRecursive(const std::string & path);
void * watchForEvent(EventPtr event);
......@@ -343,7 +341,6 @@ private:
int32_t existsImpl(const std::string & path, Stat * stat_, EventPtr watch = nullptr);
std::string hosts;
std::string task_queue_path;
int32_t session_timeout_ms;
std::mutex mutex;
......
......@@ -61,13 +61,12 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p
}
}
void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_, const std::string & task_queue_path_)
void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_)
{
log = &Logger::get("ZooKeeper");
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
hosts = hosts_;
session_timeout_ms = session_timeout_ms_;
task_queue_path = task_queue_path_;
impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
......@@ -105,10 +104,6 @@ struct ZooKeeperArgs
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "task_queue_path")
{
task_queue_path = config.getString(config_name + "." + key);
}
else throw KeeperException(std::string("Unknown key ") + key + " in config file");
}
......@@ -125,13 +120,12 @@ struct ZooKeeperArgs
std::string hosts;
size_t session_timeout_ms;
std::string task_queue_path;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, args.session_timeout_ms, args.task_queue_path);
init(args.hosts, args.session_timeout_ms);
}
void * ZooKeeper::watchForEvent(EventPtr event)
......@@ -584,11 +578,6 @@ void ZooKeeper::waitForDisappear(const std::string & path)
}
}
std::string ZooKeeper::getTaskQueuePath() const
{
return task_queue_path;
}
ZooKeeper::~ZooKeeper()
{
LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册