提交 aa133365 编写于 作者: A artpaul

sync

上级 706fe739
......@@ -16,7 +16,9 @@ namespace DB
class DDLWorker
{
public:
DDLWorker(Context * ctx, const std::string & host, int port);
DDLWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_,
const std::string & host, int port);
~DDLWorker();
private:
......@@ -26,7 +28,7 @@ private:
void run();
private:
Context * context;
Context & context;
std::string local_addr;
std::string base_path;
......
......@@ -4,11 +4,64 @@
namespace DB
{
DDLWorker::DDLWorker(Context * ctx, const std::string & host, int port)
: context(ctx)
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
}
namespace {
/// Helper class which extracts from the ClickHouse configuration file
/// the parameters we need for operating the resharding thread.
class Arguments final
{
public:
Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (key == "task_queue_path")
task_queue_path = config.getString(config_name + "." + key);
else
throw Exception{"Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
}
if (task_queue_path.empty())
throw Exception{"Resharding: missing parameter task_queue_path", ErrorCodes::INVALID_CONFIG_PARAMETER};
}
Arguments(const Arguments &) = delete;
Arguments & operator=(const Arguments &) = delete;
std::string getTaskQueuePath() const
{
return task_queue_path;
}
private:
std::string task_queue_path;
};
}
DDLWorker::DDLWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_,
const std::string & host, int port)
: context(context_)
, stop_flag(false)
, thread(&DDLWorker::run, this)
{
Arguments arguments(config, config_name);
auto zookeeper = context.getZooKeeper();
std::string root = arguments.getTaskQueuePath();
if (root.back() != '/')
root += "/";
local_addr = host + ":" + std::to_string(port);
base_path = "/clickhouse/task_queue/ddl/";
}
......@@ -26,7 +79,7 @@ void DDLWorker::processTasks() {
}
void DDLWorker::processCreate(const std::string & path) {
auto zookeeper = context->getZooKeeper();
auto zookeeper = context.getZooKeeper();
const Strings & children = zookeeper->getChildren(path);
for (const auto & name : children) {
......@@ -35,7 +88,7 @@ void DDLWorker::processCreate(const std::string & path) {
std::string value = zookeeper->get(path);
if (!value.empty()) {
executeQuery(value, *context);
executeQuery(value, context);
}
zookeeper->remove(path);
......
......@@ -377,9 +377,9 @@ int Server::main(const std::vector<std::string> & args)
has_resharding_worker = true;
}
// TODO read from config
if (has_zookeeper)
if (has_zookeeper && config().has("distributed_ddl"))
{
auto ddl_worker = std::make_shared<DDLWorker>(global_context.get(), "localhost", 9000);
auto ddl_worker = std::make_shared<DDLWorker>(config(), "distributed_ddl", *global_context, "localhost", 9000);
global_context->setDDLWorker(ddl_worker);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册