Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
46127946
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
46127946
编写于
8月 28, 2020
作者:
A
Alexander Kuzmenkov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
opentelemetry context propagation
上级
2bef4062
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
244 addition
and
21 deletion
+244
-21
src/Core/Defines.h
src/Core/Defines.h
+4
-1
src/Interpreters/ClientInfo.cpp
src/Interpreters/ClientInfo.cpp
+140
-0
src/Interpreters/ClientInfo.h
src/Interpreters/ClientInfo.h
+14
-3
src/Interpreters/Context.cpp
src/Interpreters/Context.cpp
+17
-3
src/Interpreters/executeQuery.cpp
src/Interpreters/executeQuery.cpp
+24
-9
src/Interpreters/ya.make
src/Interpreters/ya.make
+1
-0
src/Server/HTTPHandler.cpp
src/Server/HTTPHandler.cpp
+21
-2
src/Server/TCPHandler.cpp
src/Server/TCPHandler.cpp
+6
-2
src/Storages/StorageURL.cpp
src/Storages/StorageURL.cpp
+17
-1
未找到文件。
src/Core/Defines.h
浏览文件 @
46127946
...
...
@@ -67,8 +67,11 @@
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54227
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 5422
6
#define DBMS_TCP_PROTOCOL_VERSION 5422
7
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
...
...
src/Interpreters/ClientInfo.cpp
浏览文件 @
46127946
...
...
@@ -60,6 +60,18 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if
(
server_protocol_revision
>=
DBMS_MIN_REVISION_WITH_VERSION_PATCH
)
writeVarUInt
(
client_version_patch
,
out
);
}
if
(
server_protocol_revision
>=
DBMS_MIN_REVISION_WITH_OPENTELEMETRY
)
{
// No point writing these numbers with variable length, because they
// are random and will probably require the full length anyway.
writeBinary
(
opentelemetry_trace_id
,
out
);
writeBinary
(
opentelemetry_span_id
,
out
);
writeBinary
(
opentelemetry_parent_span_id
,
out
);
writeBinary
(
opentelemetry_tracestate
,
out
);
writeBinary
(
opentelemetry_trace_flags
,
out
);
std
::
cerr
<<
fmt
::
format
(
"wrote {:x}, {}, {}
\n
"
,
opentelemetry_trace_id
,
opentelemetry_span_id
,
opentelemetry_parent_span_id
)
<<
StackTrace
().
toString
()
<<
std
::
endl
;
}
}
...
...
@@ -113,6 +125,17 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
else
client_version_patch
=
client_revision
;
}
if
(
client_protocol_revision
>=
DBMS_MIN_REVISION_WITH_OPENTELEMETRY
)
{
readBinary
(
opentelemetry_trace_id
,
in
);
readBinary
(
opentelemetry_span_id
,
in
);
readBinary
(
opentelemetry_parent_span_id
,
in
);
readBinary
(
opentelemetry_tracestate
,
in
);
readBinary
(
opentelemetry_trace_flags
,
in
);
std
::
cerr
<<
fmt
::
format
(
"read {:x}, {}, {}
\n
"
,
opentelemetry_trace_id
,
opentelemetry_span_id
,
opentelemetry_parent_span_id
)
<<
StackTrace
().
toString
()
<<
std
::
endl
;
}
}
...
...
@@ -123,6 +146,123 @@ void ClientInfo::setInitialQuery()
client_name
=
(
DBMS_NAME
" "
)
+
client_name
;
}
template
<
typename
T
>
bool
readLowercaseHexDigits
(
const
char
*&
begin
,
const
char
*
end
,
T
&
dest_value
,
std
::
string
&
error
)
{
char
*
dest_begin
=
reinterpret_cast
<
char
*>
(
&
dest_value
);
char
*
dest_end
=
dest_begin
+
sizeof
(
dest_value
);
bool
odd_character
=
true
;
for
(;;)
{
if
(
begin
==
end
)
{
if
(
dest_begin
==
dest_end
)
{
return
true
;
}
error
=
fmt
::
format
(
"Not enough charaters in the input, got {}, need {} more"
,
end
-
begin
,
dest_end
-
dest_begin
);
return
false
;
}
if
(
dest_begin
==
dest_end
)
{
return
true
;
}
int
cur
=
0
;
if
(
*
begin
>=
'0'
&&
*
begin
<=
'9'
)
{
cur
=
*
begin
-
'0'
;
}
else
if
(
*
begin
>=
'a'
&&
*
begin
<=
'f'
)
{
cur
=
10
+
*
begin
-
'a'
;
}
else
{
error
=
fmt
::
format
(
"Encountered '{}' which is not a lowercase hexadecimal digit"
,
*
begin
);
return
false
;
}
// Two characters per byte, little-endian.
if
(
odd_character
)
{
*
(
dest_end
-
1
)
=
cur
;
}
else
{
*
(
dest_end
-
1
)
=
*
(
dest_end
-
1
)
<<
8
|
cur
;
--
dest_end
;
}
begin
++
;
odd_character
=
!
odd_character
;
}
}
bool
ClientInfo
::
setOpenTelemetryTraceparent
(
const
std
::
string
&
traceparent
,
std
::
string
&
error
)
{
uint8_t
version
=
-
1
;
__uint128_t
trace_id
=
0
;
uint64_t
trace_parent
=
0
;
uint8_t
trace_flags
=
0
;
const
char
*
begin
=
&
traceparent
[
0
];
const
char
*
end
=
begin
+
traceparent
.
length
();
#define CHECK_CONDITION(condition, ...) \
((condition) || (error = fmt::format(__VA_ARGS__), false))
#define CHECK_DELIMITER \
(begin >= end \
? (error = fmt::format( \
"Expected '-' delimiter, got EOL at position {}", \
begin - &traceparent[0]), \
false) \
: *begin != '-' \
? (error = fmt::format( \
"Expected '-' delimiter, got '{}' at position {}", \
*begin, begin - &traceparent[0]), \
false) \
: (++begin, true))
bool
result
=
readLowercaseHexDigits
(
begin
,
end
,
version
,
error
)
&&
CHECK_CONDITION
(
version
==
0
,
"Expected version 00, got {}"
,
version
)
&&
CHECK_DELIMITER
&&
readLowercaseHexDigits
(
begin
,
end
,
trace_id
,
error
)
&&
CHECK_DELIMITER
&&
readLowercaseHexDigits
(
begin
,
end
,
trace_parent
,
error
)
&&
CHECK_DELIMITER
&&
readLowercaseHexDigits
(
begin
,
end
,
trace_flags
,
error
)
&&
CHECK_CONDITION
(
begin
==
end
,
"Expected end of string, got {} at position {}"
,
*
begin
,
end
-
begin
);
#undef CHECK
#undef CHECK_DELIMITER
if
(
!
result
)
{
return
false
;
}
opentelemetry_trace_id
=
trace_id
;
opentelemetry_parent_span_id
=
trace_parent
;
opentelemetry_trace_flags
=
trace_flags
;
return
true
;
}
std
::
string
ClientInfo
::
getOpenTelemetryTraceparentForChild
()
const
{
// This span is a parent for its children (so deep...), so we specify
// this span_id as a parent id.
return
fmt
::
format
(
"00-{:032x}-{:016x}-{:02x}"
,
opentelemetry_trace_id
,
opentelemetry_span_id
,
// This cast is because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast
<
uint8_t
>
(
opentelemetry_trace_flags
));
}
void
ClientInfo
::
fillOSUserHostNameAndVersionInfo
()
{
...
...
src/Interpreters/ClientInfo.h
浏览文件 @
46127946
...
...
@@ -59,9 +59,17 @@ public:
String
initial_query_id
;
Poco
::
Net
::
SocketAddress
initial_address
;
__uint128_t
trace_id
;
UInt64
span_id
;
UInt64
parent_span_id
;
// OpenTelemetry things
__uint128_t
opentelemetry_trace_id
=
0
;
// Span ID is not strictly the client info, but convenient to keep here.
// The span id we get the in the incoming client info becomes our parent span
// id, and the span id we send becomes downstream parent span id.
UInt64
opentelemetry_span_id
=
0
;
UInt64
opentelemetry_parent_span_id
=
0
;
// the incoming tracestate header, we just pass it downstream.
// https://www.w3.org/TR/trace-context/
String
opentelemetry_tracestate
;
UInt8
opentelemetry_trace_flags
;
/// All below are parameters related to initial query.
...
...
@@ -95,6 +103,9 @@ public:
/// Initialize parameters on client initiating query.
void
setInitialQuery
();
bool
setOpenTelemetryTraceparent
(
const
std
::
string
&
traceparent
,
std
::
string
&
error
);
std
::
string
getOpenTelemetryTraceparentForChild
()
const
;
private:
void
fillOSUserHostNameAndVersionInfo
();
};
...
...
src/Interpreters/Context.cpp
浏览文件 @
46127946
...
...
@@ -1093,9 +1093,23 @@ void Context::setCurrentQueryId(const String & query_id)
random
.
words
.
a
=
thread_local_rng
();
//-V656
random
.
words
.
b
=
thread_local_rng
();
//-V656
client_info
.
trace_id
=
random
.
uuid
;
client_info
.
span_id
=
1
;
client_info
.
parent_span_id
=
0
;
fmt
::
print
(
stderr
,
"traceid {}, ==0 {}
\n
"
,
client_info
.
opentelemetry_trace_id
,
client_info
.
opentelemetry_trace_id
==
0
);
if
(
client_info
.
opentelemetry_trace_id
==
0
)
{
// If trace_id is not initialized, it means that this is an initial query
// without any parent OpenTelemetry trace. Use the randomly generated
// default query id as the new trace id.
client_info
.
opentelemetry_trace_id
=
random
.
uuid
;
client_info
.
opentelemetry_parent_span_id
=
0
;
client_info
.
opentelemetry_span_id
=
thread_local_rng
();
}
else
{
// The incoming span id becomes our parent span id.
client_info
.
opentelemetry_parent_span_id
=
client_info
.
opentelemetry_span_id
;
client_info
.
opentelemetry_span_id
=
thread_local_rng
();
}
fmt
::
print
(
stderr
,
"traceid {}, ==0 {}
\n
{}
\n
"
,
client_info
.
opentelemetry_trace_id
,
client_info
.
opentelemetry_trace_id
==
0
,
StackTrace
().
toString
());
String
query_id_to_set
=
query_id
;
if
(
query_id_to_set
.
empty
())
/// If the user did not submit his query_id, then we generate it ourselves.
...
...
src/Interpreters/executeQuery.cpp
浏览文件 @
46127946
...
...
@@ -149,9 +149,11 @@ static void logQuery(const String & query, const Context & context, bool interna
joinLines
(
query
));
LOG_TRACE
(
&
Poco
::
Logger
::
get
(
"executeQuery"
),
"OpenTelemetry trace id {:x}, span id {:x}, parent span id {:x}"
,
context
.
getClientInfo
().
trace_id
,
context
.
getClientInfo
().
span_id
,
context
.
getClientInfo
().
parent_span_id
);
"OpenTelemetry trace id {:x}, span id {}, parent span id {}"
,
context
.
getClientInfo
().
opentelemetry_trace_id
,
context
.
getClientInfo
().
opentelemetry_span_id
,
context
.
getClientInfo
().
opentelemetry_parent_span_id
);
std
::
cerr
<<
StackTrace
().
toString
()
<<
std
::
endl
;
}
}
...
...
@@ -225,9 +227,9 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
if
(
auto
opentelemetry_log
=
context
.
getOpenTelemetryLog
())
{
OpenTelemetrySpanLogElement
span
;
span
.
trace_id
=
context
.
getClientInfo
().
trace_id
;
span
.
span_id
=
context
.
getClientInfo
().
span_id
;
span
.
parent_span_id
=
context
.
getClientInfo
().
parent_span_id
;
span
.
trace_id
=
context
.
getClientInfo
().
opentelemetry_
trace_id
;
span
.
span_id
=
context
.
getClientInfo
().
opentelemetry_
span_id
;
span
.
parent_span_id
=
context
.
getClientInfo
().
opentelemetry_
parent_span_id
;
span
.
operation_name
=
"query"
;
span
.
start_time
=
current_time
;
span
.
finish_time
=
current_time
;
...
...
@@ -242,6 +244,13 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
span
.
attribute_names
.
push_back
(
"query_id"
);
span
.
attribute_values
.
push_back
(
elem
.
client_info
.
current_query_id
);
if
(
!
context
.
getClientInfo
().
opentelemetry_tracestate
.
empty
())
{
span
.
attribute_names
.
push_back
(
"tracestate"
);
span
.
attribute_values
.
push_back
(
context
.
getClientInfo
().
opentelemetry_tracestate
);
}
opentelemetry_log
->
add
(
span
);
}
...
...
@@ -617,9 +626,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if
(
auto
opentelemetry_log
=
context
.
getOpenTelemetryLog
())
{
OpenTelemetrySpanLogElement
span
;
span
.
trace_id
=
context
.
getClientInfo
().
trace_id
;
span
.
span_id
=
context
.
getClientInfo
().
span_id
;
span
.
parent_span_id
=
context
.
getClientInfo
().
parent_span_id
;
span
.
trace_id
=
context
.
getClientInfo
().
opentelemetry_
trace_id
;
span
.
span_id
=
context
.
getClientInfo
().
opentelemetry_
span_id
;
span
.
parent_span_id
=
context
.
getClientInfo
().
opentelemetry_
parent_span_id
;
span
.
operation_name
=
"query"
;
span
.
start_time
=
elem
.
query_start_time
;
span
.
finish_time
=
time
(
nullptr
);
// current time
...
...
@@ -633,6 +642,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
span
.
attribute_names
.
push_back
(
"query_id"
);
span
.
attribute_values
.
push_back
(
elem
.
client_info
.
current_query_id
);
if
(
!
context
.
getClientInfo
().
opentelemetry_tracestate
.
empty
())
{
span
.
attribute_names
.
push_back
(
"tracestate"
);
span
.
attribute_values
.
push_back
(
context
.
getClientInfo
().
opentelemetry_tracestate
);
}
opentelemetry_log
->
add
(
span
);
}
...
...
src/Interpreters/ya.make
浏览文件 @
46127946
...
...
@@ -113,6 +113,7 @@ SRCS(
MutationsInterpreter.cpp
MySQL/InterpretersMySQLDDLQuery.cpp
NullableUtils.cpp
OpenTelemetryLog.cpp
OptimizeIfChains.cpp
OptimizeIfWithConstantConditionVisitor.cpp
PartLog.cpp
...
...
src/Server/HTTPHandler.cpp
浏览文件 @
46127946
...
...
@@ -96,6 +96,7 @@ namespace ErrorCodes
extern
const
int
WRONG_PASSWORD
;
extern
const
int
REQUIRED_PASSWORD
;
extern
const
int
BAD_REQUEST_PARAMETER
;
extern
const
int
INVALID_SESSION_TIMEOUT
;
extern
const
int
HTTP_LENGTH_REQUIRED
;
}
...
...
@@ -279,9 +280,7 @@ void HTTPHandler::processQuery(
}
}
std
::
string
query_id
=
params
.
get
(
"query_id"
,
""
);
context
.
setUser
(
user
,
password
,
request
.
clientAddress
());
context
.
setCurrentQueryId
(
query_id
);
if
(
!
quota_key
.
empty
())
context
.
setQuotaKey
(
quota_key
);
...
...
@@ -311,6 +310,26 @@ void HTTPHandler::processQuery(
session
->
release
();
});
std
::
string
query_id
=
params
.
get
(
"query_id"
,
""
);
context
.
setCurrentQueryId
(
query_id
);
if
(
request
.
has
(
"traceparent"
))
{
std
::
string
opentelemetry_traceparent
=
request
.
get
(
"traceparent"
);
std
::
string
error
;
if
(
!
context
.
getClientInfo
().
setOpenTelemetryTraceparent
(
opentelemetry_traceparent
,
error
))
{
throw
Exception
(
ErrorCodes
::
BAD_REQUEST_PARAMETER
,
"Failed to parse OpenTelemetry traceparent header '{}': {}"
,
opentelemetry_traceparent
,
error
);
}
context
.
getClientInfo
().
opentelemetry_tracestate
=
request
.
get
(
"tracestate"
,
""
);
}
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String
http_response_compression_methods
=
request
.
get
(
"Accept-Encoding"
,
""
);
CompressionMethod
http_response_compression_method
=
CompressionMethod
::
None
;
...
...
src/Server/TCPHandler.cpp
浏览文件 @
46127946
...
...
@@ -845,13 +845,17 @@ void TCPHandler::receiveQuery()
state
.
is_empty
=
false
;
readStringBinary
(
state
.
query_id
,
*
in
);
query_context
->
setCurrentQueryId
(
state
.
query_id
);
/// Client info
ClientInfo
&
client_info
=
query_context
->
getClientInfo
();
if
(
client_revision
>=
DBMS_MIN_REVISION_WITH_CLIENT_INFO
)
client_info
.
read
(
*
in
,
client_revision
);
// It is convenient to generate default OpenTelemetry trace id and default
// query id together. ClientInfo might contain upstream trace id, so we
// decide whether to use the default ids after we have received the ClientInfo.
// We also set up the parent span id while we're at it.
query_context
->
setCurrentQueryId
(
state
.
query_id
);
/// For better support of old clients, that does not send ClientInfo.
if
(
client_info
.
query_kind
==
ClientInfo
::
QueryKind
::
NO_QUERY
)
{
...
...
src/Storages/StorageURL.cpp
浏览文件 @
46127946
...
...
@@ -67,6 +67,22 @@ namespace
const
CompressionMethod
compression_method
)
:
SourceWithProgress
(
sample_block
),
name
(
std
::
move
(
name_
))
{
ReadWriteBufferFromHTTP
::
HTTPHeaderEntries
header
;
// Propagate OpenTelemetry trace context, if any, downstream.
auto
&
client_info
=
context
.
getClientInfo
();
if
(
client_info
.
opentelemetry_trace_id
)
{
header
.
emplace_back
(
"traceparent"
,
client_info
.
getOpenTelemetryTraceparentForChild
());
if
(
!
client_info
.
opentelemetry_tracestate
.
empty
())
{
header
.
emplace_back
(
"tracestate"
,
client_info
.
opentelemetry_tracestate
);
}
}
read_buf
=
wrapReadBufferWithCompressionMethod
(
std
::
make_unique
<
ReadWriteBufferFromHTTP
>
(
uri
,
...
...
@@ -76,7 +92,7 @@ namespace
context
.
getSettingsRef
().
max_http_get_redirects
,
Poco
::
Net
::
HTTPBasicCredentials
{},
DBMS_DEFAULT_BUFFER_SIZE
,
ReadWriteBufferFromHTTP
::
HTTPHeaderEntries
{}
,
header
,
context
.
getRemoteHostFilter
()),
compression_method
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录