提交 5c90d7d4 编写于 作者: A Alexey Milovidov

Merge

上级 085dd9f9
#pragma once
#include <DB/Interpreters/ClusterProxy/IQueryConstructor.h>
namespace DB
{
namespace ClusterProxy
{
class AlterQueryConstructor final : public IQueryConstructor
{
public:
AlterQueryConstructor() = default;
BlockInputStreamPtr createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address) override;
BlockInputStreamPtr createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
BlockInputStreamPtr createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
bool isInclusive() const override;
};
}
}
#pragma once
#include <DB/Interpreters/ClusterProxy/IQueryConstructor.h>
namespace DB
{
namespace ClusterProxy
{
class DescribeQueryConstructor final : public IQueryConstructor
{
public:
DescribeQueryConstructor() = default;
BlockInputStreamPtr createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address) override;
BlockInputStreamPtr createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
BlockInputStreamPtr createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
bool isInclusive() const override;
};
}
}
#pragma once
#include <DB/Interpreters/Cluster.h>
#include <DB/Parsers/IAST.h>
#include <DB/Storages/IStorage.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
class Settings;
class Context;
class Cluster;
class IInterpreter;
class RemoteBlockInputStream;
class Throttler;
namespace ClusterProxy
{
class IQueryConstructor
{
public:
virtual ~IQueryConstructor() {}
virtual BlockInputStreamPtr createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address) = 0;
virtual BlockInputStreamPtr createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) = 0;
virtual BlockInputStreamPtr createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & new_settings, ThrottlerPtr throttler, const Context & context) = 0;
virtual bool isInclusive() const = 0;
};
}
}
#pragma once
#include <DB/Parsers/IAST.h>
#include <DB/Storages/IStorage.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
class Settings;
class Context;
class Cluster;
namespace ClusterProxy
{
class IQueryConstructor;
class Query
{
public:
Query(IQueryConstructor & query_constructor_, Cluster & cluster_,
ASTPtr query_ast_, const Context & context_, const Settings & settings_, bool enable_shard_multiplexing_);
BlockInputStreams execute();
private:
IQueryConstructor & query_constructor;
Cluster & cluster;
ASTPtr query_ast;
const Context & context;
const Settings & settings;
bool enable_shard_multiplexing;
};
}
}
#pragma once
#include <DB/Interpreters/ClusterProxy/IQueryConstructor.h>
#include <DB/Core/QueryProcessingStage.h>
#include <DB/Storages/IStorage.h>
namespace DB
{
namespace ClusterProxy
{
class SelectQueryConstructor final : public IQueryConstructor
{
public:
SelectQueryConstructor(const QueryProcessingStage::Enum & processed_stage, const Tables & external_tables);
BlockInputStreamPtr createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address) override;
BlockInputStreamPtr createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
BlockInputStreamPtr createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
bool isInclusive() const override;
private:
const QueryProcessingStage::Enum & processed_stage;
const Tables & external_tables;
};
}
}
......@@ -30,7 +30,6 @@ class Dictionaries;
class ExternalDictionaries;
class InterserverIOHandler;
class BackgroundProcessingPool;
class ReshardingWorker;
class MergeList;
class Cluster;
class Compiler;
......@@ -251,8 +250,6 @@ public:
BackgroundProcessingPool & getBackgroundPool();
ReshardingWorker & getReshardingWorker();
/** Очистить кэши разжатых блоков и засечек.
* Обычно это делается при переименовании таблиц, изменении типа столбцов, удалении таблицы.
* - так как кэши привязаны к именам файлов, и становятся некорректными.
......
......@@ -16,7 +16,7 @@ namespace DB
class InterpreterAlterQuery : public IInterpreter
{
public:
InterpreterAlterQuery(ASTPtr query_ptr_, const Context & context_);
InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_);
BlockIO execute() override;
......@@ -28,8 +28,7 @@ public:
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const Context & context);
Context & context);
private:
struct PartitionCommand
{
......@@ -39,7 +38,6 @@ private:
ATTACH_PARTITION,
FETCH_PARTITION,
FREEZE_PARTITION,
RESHARD_PARTITION
};
Type type;
......@@ -52,11 +50,6 @@ private:
String from; /// Для FETCH PARTITION - путь в ZK к шарду, с которого скачивать партицию.
/// Для RESHARD PARTITION.
Field last_partition;
WeightedZooKeeperPaths weighted_zookeeper_paths;
String sharding_key;
static PartitionCommand dropPartition(const Field & partition, bool detach, bool unreplicated)
{
return {DROP_PARTITION, partition, detach, unreplicated};
......@@ -76,12 +69,6 @@ private:
{
return {FREEZE_PARTITION, partition};
}
static PartitionCommand reshardPartitions(const Field & first_partition_, const Field & last_partition_,
const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const String & sharding_key_)
{
return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_};
}
};
typedef std::vector<PartitionCommand> PartitionCommands;
......
......@@ -23,7 +23,7 @@ namespace DB
class InterpreterDescribeQuery : public IInterpreter
{
public:
InterpreterDescribeQuery(ASTPtr query_ptr_, const Context & context_)
InterpreterDescribeQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override
......
......@@ -2,10 +2,6 @@
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Core/Types.h>
#include <map>
#include <atomic>
......@@ -20,50 +16,12 @@ namespace ErrorCodes
extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT;
}
/** Местонахождение сервиса.
*/
struct InterserverIOEndpointLocation
{
public:
InterserverIOEndpointLocation(const std::string & name_, const std::string & host_, UInt16 port_)
: name(name_), host(host_), port(port_)
{
}
/// Создаёт местонахождение на основе его сериализованного представления.
InterserverIOEndpointLocation(const std::string & serialized_location)
{
ReadBufferFromString buf(serialized_location);
readBinary(name, buf);
readBinary(host, buf);
readBinary(port, buf);
assertEOF(buf);
}
/// Сериализует местонахождение.
std::string toString() const
{
std::string serialized_location;
WriteBufferFromString buf(serialized_location);
writeBinary(name, buf);
writeBinary(host, buf);
writeBinary(port, buf);
buf.next();
return serialized_location;
}
public:
std::string name;
std::string host;
UInt16 port;
};
/** Обработчик запросов от других серверов.
*/
class InterserverIOEndpoint
{
public:
virtual std::string getId(const std::string & path) const = 0;
virtual void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) = 0;
virtual ~InterserverIOEndpoint() {}
......
#pragma once
#include <DB/Parsers/IAST.h>
#include <mysqlxx/Manip.h>
namespace DB
......@@ -17,7 +18,6 @@ namespace ErrorCodes
* DROP COLUMN col_drop,
* MODIFY COLUMN col_name type,
* DROP PARTITION partition
* RESHARD PARTITION partition TO /path/to/zookeeper/table WEIGHT w, ... USING column
* ...
*/
......@@ -33,14 +33,12 @@ public:
ATTACH_PARTITION,
FETCH_PARTITION,
FREEZE_PARTITION,
RESHARD_PARTITION,
NO_TYPE
};
struct Parameters
{
Parameters();
Parameters() : type(NO_TYPE) {}
int type = NO_TYPE;
/** В запросе ADD COLUMN здесь хранится имя и тип добавляемого столбца
......@@ -54,7 +52,7 @@ public:
*/
ASTPtr column;
/** В запросах DROP PARTITION и RESHARD PARTITION здесь хранится имя partition'а.
/** В запросе DROP PARTITION здесь хранится имя partition'а.
*/
ASTPtr partition;
bool detach = false; /// true для DETACH PARTITION.
......@@ -66,33 +64,126 @@ public:
*/
String from;
/** Для RESHARD PARTITION.
*/
ASTPtr last_partition;
ASTPtr weighted_zookeeper_paths;
String sharding_key;
/// deep copy
void clone(Parameters & p) const;
void clone(Parameters & p) const
{
p = *this;
if (col_decl) p.col_decl = col_decl->clone();
if (column) p.column = column->clone();
if (partition) p.partition = partition->clone();
}
};
typedef std::vector<Parameters> ParameterContainer;
ParameterContainer parameters;
String database;
String table;
void addParameters(const Parameters & params);
void addParameters(const Parameters & params)
{
parameters.push_back(params);
if (params.col_decl)
children.push_back(params.col_decl);
if (params.column)
children.push_back(params.column);
if (params.partition)
children.push_back(params.partition);
}
ASTAlterQuery(StringRange range_ = StringRange());
ASTAlterQuery(StringRange range_ = StringRange()) : IAST(range_) {};
/** Получить текст, который идентифицирует этот элемент. */
String getID() const override;
String getID() const override { return ("AlterQuery_" + database + "_" + table); };
ASTPtr clone() const override;
ASTPtr clone() const override
{
ASTAlterQuery * res = new ASTAlterQuery(*this);
for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i)
parameters[i].clone(res->parameters[i]);
return res;
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");
if (!table.empty())
{
if (!database.empty())
{
settings.ostr << indent_str << database;
settings.ostr << ".";
}
settings.ostr << indent_str << table;
}
settings.ostr << settings.nl_or_ws;
for (size_t i = 0; i < parameters.size(); ++i)
{
const ASTAlterQuery::Parameters & p = parameters[i];
if (p.type == ASTAlterQuery::ADD_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (settings.hilite ? hilite_none : "");
p.col_decl->formatImpl(settings, state, frame);
/// AFTER
if (p.column)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " AFTER " << (settings.hilite ? hilite_none : "");
p.column->formatImpl(settings, state, frame);
}
}
else if (p.type == ASTAlterQuery::DROP_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DROP COLUMN " << (settings.hilite ? hilite_none : "");
p.column->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::MODIFY_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (settings.hilite ? hilite_none : "");
p.col_decl->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::DROP_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (p.detach ? "DETACH" : "DROP") << " PARTITION "
<< (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::ATTACH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH " << (p.unreplicated ? "UNREPLICATED " : "")
<< (p.part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH " << (p.unreplicated ? "UNREPLICATED " : "")
<< "PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << mysqlxx::quote << p.from;
}
else if (p.type == ASTAlterQuery::FREEZE_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
std::string comma = (i < (parameters.size() -1) ) ? "," : "";
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << comma << (settings.hilite ? hilite_none : "");
settings.ostr << settings.nl_or_ws;
}
}
};
}
#pragma once
#include <DB/Parsers/IAST.h>
#include <DB/Core/Types.h>
#include <mysqlxx/Manip.h>
namespace DB
{
class ASTWeightedZooKeeperPath : public IAST
{
public:
ASTWeightedZooKeeperPath() = default;
ASTWeightedZooKeeperPath(StringRange range_) : IAST(range_) {}
String getID() const override { return "Weighted_ZooKeeper_Path"; }
ASTPtr clone() const override { return new ASTWeightedZooKeeperPath(*this); }
public:
String path;
UInt64 weight;
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
settings.ostr << settings.nl_or_ws << indent_str << mysqlxx::quote << path << " WEIGHT " << weight;
}
};
}
......@@ -216,13 +216,5 @@ protected:
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
/** Путь шарда в ZooKeeper вместе с весом.
*/
class ParserWeightedZooKeeperPath : public IParserBase
{
protected:
const char * getName() const { return "weighted ZooKeeper path"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
}
......@@ -13,7 +13,6 @@ namespace DB
* [DROP|DETACH|ATTACH [UNREPLICATED] PARTITION|PART partition, ...]
* [FETCH PARTITION partition FROM ...]
* [FREEZE PARTITION]
* [RESHARD PARTITION partition TO zookeeper/path/to/partition [WEIGHT w] [, ...] USING sharding_key]
*/
class ParserAlterQuery : public IParserBase
{
......
......@@ -6,10 +6,6 @@
namespace DB
{
/// Для RESHARD PARTITION.
using WeightedZooKeeperPath = std::pair<String, UInt64>;
using WeightedZooKeeperPaths = std::vector<WeightedZooKeeperPath>;
/// Операция из запроса ALTER (кроме манипуляции с PART/PARTITION). Добавление столбцов типа Nested не развернуто в добавление отдельных столбцов.
struct AlterCommand
{
......
......@@ -206,7 +206,7 @@ public:
* Этот метод должен полностью выполнить запрос ALTER, самостоятельно заботясь о блокировках.
* Для обновления метаданных таблицы на диске этот метод должен вызвать InterpreterAlterQuery::updateMetadata.
*/
virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
......@@ -239,15 +239,6 @@ public:
throw Exception("Method freezePartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить запрос RESHARD PARTITION.
*/
virtual void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const Settings & settings)
{
throw Exception("Method reshardPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
* Возвращает - была ли выполнена какая-либо работа.
*/
......
......@@ -90,8 +90,6 @@ namespace ErrorCodes
class MergeTreeData : public ITableDeclaration
{
friend class ReshardingWorker;
public:
/// Функция, которую можно вызвать, если есть подозрение, что данные куска испорчены.
typedef std::function<void (const String &)> BrokenPartCallback;
......@@ -248,10 +246,6 @@ public:
/// Если true, деструктор удалит директорию с куском.
bool is_temp = false;
/// Для перешардирования.
bool is_sharded = false;
size_t shard_no = 0;
/// Первичный ключ. Всегда загружается в оперативку.
typedef std::vector<Field> Index;
Index index;
......@@ -286,15 +280,13 @@ public:
{
try
{
std::string path = storage.full_path + (is_sharded ? ("reshard/" + toString(shard_no) + "/") : "") + name;
Poco::File dir(path);
Poco::File dir(storage.full_path + name);
if (!dir.exists())
return;
if (name.substr(0, strlen("tmp")) != "tmp")
{
LOG_ERROR(storage.log, "~DataPart() should remove part " << path
LOG_ERROR(storage.log, "~DataPart() should remove part " << storage.full_path + name
<< " but its name doesn't start with tmp. Too suspicious, keeping the part.");
return;
}
......@@ -549,10 +541,9 @@ public:
bool hasColumnFiles(const String & column) const
{
String prefix = storage.full_path + (is_sharded ? ("reshard/" + toString(shard_no) + "/") : "") + name + "/";
String escaped_column = escapeForFileName(column);
return Poco::File(prefix + escaped_column + ".bin").exists() &&
Poco::File(prefix + escaped_column + ".mrk").exists();
return Poco::File(storage.full_path + name + "/" + escaped_column + ".bin").exists() &&
Poco::File(storage.full_path + name + "/" + escaped_column + ".mrk").exists();
}
};
......@@ -563,9 +554,6 @@ public:
typedef std::set<DataPartPtr, DataPartPtrLess> DataParts;
typedef std::vector<DataPartPtr> DataPartsVector;
/// Для перешардирования.
using MutableDataParts = std::set<MutableDataPartPtr, DataPartPtrLess>;
using PerShardDataParts = std::unordered_map<size_t, MutableDataParts>;
/// Некоторые операции над множеством кусков могут возвращать такой объект.
/// Если не был вызван commit или rollback, деструктор откатывает операцию.
......@@ -679,7 +667,7 @@ public:
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_,
const Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
......@@ -764,7 +752,6 @@ public:
/** Возвращает кусок с таким именем (активный или не активный). Если нету, nullptr.
*/
DataPartPtr getPartIfExists(const String & part_name);
DataPartPtr getShardedPartIfExists(const String & part_name, size_t shard_no);
/** Переименовывает временный кусок в постоянный и добавляет его в рабочий набор.
* Если increment != nullptr, индекс куска берется из инкремента. Иначе индекс куска не меняется.
......@@ -854,10 +841,6 @@ public:
*/
void freezePartition(const std::string & prefix);
/** Возвращает размер заданной партиции в байтах.
*/
size_t getPartitionSize(const std::string & partition_name) const;
size_t getColumnSize(const std::string & name) const
{
Poco::ScopedLock<Poco::FastMutex> lock{data_parts_mutex};
......@@ -873,13 +856,11 @@ public:
return column_sizes;
}
/// Для ATTACH/DETACH/DROP/RESHARD PARTITION.
/// Для ATTACH/DETACH/DROP PARTITION.
static String getMonthName(const Field & partition);
static String getMonthName(DayNum_t month);
static DayNum_t getMonthDayNum(const Field & partition);
static DayNum_t getMonthFromName(const String & month_name);
Context & context;
const Context & context;
const String date_column_name;
const ASTPtr sampling_expression;
const size_t index_granularity;
......@@ -925,10 +906,6 @@ private:
DataParts all_data_parts;
mutable Poco::FastMutex all_data_parts_mutex;
/** Для каждого шарда множество шардированных кусков.
*/
PerShardDataParts per_shard_data_parts;
/** Выражение, преобразующее типы столбцов.
* Если преобразований типов нет, out_expression=nullptr.
* out_rename_map отображает файлы-столбцы на выходе выражения в новые файлы таблицы.
......
......@@ -8,7 +8,6 @@ namespace DB
{
class MergeListEntry;
class ReshardingJob;
/** Умеет выбирать куски для слияния и сливать их.
......@@ -40,15 +39,11 @@ public:
bool only_small,
const AllowedMergingPredicate & can_merge);
/** Выбрать все куски принадлежащие одной партиции.
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition);
/** Сливает куски.
* Если reservation != nullptr, то и дело уменьшает размер зарезервированного места
* приблизительно пропорционально количеству уже выписанных данных.
*/
MergeTreeData::MutableDataPartPtr mergeParts(
MergeTreeData::DataPartPtr mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeListEntry & merge_entry,
size_t aio_threshold, MergeTreeData::Transaction * out_transaction = nullptr,
DiskSpaceMonitor::Reservation * disk_reservation = nullptr);
......
#pragma once
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Interpreters/sortBlock.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Core/Block.h>
namespace DB
{
struct ShardedBlockWithDateInterval final
{
ShardedBlockWithDateInterval(const Block & block_, size_t shard_no_, UInt16 min_date_, UInt16 max_date_);
ShardedBlockWithDateInterval(const ShardedBlockWithDateInterval &) = delete;
ShardedBlockWithDateInterval & operator=(const ShardedBlockWithDateInterval &) = delete;
Block block;
size_t shard_no;
UInt16 min_date;
UInt16 max_date;
};
using ShardedBlocksWithDateIntervals = std::list<ShardedBlockWithDateInterval>;
class ReshardingJob;
/** Создаёт новые шардированные куски с данными.
*/
class MergeTreeSharder final
{
public:
MergeTreeSharder(MergeTreeData & data_, const ReshardingJob & job_);
MergeTreeSharder(const MergeTreeSharder &) = delete;
MergeTreeSharder & operator=(const MergeTreeSharder &) = delete;
/** Разбивает блок на блоки по ключу шардирования, каждый из которых
* нужно записать в отдельный кусок. Работает детерминированно: если
* отдать на вход такой же блок, на выходе получатся такие же блоки в
* таком же порядке.
*/
ShardedBlocksWithDateIntervals shardBlock(const Block & block);
/** Все строки должны относиться к одному месяцу.
* temp_index - значение left и right для нового куска. Можно будет изменить при переименовании.
* Возвращает временный кусок с именем, начинающимся с tmp_.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(ShardedBlockWithDateInterval & sharded_block_with_dates, Int64 temp_index);
private:
std::vector<IColumn::Filter> createFilters(Block block);
private:
MergeTreeData & data;
const ReshardingJob & job;
Logger * log;
std::vector<size_t> slots;
};
}
#pragma once
#include <DB/Core/Types.h>
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/IO/WriteBuffer.h>
namespace DB
{
namespace RemoteDiskSpaceMonitor
{
/** Сервис для получения информации о свободном месте на диске.
*/
class Service final : public InterserverIOEndpoint
{
public:
Service(const std::string & path_);
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
const std::string path;
};
/** Клиент для получения информации о свободном месте на удалённом диске.
*/
class Client final
{
public:
Client() = default;
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
size_t getFreeDiskSpace(const InterserverIOEndpointLocation & location) const;
void cancel() { is_cancelled = true; }
private:
std::atomic<bool> is_cancelled{false};
};
}
}
#pragma once
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/IO/WriteBuffer.h>
namespace DB
{
class Context;
namespace RemoteQueryExecutor
{
/** Сервис для выполнения SQL запросов.
*/
class Service final : public InterserverIOEndpoint
{
public:
Service(Context & context_);
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
Context & context;
};
/** Клиент для удалённого выполнения SQL запросов.
*/
class Client final
{
public:
Client() = default;
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
bool executeQuery(const InterserverIOEndpointLocation & location, const std::string & query);
void cancel() { is_cancelled = true; }
private:
std::atomic<bool> is_cancelled{false};
};
}
}
......@@ -12,42 +12,33 @@ namespace DB
class StorageReplicatedMergeTree;
namespace DataPartsExchange
{
/** Сервис для отправки кусков из таблицы *MergeTree.
*/
class Service final : public InterserverIOEndpoint
class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint
{
public:
Service(MergeTreeData & data_, StorageReplicatedMergeTree & storage_) : data(data_),
storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StorageReplicatedMergeTree & storage_) : data(data_),
storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {}
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
MergeTreeData::DataPartPtr findPart(const String & name);
MergeTreeData::DataPartPtr findShardedPart(const String & name, size_t shard_no);
private:
MergeTreeData & data;
StorageReplicatedMergeTree & storage;
Logger * log;
MergeTreeData::DataPartPtr findPart(const String & name)
{
MergeTreeData::DataPartPtr part = data.getPartIfExists(name);
if (part)
return part;
throw Exception("No part " + name + " in table");
}
};
/** Клиент для получения кусков из таблицы *MergeTree.
*/
class Fetcher final
class ReplicatedMergeTreePartsFetcher
{
public:
Fetcher(MergeTreeData & data_) : data(data_), log(&Logger::get("Fetcher")) {}
Fetcher(const Fetcher &) = delete;
Fetcher & operator=(const Fetcher &) = delete;
ReplicatedMergeTreePartsFetcher(MergeTreeData & data_) : data(data_), log(&Logger::get("ReplicatedMergeTreePartsFetcher")) {}
/// Скачивает кусок в tmp_директорию. Если to_detached - скачивает в директорию detached.
MergeTreeData::MutableDataPartPtr fetchPart(
......@@ -57,31 +48,15 @@ public:
int port,
bool to_detached = false);
/// Метод для перешардирования. Скачивает шардированный кусок
/// из заданного шарда в папку to_detached.
MergeTreeData::MutableDataPartPtr fetchShardedPart(
const InterserverIOEndpointLocation & location,
const String & part_name,
size_t shard_no);
void cancel() { is_cancelled = true; }
private:
MergeTreeData::MutableDataPartPtr fetchPartImpl(
const String & part_name,
const String & replica_path,
const String & host,
int port,
const String & shard_no,
bool to_detached);
private:
MergeTreeData & data;
/// Нужно остановить передачу данных.
std::atomic<bool> is_cancelled {false};
Logger * log;
};
}
}
#pragma once
#include <DB/Storages/AlterCommands.h>
#include <string>
namespace DB
{
/** Описание задачи перешардирования.
*/
struct ReshardingJob final
{
public:
/// Создаёт описание на основе его сериализованного представления.
ReshardingJob(const std::string & serialized_job);
ReshardingJob(const std::string & database_name_, const std::string & table_name_,
const std::string & partition_, const WeightedZooKeeperPaths & paths_,
const std::string & sharding_key_);
ReshardingJob(const ReshardingJob &) = delete;
ReshardingJob & operator=(const ReshardingJob &) = delete;
/// Сериализует описание задачи.
std::string toString() const;
public:
std::string database_name;
std::string table_name;
std::string partition;
WeightedZooKeeperPaths paths;
std::string sharding_key;
};
}
#pragma once
#include <DB/Storages/AlterCommands.h>
#include <common/logger_useful.h>
#include <Poco/SharedPtr.h>
#include <string>
#include <thread>
#include <atomic>
namespace DB
{
class Context;
class StorageReplicatedMergeTree;
class ReshardingJob;
/** Исполнитель задач перешардирования.
* Рабоает в фоновом режиме внутри одного потока.
* Следит за появлением задач и назначает их на выполнение.
* Задачи выполняются последовательно.
*/
class ReshardingWorker final
{
public:
ReshardingWorker(Context & context_);
ReshardingWorker(const ReshardingWorker &) = delete;
ReshardingWorker & operator=(const ReshardingWorker &) = delete;
~ReshardingWorker();
/// Запустить поток выполняющий задачи перешардирования.
void start();
/// Прислать запрос на перешардирование.
void submitJob(const std::string & database_name,
const std::string & table_name,
const std::string & partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const std::string & sharding_key);
/// Прислать запрос на перешардирование.
void submitJob(const ReshardingJob & job);
/// Был ли поток запущен?
bool isStarted() const;
private:
/// Прислать запрос на перешардирование (внутренняя версия).
void submitJobImpl(const std::string & serialized_job);
/// Следить за появлением новых задач. Выполнить их последовательно.
void pollAndExecute();
/// Выполнить задачи, которые были в очереди выполнения при запуске узла.
void performPendingJobs();
/// Выполнить задачи, которые заданы по путям в БД ZooKeeper.
void perform(const Strings & job_nodes);
/// Выполнить одну задачу.
void perform(const ReshardingJob & job);
/// Разбить куски входящие в партицию на несколько, согласно ключу шардирования.
/// Оновременно перегруппировать эти куски по шардам и слить куски в каждой группе.
/// При завершении этого процесса создаётся новая партиция для каждого шарда.
void createShardedPartitions(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
/// Копировать все партиции полученные путём перешардирования на каждую реплику
/// соответствующих шардов.
void publishShardedPartitions(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
/// Для каждого шарда добавить данные из новой партиции этого шарда в таблицу на всех
/// репликах входящих в этот же шард. На локальном узле, который выполняет задачу
/// перешардирования, удалить данные из первоначальной партиции.
void applyChanges(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
/// Удалить временные данные с локального узла и ZooKeeper'а.
void cleanup(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
/// Принудительно завершить поток.
void abortIfRequested() const;
/// Был ли поток завершён?
bool hasAborted(const Exception & ex) const;
private:
Context & context;
Logger * log;
std::thread polling_thread;
std::string host_task_queue_path;
std::atomic<bool> is_started{false};
std::atomic<bool> must_stop{false};
};
using ReshardingWorkerPtr = Poco::SharedPtr<ReshardingWorker>;
}
#pragma once
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/IO/WriteBuffer.h>
namespace DB
{
class StorageReplicatedMergeTree;
namespace ShardedPartitionSender
{
/** Сервис для получения кусков из партиции таблицы *MergeTree.
*/
class Service final : public InterserverIOEndpoint
{
public:
Service(StorageReplicatedMergeTree & storage_);
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
StorageReplicatedMergeTree & storage;
};
/** Клиент для отправления кусков из партиции таблицы *MergeTree.
*/
class Client final
{
public:
Client() = default;
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
bool send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::vector<std::string> & parts, size_t shard_no);
void cancel() { is_cancelled = true; }
private:
std::atomic<bool> is_cancelled{false};
};
}
}
......@@ -85,7 +85,7 @@ public:
bool supportsParallelReplicas() const override { return true; }
/// Структура подчинённой таблицы не проверяется и не изменяется.
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
private:
String name;
......
......@@ -72,14 +72,10 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; }
/// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
void shutdown() override;
void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const Settings & settings) override;
/// От каждой реплики получить описание соответствующей локальной таблицы.
BlockInputStreams describe(const Context & context, const Settings & settings);
......
......@@ -62,7 +62,7 @@ public:
/// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
private:
String name;
......
......@@ -97,7 +97,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
bool supportsIndexForIn() const override { return true; }
......
......@@ -7,14 +7,11 @@
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/DataPartsExchange.h>
#include <DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/ShardedPartitionSender.h>
#include <DB/Storages/MergeTree/RemoteQueryExecutor.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <zkutil/ZooKeeper.h>
#include <zkutil/LeaderElection.h>
......@@ -129,15 +126,12 @@ public:
bool optimize(const Settings & settings) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
void dropPartition(ASTPtr query, const Field & partition, bool detach, bool unreplicated, const Settings & settings) override;
void attachPartition(ASTPtr query, const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void fetchPartition(const Field & partition, const String & from, const Settings & settings) override;
void freezePartition(const Field & partition, const Settings & settings) override;
void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key,
const Settings & settings) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/
......@@ -187,11 +181,6 @@ private:
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeCleanupThread;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReshardingWorker;
friend class ShardedPartitionSender::Client;
friend class ShardedPartitionSender::Service;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
......@@ -247,20 +236,12 @@ private:
bool is_leader_node = false;
InterserverIOEndpointHolderPtr endpoint_holder;
InterserverIOEndpointHolderPtr disk_space_monitor_endpoint_holder;
InterserverIOEndpointHolderPtr sharded_partition_sender_endpoint_holder;
InterserverIOEndpointHolderPtr remote_query_executor_endpoint_holder;
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMerger merger;
DataPartsExchange::Fetcher fetcher;
RemoteDiskSpaceMonitor::Client free_disk_space_checker;
ShardedPartitionSender::Client sharded_partition_sender_client;
RemoteQueryExecutor::Client remote_query_executor_client;
ReplicatedMergeTreePartsFetcher fetcher;
zkutil::LeaderElectionPtr leader_election;
/// Для чтения данных из директории unreplicated.
......@@ -442,91 +423,12 @@ private:
/// Кинуть исключение, если таблица readonly.
void assertNotReadonly() const;
/** Получить блокировку, которая защищает заданную партицию от задачи слияния.
* Блокировка является рекурсивной.
*/
std::string acquirePartitionMergeLock(const std::string & partition_name);
/** Заявить, что больше не ссылаемся на блокировку соответствующую заданной
* партиции. Если ссылок больше нет, блокировка уничтожается.
*/
void releasePartitionMergeLock(const std::string & partition_name);
/// Проверить наличие узла в ZK. Если он есть - запомнить эту информацию, и затем сразу отвечать true.
std::unordered_set<std::string> existing_nodes_cache;
std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path);
/// Перешардирование.
struct ReplicaSpaceInfo
{
long double factor = 0.0;
size_t available_size = 0;
};
using ReplicaToSpaceInfo = std::map<std::string, ReplicaSpaceInfo>;
struct PartitionMergeLockInfo
{
PartitionMergeLockInfo(const std::string & fake_part_name_)
: fake_part_name(fake_part_name_), ref_count(1)
{
}
std::string fake_part_name;
unsigned int ref_count;
};
using PartitionToMergeLock = std::map<std::string, PartitionMergeLockInfo>;
/** Проверяет, что структуры локальной и реплицируемых таблиц совпадают.
*/
void enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths);
/** Получить информацию о свободном месте на репликах + дополнительную информацию
* для функции checkSpaceForResharding.
*/
ReplicaToSpaceInfo gatherReplicaSpaceInfo(const WeightedZooKeeperPaths & weighted_zookeeper_paths);
/** Проверяет, что имеется достаточно свободного места локально и на всех репликах.
*/
bool checkSpaceForResharding(const ReplicaToSpaceInfo & replica_to_space_info, size_t partition_size) const;
std::mutex mutex_partition_to_merge_lock;
PartitionToMergeLock partition_to_merge_lock;
};
/** Рекурсивная блокировка, которая защищает заданную партицию от задачи слияния.
*/
class ScopedPartitionMergeLock final
{
public:
ScopedPartitionMergeLock(StorageReplicatedMergeTree & storage_, const std::string & partition_name_)
: storage(storage_), partition_name(partition_name_)
{
fake_part_name = storage.acquirePartitionMergeLock(partition_name);
}
ScopedPartitionMergeLock(const ScopedPartitionMergeLock &) = delete;
ScopedPartitionMergeLock & operator=(const ScopedPartitionMergeLock &) = delete;
/// Получить уникальное название блокировки.
std::string getId() const
{
return fake_part_name;
}
~ScopedPartitionMergeLock()
{
storage.releasePartitionMergeLock(partition_name);
}
private:
StorageReplicatedMergeTree & storage;
const std::string partition_name;
std::string fake_part_name;
};
}
......@@ -313,16 +313,6 @@ namespace ErrorCodes
extern const int TOO_MUCH_BYTES = 307;
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER = 308;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS = 309;
extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS = 310;
extern const int INSUFFICIENT_SPACE_FOR_RESHARDING = 311;
extern const int PARTITION_COPY_FAILED = 312;
extern const int PARTITION_ATTACH_FAILED = 313;
extern const int RESHARDING_CLEANUP_FAILED = 314;
extern const int RESHARDING_NO_WORKER = 315;
extern const int INVALID_PARTITIONS_INTERVAL = 316;
extern const int RESHARDING_INVALID_PARAMETERS = 317;
extern const int INVALID_SHARD_WEIGHT = 318;
extern const int SHARD_DOESNT_REFERENCE_TABLE = 319;
extern const int UNKNOWN_STATUS_OF_INSERT = 320;
extern const int KEEPER_EXCEPTION = 999;
......
#include <DB/Interpreters/ClusterProxy/AlterQueryConstructor.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/BlockExtraInfoInputStream.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace ClusterProxy
{
BlockInputStreamPtr AlterQueryConstructor::createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterAlterQuery interpreter(query_ast, context);
return interpreter.execute().in;
}
BlockInputStreamPtr AlterQueryConstructor::createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pool, query, &settings, throttler};
}
BlockInputStreamPtr AlterQueryConstructor::createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pools, query, &settings, throttler};
}
bool AlterQueryConstructor::isInclusive() const
{
return false;
}
}
}
#include <DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h>
#include <DB/Interpreters/InterpreterDescribeQuery.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/BlockExtraInfoInputStream.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace
{
BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
{
BlockExtraInfo block_extra_info;
block_extra_info.host = address.host_name;
block_extra_info.resolved_address = address.resolved_address.toString();
block_extra_info.port = address.port;
block_extra_info.user = address.user;
block_extra_info.is_valid = true;
return block_extra_info;
}
}
namespace ClusterProxy
{
BlockInputStreamPtr DescribeQueryConstructor::createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterDescribeQuery interpreter(query_ast, context);
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
return new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address));
}
BlockInputStreamPtr DescribeQueryConstructor::createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = new RemoteBlockInputStream{pool, query, &settings, throttler};
stream->doBroadcast();
stream->appendExtraInfo();
return stream;
}
BlockInputStreamPtr DescribeQueryConstructor::createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = new RemoteBlockInputStream{pools, query, &settings, throttler};
stream->doBroadcast();
stream->appendExtraInfo();
return stream;
}
bool DescribeQueryConstructor::isInclusive() const
{
return true;
}
}
}
#include <DB/Interpreters/ClusterProxy/Query.h>
#include <DB/Interpreters/ClusterProxy/IQueryConstructor.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace ClusterProxy
{
Query::Query(IQueryConstructor & query_constructor_, Cluster & cluster_,
ASTPtr query_ast_, const Context & context_, const Settings & settings_, bool enable_shard_multiplexing_)
: query_constructor(query_constructor_), cluster(cluster_), query_ast(query_ast_),
context(context_), settings(settings_), enable_shard_multiplexing(enable_shard_multiplexing_)
{
}
BlockInputStreams Query::execute()
{
BlockInputStreams res;
const std::string query = queryToString(query_ast);
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
/// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом.
new_settings.max_concurrent_queries_for_user = 0;
/// Ограничение сетевого трафика, если нужно.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
throttler.reset(new Throttler(
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded."));
/// Распределить шарды равномерно по потокам.
size_t remote_count = 0;
if (query_constructor.isInclusive())
{
for (const auto & shard_info : cluster.getShardsInfo())
{
if (shard_info.hasRemoteConnections())
++remote_count;
}
}
else
remote_count = cluster.getRemoteShardCount();
size_t thread_count;
if (!enable_shard_multiplexing)
thread_count = remote_count;
else if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
ConnectionPoolsPtr pools;
bool do_init = true;
/// Цикл по шардам.
size_t current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
bool create_local_queries = shard_info.isLocal();
bool create_remote_queries = query_constructor.isInclusive() ? shard_info.hasRemoteConnections() : !create_local_queries;
if (create_local_queries)
{
/// Добавляем запросы к локальному ClickHouse.
DB::Context new_context = context;
new_context.setSettings(new_settings);
for (const auto & address : shard_info.local_addresses)
{
BlockInputStreamPtr stream = query_constructor.createLocal(query_ast, new_context, address);
if (stream)
res.emplace_back(stream);
}
}
if (create_remote_queries)
{
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
res.emplace_back(query_constructor.createRemote(shard_info.pool, query, new_settings, throttler, context));
++current_thread;
}
else
{
if (do_init)
{
pools = new ConnectionPools;
do_init = false;
}
pools->push_back(shard_info.pool);
if (pools->size() == actual_pools_per_thread)
{
res.emplace_back(query_constructor.createRemote(pools, query, new_settings, throttler, context));
do_init = true;
++current_thread;
}
}
}
}
return res;
}
}
}
#include <DB/Interpreters/ClusterProxy/SelectQueryConstructor.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
namespace DB
{
namespace ClusterProxy
{
SelectQueryConstructor::SelectQueryConstructor(const QueryProcessingStage::Enum & processed_stage_,
const Tables & external_tables_)
: processed_stage(processed_stage_), external_tables(external_tables_)
{
}
BlockInputStreamPtr SelectQueryConstructor::createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterSelectQuery interpreter(query_ast, context, processed_stage);
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
return new MaterializingBlockInputStream(interpreter.execute().in);
}
BlockInputStreamPtr SelectQueryConstructor::createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pool, query, &settings, throttler, external_tables, processed_stage, context};
}
BlockInputStreamPtr SelectQueryConstructor::createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pools, query, &settings, throttler, external_tables, processed_stage, context};
}
bool SelectQueryConstructor::isInclusive() const
{
return false;
}
}
}
......@@ -17,7 +17,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/Storages/MarkCache.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/Storages/MergeTree/MergeTreeSettings.h>
#include <DB/Storages/CompressionMethodSelector.h>
......@@ -103,7 +102,6 @@ struct ContextShared
ConfigurationPtr users_config; /// Конфиг с секциями users, profiles и quotas.
InterserverIOHandler interserver_io_handler; /// Обработчик для межсерверной передачи данных.
BackgroundProcessingPoolPtr background_pool; /// Пул потоков для фоновой работы, выполняемой таблицами.
ReshardingWorkerPtr resharding_worker;
Macros macros; /// Подстановки из конфига.
std::unique_ptr<Compiler> compiler; /// Для динамической компиляции частей запроса, при необходимости.
std::unique_ptr<QueryLog> query_log; /// Для логгирования запросов.
......@@ -822,19 +820,6 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return *shared->background_pool;
}
ReshardingWorker & Context::getReshardingWorker()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->zookeeper)
throw Exception("Resharding background processing requires ZooKeeper", ErrorCodes::LOGICAL_ERROR);
if (!shared->resharding_worker)
shared->resharding_worker = new ReshardingWorker(*this);
return *shared->resharding_worker;
}
void Context::resetCaches() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
......
......@@ -6,7 +6,6 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Parsers/ASTWeightedZooKeeperPath.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/IO/copyData.h>
......@@ -30,7 +29,7 @@ namespace ErrorCodes
}
InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, const Context & context_)
InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}
......@@ -66,10 +65,6 @@ BlockIO InterpreterAlterQuery::execute()
table->freezePartition(command.partition, context.getSettingsRef());
break;
case PartitionCommand::RESHARD_PARTITION:
table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key, context.getSettingsRef());
break;
default:
throw Exception("Bad PartitionCommand::Type: " + toString(command.type), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
......@@ -169,31 +164,6 @@ void InterpreterAlterQuery::parseAlter(
const Field & partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
out_partition_commands.push_back(PartitionCommand::freezePartition(partition));
}
else if (params.type == ASTAlterQuery::RESHARD_PARTITION)
{
Field first_partition;
if (params.partition)
first_partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
Field last_partition;
if (params.last_partition)
last_partition = dynamic_cast<const ASTLiteral &>(*params.last_partition).value;
else
last_partition = first_partition;
WeightedZooKeeperPaths weighted_zookeeper_paths;
const ASTs & ast_weighted_zookeeper_paths = typeid_cast<const ASTExpressionList &>(*params.weighted_zookeeper_paths).children;
for (size_t i = 0; i < ast_weighted_zookeeper_paths.size(); ++i)
{
const auto & weighted_zookeeper_path = typeid_cast<const ASTWeightedZooKeeperPath &>(*ast_weighted_zookeeper_paths[i]);
weighted_zookeeper_paths.emplace_back(weighted_zookeeper_path.path, weighted_zookeeper_path.weight);
}
const auto & sharding_key = params.sharding_key;
out_partition_commands.push_back(PartitionCommand::reshardPartitions(first_partition, last_partition, weighted_zookeeper_paths, sharding_key));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
......@@ -206,7 +176,7 @@ void InterpreterAlterQuery::updateMetadata(
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const Context & context)
Context & context)
{
String path = context.getPath();
......
#include <DB/Parsers/ASTAlterQuery.h>
#include <mysqlxx/Manip.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_AST_STRUCTURE;
}
ASTAlterQuery::Parameters::Parameters() : type(NO_TYPE) {}
void ASTAlterQuery::Parameters::clone(Parameters & p) const
{
p = *this;
if (col_decl) p.col_decl = col_decl->clone();
if (column) p.column = column->clone();
if (partition) p.partition = partition->clone();
if (last_partition) p.last_partition = last_partition->clone();
if (weighted_zookeeper_paths) p.weighted_zookeeper_paths = weighted_zookeeper_paths->clone();
}
void ASTAlterQuery::addParameters(const Parameters & params)
{
parameters.push_back(params);
if (params.col_decl)
children.push_back(params.col_decl);
if (params.column)
children.push_back(params.column);
if (params.partition)
children.push_back(params.partition);
if (params.last_partition)
children.push_back(params.last_partition);
if (params.weighted_zookeeper_paths)
children.push_back(params.weighted_zookeeper_paths);
}
ASTAlterQuery::ASTAlterQuery(StringRange range_) : IAST(range_)
{
}
/** Получить текст, который идентифицирует этот элемент. */
String ASTAlterQuery::getID() const
{
return ("AlterQuery_" + database + "_" + table);
}
ASTPtr ASTAlterQuery::clone() const
{
ASTAlterQuery * res = new ASTAlterQuery(*this);
for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i)
parameters[i].clone(res->parameters[i]);
return res;
}
void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");
if (!table.empty())
{
if (!database.empty())
{
settings.ostr << indent_str << database;
settings.ostr << ".";
}
settings.ostr << indent_str << table;
}
settings.ostr << settings.nl_or_ws;
for (size_t i = 0; i < parameters.size(); ++i)
{
const ASTAlterQuery::Parameters & p = parameters[i];
if (p.type == ASTAlterQuery::ADD_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (settings.hilite ? hilite_none : "");
p.col_decl->formatImpl(settings, state, frame);
/// AFTER
if (p.column)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " AFTER " << (settings.hilite ? hilite_none : "");
p.column->formatImpl(settings, state, frame);
}
}
else if (p.type == ASTAlterQuery::DROP_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DROP COLUMN " << (settings.hilite ? hilite_none : "");
p.column->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::MODIFY_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (settings.hilite ? hilite_none : "");
p.col_decl->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::DROP_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (p.detach ? "DETACH" : "DROP") << " PARTITION "
<< (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::ATTACH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH " << (p.unreplicated ? "UNREPLICATED " : "")
<< (p.part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH " << (p.unreplicated ? "UNREPLICATED " : "")
<< "PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << mysqlxx::quote << p.from;
}
else if (p.type == ASTAlterQuery::FREEZE_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::RESHARD_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RESHARD ";
if (p.partition)
settings.ostr << "PARTITION ";
settings.ostr << (settings.hilite ? hilite_none : "");
if (p.partition)
p.partition->formatImpl(settings, state, frame);
if (p.partition && p.last_partition)
settings.ostr << "..";
if (p.last_partition)
p.last_partition->formatImpl(settings, state, frame);
std::string ws = p.partition ? " " : "";
settings.ostr << (settings.hilite ? hilite_keyword : "") << ws
<< "TO " << (settings.hilite ? hilite_none : "");
FormatStateStacked frame_with_indent = frame;
++frame_with_indent.indent;
p.weighted_zookeeper_paths->formatImpl(settings, state, frame_with_indent);
settings.ostr << settings.nl_or_ws;
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< "USING " << (settings.hilite ? hilite_none : "")
<< p.sharding_key;
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
std::string comma = (i < (parameters.size() -1) ) ? "," : "";
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << comma << (settings.hilite ? hilite_none : "");
settings.ostr << settings.nl_or_ws;
}
}
}
......@@ -12,7 +12,6 @@
#include <DB/Parsers/ASTOrderByElement.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTSubquery.h>
#include <DB/Parsers/ASTWeightedZooKeeperPath.h>
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionListParsers.h>
......@@ -792,45 +791,6 @@ bool ParserOrderByElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & ma
return true;
}
bool ParserWeightedZooKeeperPath::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserString s_weight("WEIGHT", true, true);
ParserStringLiteral path_p;
ParserUnsignedInteger weight_p;
ParserWhiteSpaceOrComments ws;
auto weighted_zookeeper_path = new ASTWeightedZooKeeperPath;
node = weighted_zookeeper_path;
ws.ignore(pos, end);
ASTPtr path_node;
if (!path_p.parse(pos, end, path_node, max_parsed_pos, expected))
return false;
weighted_zookeeper_path->path = typeid_cast<const ASTLiteral &>(*path_node).value.get<const String &>();
ws.ignore(pos, end);
bool is_weight_set = false;
if (s_weight.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ASTPtr weight_node;
if (weight_p.parse(pos, end, weight_node, max_parsed_pos, expected))
{
is_weight_set = true;
weighted_zookeeper_path->weight = typeid_cast<const ASTLiteral &>(*weight_node).value.get<const UInt64 &>();
}
}
if (!is_weight_set)
weighted_zookeeper_path->weight = 1;
return true;
}
}
......@@ -9,7 +9,6 @@
namespace DB
{
bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
Pos begin = pos;
......@@ -23,7 +22,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
ParserString s_column("COLUMN", true, true);
ParserString s_after("AFTER", true, true);
ParserString s_modify("MODIFY", true, true);
ParserString s_reshard("RESHARD", true, true);
ParserString s_drop("DROP", true, true);
ParserString s_detach("DETACH", true, true);
ParserString s_attach("ATTACH", true, true);
......@@ -33,17 +32,12 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
ParserString s_part("PART", true, true);
ParserString s_partition("PARTITION", true, true);
ParserString s_from("FROM", true, true);
ParserString s_to("TO", true, true);
ParserString s_using("USING", true, true);
ParserString s_key("KEY", true, true);
ParserString s_comma(",");
ParserString s_doubledot("..");
ParserIdentifier table_parser;
ParserCompoundIdentifier parser_name;
ParserCompoundColumnDeclaration parser_col_decl;
ParserLiteral parser_literal;
ParserUnsignedInteger parser_uint;
ParserStringLiteral parser_string_literal;
ASTPtr table;
......@@ -252,58 +246,6 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
params.type = ASTAlterQuery::MODIFY_COLUMN;
}
else if (s_reshard.ignore(pos, end, max_parsed_pos, expected))
{
ParserList weighted_zookeeper_paths_p(ParserPtr(new ParserWeightedZooKeeperPath), ParserPtr(new ParserString(",")), false);
ParserIdentifier sharding_key_parser;
ws.ignore(pos, end);
if (s_partition.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
if (!parser_uint.parse(pos, end, params.partition, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (s_doubledot.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
if (!parser_uint.parse(pos, end, params.last_partition, max_parsed_pos, expected))
return false;
}
}
ws.ignore(pos, end);
if (!s_to.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (!weighted_zookeeper_paths_p.parse(pos, end, params.weighted_zookeeper_paths, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (!s_using.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
ASTPtr ast_sharding_key;
if (!sharding_key_parser.parse(pos, end, ast_sharding_key, max_parsed_pos, expected))
return false;
params.sharding_key = typeid_cast<const ASTIdentifier &>(*ast_sharding_key).name;
ws.ignore(pos, end);
params.type = ASTAlterQuery::RESHARD_PARTITION;
}
else
return false;
......
......@@ -35,8 +35,6 @@
#include <DB/Storages/System/StorageSystemFunctions.h>
#include <DB/Storages/System/StorageSystemClusters.h>
#include <DB/Storages/System/StorageSystemMetrics.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <zkutil/ZooKeeper.h>
......@@ -320,16 +318,6 @@ int Server::main(const std::vector<std::string> & args)
global_context->setCurrentDatabase(config().getString("default_database", "default"));
if (has_zookeeper)
{
zkutil::ZooKeeperPtr zookeeper = global_context->getZooKeeper();
if (!zookeeper->getTaskQueuePath().empty())
{
auto & resharding_worker = global_context->getReshardingWorker();
resharding_worker.start();
}
}
SCOPE_EXIT(
LOG_DEBUG(log, "Closed all connections.");
......
......@@ -33,7 +33,7 @@ MergeTreeData::MergeTreeData(
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_,
const Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_, const ASTPtr & sampling_expression_,
size_t index_granularity_,
......@@ -1039,19 +1039,6 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
return nullptr;
}
MergeTreeData::DataPartPtr MergeTreeData::getShardedPartIfExists(const String & part_name, size_t shard_no)
{
MutableDataPartPtr tmp_part(new DataPart(*this));
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
const MutableDataParts & sharded_parts = per_shard_data_parts.at(shard_no);
MutableDataParts::const_iterator it = sharded_parts.lower_bound(tmp_part);
if ((it != sharded_parts.end()) && ((*it)->name == part_name))
return *it;
return nullptr;
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
......@@ -1352,31 +1339,6 @@ void MergeTreeData::freezePartition(const std::string & prefix)
LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
}
size_t MergeTreeData::getPartitionSize(const std::string & partition_name) const
{
size_t size = 0;
Poco::DirectoryIterator end;
Poco::DirectoryIterator end2;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
const auto filename = it.name();
if (!ActiveDataPartSet::isPartDirectory(filename))
continue;
if (0 != filename.compare(0, partition_name.size(), partition_name))
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end2; ++it2)
{
const auto part_file_path = it2.path().absolute().toString();
size += Poco::File(part_file_path).getSize();
}
}
return size;
}
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
{
......@@ -1404,26 +1366,9 @@ String MergeTreeData::getMonthName(const Field & partition)
return getMonthNameAndDayNum(partition).first;
}
String MergeTreeData::getMonthName(DayNum_t month)
{
return toString(DateLUT::instance().toNumYYYYMMDD(month) / 100);
}
DayNum_t MergeTreeData::getMonthDayNum(const Field & partition)
{
return getMonthNameAndDayNum(partition).second;
}
DayNum_t MergeTreeData::getMonthFromName(const String & month_name)
{
DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
ErrorCodes::INVALID_PARTITION_NAME);
return date;
}
}
......@@ -3,7 +3,6 @@
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/Storages/MergeTree/ReshardingJob.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
......@@ -279,59 +278,18 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
return found;
}
MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(DayNum_t partition)
{
MergeTreeData::DataPartsVector parts_from_partition;
MergeTreeData::DataParts data_parts = data.getDataParts();
for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
{
const MergeTreeData::DataPartPtr & current_part = *it;
DayNum_t month = current_part->month;
if (month != partition)
continue;
parts_from_partition.push_back(*it);
}
return parts_from_partition;
}
/// parts должны быть отсортированы.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, MergeTreeData::Transaction * out_transaction,
DiskSpaceMonitor::Reservation * disk_reservation)
{
bool is_sharded = parts[0]->is_sharded;
for (size_t i = 1; i < parts.size(); ++i)
{
if (parts[i]->is_sharded != is_sharded)
throw Exception("Inconsistent set of parts provided for merging", ErrorCodes::LOGICAL_ERROR);
}
size_t shard_no = 0;
if (is_sharded)
{
shard_no = parts[0]->shard_no;
for (size_t i = 1; i < parts.size(); ++i)
{
if (parts[i]->shard_no != shard_no)
throw Exception("Inconsistent set of parts provided for merging", ErrorCodes::LOGICAL_ERROR);
}
}
merge_entry->num_parts = parts.size();
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
String merged_dir;
if (is_sharded)
merged_dir = data.getFullPath() + "reshard/" + toString(shard_no) + merged_name;
else
merged_dir = data.getFullPath() + merged_name;
String merged_dir = data.getFullPath() + merged_name;
if (Poco::File(merged_dir).exists())
throw Exception("Directory " + merged_dir + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
......@@ -375,14 +333,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
String part_path;
if (is_sharded)
part_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/" + parts[i]->name + '/';
else
part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
......@@ -436,12 +388,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
}
String new_part_tmp_path;
if (is_sharded)
new_part_tmp_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/tmp_" + merged_name + "/";
else
new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
const String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
auto compression_method = data.context.chooseCompressionMethod(
merge_entry->total_size_bytes_compressed,
......@@ -483,45 +430,40 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
new_data_part->size = to.marksCount();
new_data_part->modification_time = time(0);
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part_tmp_path);
new_data_part->is_sharded = is_sharded;
new_data_part->shard_no = shard_no;
if (!is_sharded)
{
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
if (new_data_part->name != merged_name)
throw Exception("Unexpected part name: " + new_data_part->name + " instead of " + merged_name, ErrorCodes::LOGICAL_ERROR);
if (new_data_part->name != merged_name)
throw Exception("Unexpected part name: " + new_data_part->name + " instead of " + merged_name, ErrorCodes::LOGICAL_ERROR);
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
/** Это нормально, хотя такое бывает редко.
* Ситуация - было заменено 0 кусков вместо N может быть, например, в следующем случае:
* - у нас был кусок A, но не было куска B и C;
* - в очереди был мердж A, B -> AB, но его не делали, так как куска B нет;
* - в очереди был мердж AB, C -> ABC, но его не делали, так как куска AB и C нет;
* - мы выполнили задачу на скачивание куска B;
* - мы начали делать мердж A, B -> AB, так как все куски появились;
* - мы решили скачать с другой реплики кусок ABC, так как невозможно было сделать мердж AB, C -> ABC;
* - кусок ABC появился, при его добавлении, были удалены старые куски A, B, C;
* - мердж AB закончился. Добавился кусок AB. Но это устаревший кусок. В логе будет сообщение Obsolete part added,
* затем попадаем сюда.
* Ситуация - было заменено M > N кусков тоже нормальная.
*
* Хотя это должно предотвращаться проверкой в методе StorageReplicatedMergeTree::shouldExecuteLogEntry.
*/
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
if (parts[i]->name != replaced_parts[i]->name)
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
}
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
/** Это нормально, хотя такое бывает редко.
* Ситуация - было заменено 0 кусков вместо N может быть, например, в следующем случае:
* - у нас был кусок A, но не было куска B и C;
* - в очереди был мердж A, B -> AB, но его не делали, так как куска B нет;
* - в очереди был мердж AB, C -> ABC, но его не делали, так как куска AB и C нет;
* - мы выполнили задачу на скачивание куска B;
* - мы начали делать мердж A, B -> AB, так как все куски появились;
* - мы решили скачать с другой реплики кусок ABC, так как невозможно было сделать мердж AB, C -> ABC;
* - кусок ABC появился, при его добавлении, были удалены старые куски A, B, C;
* - мердж AB закончился. Добавился кусок AB. Но это устаревший кусок. В логе будет сообщение Obsolete part added,
* затем попадаем сюда.
* Ситуация - было заменено M > N кусков тоже нормальная.
*
* Хотя это должно предотвращаться проверкой в методе ReplicatedMergeTreeQueue::shouldExecuteLogEntry.
*/
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
if (parts[i]->name != replaced_parts[i]->name)
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
}
LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
......
#include <DB/Storages/MergeTree/MergeTreeSharder.h>
#include <DB/Storages/MergeTree/ReshardingJob.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <ctime>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TYPE_MISMATCH;
}
namespace
{
template <typename T>
std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, size_t num_shards, const std::vector<size_t> & slots)
{
const auto total_weight = slots.size();
std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using UnsignedT = typename std::make_unsigned<T>::type;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column->isConst())
{
const auto data = typeid_cast<const ColumnConst<T> *>(column)->getData();
const auto shard_num = slots[static_cast<UnsignedT>(data) % total_weight];
for (size_t i = 0; i < num_shards; ++i)
filters[i].assign(num_rows, static_cast<UInt8>(shard_num == i));
}
else
{
const auto & data = typeid_cast<const ColumnVector<T> *>(column)->getData();
for (size_t i = 0; i < num_shards; ++i)
{
filters[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j)
filters[i][j] = slots[static_cast<UnsignedT>(data[j]) % total_weight] == i;
}
}
return filters;
}
}
ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_,
size_t shard_no_, UInt16 min_date_, UInt16 max_date_)
: block(block_), shard_no(shard_no_), min_date(min_date_), max_date(max_date_)
{
}
MergeTreeSharder::MergeTreeSharder(MergeTreeData & data_, const ReshardingJob & job_)
: data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)"))
{
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
{
const WeightedZooKeeperPath & weighted_path = job.paths[shard_no];
slots.insert(slots.end(), weighted_path.second, shard_no);
}
}
ShardedBlocksWithDateIntervals MergeTreeSharder::shardBlock(const Block & block)
{
ShardedBlocksWithDateIntervals res;
const auto num_cols = block.columns();
/// cache column pointers for later reuse
std::vector<const IColumn*> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.getByPosition(i).column;
auto filters = createFilters(block);
const auto num_shards = job.paths.size();
ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад.
for (size_t shard_no = 0; shard_no < num_shards; ++shard_no)
{
auto target_block = block.cloneEmpty();
for (size_t col = 0; col < num_cols; ++col)
target_block.getByPosition(col).column = columns[col]->filter(filters[shard_no], size_hint);
if (target_block.rowsInFirstColumn())
{
/// Достаём столбец с датой.
const ColumnUInt16::Container_t & dates =
typeid_cast<const ColumnUInt16 &>(*target_block.getByName(data.date_column_name).column).getData();
/// Минимальная и максимальная дата.
UInt16 min_date = std::numeric_limits<UInt16>::max();
UInt16 max_date = std::numeric_limits<UInt16>::min();
for (ColumnUInt16::Container_t::const_iterator it = dates.begin(); it != dates.end(); ++it)
{
if (*it < min_date)
min_date = *it;
if (*it > max_date)
max_date = *it;
}
res.emplace_back(target_block, shard_no, min_date, max_date);
}
}
return res;
}
MergeTreeData::MutableDataPartPtr MergeTreeSharder::writeTempPart(
ShardedBlockWithDateInterval & sharded_block_with_dates, Int64 temp_index)
{
Block & block = sharded_block_with_dates.block;
UInt16 min_date = sharded_block_with_dates.min_date;
UInt16 max_date = sharded_block_with_dates.max_date;
size_t shard_no = sharded_block_with_dates.shard_no;
const auto & date_lut = DateLUT::instance();
DayNum_t min_month = date_lut.toFirstDayNumOfMonth(DayNum_t(min_date));
DayNum_t max_month = date_lut.toFirstDayNumOfMonth(DayNum_t(max_date));
if (min_month != max_month)
throw Exception("Logical error: part spans more than one month.", ErrorCodes::LOGICAL_ERROR);
size_t part_size = (block.rows() + data.index_granularity - 1) / data.index_granularity;
String tmp_part_name = "tmp_" + ActiveDataPartSet::getPartName(
DayNum_t(min_date), DayNum_t(max_date),
temp_index, temp_index, 0);
String part_tmp_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/" + tmp_part_name + "/";
Poco::File(part_tmp_path).createDirectories();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = tmp_part_name;
new_data_part->is_temp = true;
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
if (data.mode != MergeTreeData::Unsorted)
data.getPrimaryExpression()->execute(block);
SortDescription sort_descr = data.getSortDescription();
/// Сортируем.
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (data.mode != MergeTreeData::Unsorted)
{
if (!isAlreadySorted(block, sort_descr))
{
stableGetPermutation(block, sort_descr, perm);
perm_ptr = &perm;
}
}
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, part_tmp_path, columns, CompressionMethod::LZ4);
out.getIndex().reserve(part_size * sort_descr.size());
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);
MergeTreeData::DataPart::Checksums checksums = out.writeSuffixAndGetChecksums();
new_data_part->left_date = DayNum_t(min_date);
new_data_part->right_date = DayNum_t(max_date);
new_data_part->left = temp_index;
new_data_part->right = temp_index;
new_data_part->level = 0;
new_data_part->size = part_size;
new_data_part->modification_time = std::time(0);
new_data_part->month = min_month;
new_data_part->columns = columns;
new_data_part->checksums = checksums;
new_data_part->index.swap(out.getIndex());
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(part_tmp_path);
new_data_part->is_sharded = true;
new_data_part->shard_no = sharded_block_with_dates.shard_no;
return new_data_part;
}
std::vector<IColumn::Filter> MergeTreeSharder::createFilters(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, size_t num_shards, const std::vector<size_t> & slots);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &createFiltersImpl<UInt8> },
{ TypeName<UInt16>::get(), &createFiltersImpl<UInt16> },
{ TypeName<UInt32>::get(), &createFiltersImpl<UInt32> },
{ TypeName<UInt64>::get(), &createFiltersImpl<UInt64> },
{ TypeName<Int8>::get(), &createFiltersImpl<Int8> },
{ TypeName<Int16>::get(), &createFiltersImpl<Int16> },
{ TypeName<Int32>::get(), &createFiltersImpl<Int32> },
{ TypeName<Int64>::get(), &createFiltersImpl<Int64> },
};
data.getPrimaryExpression()->execute(block);
const auto & key_column = block.getByName(job.sharding_key);
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
return it != std::end(creators)
? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), job.paths.size(), slots)
: throw Exception{
"Sharding key expression does not evaluate to an integer type",
ErrorCodes::TYPE_MISMATCH
};
}
}
#include <DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
}
namespace RemoteDiskSpaceMonitor
{
namespace
{
std::string getEndpointId(const std::string & node_id)
{
return "RemoteDiskSpaceMonitor:" + node_id;
}
}
Service::Service(const String & path_)
: path(path_)
{
}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED);
size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path);
writeBinary(free_space, out);
out.next();
}
size_t Client::getFreeDiskSpace(const InterserverIOEndpointLocation & location) const
{
ReadBufferFromHTTP::Params params =
{
{"endpoint", getEndpointId(location.name) },
{"compress", "false"}
};
ReadBufferFromHTTP in(location.host, location.port, params);
size_t free_disk_space;
readBinary(free_disk_space, in);
assertEOF(in);
return free_disk_space;
}
}
}
#include <DB/Storages/MergeTree/RemoteQueryExecutor.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
}
namespace RemoteQueryExecutor
{
namespace
{
std::string getEndpointId(const std::string & node_id)
{
return "RemoteQueryExecutor:" + node_id;
}
}
Service::Service(Context & context_)
: context(context_)
{
}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("RemoteQueryExecutor service terminated", ErrorCodes::ABORTED);
std::string query = params.get("query");
bool flag = true;
try
{
(void) executeQuery(query, context, true);
}
catch (...)
{
flag = false;
}
writeBinary(flag, out);
out.next();
}
bool Client::executeQuery(const InterserverIOEndpointLocation & location, const std::string & query)
{
ReadBufferFromHTTP::Params params =
{
{"endpoint", getEndpointId(location.name)},
{"compress", "false"},
{"query", query}
};
ReadBufferFromHTTP in(location.host, location.port, params);
bool flag;
readBinary(flag, in);
assertEOF(in);
return flag;
}
}
}
#include <DB/Storages/MergeTree/DataPartsExchange.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/CurrentMetrics.h>
......@@ -9,52 +9,22 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
}
namespace DataPartsExchange
{
namespace
{
std::string getEndpointId(const std::string & node_id)
{
return "DataPartsExchange:" + node_id;
}
}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String part_name = params.get("part");
String shard_str = params.get("shard");
bool send_sharded_part = !shard_str.empty();
LOG_TRACE(log, "Sending part " << part_name);
try
{
auto storage_lock = storage.lockStructure(false);
MergeTreeData::DataPartPtr part;
if (send_sharded_part)
{
size_t shard_no = std::stoul(shard_str);
part = findShardedPart(part_name, shard_no);
}
else
part = findPart(part_name);
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
......@@ -73,13 +43,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
{
String file_name = it.first;
String path;
if (send_sharded_part)
path = data.getFullPath() + "reshard/" + shard_str + "/" + part_name + "/" + file_name;
else
path = data.getFullPath() + part_name + "/" + file_name;
String path = data.getFullPath() + part_name + "/" + file_name;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
......@@ -111,53 +75,17 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
}
}
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
MergeTreeData::DataPartPtr part = data.getPartIfExists(name);
if (part)
return part;
throw Exception("No part " + name + " in table");
}
MergeTreeData::DataPartPtr Service::findShardedPart(const String & name, size_t shard_no)
{
MergeTreeData::DataPartPtr part = data.getShardedPartIfExists(name, shard_no);
if (part)
return part;
throw Exception("No part " + name + " in table");
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port,
bool to_detached)
{
return fetchPartImpl(part_name, replica_path, host, port, "", to_detached);
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchShardedPart(
const InterserverIOEndpointLocation & location,
const String & part_name,
size_t shard_no)
{
return fetchPartImpl(part_name, location.name, location.host, location.port, toString(shard_no), true);
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
const String & part_name,
const String & replica_path,
const String & host,
int port,
const String & shard_no,
bool to_detached)
{
ReadBufferFromHTTP::Params params =
{
{"endpoint", getEndpointId(replica_path)},
{"endpoint", "ReplicatedMergeTree:" + replica_path},
{"part", part_name},
{"shard", shard_no},
{"compress", "false"}
};
......@@ -222,12 +150,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
new_data_part->loadColumns(true);
new_data_part->loadChecksums(true);
new_data_part->loadIndex();
new_data_part->is_sharded = false;
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;
}
}
}
......@@ -161,16 +161,6 @@ void ReplicatedMergeTreeRestartingThread::run()
{
storage.endpoint_holder->cancel();
storage.endpoint_holder = nullptr;
storage.disk_space_monitor_endpoint_holder->cancel();
storage.disk_space_monitor_endpoint_holder = nullptr;
storage.sharded_partition_sender_endpoint_holder->cancel();
storage.sharded_partition_sender_endpoint_holder = nullptr;
storage.remote_query_executor_endpoint_holder->cancel();
storage.remote_query_executor_endpoint_holder = nullptr;
partialShutdown();
}
catch (...)
......
#include <DB/Storages/MergeTree/ReshardingJob.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
ReshardingJob::ReshardingJob(const std::string & serialized_job)
{
ReadBufferFromString buf(serialized_job);
readBinary(database_name, buf);
readBinary(table_name, buf);
readBinary(partition, buf);
readBinary(sharding_key, buf);
while (!buf.eof())
{
std::string path;
readBinary(path, buf);
UInt64 weight;
readBinary(weight, buf);
paths.emplace_back(path, weight);
}
}
ReshardingJob::ReshardingJob(const std::string & database_name_, const std::string & table_name_,
const std::string & partition_, const WeightedZooKeeperPaths & paths_,
const std::string & sharding_key_)
: database_name(database_name_),
table_name(table_name_),
partition(partition_),
paths(paths_),
sharding_key(sharding_key_)
{
}
std::string ReshardingJob::toString() const
{
std::string serialized_job;
WriteBufferFromString buf(serialized_job);
writeBinary(database_name, buf);
writeBinary(table_name, buf);
writeBinary(partition, buf);
writeBinary(sharding_key, buf);
for (const auto & path : paths)
{
writeBinary(path.first, buf);
writeBinary(path.second, buf);
}
buf.next();
return serialized_job;
}
}
#include <DB/Storages/MergeTree/ShardedPartitionSender.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
}
namespace
{
std::string glue(const std::vector<std::string> & names, char delim)
{
std::string res;
bool is_first = true;
for (const auto & name : names)
{
if (is_first)
is_first = false;
else
res.append(1, delim);
res.append(name);
}
return res;
}
}
namespace ShardedPartitionSender
{
namespace
{
std::string getEndpointId(const std::string & node_id)
{
return "ShardedPartitionSender:" + node_id;
}
}
Service::Service(StorageReplicatedMergeTree & storage_)
: storage(storage_)
{
}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
InterserverIOEndpointLocation from_location(params.get("from_location"));
std::string glued_parts = params.get("parts");
size_t shard_no = std::stoul(params.get("shard"));
std::vector<std::string> parts;
boost::split(parts, glued_parts, boost::is_any_of(","));
for (const auto & part_name : parts)
{
if (is_cancelled)
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no);
part->is_temp = false;
const std::string new_name = "detached/" + part_name;
Poco::File(storage.full_path + part->name).renameTo(storage.full_path + new_name);
}
bool flag = true;
writeBinary(flag, out);
out.next();
}
bool Client::send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::vector<std::string> & parts, size_t shard_no)
{
std::string glued_parts = glue(parts, ',');
ReadBufferFromHTTP::Params params =
{
{"endpoint", getEndpointId(to_location.name)},
{"from_location", from_location.toString()},
{"compress", "false"},
{"parts", glued_parts},
{"shard", toString(shard_no)}
};
ReadBufferFromHTTP in(to_location.host, to_location.port, params);
bool flag;
readBinary(flag, in);
assertEOF(in);
return flag;
}
}
}
......@@ -490,7 +490,7 @@ void StorageBuffer::flushThread()
}
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
auto lock = lockStructureForAlter();
......
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/BlockExtraInfoInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/Storages/VirtualColumnFactory.h>
......@@ -12,19 +11,11 @@
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ParserAlterQuery.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Parsers/ASTWeightedZooKeeperPath.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/InterpreterDescribeQuery.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Interpreters/ClusterProxy/Query.h>
#include <DB/Interpreters/ClusterProxy/SelectQueryConstructor.h>
#include <DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h>
#include <DB/Interpreters/ClusterProxy/AlterQueryConstructor.h>
#include <DB/Core/Field.h>
......@@ -68,6 +59,17 @@ namespace
return modified_query_ast;
}
BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
{
BlockExtraInfo block_extra_info;
block_extra_info.host = address.host_name;
block_extra_info.resolved_address = address.resolved_address.toString();
block_extra_info.port = address.port;
block_extra_info.user = address.user;
block_extra_info.is_valid = true;
return block_extra_info;
}
}
......@@ -168,20 +170,39 @@ BlockInputStreams StorageDistributed::read(
const size_t max_block_size,
const unsigned threads)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
/// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом.
new_settings.max_concurrent_queries_for_user = 0;
size_t result_size = (cluster.getRemoteShardCount() * settings.max_parallel_replicas) + cluster.getLocalShardCount();
processed_stage = result_size == 1 || settings.distributed_group_by_no_merge
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
BlockInputStreams res;
const auto & modified_query_ast = rewriteSelectQuery(
query, remote_database, remote_table);
const auto & modified_query = queryToString(modified_query_ast);
/// Ограничение сетевого трафика, если нужно.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
throttler.reset(new Throttler(
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded."));
Tables external_tables;
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
external_tables = context.getExternalTables();
/// Распределить шарды равномерно по потокам.
size_t remote_count = cluster.getRemoteShardCount();
/// Отключаем мультиплексирование шардов, если есть ORDER BY без GROUP BY.
//const ASTSelectQuery & ast = *(static_cast<const ASTSelectQuery *>(modified_query_ast.get()));
......@@ -192,10 +213,79 @@ BlockInputStreams StorageDistributed::read(
//bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list);
bool enable_shard_multiplexing = false;
ClusterProxy::SelectQueryConstructor select_query_constructor(processed_stage, external_tables);
size_t thread_count;
if (!enable_shard_multiplexing)
thread_count = remote_count;
else if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
return ClusterProxy::Query(select_query_constructor, cluster, modified_query_ast,
context, settings, enable_shard_multiplexing).execute();
ConnectionPoolsPtr pools;
bool do_init = true;
/// Цикл по шардам.
size_t current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
if (shard_info.isLocal())
{
/// Добавляем запросы к локальному ClickHouse.
DB::Context new_context = context;
new_context.setSettings(new_settings);
for (size_t i = 0; i < shard_info.local_addresses.size(); ++i)
{
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage);
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in));
}
}
else
{
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
res.emplace_back(new RemoteBlockInputStream{
shard_info.pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
++current_thread;
}
else
{
if (do_init)
{
pools = new ConnectionPools;
do_init = false;
}
pools->push_back(shard_info.pool);
if (pools->size() == actual_pools_per_thread)
{
res.emplace_back(new RemoteBlockInputStream{
pools, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
do_init = true;
++current_thread;
}
}
}
}
return res;
}
BlockOutputStreamPtr StorageDistributed::write(ASTPtr query, const Settings & settings)
......@@ -213,7 +303,7 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query, const Settings & se
};
}
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
auto lock = lockStructureForAlter();
params.apply(*columns, materialized_columns, alias_columns, column_defaults);
......@@ -226,83 +316,120 @@ void StorageDistributed::shutdown()
directory_monitors.clear();
}
void StorageDistributed::reshardPartitions(const String & database_name, const Field & first_partition,
const Field & last_partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const String & sharding_key, const Settings & settings)
BlockInputStreams StorageDistributed::describe(const Context & context, const Settings & settings)
{
/// Создать запрос ALTER TABLE xxx.yyy RESHARD PARTITION zzz TO ttt USING uuu.
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
/// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом.
new_settings.max_concurrent_queries_for_user = 0;
ASTPtr alter_query_ptr = new ASTAlterQuery;
auto & alter_query = static_cast<ASTAlterQuery &>(*alter_query_ptr);
/// Создать запрос DESCRIBE TABLE.
auto describe_query = new ASTDescribeQuery;
describe_query->database = remote_database;
describe_query->table = remote_table;
alter_query.database = remote_database;
alter_query.table = remote_table;
ASTPtr ast = describe_query;
const auto query = queryToString(ast);
alter_query.parameters.emplace_back();
ASTAlterQuery::Parameters & parameters = alter_query.parameters.back();
/// Ограничение сетевого трафика, если нужно.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
throttler.reset(new Throttler(
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded."));
parameters.type = ASTAlterQuery::RESHARD_PARTITION;
if (!first_partition.isNull())
parameters.partition = new ASTLiteral({}, first_partition);
if (!last_partition.isNull())
parameters.last_partition = new ASTLiteral({}, last_partition);
BlockInputStreams res;
ASTPtr expr_list = new ASTExpressionList;
for (const auto & entry : weighted_zookeeper_paths)
/// Распределить шарды равномерно по потокам.
size_t remote_count = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
ASTPtr weighted_path_ptr = new ASTWeightedZooKeeperPath;
auto & weighted_path = static_cast<ASTWeightedZooKeeperPath &>(*weighted_path_ptr);
weighted_path.path = entry.first;
weighted_path.weight = entry.second;
expr_list->children.push_back(weighted_path_ptr);
if (shard_info.hasRemoteConnections())
++remote_count;
}
parameters.weighted_zookeeper_paths = expr_list;
parameters.sharding_key = sharding_key;
size_t thread_count;
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
* Подробнее смотрите в https://███████████.yandex-team.ru/METR-18300
*/
bool enable_shard_multiplexing = false;
ClusterProxy::AlterQueryConstructor alter_query_constructor;
/* if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
*/
thread_count = remote_count;
BlockInputStreams streams = ClusterProxy::Query(alter_query_constructor, cluster, alter_query_ptr,
context, settings, enable_shard_multiplexing).execute();
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections);
streams.resize(1);
ConnectionPoolsPtr pools;
bool do_init = true;
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
if (stream_ptr == nullptr)
throw Exception("StorageDistributed: Internal error", ErrorCodes::LOGICAL_ERROR);
auto & stream = *stream_ptr;
while (!stream.isCancelled() && stream.read())
;
}
BlockInputStreams StorageDistributed::describe(const Context & context, const Settings & settings)
{
/// Создать запрос DESCRIBE TABLE.
ASTPtr describe_query_ptr = new ASTDescribeQuery;
auto & describe_query = static_cast<ASTDescribeQuery &>(*describe_query_ptr);
describe_query.database = remote_database;
describe_query.table = remote_table;
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
* Подробнее смотрите в https://███████████.yandex-team.ru/METR-18300
*/
bool enable_shard_multiplexing = false;
ClusterProxy::DescribeQueryConstructor describe_query_constructor;
/// Цикл по шардам.
size_t current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
if (shard_info.isLocal())
{
/// Добавляем запросы к локальному ClickHouse.
DB::Context new_context = context;
new_context.setSettings(new_settings);
for (const auto & address : shard_info.local_addresses)
{
InterpreterDescribeQuery interpreter(ast, new_context);
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address));
res.emplace_back(stream);
}
}
if (shard_info.hasRemoteConnections())
{
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler};
stream->doBroadcast();
stream->appendExtraInfo();
res.emplace_back(stream);
++current_thread;
}
else
{
if (do_init)
{
pools = new ConnectionPools;
do_init = false;
}
pools->push_back(shard_info.pool);
if (pools->size() == actual_pools_per_thread)
{
auto stream = new RemoteBlockInputStream{pools, query, &new_settings, throttler};
stream->doBroadcast();
stream->appendExtraInfo();
res.emplace_back(stream);
do_init = true;
++current_thread;
}
}
}
}
return ClusterProxy::Query(describe_query_constructor, cluster, describe_query_ptr,
context, settings, enable_shard_multiplexing).execute();
return res;
}
NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
......
......@@ -215,7 +215,7 @@ void StorageMerge::getSelectedTables(StorageVector & selected_tables) const
}
void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
auto lock = lockStructureForAlter();
params.apply(*columns, materialized_columns, alias_columns, column_defaults);
......
......@@ -161,7 +161,7 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_
/// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger.
}
void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
/// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго.
const MergeTreeMergeBlocker merge_blocker{merger};
......
......@@ -173,7 +173,6 @@ public:
*/
void waitForDisappear(const std::string & path);
std::string getTaskQueuePath() const;
/** Асинхронный интерфейс (реализовано небольшое подмножество операций).
*
......@@ -300,7 +299,7 @@ private:
friend struct WatchWithEvent;
friend class EphemeralNodeHolder;
void init(const std::string & hosts, int32_t session_timeout_ms, const std::string & task_queue_path_ = "");
void init(const std::string & hosts, int32_t session_timeout_ms);
void removeChildrenRecursive(const std::string & path);
void tryRemoveChildrenRecursive(const std::string & path);
void * watchForEvent(EventPtr event);
......@@ -343,7 +342,6 @@ private:
int32_t existsImpl(const std::string & path, Stat * stat_, EventPtr watch = nullptr);
std::string hosts;
std::string task_queue_path;
int32_t session_timeout_ms;
std::mutex mutex;
......
......@@ -61,13 +61,12 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p
}
}
void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_, const std::string & task_queue_path_)
void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_)
{
log = &Logger::get("ZooKeeper");
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
hosts = hosts_;
session_timeout_ms = session_timeout_ms_;
task_queue_path = task_queue_path_;
impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
......@@ -105,10 +104,6 @@ struct ZooKeeperArgs
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "task_queue_path")
{
task_queue_path = config.getString(config_name + "." + key);
}
else throw KeeperException(std::string("Unknown key ") + key + " in config file");
}
......@@ -125,13 +120,12 @@ struct ZooKeeperArgs
std::string hosts;
size_t session_timeout_ms;
std::string task_queue_path;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, args.session_timeout_ms, args.task_queue_path);
init(args.hosts, args.session_timeout_ms);
}
void * ZooKeeper::watchForEvent(EventPtr event)
......@@ -584,10 +578,6 @@ void ZooKeeper::waitForDisappear(const std::string & path)
}
}
std::string ZooKeeper::getTaskQueuePath() const
{
return task_queue_path;
}
ZooKeeper::~ZooKeeper()
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册