Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
5ec00054
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5ec00054
编写于
9月 16, 2019
作者:
R
root
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
+UpdatableSessionBase
Committer: maqroll <loteroc@gmail.com>
上级
2b5a420f
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
176 addition
and
80 deletion
+176
-80
dbms/src/IO/HTTPCommon.cpp
dbms/src/IO/HTTPCommon.cpp
+3
-5
dbms/src/IO/HTTPCommon.h
dbms/src/IO/HTTPCommon.h
+3
-1
dbms/src/IO/ReadWriteBufferFromHTTP.cpp
dbms/src/IO/ReadWriteBufferFromHTTP.cpp
+1
-29
dbms/src/IO/ReadWriteBufferFromHTTP.h
dbms/src/IO/ReadWriteBufferFromHTTP.h
+167
-43
dbms/src/IO/WriteBufferFromHTTP.cpp
dbms/src/IO/WriteBufferFromHTTP.cpp
+1
-1
dbms/src/Storages/StorageURL.cpp
dbms/src/Storages/StorageURL.cpp
+1
-1
未找到文件。
dbms/src/IO/HTTPCommon.cpp
浏览文件 @
5ec00054
...
...
@@ -217,17 +217,15 @@ PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Connecti
return
HTTPSessionPool
::
instance
().
getSession
(
uri
,
timeouts
,
per_endpoint_pool_size
);
}
bool
isRedirect
(
const
Poco
::
Net
::
HTTPResponse
::
HTTPStatus
status
)
{
return
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_MOVED_PERMANENTLY
||
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_FOUND
||
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_SEE_OTHER
||
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_TEMPORARY_REDIRECT
;
}
std
::
istream
*
receiveResponse
(
Poco
::
Net
::
HTTPClientSession
&
session
,
const
Poco
::
Net
::
HTTPRequest
&
request
,
Poco
::
Net
::
HTTPResponse
&
response
)
Poco
::
Net
::
HTTPClientSession
&
session
,
const
Poco
::
Net
::
HTTPRequest
&
request
,
Poco
::
Net
::
HTTPResponse
&
response
,
const
bool
allow_redirects
)
{
auto
istr
=
&
session
.
receiveResponse
(
response
);
auto
status
=
response
.
getStatus
();
if
((
request
.
getMethod
()
==
Poco
::
Net
::
HTTPRequest
::
HTTP_GET
)
&&
(
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_MOVED_PERMANENTLY
||
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_FOUND
||
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_SEE_OTHER
||
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_TEMPORARY_REDIRECT
))
throw
Poco
::
URIRedirection
(
response
.
get
(
"Location"
));
if
(
status
!=
Poco
::
Net
::
HTTPResponse
::
HTTP_OK
)
if
(
!
(
status
==
Poco
::
Net
::
HTTPResponse
::
HTTP_OK
||
(
isRedirect
(
status
)
&&
allow_redirects
)))
{
std
::
stringstream
error_message
;
error_message
<<
"Received error from remote server "
<<
request
.
getURI
()
<<
". HTTP status code: "
<<
status
<<
" "
...
...
dbms/src/IO/HTTPCommon.h
浏览文件 @
5ec00054
...
...
@@ -50,11 +50,13 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
/// As previous method creates session, but tooks it from pool
PooledHTTPSessionPtr
makePooledHTTPSession
(
const
Poco
::
URI
&
uri
,
const
ConnectionTimeouts
&
timeouts
,
size_t
per_endpoint_pool_size
);
bool
isRedirect
(
const
Poco
::
Net
::
HTTPResponse
::
HTTPStatus
status
);
/** Used to receive response (response headers and possibly body)
* after sending data (request headers and possibly body).
* Throws exception in case of non HTTP_OK (200) response code.
* Returned istream lives in 'session' object.
*/
std
::
istream
*
receiveResponse
(
Poco
::
Net
::
HTTPClientSession
&
session
,
const
Poco
::
Net
::
HTTPRequest
&
request
,
Poco
::
Net
::
HTTPResponse
&
response
);
Poco
::
Net
::
HTTPClientSession
&
session
,
const
Poco
::
Net
::
HTTPRequest
&
request
,
Poco
::
Net
::
HTTPResponse
&
response
,
bool
allow_redirects
);
}
dbms/src/IO/ReadWriteBufferFromHTTP.cpp
浏览文件 @
5ec00054
...
...
@@ -9,33 +9,5 @@ namespace ErrorCodes
extern
const
int
TOO_MANY_REDIRECTS
;
}
std
::
unique_ptr
<
DB
::
ReadWriteBufferFromHTTP
>
makeReadWriteBufferFromHTTP
(
const
Poco
::
URI
&
uri
,
const
std
::
string
&
method
,
std
::
function
<
void
(
std
::
ostream
&
)
>
callback
,
const
DB
::
ConnectionTimeouts
&
timeouts
,
const
DB
::
SettingUInt64
max_redirects
)
{
auto
actual_uri
=
uri
;
UInt64
redirects
=
0
;
do
{
try
{
return
std
::
make_unique
<
DB
::
ReadWriteBufferFromHTTP
>
(
actual_uri
,
method
,
callback
,
timeouts
);
}
catch
(
Poco
::
URIRedirection
&
exc
)
{
redirects
++
;
actual_uri
=
exc
.
uri
();
}
}
while
(
max_redirects
>
redirects
);
// too many redirects....
std
::
stringstream
error_message
;
error_message
<<
"Too many redirects while trying to access "
<<
uri
.
toString
()
;
throw
Exception
(
error_message
.
str
(),
ErrorCodes
::
TOO_MANY_REDIRECTS
);
}
}
dbms/src/IO/ReadWriteBufferFromHTTP.h
浏览文件 @
5ec00054
...
...
@@ -27,39 +27,78 @@ namespace DB
/** Perform HTTP POST request and provide response to read.
*/
namespace
ErrorCodes
{
extern
const
int
TOO_MANY_REDIRECTS
;
}
template
<
typename
SessionPtr
>
class
UpdatableSessionBase
{
protected:
SessionPtr
session
;
UInt64
redirects
{
0
};
Poco
::
URI
initial_uri
;
const
ConnectionTimeouts
&
timeouts
;
DB
::
SettingUInt64
max_redirects
;
public:
void
buildNewSession
(
const
Poco
::
URI
&
uri
);
explicit
UpdatableSessionBase
(
const
Poco
::
URI
uri
,
const
ConnectionTimeouts
&
timeouts_
,
SettingUInt64
max_redirects_
)
:
initial_uri
{
uri
}
,
timeouts
{
timeouts_
}
,
max_redirects
{
max_redirects_
}
{
}
SessionPtr
getSession
()
{
return
session
;
}
void
updateSession
(
const
Poco
::
URI
&
uri
)
{
if
(
redirects
++<
max_redirects
)
{
buildNewSession
(
uri
);
}
else
{
std
::
stringstream
error_message
;
error_message
<<
"Too many redirects while trying to access "
<<
initial_uri
.
toString
();
throw
Exception
(
error_message
.
str
(),
ErrorCodes
::
TOO_MANY_REDIRECTS
);
}
}
};
namespace
detail
{
template
<
typename
SessionPtr
>
template
<
typename
Updatable
SessionPtr
>
class
ReadWriteBufferFromHTTPBase
:
public
ReadBuffer
{
protected:
Poco
::
URI
uri
;
std
::
string
method
;
SessionPtr
session
;
Updatable
SessionPtr
session
;
std
::
istream
*
istr
;
/// owned by session
std
::
unique_ptr
<
ReadBuffer
>
impl
;
std
::
function
<
void
(
std
::
ostream
&
)
>
out_stream_callback
;
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials
;
public:
using
OutStreamCallback
=
std
::
function
<
void
(
std
::
ostream
&
)
>
;
explicit
ReadWriteBufferFromHTTPBase
(
SessionPtr
session_
,
Poco
::
URI
uri_
,
const
std
::
string
&
method_
=
{},
OutStreamCallback
out_stream_callback
=
{},
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials
=
{},
size_t
buffer_size_
=
DBMS_DEFAULT_BUFFER_SIZE
)
:
ReadBuffer
(
nullptr
,
0
)
,
uri
{
uri_
}
,
method
{
!
method_
.
empty
()
?
method_
:
out_stream_callback
?
Poco
::
Net
::
HTTPRequest
::
HTTP_POST
:
Poco
::
Net
::
HTTPRequest
::
HTTP_GET
}
,
session
{
session_
}
protected:
std
::
istream
*
call
(
const
Poco
::
URI
uri_
,
Poco
::
Net
::
HTTPResponse
&
response
)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if
(
uri
.
getPath
().
empty
())
uri
.
setPath
(
"/"
);
Poco
::
Net
::
HTTPRequest
request
(
method
,
uri
.
getPathAndQuery
(),
Poco
::
Net
::
HTTPRequest
::
HTTP_1_1
);
request
.
setHost
(
uri
.
getHost
());
// use original, not resolved host name in header
Poco
::
Net
::
HTTPRequest
request
(
method
,
uri
_
.
getPathAndQuery
(),
Poco
::
Net
::
HTTPRequest
::
HTTP_1_1
);
request
.
setHost
(
uri
_
.
getHost
());
// use original, not resolved host name in header
if
(
out_stream_callback
)
request
.
setChunkedTransferEncoding
(
true
);
...
...
@@ -67,26 +106,70 @@ namespace detail
if
(
!
credentials
.
getUsername
().
empty
())
credentials
.
authenticate
(
request
);
Poco
::
Net
::
HTTPResponse
response
;
LOG_TRACE
((
&
Logger
::
get
(
"ReadWriteBufferFromHTTP"
)),
"Sending request to "
<<
uri
.
toString
());
try
{
auto
&
stream_out
=
session
->
sendRequest
(
request
);
auto
sess
=
session
->
getSession
();
auto
&
stream_out
=
sess
->
sendRequest
(
request
);
if
(
out_stream_callback
)
out_stream_callback
(
stream_out
);
istr
=
receiveResponse
(
*
session
,
request
,
response
);
try
{
istr
=
receiveResponse
(
*
sess
,
request
,
response
,
true
);
return
istr
;
}
catch
(
const
Poco
::
Exception
&
e
)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
sess
->
attachSessionData
(
e
.
message
());
throw
;
}
}
public:
using
OutStreamCallback
=
std
::
function
<
void
(
std
::
ostream
&
)
>
;
explicit
ReadWriteBufferFromHTTPBase
(
UpdatableSessionPtr
session_
,
Poco
::
URI
uri_
,
const
std
::
string
&
method_
=
{},
OutStreamCallback
out_stream_callback_
=
{},
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials_
=
{},
size_t
buffer_size_
=
DBMS_DEFAULT_BUFFER_SIZE
)
:
ReadBuffer
(
nullptr
,
0
)
,
uri
{
uri_
}
,
method
{
!
method_
.
empty
()
?
method_
:
out_stream_callback
?
Poco
::
Net
::
HTTPRequest
::
HTTP_POST
:
Poco
::
Net
::
HTTPRequest
::
HTTP_GET
}
,
session
{
session_
}
,
out_stream_callback
{
out_stream_callback_
}
,
credentials
{
credentials_
}
{
Poco
::
Net
::
HTTPResponse
response
;
istr
=
call
(
uri
,
response
);
while
(
isRedirect
(
response
.
getStatus
()))
{
Poco
::
URI
uri_redirect
(
response
.
get
(
"Location"
));
session
->
updateSession
(
uri_redirect
);
istr
=
call
(
uri_redirect
,
response
);
}
try
{
impl
=
std
::
make_unique
<
ReadBufferFromIStream
>
(
*
istr
,
buffer_size_
);
}
catch
(
const
Poco
::
Exception
&
e
)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
session
->
attachSessionData
(
e
.
message
());
auto
sess
=
session
->
getSession
();
sess
->
attachSessionData
(
e
.
message
());
throw
;
}
}
...
...
@@ -103,47 +186,88 @@ namespace detail
};
}
class
ReadWriteBufferFromHTTP
:
public
detail
::
ReadWriteBufferFromHTTP
Base
<
HTTPSessionPtr
>
class
UpdatableSession
:
public
UpdatableSession
Base
<
HTTPSessionPtr
>
{
using
Parent
=
detail
::
ReadWriteBufferFromHTTPBase
<
HTTPSessionPtr
>
;
using
Parent
=
UpdatableSessionBase
<
HTTPSessionPtr
>
;
public:
explicit
UpdatableSession
(
const
Poco
::
URI
uri
,
const
ConnectionTimeouts
&
timeouts_
,
const
SettingUInt64
max_redirects_
)
:
Parent
(
uri
,
timeouts_
,
max_redirects_
)
{
session
=
makeHTTPSession
(
initial_uri
,
timeouts
);
}
void
buildNewSession
(
const
Poco
::
URI
uri
)
{
session
=
makeHTTPSession
(
uri
,
timeouts
);
}
};
class
ReadWriteBufferFromHTTP
:
public
detail
::
ReadWriteBufferFromHTTPBase
<
std
::
shared_ptr
<
UpdatableSession
>>
{
using
Parent
=
detail
::
ReadWriteBufferFromHTTPBase
<
std
::
shared_ptr
<
UpdatableSession
>>
;
public:
explicit
ReadWriteBufferFromHTTP
(
Poco
::
URI
uri_
,
const
std
::
string
&
method_
=
{},
OutStreamCallback
out_stream_callback
=
{},
OutStreamCallback
out_stream_callback
_
=
{},
const
ConnectionTimeouts
&
timeouts
=
{},
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials
=
{},
const
DB
::
SettingUInt64
max_redirects
=
0
,
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials_
=
{},
size_t
buffer_size_
=
DBMS_DEFAULT_BUFFER_SIZE
)
:
Parent
(
makeHTTPSession
(
uri_
,
timeouts
),
uri_
,
method_
,
out_stream_callback
,
credentials
,
buffer_size_
)
:
Parent
(
std
::
make_shared
<
UpdatableSession
>
(
uri_
,
timeouts
,
max_redirects
),
uri_
,
method_
,
out_stream_callback_
,
credentials_
,
buffer_size_
)
{
}
};
class
UpdatablePooledSession
:
public
UpdatableSessionBase
<
PooledHTTPSessionPtr
>
{
using
Parent
=
UpdatableSessionBase
<
PooledHTTPSessionPtr
>
;
private:
size_t
per_endpoint_pool_size
;
public:
explicit
UpdatablePooledSession
(
const
Poco
::
URI
uri
,
const
ConnectionTimeouts
&
timeouts_
,
const
SettingUInt64
max_redirects_
,
size_t
per_endpoint_pool_size_
)
:
Parent
(
uri
,
timeouts_
,
max_redirects_
)
,
per_endpoint_pool_size
{
per_endpoint_pool_size_
}
{
session
=
makePooledHTTPSession
(
initial_uri
,
timeouts
,
per_endpoint_pool_size
);
}
void
buildNewSession
(
const
Poco
::
URI
uri
)
{
session
=
makePooledHTTPSession
(
uri
,
timeouts
,
per_endpoint_pool_size
);
}
};
class
PooledReadWriteBufferFromHTTP
:
public
detail
::
ReadWriteBufferFromHTTPBase
<
PooledHTTPSessionPtr
>
class
PooledReadWriteBufferFromHTTP
:
public
detail
::
ReadWriteBufferFromHTTPBase
<
std
::
shared_ptr
<
UpdatablePooledSession
>>
{
using
Parent
=
detail
::
ReadWriteBufferFromHTTPBase
<
PooledHTTPSessionPtr
>
;
using
Parent
=
detail
::
ReadWriteBufferFromHTTPBase
<
std
::
shared_ptr
<
UpdatablePooledSession
>
>
;
public:
explicit
PooledReadWriteBufferFromHTTP
(
Poco
::
URI
uri_
,
const
std
::
string
&
method_
=
{},
OutStreamCallback
out_stream_callback
=
{},
const
ConnectionTimeouts
&
timeouts
=
{},
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials
=
{},
OutStreamCallback
out_stream_callback
_
=
{},
const
ConnectionTimeouts
&
timeouts
_
=
{},
const
Poco
::
Net
::
HTTPBasicCredentials
&
credentials
_
=
{},
size_t
buffer_size_
=
DBMS_DEFAULT_BUFFER_SIZE
,
const
DB
::
SettingUInt64
max_redirects
=
0
,
size_t
max_connections_per_endpoint
=
DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT
)
:
Parent
(
makePooledHTTPSession
(
uri_
,
timeou
ts
,
max_connections_per_endpoint
),
:
Parent
(
std
::
make_shared
<
UpdatablePooledSession
>
(
uri_
,
timeouts_
,
max_redirec
ts
,
max_connections_per_endpoint
),
uri_
,
method_
,
out_stream_callback
,
credentials
,
out_stream_callback
_
,
credentials
_
,
buffer_size_
)
{
}
};
std
::
unique_ptr
<
ReadWriteBufferFromHTTP
>
makeReadWriteBufferFromHTTP
(
const
Poco
::
URI
&
uri
,
const
std
::
string
&
method
,
std
::
function
<
void
(
std
::
ostream
&
)
>
callback
,
const
ConnectionTimeouts
&
timeouts
,
const
SettingUInt64
max_redirects
);
}
dbms/src/IO/WriteBufferFromHTTP.cpp
浏览文件 @
5ec00054
...
...
@@ -22,7 +22,7 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
void
WriteBufferFromHTTP
::
finalize
()
{
receiveResponse
(
*
session
,
request
,
response
);
receiveResponse
(
*
session
,
request
,
response
,
false
);
/// TODO: Response body is ignored.
}
...
...
dbms/src/Storages/StorageURL.cpp
浏览文件 @
5ec00054
...
...
@@ -54,7 +54,7 @@ namespace
const
ConnectionTimeouts
&
timeouts
)
:
name
(
name_
)
{
read_buf
=
makeReadWriteBufferFromHTTP
(
uri
,
method
,
callback
,
timeouts
,
context
.
getSettingsRef
().
max_http_get_redirects
);
read_buf
=
std
::
make_unique
<
ReadWriteBufferFromHTTP
>
(
uri
,
method
,
callback
,
timeouts
,
context
.
getSettingsRef
().
max_http_get_redirects
);
reader
=
FormatFactory
::
instance
().
getInput
(
format
,
*
read_buf
,
sample_block
,
context
,
max_block_size
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录