Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
b3a6f6b3
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b3a6f6b3
编写于
3月 06, 2020
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Revert "Simplification"
This reverts commit
f5518c0c
.
上级
49d6fec3
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
69 addition
and
29 deletion
+69
-29
dbms/programs/server/HTTPHandler.cpp
dbms/programs/server/HTTPHandler.cpp
+5
-5
dbms/src/Interpreters/Context.cpp
dbms/src/Interpreters/Context.cpp
+60
-21
dbms/src/Interpreters/Context.h
dbms/src/Interpreters/Context.h
+4
-3
未找到文件。
dbms/programs/server/HTTPHandler.cpp
浏览文件 @
b3a6f6b3
...
...
@@ -141,15 +141,15 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
}
static
uint32_t
parseSessionTimeout
(
static
std
::
chrono
::
steady_clock
::
duration
parseSessionTimeout
(
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
HTMLForm
&
params
)
{
u
int32_t
session_timeout
=
config
.
getInt
(
"default_session_timeout"
,
60
);
u
nsigned
session_timeout
=
config
.
getInt
(
"default_session_timeout"
,
60
);
if
(
params
.
has
(
"session_timeout"
))
{
u
int32_t
max_session_timeout
=
config
.
getUInt
(
"max_session_timeout"
,
3600
);
u
nsigned
max_session_timeout
=
config
.
getUInt
(
"max_session_timeout"
,
3600
);
std
::
string
session_timeout_str
=
params
.
get
(
"session_timeout"
);
ReadBufferFromString
buf
(
session_timeout_str
);
...
...
@@ -162,7 +162,7 @@ static uint32_t parseSessionTimeout(
ErrorCodes
::
INVALID_SESSION_TIMEOUT
);
}
return
s
ession_timeout
;
return
s
td
::
chrono
::
seconds
(
session_timeout
)
;
}
...
...
@@ -275,7 +275,7 @@ void HTTPHandler::processQuery(
std
::
shared_ptr
<
NamedSession
>
session
;
String
session_id
;
uint32_t
session_timeout
;
std
::
chrono
::
steady_clock
::
duration
session_timeout
;
bool
session_is_set
=
params
.
has
(
"session_id"
);
const
auto
&
config
=
server
.
config
();
...
...
dbms/src/Interpreters/Context.cpp
浏览文件 @
b3a6f6b3
...
...
@@ -124,7 +124,7 @@ public:
std
::
shared_ptr
<
NamedSession
>
acquireSession
(
const
String
&
session_id
,
Context
&
context
,
uint32_t
timeout
,
std
::
chrono
::
steady_clock
::
duration
timeout
,
bool
throw_if_not_found
)
{
std
::
unique_lock
lock
(
mutex
);
...
...
@@ -162,7 +162,7 @@ public:
void
releaseSession
(
NamedSession
&
session
)
{
std
::
unique_lock
lock
(
mutex
);
close_times
.
emplace
(
time
(
nullptr
)
+
session
.
timeout
,
session
.
key
);
scheduleCloseSession
(
session
,
lock
);
}
private:
...
...
@@ -178,11 +178,31 @@ private:
}
};
/// TODO it's very complicated. Make simple std::map with time_t or boost::multi_index.
using
Container
=
std
::
unordered_map
<
Key
,
std
::
shared_ptr
<
NamedSession
>
,
SessionKeyHash
>
;
using
CloseTimes
=
std
::
multimap
<
time_t
,
Key
>
;
using
CloseTimes
=
std
::
deque
<
std
::
vector
<
Key
>>
;
Container
sessions
;
CloseTimes
close_times
;
std
::
chrono
::
steady_clock
::
duration
close_interval
=
std
::
chrono
::
seconds
(
1
);
std
::
chrono
::
steady_clock
::
time_point
close_cycle_time
=
std
::
chrono
::
steady_clock
::
now
();
UInt64
close_cycle
=
0
;
void
scheduleCloseSession
(
NamedSession
&
session
,
std
::
unique_lock
<
std
::
mutex
>
&
)
{
/// Push it on a queue of sessions to close, on a position corresponding to the timeout.
/// (timeout is measured from current moment of time)
const
UInt64
close_index
=
session
.
timeout
/
close_interval
+
1
;
const
auto
new_close_cycle
=
close_cycle
+
close_index
;
if
(
session
.
close_cycle
!=
new_close_cycle
)
{
session
.
close_cycle
=
new_close_cycle
;
if
(
close_times
.
size
()
<
close_index
+
1
)
close_times
.
resize
(
close_index
+
1
);
close_times
[
close_index
].
emplace_back
(
session
.
key
);
}
}
void
cleanThread
()
{
...
...
@@ -192,32 +212,51 @@ private:
while
(
true
)
{
closeSessions
(
lock
);
if
(
cond
.
wait_for
(
lock
,
std
::
chrono
::
seconds
(
1
),
[
this
]()
->
bool
{
return
quit
;
}))
auto
interval
=
closeSessions
(
lock
);
if
(
cond
.
wait_for
(
lock
,
interval
,
[
this
]()
->
bool
{
return
quit
;
}))
break
;
}
}
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
void
closeSessions
(
std
::
unique_lock
<
std
::
mutex
>
&
)
std
::
chrono
::
steady_clock
::
duration
closeSessions
(
std
::
unique_lock
<
std
::
mutex
>
&
lock
)
{
time_t
now
=
time
(
nullptr
);
for
(
auto
it
=
close_times
.
begin
();
it
!=
close_times
.
end
();)
{
if
(
it
->
first
>=
now
)
break
;
const
auto
now
=
std
::
chrono
::
steady_clock
::
now
();
/// The time to close the next session did not come
if
(
now
<
close_cycle_time
)
return
close_cycle_time
-
now
;
/// Will sleep until it comes.
const
auto
current_cycle
=
close_cycle
;
const
auto
session_it
=
sessions
.
find
(
it
->
second
)
;
it
=
close_times
.
erase
(
it
)
;
++
close_cycle
;
close_cycle_time
=
now
+
close_interval
;
if
(
session_it
==
sessions
.
end
())
continue
;
if
(
close_times
.
empty
())
return
close_interval
;
if
(
session_it
->
second
.
unique
())
sessions
.
erase
(
session_it
);
else
close_times
.
emplace
(
now
+
session_it
->
second
->
timeout
,
session_it
->
second
->
key
);
/// Does not invalidate iterators.
auto
&
sessions_to_close
=
close_times
.
front
();
for
(
const
auto
&
key
:
sessions_to_close
)
{
const
auto
session
=
sessions
.
find
(
key
);
if
(
session
!=
sessions
.
end
()
&&
session
->
second
->
close_cycle
<=
current_cycle
)
{
if
(
!
session
->
second
.
unique
())
{
/// Skip but move it to close on the next cycle.
session
->
second
->
timeout
=
std
::
chrono
::
steady_clock
::
duration
{
0
};
scheduleCloseSession
(
*
session
->
second
,
lock
);
}
else
sessions
.
erase
(
session
);
}
}
close_times
.
pop_front
();
return
close_interval
;
}
std
::
mutex
mutex
;
...
...
@@ -481,7 +520,7 @@ void Context::enableNamedSessions()
shared
->
named_sessions
.
emplace
();
}
std
::
shared_ptr
<
NamedSession
>
Context
::
acquireNamedSession
(
const
String
&
session_id
,
uint32_t
timeout
,
bool
session_check
)
std
::
shared_ptr
<
NamedSession
>
Context
::
acquireNamedSession
(
const
String
&
session_id
,
std
::
chrono
::
steady_clock
::
duration
timeout
,
bool
session_check
)
{
if
(
!
shared
->
named_sessions
)
throw
Exception
(
"Support for named sessions is not enabled"
,
ErrorCodes
::
NOT_IMPLEMENTED
);
...
...
dbms/src/Interpreters/Context.h
浏览文件 @
b3a6f6b3
...
...
@@ -426,7 +426,7 @@ public:
/// The method must be called at the server startup.
void
enableNamedSessions
();
std
::
shared_ptr
<
NamedSession
>
acquireNamedSession
(
const
String
&
session_id
,
uint32_t
timeout
,
bool
session_check
);
std
::
shared_ptr
<
NamedSession
>
acquireNamedSession
(
const
String
&
session_id
,
std
::
chrono
::
steady_clock
::
duration
timeout
,
bool
session_check
);
/// For methods below you may need to acquire a lock by yourself.
std
::
unique_lock
<
std
::
recursive_mutex
>
getLock
()
const
;
...
...
@@ -672,11 +672,12 @@ using NamedSessionKey = std::pair<String, String>;
struct
NamedSession
{
NamedSessionKey
key
;
UInt64
close_cycle
=
0
;
Context
context
;
uint32_t
timeout
;
std
::
chrono
::
steady_clock
::
duration
timeout
;
NamedSessions
&
parent
;
NamedSession
(
NamedSessionKey
key_
,
Context
&
context_
,
uint32_t
timeout_
,
NamedSessions
&
parent_
)
NamedSession
(
NamedSessionKey
key_
,
Context
&
context_
,
std
::
chrono
::
steady_clock
::
duration
timeout_
,
NamedSessions
&
parent_
)
:
key
(
key_
),
context
(
context_
),
timeout
(
timeout_
),
parent
(
parent_
)
{
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录