Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
f54435e7
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f54435e7
编写于
5月 18, 2020
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix clang-tidy
上级
b0a5ce77
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
99 addition
and
78 deletion
+99
-78
programs/benchmark/Benchmark.cpp
programs/benchmark/Benchmark.cpp
+1
-1
programs/client/Client.cpp
programs/client/Client.cpp
+2
-2
programs/client/ConnectionParameters.cpp
programs/client/ConnectionParameters.cpp
+1
-1
programs/copier/ClusterCopier.cpp
programs/copier/ClusterCopier.cpp
+2
-2
programs/copier/ClusterCopierApp.cpp
programs/copier/ClusterCopierApp.cpp
+1
-1
programs/copier/Internals.cpp
programs/copier/Internals.cpp
+2
-2
programs/obfuscator/Obfuscator.cpp
programs/obfuscator/Obfuscator.cpp
+2
-2
programs/server/HTTPHandler.cpp
programs/server/HTTPHandler.cpp
+2
-2
programs/server/HTTPHandlerFactory.cpp
programs/server/HTTPHandlerFactory.cpp
+78
-57
programs/server/ReplicasStatusHandler.cpp
programs/server/ReplicasStatusHandler.cpp
+1
-1
src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
+6
-6
utils/zookeeper-adjust-block-numbers-to-parts/main.cpp
utils/zookeeper-adjust-block-numbers-to-parts/main.cpp
+1
-1
未找到文件。
programs/benchmark/Benchmark.cpp
浏览文件 @
f54435e7
...
...
@@ -289,7 +289,7 @@ private:
connection_entries
.
emplace_back
(
std
::
make_shared
<
Entry
>
(
connection
->
get
(
ConnectionTimeouts
::
getTCPTimeoutsWithoutFailover
(
settings
))));
pool
.
scheduleOrThrowOnError
(
std
::
bind
(
&
Benchmark
::
thread
,
this
,
connection_entries
)
);
pool
.
scheduleOrThrowOnError
(
[
this
,
connection_entries
]()
mutable
{
thread
(
connection_entries
);
}
);
}
}
catch
(...)
...
...
programs/client/Client.cpp
浏览文件 @
f54435e7
...
...
@@ -485,7 +485,7 @@ private:
history_file
=
config
().
getString
(
"history_file"
);
else
{
auto
history_file_from_env
=
getenv
(
"CLICKHOUSE_HISTORY_FILE"
);
auto
*
history_file_from_env
=
getenv
(
"CLICKHOUSE_HISTORY_FILE"
);
if
(
history_file_from_env
)
history_file
=
history_file_from_env
;
else
if
(
!
home_path
.
empty
())
...
...
@@ -1480,7 +1480,7 @@ private:
"
\033
[1m↗
\033
[0m"
,
};
auto
indicator
=
indicators
[
increment
%
8
];
const
char
*
indicator
=
indicators
[
increment
%
8
];
if
(
!
send_logs
&&
written_progress_chars
)
message
<<
'\r'
;
...
...
programs/client/ConnectionParameters.cpp
浏览文件 @
f54435e7
...
...
@@ -51,7 +51,7 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
{
std
::
string
prompt
{
"Password for user ("
+
user
+
"): "
};
char
buf
[
1000
]
=
{};
if
(
auto
result
=
readpassphrase
(
prompt
.
c_str
(),
buf
,
sizeof
(
buf
),
0
))
if
(
auto
*
result
=
readpassphrase
(
prompt
.
c_str
(),
buf
,
sizeof
(
buf
),
0
))
password
=
result
;
}
...
...
programs/copier/ClusterCopier.cpp
浏览文件 @
f54435e7
...
...
@@ -442,7 +442,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
/// Collect all shards that contain partition piece number piece_number.
Strings
piece_status_paths
;
for
(
auto
&
shard
:
shards_with_partition
)
for
(
const
auto
&
shard
:
shards_with_partition
)
{
ShardPartition
&
task_shard_partition
=
shard
->
partition_tasks
.
find
(
partition_name
)
->
second
;
ShardPartitionPiece
&
shard_partition_piece
=
task_shard_partition
.
pieces
[
piece_number
];
...
...
@@ -702,7 +702,7 @@ ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast
auto
new_columns_list
=
std
::
make_shared
<
ASTColumns
>
();
new_columns_list
->
set
(
new_columns_list
->
columns
,
new_columns
);
if
(
auto
indices
=
query_ast
->
as
<
ASTCreateQuery
>
()
->
columns_list
->
indices
)
if
(
const
auto
*
indices
=
query_ast
->
as
<
ASTCreateQuery
>
()
->
columns_list
->
indices
)
new_columns_list
->
set
(
new_columns_list
->
indices
,
indices
->
clone
());
new_query
.
replace
(
new_query
.
columns_list
,
new_columns_list
);
...
...
programs/copier/ClusterCopierApp.cpp
浏览文件 @
f54435e7
...
...
@@ -94,7 +94,7 @@ void ClusterCopierApp::mainImpl()
StatusFile
status_file
(
process_path
+
"/status"
);
ThreadStatus
thread_status
;
auto
log
=
&
logger
();
auto
*
log
=
&
logger
();
LOG_INFO
(
log
,
"Starting clickhouse-copier ("
<<
"id "
<<
process_id
<<
", "
<<
"host_id "
<<
host_id
<<
", "
...
...
programs/copier/Internals.cpp
浏览文件 @
f54435e7
...
...
@@ -260,7 +260,7 @@ ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std
return
res
;
res
.
is_remote
=
1
;
for
(
auto
&
replica
:
replicas
)
for
(
const
auto
&
replica
:
replicas
)
{
if
(
isLocalAddress
(
DNSResolver
::
instance
().
resolveHost
(
replica
.
host_name
)))
{
...
...
@@ -270,7 +270,7 @@ ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std
}
res
.
hostname_difference
=
std
::
numeric_limits
<
size_t
>::
max
();
for
(
auto
&
replica
:
replicas
)
for
(
const
auto
&
replica
:
replicas
)
{
size_t
difference
=
getHostNameDifference
(
local_hostname
,
replica
.
host_name
);
res
.
hostname_difference
=
std
::
min
(
difference
,
res
.
hostname_difference
);
...
...
programs/obfuscator/Obfuscator.cpp
浏览文件 @
f54435e7
...
...
@@ -937,10 +937,10 @@ public:
if
(
typeid_cast
<
const
DataTypeFixedString
*>
(
&
data_type
))
return
std
::
make_unique
<
FixedStringModel
>
(
seed
);
if
(
auto
type
=
typeid_cast
<
const
DataTypeArray
*>
(
&
data_type
))
if
(
const
auto
*
type
=
typeid_cast
<
const
DataTypeArray
*>
(
&
data_type
))
return
std
::
make_unique
<
ArrayModel
>
(
get
(
*
type
->
getNestedType
(),
seed
,
markov_model_params
));
if
(
auto
type
=
typeid_cast
<
const
DataTypeNullable
*>
(
&
data_type
))
if
(
const
auto
*
type
=
typeid_cast
<
const
DataTypeNullable
*>
(
&
data_type
))
return
std
::
make_unique
<
NullableModel
>
(
get
(
*
type
->
getNestedType
(),
seed
,
markov_model_params
));
throw
Exception
(
"Unsupported data type"
,
ErrorCodes
::
NOT_IMPLEMENTED
);
...
...
programs/server/HTTPHandler.cpp
浏览文件 @
f54435e7
...
...
@@ -195,7 +195,7 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
std
::
vector
<
ReadBufferPtr
>
read_buffers
;
std
::
vector
<
ReadBuffer
*>
read_buffers_raw_ptr
;
auto
cascade_buffer
=
typeid_cast
<
CascadeWriteBuffer
*>
(
used_output
.
out_maybe_delayed_and_compressed
.
get
());
auto
*
cascade_buffer
=
typeid_cast
<
CascadeWriteBuffer
*>
(
used_output
.
out_maybe_delayed_and_compressed
.
get
());
if
(
!
cascade_buffer
)
throw
Exception
(
"Expected CascadeWriteBuffer"
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
@@ -383,7 +383,7 @@ void HTTPHandler::processQuery(
{
auto
push_memory_buffer_and_continue
=
[
next_buffer
=
used_output
.
out_maybe_compressed
]
(
const
WriteBufferPtr
&
prev_buf
)
{
auto
prev_memory_buffer
=
typeid_cast
<
MemoryWriteBuffer
*>
(
prev_buf
.
get
());
auto
*
prev_memory_buffer
=
typeid_cast
<
MemoryWriteBuffer
*>
(
prev_buf
.
get
());
if
(
!
prev_memory_buffer
)
throw
Exception
(
"Expected MemoryWriteBuffer"
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
programs/server/HTTPHandlerFactory.cpp
浏览文件 @
f54435e7
...
...
@@ -28,7 +28,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string &
{
}
Poco
::
Net
::
HTTPRequestHandler
*
HTTPRequestHandlerFactoryMain
::
createRequestHandler
(
const
Poco
::
Net
::
HTTPServerRequest
&
request
)
// override
Poco
::
Net
::
HTTPRequestHandler
*
HTTPRequestHandlerFactoryMain
::
createRequestHandler
(
const
Poco
::
Net
::
HTTPServerRequest
&
request
)
{
LOG_TRACE
(
log
,
"HTTP Request for "
<<
name
<<
". "
<<
"Method: "
<<
request
.
getMethod
()
...
...
@@ -40,7 +40,7 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand
for
(
auto
&
handler_factory
:
child_factories
)
{
auto
handler
=
handler_factory
->
createRequestHandler
(
request
);
auto
*
handler
=
handler_factory
->
createRequestHandler
(
request
);
if
(
handler
!=
nullptr
)
return
handler
;
}
...
...
@@ -72,80 +72,96 @@ HTTPRequestHandlerFactoryMain::TThis * HTTPRequestHandlerFactoryMain::addHandler
static
inline
auto
createHandlersFactoryFromConfig
(
IServer
&
server
,
const
std
::
string
&
name
,
const
String
&
prefix
)
{
auto
main_handler_factory
=
new
HTTPRequestHandlerFactoryMain
(
name
);
auto
main_handler_factory
=
std
::
make_unique
<
HTTPRequestHandlerFactoryMain
>
(
name
);
try
{
Poco
::
Util
::
AbstractConfiguration
::
Keys
keys
;
server
.
config
().
keys
(
prefix
,
keys
);
Poco
::
Util
::
AbstractConfiguration
::
Keys
keys
;
server
.
config
().
keys
(
prefix
,
keys
);
for
(
const
auto
&
key
:
keys
)
{
if
(
!
startsWith
(
key
,
"rule"
))
throw
Exception
(
"Unknown element in config: "
+
prefix
+
"."
+
key
+
", must be 'rule'"
,
ErrorCodes
::
UNKNOWN_ELEMENT_IN_CONFIG
);
const
auto
&
handler_type
=
server
.
config
().
getString
(
prefix
+
"."
+
key
+
".handler.type"
,
""
);
if
(
handler_type
==
"static"
)
main_handler_factory
->
addHandler
(
createStaticHandlerFactory
(
server
,
prefix
+
"."
+
key
));
else
if
(
handler_type
==
"dynamic_query_handler"
)
main_handler_factory
->
addHandler
(
createDynamicHandlerFactory
(
server
,
prefix
+
"."
+
key
));
else
if
(
handler_type
==
"predefined_query_handler"
)
main_handler_factory
->
addHandler
(
createPredefinedHandlerFactory
(
server
,
prefix
+
"."
+
key
));
else
if
(
handler_type
.
empty
())
throw
Exception
(
"Handler type in config is not specified here: "
+
prefix
+
"."
+
key
+
".handler.type"
,
ErrorCodes
::
INVALID_CONFIG_PARAMETER
);
else
throw
Exception
(
"Unknown handler type '"
+
handler_type
+
"' in config here: "
+
prefix
+
"."
+
key
+
".handler.type"
,
ErrorCodes
::
INVALID_CONFIG_PARAMETER
);
}
return
main_handler_factory
;
}
catch
(...)
for
(
const
auto
&
key
:
keys
)
{
delete
main_handler_factory
;
throw
;
if
(
!
startsWith
(
key
,
"rule"
))
throw
Exception
(
"Unknown element in config: "
+
prefix
+
"."
+
key
+
", must be 'rule'"
,
ErrorCodes
::
UNKNOWN_ELEMENT_IN_CONFIG
);
const
auto
&
handler_type
=
server
.
config
().
getString
(
prefix
+
"."
+
key
+
".handler.type"
,
""
);
if
(
handler_type
==
"static"
)
main_handler_factory
->
addHandler
(
createStaticHandlerFactory
(
server
,
prefix
+
"."
+
key
));
else
if
(
handler_type
==
"dynamic_query_handler"
)
main_handler_factory
->
addHandler
(
createDynamicHandlerFactory
(
server
,
prefix
+
"."
+
key
));
else
if
(
handler_type
==
"predefined_query_handler"
)
main_handler_factory
->
addHandler
(
createPredefinedHandlerFactory
(
server
,
prefix
+
"."
+
key
));
else
if
(
handler_type
.
empty
())
throw
Exception
(
"Handler type in config is not specified here: "
+
prefix
+
"."
+
key
+
".handler.type"
,
ErrorCodes
::
INVALID_CONFIG_PARAMETER
);
else
throw
Exception
(
"Unknown handler type '"
+
handler_type
+
"' in config here: "
+
prefix
+
"."
+
key
+
".handler.type"
,
ErrorCodes
::
INVALID_CONFIG_PARAMETER
);
}
return
main_handler_factory
.
release
();
}
static
const
auto
ping_response_expression
=
"Ok.
\n
"
;
static
const
auto
root_response_expression
=
"config://http_server_default_response"
;
static
inline
Poco
::
Net
::
HTTPRequestHandlerFactory
*
createHTTPHandlerFactory
(
IServer
&
server
,
const
std
::
string
&
name
,
AsynchronousMetrics
&
async_metrics
)
static
inline
Poco
::
Net
::
HTTPRequestHandlerFactory
*
createHTTPHandlerFactory
(
IServer
&
server
,
const
std
::
string
&
name
,
AsynchronousMetrics
&
async_metrics
)
{
if
(
server
.
config
().
has
(
"http_handlers"
))
return
createHandlersFactoryFromConfig
(
server
,
name
,
"http_handlers"
);
else
{
auto
factory
=
(
new
HTTPRequestHandlerFactoryMain
(
name
))
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>
(
server
,
root_response_expression
))
->
attachStrictPath
(
"/"
)
->
allowGetAndHeadRequest
())
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>
(
server
,
ping_response_expression
))
->
attachStrictPath
(
"/ping"
)
->
allowGetAndHeadRequest
())
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
ReplicasStatusHandler
>
(
server
))
->
attachNonStrictPath
(
"/replicas_status"
)
->
allowGetAndHeadRequest
())
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
DynamicQueryHandler
>
(
server
,
"query"
))
->
allowPostAndGetParamsRequest
());
auto
factory
=
std
::
make_unique
<
HTTPRequestHandlerFactoryMain
>
(
name
);
auto
root_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>>
(
server
,
root_response_expression
);
root_handler
->
attachStrictPath
(
"/"
)
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
root_handler
.
release
());
auto
ping_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>>
(
server
,
ping_response_expression
);
ping_handler
->
attachStrictPath
(
"/ping"
)
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
ping_handler
.
release
());
auto
replicas_status_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
ReplicasStatusHandler
>>
(
server
);
replicas_status_handler
->
attachNonStrictPath
(
"/replicas_status"
)
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
replicas_status_handler
.
release
());
auto
query_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
DynamicQueryHandler
>>
(
server
,
"query"
);
query_handler
->
allowPostAndGetParamsRequest
();
factory
->
addHandler
(
query_handler
.
release
());
if
(
server
.
config
().
has
(
"prometheus"
)
&&
server
.
config
().
getInt
(
"prometheus.port"
,
0
)
==
0
)
factory
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
PrometheusRequestHandler
>
(
server
,
PrometheusMetricsWriter
(
server
.
config
(),
"prometheus"
,
async_metrics
)))
->
attachStrictPath
(
server
.
config
().
getString
(
"prometheus.endpoint"
,
"/metrics"
))
->
allowGetAndHeadRequest
());
{
auto
prometheus_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
PrometheusRequestHandler
>>
(
server
,
PrometheusMetricsWriter
(
server
.
config
(),
"prometheus"
,
async_metrics
));
prometheus_handler
->
attachStrictPath
(
server
.
config
().
getString
(
"prometheus.endpoint"
,
"/metrics"
))
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
prometheus_handler
.
release
());
}
return
factory
;
return
factory
.
release
()
;
}
}
static
inline
Poco
::
Net
::
HTTPRequestHandlerFactory
*
createInterserverHTTPHandlerFactory
(
IServer
&
server
,
const
std
::
string
&
name
)
{
return
(
new
HTTPRequestHandlerFactoryMain
(
name
))
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>
(
server
,
root_response_expression
))
->
attachStrictPath
(
"/"
)
->
allowGetAndHeadRequest
())
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>
(
server
,
ping_response_expression
))
->
attachStrictPath
(
"/ping"
)
->
allowGetAndHeadRequest
())
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
ReplicasStatusHandler
>
(
server
))
->
attachNonStrictPath
(
"/replicas_status"
)
->
allowGetAndHeadRequest
())
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
InterserverIOHTTPHandler
>
(
server
))
->
allowPostAndGetParamsRequest
());
auto
factory
=
std
::
make_unique
<
HTTPRequestHandlerFactoryMain
>
(
name
);
auto
root_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>>
(
server
,
root_response_expression
);
root_handler
->
attachStrictPath
(
"/"
)
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
root_handler
.
release
());
auto
ping_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
StaticRequestHandler
>>
(
server
,
ping_response_expression
);
ping_handler
->
attachStrictPath
(
"/ping"
)
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
ping_handler
.
release
());
auto
replicas_status_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
ReplicasStatusHandler
>>
(
server
);
replicas_status_handler
->
attachNonStrictPath
(
"/replicas_status"
)
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
replicas_status_handler
.
release
());
auto
main_handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
InterserverIOHTTPHandler
>>
(
server
);
main_handler
->
allowPostAndGetParamsRequest
();
factory
->
addHandler
(
main_handler
.
release
());
return
factory
.
release
();
}
Poco
::
Net
::
HTTPRequestHandlerFactory
*
createHandlerFactory
(
IServer
&
server
,
AsynchronousMetrics
&
async_metrics
,
const
std
::
string
&
name
)
...
...
@@ -155,9 +171,14 @@ Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, As
else
if
(
name
==
"InterserverIOHTTPHandler-factory"
||
name
==
"InterserverIOHTTPSHandler-factory"
)
return
createInterserverHTTPHandlerFactory
(
server
,
name
);
else
if
(
name
==
"PrometheusHandler-factory"
)
return
(
new
HTTPRequestHandlerFactoryMain
(
name
))
->
addHandler
((
new
HandlingRuleHTTPHandlerFactory
<
PrometheusRequestHandler
>
(
server
,
PrometheusMetricsWriter
(
server
.
config
(),
"prometheus"
,
async_metrics
)))
->
attachStrictPath
(
server
.
config
().
getString
(
"prometheus.endpoint"
,
"/metrics"
))
->
allowGetAndHeadRequest
());
{
auto
factory
=
std
::
make_unique
<
HTTPRequestHandlerFactoryMain
>
(
name
);
auto
handler
=
std
::
make_unique
<
HandlingRuleHTTPHandlerFactory
<
PrometheusRequestHandler
>>
(
server
,
PrometheusMetricsWriter
(
server
.
config
(),
"prometheus"
,
async_metrics
));
handler
->
attachStrictPath
(
server
.
config
().
getString
(
"prometheus.endpoint"
,
"/metrics"
))
->
allowGetAndHeadRequest
();
factory
->
addHandler
(
handler
.
release
());
return
factory
.
release
();
}
throw
Exception
(
"LOGICAL ERROR: Unknown HTTP handler factory name."
,
ErrorCodes
::
LOGICAL_ERROR
);
}
...
...
programs/server/ReplicasStatusHandler.cpp
浏览文件 @
f54435e7
...
...
@@ -46,7 +46,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
for
(
auto
iterator
=
db
.
second
->
getTablesIterator
();
iterator
->
isValid
();
iterator
->
next
())
{
auto
&
table
=
iterator
->
table
();
const
auto
&
table
=
iterator
->
table
();
StorageReplicatedMergeTree
*
table_replicated
=
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
table
.
get
());
if
(
!
table_replicated
)
...
...
src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
浏览文件 @
f54435e7
...
...
@@ -331,18 +331,13 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
void
ReplicatedMergeTreeQueue
::
removeProcessedEntry
(
zkutil
::
ZooKeeperPtr
zookeeper
,
LogEntryPtr
&
entry
)
{
auto
code
=
zookeeper
->
tryRemove
(
replica_path
+
"/queue/"
+
entry
->
znode_name
);
if
(
code
)
LOG_ERROR
(
log
,
"Couldn't remove "
<<
replica_path
<<
"/queue/"
<<
entry
->
znode_name
<<
": "
<<
zkutil
::
ZooKeeper
::
error2string
(
code
)
<<
". This shouldn't happen often."
);
std
::
optional
<
time_t
>
min_unprocessed_insert_time_changed
;
std
::
optional
<
time_t
>
max_processed_insert_time_changed
;
bool
found
=
false
;
size_t
queue_size
=
0
;
/// First remove from memory then from ZooKeeper
{
std
::
unique_lock
lock
(
state_mutex
);
...
...
@@ -372,6 +367,11 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
notifySubscribers
(
queue_size
);
auto
code
=
zookeeper
->
tryRemove
(
replica_path
+
"/queue/"
+
entry
->
znode_name
);
if
(
code
)
LOG_ERROR
(
log
,
"Couldn't remove "
<<
replica_path
<<
"/queue/"
<<
entry
->
znode_name
<<
": "
<<
zkutil
::
ZooKeeper
::
error2string
(
code
)
<<
". This shouldn't happen often."
);
updateTimesInZooKeeper
(
zookeeper
,
min_unprocessed_insert_time_changed
,
max_processed_insert_time_changed
);
}
...
...
utils/zookeeper-adjust-block-numbers-to-parts/main.cpp
浏览文件 @
f54435e7
...
...
@@ -199,7 +199,7 @@ void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, Int
create_ephemeral_nodes
(
1
);
/// Firstly try to create just a single node.
/// Create other nodes in batches of 50 nodes.
while
(
current_block_number
+
50
<=
new_current_block_number
)
while
(
current_block_number
+
50
<=
new_current_block_number
)
// NOLINT: clang-tidy thinks that the loop is infinite
create_ephemeral_nodes
(
50
);
create_ephemeral_nodes
(
new_current_block_number
-
current_block_number
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录