提交 f5652b44 编写于 作者: A Alexey Milovidov

Modifications after removing libzookeeper; initialize ZooKeeper session lazily [#CLICKHOUSE-2]

上级 08170d0d
......@@ -345,7 +345,7 @@ void read(String & s, ReadBuffer & in)
static constexpr int32_t max_string_size = 1 << 20;
int32_t size = 0;
read(size, in);
if (size < 0) /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string.
if (size < 0) /// TODO Actually it means that zookeeper node has NULL value. Maybe better to treat it like empty string.
throw Exception("Negative size while reading string from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
if (size > max_string_size)
throw Exception("Too large string size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
......@@ -862,8 +862,6 @@ void ZooKeeper::receiveEvent()
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), ZRUNTIMEINCONSISTENCY);
response = std::make_shared<HeartbeatResponse>();
// std::cerr << "Received heartbeat\n";
}
else if (xid == watch_xid)
{
......@@ -896,8 +894,6 @@ void ZooKeeper::receiveEvent()
watches.erase(it);
}
};
// std::cerr << "Received watch\n";
}
else
{
......@@ -912,8 +908,6 @@ void ZooKeeper::receiveEvent()
operations.erase(it);
}
// std::cerr << "Received response: " << request_info.request->getOpNum() << "\n";
response = request_info.request->makeResponse();
}
......
......@@ -515,12 +515,6 @@ void Context::setConfig(const ConfigurationPtr & config)
shared->config = config;
}
ConfigurationPtr Context::getConfig() const
{
auto lock = getLock();
return shared->config;
}
Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
......@@ -1326,21 +1320,13 @@ DDLWorker & Context::getDDLWorker() const
return *shared->ddl_worker;
}
void Context::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard<std::mutex> lock(shared->zookeeper_mutex);
if (shared->zookeeper)
throw Exception("ZooKeeper client has already been set.", ErrorCodes::LOGICAL_ERROR);
shared->zookeeper = std::move(zookeeper);
}
zkutil::ZooKeeperPtr Context::getZooKeeper() const
{
std::lock_guard<std::mutex> lock(shared->zookeeper_mutex);
if (shared->zookeeper && shared->zookeeper->expired())
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(getConfigRef(), "zookeeper");
else if (shared->zookeeper->expired())
shared->zookeeper = shared->zookeeper->startNewSession();
return shared->zookeeper;
......
......@@ -140,7 +140,6 @@ public:
/// Global application configuration settings.
void setConfig(const ConfigurationPtr & config);
ConfigurationPtr getConfig() const;
Poco::Util::AbstractConfiguration & getConfigRef() const;
/** Take the list of users, quotas and configuration profiles from this config.
......@@ -300,7 +299,6 @@ public:
MergeList & getMergeList();
const MergeList & getMergeList() const;
void setZooKeeper(std::shared_ptr<zkutil::ZooKeeper> zookeeper);
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Has ready or expired ZooKeeper
......
......@@ -716,13 +716,11 @@ class ClusterCopier
{
public:
ClusterCopier(const ConfigurationPtr & zookeeper_config_,
const String & task_path_,
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
Context & context_)
:
zookeeper_config(zookeeper_config_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
......@@ -733,7 +731,7 @@ public:
void init()
{
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
task_description_watch_callback = [this] (const ZooKeeperImpl::ZooKeeper::WatchResponse &)
{
......@@ -763,8 +761,8 @@ public:
/// Do not initialize tables, will make deferred initialization in process()
getZooKeeper()->createAncestors(getWorkersPathVersion() + "/");
getZooKeeper()->createAncestors(getWorkersPath() + "/");
zookeeper->createAncestors(getWorkersPathVersion() + "/");
zookeeper->createAncestors(getWorkersPath() + "/");
}
template <typename T>
......@@ -891,7 +889,7 @@ public:
void reloadTaskDescription()
{
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
task_description_watch_zookeeper = zookeeper;
String task_config_str;
......@@ -1088,7 +1086,7 @@ protected:
{
LOG_DEBUG(log, "Check that all shards processed partition " << partition_name << " successfully");
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
Strings status_paths;
for (auto & shard : shards_with_partition)
......@@ -1460,7 +1458,7 @@ protected:
TaskTable & task_table = task_shard.task_table;
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String current_task_is_active_path = task_partition.getActiveWorkerPath();
......@@ -1996,21 +1994,7 @@ protected:
return successful_shards;
}
zkutil::ZooKeeperPtr getZooKeeper()
{
auto zookeeper = context.getZooKeeper();
if (!zookeeper)
{
context.setZooKeeper(std::make_shared<zkutil::ZooKeeper>(*zookeeper_config, "zookeeper"));
zookeeper = context.getZooKeeper();
}
return zookeeper;
}
private:
ConfigurationPtr zookeeper_config;
String task_zookeeper_path;
String task_description_path;
String host_id;
......@@ -2153,6 +2137,7 @@ void ClusterCopierApp::mainImpl()
auto context = std::make_unique<Context>(Context::createGlobal());
SCOPE_EXIT(context->shutdown());
context->setConfig(zookeeper_configuration);
context->setGlobalContext(*context);
context->setApplicationType(Context::ApplicationType::LOCAL);
context->setPath(process_path);
......@@ -2166,8 +2151,7 @@ void ClusterCopierApp::mainImpl()
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
std::unique_ptr<ClusterCopier> copier(new ClusterCopier(
zookeeper_configuration, task_path, host_id, default_database, *context));
std::unique_ptr<ClusterCopier> copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
......
......@@ -102,12 +102,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setGlobalContext(*global_context);
global_context->setApplicationType(Context::ApplicationType::SERVER);
bool has_zookeeper = false;
if (config().has("zookeeper"))
{
global_context->setZooKeeper(std::make_shared<zkutil::ZooKeeper>(config(), "zookeeper"));
has_zookeeper = true;
}
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
if (loaded_config.has_zk_includes)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册