未验证 提交 7755db5e 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #9232 from ClickHouse/session-cleaner-better-build-time

Better build time (remove SessionCleaner from Context)
......@@ -273,7 +273,7 @@ void HTTPHandler::processQuery(
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
std::shared_ptr<Context> session;
std::shared_ptr<NamedSession> session;
String session_id;
std::chrono::steady_clock::duration session_timeout;
bool session_is_set = params.has("session_id");
......@@ -285,15 +285,15 @@ void HTTPHandler::processQuery(
session_timeout = parseSessionTimeout(config, params);
std::string session_check = params.get("session_check", "");
session = context.acquireSession(session_id, session_timeout, session_check == "1");
session = context.acquireNamedSession(session_id, session_timeout, session_check == "1");
context = *session;
context.setSessionContext(*session);
context = session->context;
context.setSessionContext(session->context);
}
SCOPE_EXIT({
if (session_is_set)
session->releaseSession(session_id, session_timeout);
if (session)
session->release();
});
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
......
......@@ -908,6 +908,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->enableNamedSessions();
for (auto & server : servers)
server->start();
......@@ -1020,8 +1022,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getConfigRef(), graphite_key, async_metrics));
}
SessionCleaner session_cleaner(*global_context);
waitForTerminationRequest();
}
......
......@@ -93,6 +93,182 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_SCALAR;
extern const int AUTHENTICATION_FAILED;
extern const int NOT_IMPLEMENTED;
}
class NamedSessions
{
public:
using Key = NamedSessionKey;
~NamedSessions()
{
try
{
{
std::lock_guard lock{mutex};
quit = true;
}
cond.notify_one();
thread.join();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// Find existing session or create a new.
std::shared_ptr<NamedSession> acquireSession(
const String & session_id,
Context & context,
std::chrono::steady_clock::duration timeout,
bool throw_if_not_found)
{
std::unique_lock lock(mutex);
auto & user_name = context.client_info.current_user;
if (user_name.empty())
throw Exception("Empty user name.", ErrorCodes::LOGICAL_ERROR);
Key key(user_name, session_id);
auto it = sessions.find(key);
if (it == sessions.end())
{
if (throw_if_not_found)
throw Exception("Session not found.", ErrorCodes::SESSION_NOT_FOUND);
/// Create a new session from current context.
it = sessions.insert(std::make_pair(key, std::make_shared<NamedSession>(key, context, timeout, *this))).first;
}
else if (it->second->key.first != context.client_info.current_user)
{
throw Exception("Session belongs to a different user", ErrorCodes::LOGICAL_ERROR);
}
/// Use existing session.
const auto & session = it->second;
if (!session.unique())
throw Exception("Session is locked by a concurrent client.", ErrorCodes::SESSION_IS_LOCKED);
return session;
}
void releaseSession(NamedSession & session)
{
std::unique_lock lock(mutex);
scheduleCloseSession(session, lock);
}
private:
class SessionKeyHash
{
public:
size_t operator()(const Key & key) const
{
SipHash hash;
hash.update(key.first);
hash.update(key.second);
return hash.get64();
}
};
/// TODO it's very complicated. Make simple std::map with time_t or boost::multi_index.
using Container = std::unordered_map<Key, std::shared_ptr<NamedSession>, SessionKeyHash>;
using CloseTimes = std::deque<std::vector<Key>>;
Container sessions;
CloseTimes close_times;
std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1);
std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now();
UInt64 close_cycle = 0;
void scheduleCloseSession(NamedSession & session, std::unique_lock<std::mutex> &)
{
/// Push it on a queue of sessions to close, on a position corresponding to the timeout.
/// (timeout is measured from current moment of time)
const UInt64 close_index = session.timeout / close_interval + 1;
const auto new_close_cycle = close_cycle + close_index;
if (session.close_cycle != new_close_cycle)
{
session.close_cycle = new_close_cycle;
if (close_times.size() < close_index + 1)
close_times.resize(close_index + 1);
close_times[close_index].emplace_back(session.key);
}
}
void cleanThread()
{
setThreadName("SessionCleaner");
std::unique_lock lock{mutex};
while (true)
{
auto interval = closeSessions(lock);
if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
break;
}
}
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
std::chrono::steady_clock::duration closeSessions(std::unique_lock<std::mutex> & lock)
{
const auto now = std::chrono::steady_clock::now();
/// The time to close the next session did not come
if (now < close_cycle_time)
return close_cycle_time - now; /// Will sleep until it comes.
const auto current_cycle = close_cycle;
++close_cycle;
close_cycle_time = now + close_interval;
if (close_times.empty())
return close_interval;
auto & sessions_to_close = close_times.front();
for (const auto & key : sessions_to_close)
{
const auto session = sessions.find(key);
if (session != sessions.end() && session->second->close_cycle <= current_cycle)
{
if (!session->second.unique())
{
/// Skip but move it to close on the next cycle.
session->second->timeout = std::chrono::steady_clock::duration{0};
scheduleCloseSession(*session->second, lock);
}
else
sessions.erase(session);
}
}
close_times.pop_front();
return close_interval;
}
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> quit{false};
ThreadFromGlobalPool thread{&NamedSessions::cleanThread, this};
};
void NamedSession::release()
{
parent.releaseSession(*this);
}
......@@ -165,28 +341,7 @@ struct ContextShared
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
{
public:
size_t operator()(const Context::SessionKey & key) const
{
SipHash hash;
hash.update(key.first);
hash.update(key.second);
return hash.get64();
}
};
using Sessions = std::unordered_map<Context::SessionKey, std::shared_ptr<Context>, SessionKeyHash>;
using CloseTimes = std::deque<std::vector<Context::SessionKey>>;
mutable Sessions sessions;
mutable CloseTimes close_times;
std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1);
std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now();
UInt64 close_cycle = 0;
std::optional<NamedSessions> named_sessions; /// Controls named HTTP sessions.
/// Clusters for distributed tables
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
......@@ -360,111 +515,17 @@ Databases Context::getDatabases()
}
Context::SessionKey Context::getSessionKey(const String & session_id) const
void Context::enableNamedSessions()
{
auto & user_name = client_info.current_user;
if (user_name.empty())
throw Exception("Empty user name.", ErrorCodes::LOGICAL_ERROR);
return SessionKey(user_name, session_id);
shared->named_sessions.emplace();
}
void Context::scheduleCloseSession(const Context::SessionKey & key, std::chrono::steady_clock::duration timeout)
std::shared_ptr<NamedSession> Context::acquireNamedSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check)
{
const UInt64 close_index = timeout / shared->close_interval + 1;
const auto new_close_cycle = shared->close_cycle + close_index;
if (!shared->named_sessions)
throw Exception("Support for named sessions is not enabled", ErrorCodes::NOT_IMPLEMENTED);
if (session_close_cycle != new_close_cycle)
{
session_close_cycle = new_close_cycle;
if (shared->close_times.size() < close_index + 1)
shared->close_times.resize(close_index + 1);
shared->close_times[close_index].emplace_back(key);
}
}
std::shared_ptr<Context> Context::acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check) const
{
auto lock = getLock();
const auto & key = getSessionKey(session_id);
auto it = shared->sessions.find(key);
if (it == shared->sessions.end())
{
if (session_check)
throw Exception("Session not found.", ErrorCodes::SESSION_NOT_FOUND);
auto new_session = std::make_shared<Context>(*this);
new_session->scheduleCloseSession(key, timeout);
it = shared->sessions.insert(std::make_pair(key, std::move(new_session))).first;
}
else if (it->second->client_info.current_user != client_info.current_user)
{
throw Exception("Session belongs to a different user", ErrorCodes::LOGICAL_ERROR);
}
const auto & session = it->second;
if (session->session_is_used)
throw Exception("Session is locked by a concurrent client.", ErrorCodes::SESSION_IS_LOCKED);
session->session_is_used = true;
session->client_info = client_info;
return session;
}
void Context::releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout)
{
auto lock = getLock();
session_is_used = false;
scheduleCloseSession(getSessionKey(session_id), timeout);
}
std::chrono::steady_clock::duration Context::closeSessions() const
{
auto lock = getLock();
const auto now = std::chrono::steady_clock::now();
if (now < shared->close_cycle_time)
return shared->close_cycle_time - now;
const auto current_cycle = shared->close_cycle;
++shared->close_cycle;
shared->close_cycle_time = now + shared->close_interval;
if (shared->close_times.empty())
return shared->close_interval;
auto & sessions_to_close = shared->close_times.front();
for (const auto & key : sessions_to_close)
{
const auto session = shared->sessions.find(key);
if (session != shared->sessions.end() && session->second->session_close_cycle <= current_cycle)
{
if (session->second->session_is_used)
session->second->scheduleCloseSession(key, std::chrono::seconds(0));
else
shared->sessions.erase(session);
}
}
shared->close_times.pop_front();
return shared->close_interval;
return shared->named_sessions->acquireSession(session_id, *this, timeout, session_check);
}
......@@ -2280,40 +2341,4 @@ void Context::resetInputCallbacks()
input_blocks_reader = {};
}
SessionCleaner::~SessionCleaner()
{
try
{
{
std::lock_guard lock{mutex};
quit = true;
}
cond.notify_one();
thread.join();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void SessionCleaner::run()
{
setThreadName("SessionCleaner");
std::unique_lock lock{mutex};
while (true)
{
auto interval = context.closeSessions();
if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
break;
}
}
}
......@@ -101,14 +101,13 @@ using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
struct NamedSession;
#if USE_EMBEDDED_COMPILER
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
#endif
/// Table -> set of table-views that make SELECT from it.
......@@ -138,6 +137,7 @@ struct IHostContext
using IHostContextPtr = std::shared_ptr<IHostContext>;
/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
......@@ -180,8 +180,7 @@ private:
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
UInt64 session_close_cycle = 0;
bool session_is_used = false;
friend class NamedSessions;
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
......@@ -423,11 +422,11 @@ public:
const Databases getDatabases() const;
Databases getDatabases();
std::shared_ptr<Context> acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check) const;
void releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout);
/// Allow to use named sessions. The thread will be run to cleanup sessions after timeout has expired.
/// The method must be called at the server startup.
void enableNamedSessions();
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
std::chrono::steady_clock::duration closeSessions() const;
std::shared_ptr<NamedSession> acquireNamedSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check);
/// For methods below you may need to acquire a lock by yourself.
std::unique_lock<std::recursive_mutex> getLock() const;
......@@ -590,9 +589,6 @@ public:
String getFormatSchemaPath() const;
void setFormatSchemaPath(const String & path);
/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;
SampleBlockCache & getSampleBlockCache() const;
/// Query parameters for prepared statements.
......@@ -635,11 +631,6 @@ private:
StoragePtr getTableImpl(const StorageID & table_id, std::optional<Exception> * exception) const;
SessionKey getSessionKey(const String & session_id) const;
/// Session will be closed after specified timeout.
void scheduleCloseSession(const SessionKey & key, std::chrono::steady_clock::duration timeout);
void checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const;
};
......@@ -672,24 +663,26 @@ private:
};
class SessionCleaner
class NamedSessions;
/// User name and session identifier. Named sessions are local to users.
using NamedSessionKey = std::pair<String, String>;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
struct NamedSession
{
public:
SessionCleaner(Context & context_)
: context{context_}
NamedSessionKey key;
UInt64 close_cycle = 0;
Context context;
std::chrono::steady_clock::duration timeout;
NamedSessions & parent;
NamedSession(NamedSessionKey key_, Context & context_, std::chrono::steady_clock::duration timeout_, NamedSessions & parent_)
: key(key_), context(context_), timeout(timeout_), parent(parent_)
{
}
~SessionCleaner();
private:
void run();
Context & context;
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> quit{false};
ThreadFromGlobalPool thread{&SessionCleaner::run, this};
void release();
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册