Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
e33e5150
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e33e5150
编写于
1月 02, 2019
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Miscellaneous [#CLICKHOUSE-2]
上级
531560a6
变更
53
隐藏空白更改
内联
并排
Showing
53 changed file
with
219 addition
and
220 deletion
+219
-220
dbms/programs/server/MetricsTransmitter.cpp
dbms/programs/server/MetricsTransmitter.cpp
+2
-2
dbms/src/Client/MultiplexedConnections.cpp
dbms/src/Client/MultiplexedConnections.cpp
+7
-7
dbms/src/Common/ArrayCache.h
dbms/src/Common/ArrayCache.h
+12
-12
dbms/src/Common/Config/ConfigReloader.cpp
dbms/src/Common/Config/ConfigReloader.cpp
+1
-1
dbms/src/Common/CounterInFile.h
dbms/src/Common/CounterInFile.h
+1
-1
dbms/src/Common/LRUCache.h
dbms/src/Common/LRUCache.h
+11
-11
dbms/src/Common/ObjectPool.h
dbms/src/Common/ObjectPool.h
+3
-3
dbms/src/Common/PODArray.h
dbms/src/Common/PODArray.h
+1
-1
dbms/src/Common/PoolBase.h
dbms/src/Common/PoolBase.h
+3
-3
dbms/src/Common/PoolWithFailoverBase.h
dbms/src/Common/PoolWithFailoverBase.h
+3
-3
dbms/src/Common/RWLock.cpp
dbms/src/Common/RWLock.cpp
+2
-2
dbms/src/Common/SimpleCache.h
dbms/src/Common/SimpleCache.h
+3
-3
dbms/src/Common/StackTrace.cpp
dbms/src/Common/StackTrace.cpp
+1
-2
dbms/src/Common/ZooKeeper/ZooKeeperHolder.cpp
dbms/src/Common/ZooKeeper/ZooKeeperHolder.cpp
+2
-2
dbms/src/Common/ZooKeeper/ZooKeeperNodeCache.cpp
dbms/src/Common/ZooKeeper/ZooKeeperNodeCache.cpp
+2
-2
dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp
...eams/MergingAggregatedMemoryEfficientBlockInputStream.cpp
+7
-7
dbms/src/DataStreams/ParallelInputsProcessor.h
dbms/src/DataStreams/ParallelInputsProcessor.h
+4
-4
dbms/src/DataStreams/RemoteBlockInputStream.cpp
dbms/src/DataStreams/RemoteBlockInputStream.cpp
+2
-2
dbms/src/Databases/DatabaseOrdinary.cpp
dbms/src/Databases/DatabaseOrdinary.cpp
+4
-4
dbms/src/Databases/DatabasesCommon.cpp
dbms/src/Databases/DatabasesCommon.cpp
+8
-8
dbms/src/Functions/transform.cpp
dbms/src/Functions/transform.cpp
+1
-1
dbms/src/IO/AIOContextPool.cpp
dbms/src/IO/AIOContextPool.cpp
+3
-3
dbms/src/IO/HTTPCommon.cpp
dbms/src/IO/HTTPCommon.cpp
+1
-1
dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp
dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp
+3
-3
dbms/src/Interpreters/ActionLocksManager.cpp
dbms/src/Interpreters/ActionLocksManager.cpp
+5
-5
dbms/src/Interpreters/Aggregator.cpp
dbms/src/Interpreters/Aggregator.cpp
+5
-5
dbms/src/Interpreters/Aggregator.h
dbms/src/Interpreters/Aggregator.h
+1
-1
dbms/src/Interpreters/AsynchronousMetrics.cpp
dbms/src/Interpreters/AsynchronousMetrics.cpp
+4
-4
dbms/src/Interpreters/CatBoostModel.cpp
dbms/src/Interpreters/CatBoostModel.cpp
+1
-1
dbms/src/Interpreters/Compiler.cpp
dbms/src/Interpreters/Compiler.cpp
+2
-2
dbms/src/Interpreters/Context.cpp
dbms/src/Interpreters/Context.cpp
+13
-13
dbms/src/Interpreters/EmbeddedDictionaries.cpp
dbms/src/Interpreters/EmbeddedDictionaries.cpp
+1
-1
dbms/src/Interpreters/ExpressionJIT.cpp
dbms/src/Interpreters/ExpressionJIT.cpp
+1
-1
dbms/src/Interpreters/ExternalLoader.cpp
dbms/src/Interpreters/ExternalLoader.cpp
+8
-8
dbms/src/Interpreters/InterserverIOHandler.h
dbms/src/Interpreters/InterserverIOHandler.h
+3
-3
dbms/src/Interpreters/ProcessList.cpp
dbms/src/Interpreters/ProcessList.cpp
+6
-6
dbms/src/Interpreters/ProcessList.h
dbms/src/Interpreters/ProcessList.h
+1
-1
dbms/src/Interpreters/QueryPriorities.h
dbms/src/Interpreters/QueryPriorities.h
+3
-3
dbms/src/Interpreters/Quota.cpp
dbms/src/Interpreters/Quota.cpp
+1
-1
dbms/src/Interpreters/tests/internal_iotop.cpp
dbms/src/Interpreters/tests/internal_iotop.cpp
+4
-4
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
+3
-3
dbms/src/Storages/Kafka/StorageKafka.cpp
dbms/src/Storages/Kafka/StorageKafka.cpp
+2
-2
dbms/src/Storages/MergeTree/DiskSpaceMonitor.h
dbms/src/Storages/MergeTree/DiskSpaceMonitor.h
+6
-6
dbms/src/Storages/MergeTree/MergeList.h
dbms/src/Storages/MergeTree/MergeList.h
+3
-3
dbms/src/Storages/MergeTree/MergeTreeData.cpp
dbms/src/Storages/MergeTree/MergeTreeData.cpp
+17
-17
dbms/src/Storages/MergeTree/MergeTreeData.h
dbms/src/Storages/MergeTree/MergeTreeData.h
+3
-3
dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp
dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp
+2
-2
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
...Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
+6
-6
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
+10
-10
dbms/src/Storages/StorageBuffer.cpp
dbms/src/Storages/StorageBuffer.cpp
+4
-4
dbms/src/Storages/StorageMemory.cpp
dbms/src/Storages/StorageMemory.cpp
+4
-4
dbms/src/Storages/StorageMergeTree.cpp
dbms/src/Storages/StorageMergeTree.cpp
+3
-3
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+10
-10
未找到文件。
dbms/programs/server/MetricsTransmitter.cpp
浏览文件 @
e33e5150
...
...
@@ -21,7 +21,7 @@ MetricsTransmitter::~MetricsTransmitter()
try
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
std
::
lock_guard
lock
{
mutex
};
quit
=
true
;
}
...
...
@@ -56,7 +56,7 @@ void MetricsTransmitter::run()
std
::
vector
<
ProfileEvents
::
Count
>
prev_counters
(
ProfileEvents
::
end
());
std
::
unique_lock
<
std
::
mutex
>
lock
{
mutex
};
std
::
unique_lock
lock
{
mutex
};
while
(
true
)
{
...
...
dbms/src/Client/MultiplexedConnections.cpp
浏览文件 @
e33e5150
...
...
@@ -52,7 +52,7 @@ MultiplexedConnections::MultiplexedConnections(
void
MultiplexedConnections
::
sendExternalTablesData
(
std
::
vector
<
ExternalTablesData
>
&
data
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
if
(
!
sent_query
)
throw
Exception
(
"Cannot send external tables data: query not yet sent."
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
@@ -79,7 +79,7 @@ void MultiplexedConnections::sendQuery(
const
ClientInfo
*
client_info
,
bool
with_pending_data
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
if
(
sent_query
)
throw
Exception
(
"Query already sent."
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
@@ -121,14 +121,14 @@ void MultiplexedConnections::sendQuery(
Connection
::
Packet
MultiplexedConnections
::
receivePacket
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
Connection
::
Packet
packet
=
receivePacketUnlocked
();
return
packet
;
}
void
MultiplexedConnections
::
disconnect
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
for
(
ReplicaState
&
state
:
replica_states
)
{
...
...
@@ -143,7 +143,7 @@ void MultiplexedConnections::disconnect()
void
MultiplexedConnections
::
sendCancel
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
if
(
!
sent_query
||
cancelled
)
throw
Exception
(
"Cannot cancel. Either no query sent or already cancelled."
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
@@ -160,7 +160,7 @@ void MultiplexedConnections::sendCancel()
Connection
::
Packet
MultiplexedConnections
::
drain
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
if
(
!
cancelled
)
throw
Exception
(
"Cannot drain connections: cancel first."
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
@@ -195,7 +195,7 @@ Connection::Packet MultiplexedConnections::drain()
std
::
string
MultiplexedConnections
::
dumpAddresses
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
cancel_mutex
);
std
::
lock_guard
lock
(
cancel_mutex
);
return
dumpAddressesUnlocked
();
}
...
...
dbms/src/Common/ArrayCache.h
浏览文件 @
e33e5150
...
...
@@ -231,7 +231,7 @@ public:
~
Holder
()
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
cache
.
mutex
);
std
::
lock_guard
cache_lock
(
cache
.
mutex
);
if
(
--
region
.
refcount
==
0
)
cache
.
lru_list
.
push_back
(
region
);
cache
.
total_size_in_use
-=
region
.
size
;
...
...
@@ -279,14 +279,14 @@ private:
InsertTokenHolder
()
=
default
;
void
acquire
(
const
Key
*
key_
,
const
std
::
shared_ptr
<
InsertToken
>
&
token_
,
[[
maybe_unused
]]
std
::
lock_guard
<
std
::
mutex
>
&
cache_lock
)
void
acquire
(
const
Key
*
key_
,
const
std
::
shared_ptr
<
InsertToken
>
&
token_
,
[[
maybe_unused
]]
std
::
lock_guard
&
cache_lock
)
{
key
=
key_
;
token
=
token_
;
++
token
->
refcount
;
}
void
cleanup
([[
maybe_unused
]]
std
::
lock_guard
<
std
::
mutex
>
&
token_lock
,
[[
maybe_unused
]]
std
::
lock_guard
<
std
::
mutex
>
&
cache_lock
)
void
cleanup
([[
maybe_unused
]]
std
::
lock_guard
&
token_lock
,
[[
maybe_unused
]]
std
::
lock_guard
&
cache_lock
)
{
token
->
cache
.
insert_tokens
.
erase
(
*
key
);
token
->
cleaned_up
=
true
;
...
...
@@ -301,12 +301,12 @@ private:
if
(
cleaned_up
)
return
;
std
::
lock_guard
<
std
::
mutex
>
token_lock
(
token
->
mutex
);
std
::
lock_guard
token_lock
(
token
->
mutex
);
if
(
token
->
cleaned_up
)
return
;
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
token
->
cache
.
mutex
);
std
::
lock_guard
cache_lock
(
token
->
cache
.
mutex
);
--
token
->
refcount
;
if
(
token
->
refcount
==
0
)
...
...
@@ -536,7 +536,7 @@ public:
~
ArrayCache
()
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
key_map
.
clear
();
lru_list
.
clear
();
...
...
@@ -563,7 +563,7 @@ public:
{
InsertTokenHolder
token_holder
;
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
auto
it
=
key_map
.
find
(
key
,
RegionCompareByKey
());
if
(
key_map
.
end
()
!=
it
)
...
...
@@ -584,7 +584,7 @@ public:
InsertToken
*
token
=
token_holder
.
token
.
get
();
std
::
lock_guard
<
std
::
mutex
>
token_lock
(
token
->
mutex
);
std
::
lock_guard
token_lock
(
token
->
mutex
);
token_holder
.
cleaned_up
=
token
->
cleaned_up
;
...
...
@@ -605,7 +605,7 @@ public:
RegionMetadata
*
region
;
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
region
=
allocate
(
size
);
}
...
...
@@ -626,14 +626,14 @@ public:
catch
(...)
{
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
freeRegion
(
*
region
);
}
throw
;
}
}
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
try
{
...
...
@@ -692,7 +692,7 @@ public:
Statistics
getStatistics
()
const
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
Statistics
res
;
res
.
total_chunks_size
=
total_chunks_size
;
...
...
dbms/src/Common/Config/ConfigReloader.cpp
浏览文件 @
e33e5150
...
...
@@ -78,7 +78,7 @@ void ConfigReloader::run()
void
ConfigReloader
::
reloadIfNewer
(
bool
force
,
bool
throw_on_error
,
bool
fallback_to_preprocessed
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
reload_mutex
);
std
::
lock_guard
lock
(
reload_mutex
);
FilesChangesTracker
new_files
=
getNewFileList
();
if
(
force
||
need_reload_from_zk
||
new_files
.
isDifferOrNewerThan
(
files
))
...
...
dbms/src/Common/CounterInFile.h
浏览文件 @
e33e5150
...
...
@@ -54,7 +54,7 @@ public:
template
<
typename
Callback
>
Int64
add
(
Int64
delta
,
Callback
&&
locked_callback
,
bool
create_if_need
=
false
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
Int64
res
=
-
1
;
...
...
dbms/src/Common/LRUCache.h
浏览文件 @
e33e5150
...
...
@@ -48,7 +48,7 @@ public:
MappedPtr
get
(
const
Key
&
key
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
auto
res
=
getImpl
(
key
,
lock
);
if
(
res
)
...
...
@@ -61,7 +61,7 @@ public:
void
set
(
const
Key
&
key
,
const
MappedPtr
&
mapped
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
setImpl
(
key
,
mapped
,
lock
);
}
...
...
@@ -79,7 +79,7 @@ public:
{
InsertTokenHolder
token_holder
;
{
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
auto
val
=
getImpl
(
key
,
cache_lock
);
if
(
val
)
...
...
@@ -97,7 +97,7 @@ public:
InsertToken
*
token
=
token_holder
.
token
.
get
();
std
::
lock_guard
<
std
::
mutex
>
token_lock
(
token
->
mutex
);
std
::
lock_guard
token_lock
(
token
->
mutex
);
token_holder
.
cleaned_up
=
token
->
cleaned_up
;
...
...
@@ -111,7 +111,7 @@ public:
++
misses
;
token
->
value
=
load_func
();
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
mutex
);
std
::
lock_guard
cache_lock
(
mutex
);
/// Insert the new value only if the token is still in present in insert_tokens.
/// (The token may be absent because of a concurrent reset() call).
...
...
@@ -131,26 +131,26 @@ public:
void
getStats
(
size_t
&
out_hits
,
size_t
&
out_misses
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
out_hits
=
hits
;
out_misses
=
misses
;
}
size_t
weight
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
current_size
;
}
size_t
count
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
cells
.
size
();
}
void
reset
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
queue
.
clear
();
cells
.
clear
();
insert_tokens
.
clear
();
...
...
@@ -234,12 +234,12 @@ private:
if
(
cleaned_up
)
return
;
std
::
lock_guard
<
std
::
mutex
>
token_lock
(
token
->
mutex
);
std
::
lock_guard
token_lock
(
token
->
mutex
);
if
(
token
->
cleaned_up
)
return
;
std
::
lock_guard
<
std
::
mutex
>
cache_lock
(
token
->
cache
.
mutex
);
std
::
lock_guard
cache_lock
(
token
->
cache
.
mutex
);
--
token
->
refcount
;
if
(
token
->
refcount
==
0
)
...
...
dbms/src/Common/ObjectPool.h
浏览文件 @
e33e5150
...
...
@@ -38,7 +38,7 @@ protected:
void
operator
()(
T
*
owning_ptr
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
parent
->
mutex
};
std
::
lock_guard
lock
{
parent
->
mutex
};
parent
->
stack
.
emplace
(
owning_ptr
);
}
};
...
...
@@ -51,7 +51,7 @@ public:
template
<
typename
Factory
>
Pointer
get
(
Factory
&&
f
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
if
(
stack
.
empty
())
{
...
...
@@ -94,7 +94,7 @@ public:
template
<
typename
Factory
>
Pointer
get
(
const
Key
&
key
,
Factory
&&
f
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
auto
it
=
container
.
find
(
key
);
if
(
container
.
end
()
==
it
)
...
...
dbms/src/Common/PODArray.h
浏览文件 @
e33e5150
...
...
@@ -128,7 +128,7 @@ protected:
c_end
=
c_start
+
end_diff
;
c_end_of_storage
=
c_start
+
bytes
-
pad_right
-
pad_left
;
if
(
pad_left
)
if
(
pad_left
)
/// TODO Do we need it?
memset
(
c_start
-
ELEMENT_SIZE
,
0
,
ELEMENT_SIZE
);
}
...
...
dbms/src/Common/PoolBase.h
浏览文件 @
e33e5150
...
...
@@ -54,7 +54,7 @@ private:
PoolEntryHelper
(
PooledObject
&
data_
)
:
data
(
data_
)
{
data
.
in_use
=
true
;
}
~
PoolEntryHelper
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
data
.
pool
.
mutex
);
std
::
unique_lock
lock
(
data
.
pool
.
mutex
);
data
.
in_use
=
false
;
data
.
pool
.
available
.
notify_one
();
}
...
...
@@ -107,7 +107,7 @@ public:
/** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
Entry
get
(
Poco
::
Timespan
::
TimeDiff
timeout
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
while
(
true
)
{
...
...
@@ -133,7 +133,7 @@ public:
void
reserve
(
size_t
count
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
while
(
items
.
size
()
<
count
)
items
.
emplace_back
(
std
::
make_shared
<
PooledObject
>
(
allocObject
(),
*
this
));
...
...
dbms/src/Common/PoolWithFailoverBase.h
浏览文件 @
e33e5150
...
...
@@ -195,7 +195,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
/// At exit update shared error counts with error counts occured during this call.
SCOPE_EXIT
(
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
pool_states_mutex
);
std
::
lock_guard
lock
(
pool_states_mutex
);
for
(
const
ShuffledPool
&
pool
:
shuffled_pools
)
shared_pool_states
[
pool
.
index
].
error_count
+=
pool
.
error_count
;
});
...
...
@@ -300,7 +300,7 @@ void PoolWithFailoverBase<TNestedPool>::reportError(const Entry & entry)
{
if
(
nested_pools
[
i
]
->
contains
(
entry
))
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
pool_states_mutex
);
std
::
lock_guard
lock
(
pool_states_mutex
);
++
shared_pool_states
[
i
].
error_count
;
return
;
}
...
...
@@ -338,7 +338,7 @@ PoolWithFailoverBase<TNestedPool>::updatePoolStates()
result
.
reserve
(
nested_pools
.
size
());
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
pool_states_mutex
);
std
::
lock_guard
lock
(
pool_states_mutex
);
for
(
auto
&
state
:
shared_pool_states
)
state
.
randomize
();
...
...
dbms/src/Common/RWLock.cpp
浏览文件 @
e33e5150
...
...
@@ -70,7 +70,7 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type)
GroupsContainer
::
iterator
it_group
;
ClientsContainer
::
iterator
it_client
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
/// Check if the same thread is acquiring previously acquired lock
auto
it_handler
=
thread_to_handler
.
find
(
this_thread_id
);
...
...
@@ -139,7 +139,7 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type)
RWLockImpl
::
LockHandlerImpl
::~
LockHandlerImpl
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
parent
->
mutex
);
std
::
unique_lock
lock
(
parent
->
mutex
);
/// Remove weak_ptr to the handler, since there are no owners of the current lock
parent
->
thread_to_handler
.
erase
(
it_handler
);
...
...
dbms/src/Common/SimpleCache.h
浏览文件 @
e33e5150
...
...
@@ -32,7 +32,7 @@ public:
Result
operator
()
(
Args
&&
...
args
)
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
Key
key
{
std
::
forward
<
Args
>
(
args
)...};
auto
it
=
cache
.
find
(
key
);
...
...
@@ -45,7 +45,7 @@ public:
Result
res
=
f
(
std
::
forward
<
Args
>
(
args
)...);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
cache
.
emplace
(
std
::
forward_as_tuple
(
args
...),
res
);
}
...
...
@@ -55,7 +55,7 @@ public:
void
drop
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
cache
.
clear
();
}
};
dbms/src/Common/StackTrace.cpp
浏览文件 @
e33e5150
...
...
@@ -32,11 +32,10 @@ StackTrace::StackTrace()
std
::
string
StackTrace
::
toStringImpl
(
const
Frames
&
frames
,
size_t
frames_size
)
{
char
**
symbols
=
backtrace_symbols
(
frames
.
data
(),
frames_size
);
std
::
stringstream
res
;
if
(
!
symbols
)
return
"Cannot get symbols for stack trace.
\n
"
;
std
::
stringstream
res
;
try
{
for
(
size_t
i
=
0
,
size
=
frames_size
;
i
<
size
;
++
i
)
...
...
dbms/src/Common/ZooKeeper/ZooKeeperHolder.cpp
浏览文件 @
e33e5150
...
...
@@ -14,7 +14,7 @@ using namespace zkutil;
ZooKeeperHolder
::
UnstorableZookeeperHandler
ZooKeeperHolder
::
getZooKeeper
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
return
UnstorableZookeeperHandler
(
ptr
);
}
...
...
@@ -25,7 +25,7 @@ void ZooKeeperHolder::initFromInstance(const ZooKeeper::Ptr & zookeeper_ptr)
bool
ZooKeeperHolder
::
replaceZooKeeperSessionToNewOne
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
if
(
ptr
.
unique
())
{
...
...
dbms/src/Common/ZooKeeper/ZooKeeperNodeCache.cpp
浏览文件 @
e33e5150
...
...
@@ -22,7 +22,7 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coor
{
std
::
unordered_set
<
std
::
string
>
invalidated_paths
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
context
->
mutex
);
std
::
lock_guard
lock
(
context
->
mutex
);
if
(
context
->
all_paths_invalidated
)
{
...
...
@@ -57,7 +57,7 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coor
bool
changed
=
false
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
owned_context
->
mutex
);
std
::
lock_guard
lock
(
owned_context
->
mutex
);
if
(
response
.
type
!=
Coordination
::
SESSION
)
changed
=
owned_context
->
invalidated_paths
.
emplace
(
response
.
path
).
second
;
...
...
dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp
浏览文件 @
e33e5150
...
...
@@ -127,7 +127,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
if
(
parallel_merge_data
)
{
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
std
::
unique_lock
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
parallel_merge_data
->
finish
=
true
;
}
parallel_merge_data
->
merged_blocks_changed
.
notify_one
();
/// readImpl method must stop waiting and exit.
...
...
@@ -219,7 +219,7 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
while
(
true
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
std
::
unique_lock
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
parallel_merge_data
->
merged_blocks_changed
.
wait
(
lock
,
[
this
]
{
...
...
@@ -323,7 +323,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
* - or, if no next blocks, set 'exhausted' flag.
*/
{
std
::
lock_guard
<
std
::
mutex
>
lock_next_blocks
(
parallel_merge_data
->
get_next_blocks_mutex
);
std
::
lock_guard
lock_next_blocks
(
parallel_merge_data
->
get_next_blocks_mutex
);
if
(
parallel_merge_data
->
exhausted
||
parallel_merge_data
->
finish
)
break
;
...
...
@@ -333,7 +333,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
if
(
!
blocks_to_merge
||
blocks_to_merge
->
empty
())
{
{
std
::
unique_lock
<
std
::
mutex
>
lock_merged_blocks
(
parallel_merge_data
->
merged_blocks_mutex
);
std
::
unique_lock
lock_merged_blocks
(
parallel_merge_data
->
merged_blocks_mutex
);
parallel_merge_data
->
exhausted
=
true
;
}
...
...
@@ -347,7 +347,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
:
blocks_to_merge
->
front
().
info
.
bucket_num
;
{
std
::
unique_lock
<
std
::
mutex
>
lock_merged_blocks
(
parallel_merge_data
->
merged_blocks_mutex
);
std
::
unique_lock
lock_merged_blocks
(
parallel_merge_data
->
merged_blocks_mutex
);
parallel_merge_data
->
have_space
.
wait
(
lock_merged_blocks
,
[
this
]
{
...
...
@@ -370,7 +370,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
Block
res
=
aggregator
.
mergeBlocks
(
*
blocks_to_merge
,
final
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
std
::
lock_guard
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
if
(
parallel_merge_data
->
finish
)
break
;
...
...
@@ -385,7 +385,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
catch
(...)
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
std
::
lock_guard
lock
(
parallel_merge_data
->
merged_blocks_mutex
);
parallel_merge_data
->
exception
=
std
::
current_exception
();
parallel_merge_data
->
finish
=
true
;
}
...
...
dbms/src/DataStreams/ParallelInputsProcessor.h
浏览文件 @
e33e5150
...
...
@@ -191,7 +191,7 @@ private:
{
InputData
unprepared_input
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
unprepared_inputs_mutex
);
std
::
lock_guard
lock
(
unprepared_inputs_mutex
);
if
(
unprepared_inputs
.
empty
())
break
;
...
...
@@ -203,7 +203,7 @@ private:
unprepared_input
.
in
->
readPrefix
();
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
available_inputs_mutex
);
std
::
lock_guard
lock
(
available_inputs_mutex
);
available_inputs
.
push
(
unprepared_input
);
}
}
...
...
@@ -257,7 +257,7 @@ private:
/// Select the next source.
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
available_inputs_mutex
);
std
::
lock_guard
lock
(
available_inputs_mutex
);
/// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.)
if
(
available_inputs
.
empty
())
...
...
@@ -278,7 +278,7 @@ private:
/// If this source is not run out yet, then put the resulting block in the ready queue.
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
available_inputs_mutex
);
std
::
lock_guard
lock
(
available_inputs_mutex
);
if
(
block
)
{
...
...
dbms/src/DataStreams/RemoteBlockInputStream.cpp
浏览文件 @
e33e5150
...
...
@@ -104,7 +104,7 @@ void RemoteBlockInputStream::cancel(bool kill)
return
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
external_tables_mutex
);
std
::
lock_guard
lock
(
external_tables_mutex
);
/// Stop sending external data.
for
(
auto
&
vec
:
external_tables_data
)
...
...
@@ -124,7 +124,7 @@ void RemoteBlockInputStream::sendExternalTables()
size_t
count
=
multiplexed_connections
->
size
();
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
external_tables_mutex
);
std
::
lock_guard
lock
(
external_tables_mutex
);
external_tables_data
.
reserve
(
count
);
...
...
dbms/src/Databases/DatabaseOrdinary.cpp
浏览文件 @
e33e5150
...
...
@@ -273,7 +273,7 @@ void DatabaseOrdinary::createTable(
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
tables
.
find
(
table_name
)
!=
tables
.
end
())
throw
Exception
(
"Table "
+
name
+
"."
+
table_name
+
" already exists."
,
ErrorCodes
::
TABLE_ALREADY_EXISTS
);
}
...
...
@@ -298,7 +298,7 @@ void DatabaseOrdinary::createTable(
{
/// Add a table to the map of known tables.
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
!
tables
.
emplace
(
table_name
,
table
).
second
)
throw
Exception
(
"Table "
+
name
+
"."
+
table_name
+
" already exists."
,
ErrorCodes
::
TABLE_ALREADY_EXISTS
);
}
...
...
@@ -492,7 +492,7 @@ void DatabaseOrdinary::shutdown()
Tables
tables_snapshot
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
tables_snapshot
=
tables
;
}
...
...
@@ -501,7 +501,7 @@ void DatabaseOrdinary::shutdown()
kv
.
second
->
shutdown
();
}
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
tables
.
clear
();
}
...
...
dbms/src/Databases/DatabasesCommon.cpp
浏览文件 @
e33e5150
...
...
@@ -88,7 +88,7 @@ bool DatabaseWithOwnTablesBase::isTableExist(
const
Context
&
/*context*/
,
const
String
&
table_name
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
tables
.
find
(
table_name
)
!=
tables
.
end
();
}
...
...
@@ -96,7 +96,7 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(
const
Context
&
/*context*/
,
const
String
&
table_name
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
auto
it
=
tables
.
find
(
table_name
);
if
(
it
==
tables
.
end
())
return
{};
...
...
@@ -105,13 +105,13 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(
DatabaseIteratorPtr
DatabaseWithOwnTablesBase
::
getIterator
(
const
Context
&
/*context*/
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
std
::
make_unique
<
DatabaseSnapshotIterator
>
(
tables
);
}
bool
DatabaseWithOwnTablesBase
::
empty
(
const
Context
&
/*context*/
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
tables
.
empty
();
}
...
...
@@ -119,7 +119,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
{
StoragePtr
res
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
auto
it
=
tables
.
find
(
table_name
);
if
(
it
==
tables
.
end
())
throw
Exception
(
"Table "
+
name
+
"."
+
table_name
+
" doesn't exist."
,
ErrorCodes
::
UNKNOWN_TABLE
);
...
...
@@ -132,7 +132,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
void
DatabaseWithOwnTablesBase
::
attachTable
(
const
String
&
table_name
,
const
StoragePtr
&
table
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
!
tables
.
emplace
(
table_name
,
table
).
second
)
throw
Exception
(
"Table "
+
name
+
"."
+
table_name
+
" already exists."
,
ErrorCodes
::
TABLE_ALREADY_EXISTS
);
}
...
...
@@ -144,7 +144,7 @@ void DatabaseWithOwnTablesBase::shutdown()
Tables
tables_snapshot
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
tables_snapshot
=
tables
;
}
...
...
@@ -153,7 +153,7 @@ void DatabaseWithOwnTablesBase::shutdown()
kv
.
second
->
shutdown
();
}
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
tables
.
clear
();
}
...
...
dbms/src/Functions/transform.cpp
浏览文件 @
e33e5150
...
...
@@ -742,7 +742,7 @@ private:
if
(
0
==
size
)
throw
Exception
{
"Empty arrays are illegal in function "
+
getName
(),
ErrorCodes
::
BAD_ARGUMENTS
};
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
initialized
)
return
;
...
...
dbms/src/IO/AIOContextPool.cpp
浏览文件 @
e33e5150
...
...
@@ -78,7 +78,7 @@ void AIOContextPool::fulfillPromises(const io_event events[], const int num_even
if
(
num_events
==
0
)
return
;
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
const
std
::
lock_guard
lock
{
mutex
};
/// look at returned events and find corresponding promise, set result and erase promise from map
for
(
const
auto
&
event
:
boost
::
make_iterator_range
(
events
,
events
+
num_events
))
...
...
@@ -114,7 +114,7 @@ void AIOContextPool::notifyProducers(const int num_producers) const
void
AIOContextPool
::
reportExceptionToAnyProducer
()
{
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
const
std
::
lock_guard
lock
{
mutex
};
const
auto
any_promise_it
=
std
::
begin
(
promises
);
any_promise_it
->
second
.
set_exception
(
std
::
current_exception
());
...
...
@@ -123,7 +123,7 @@ void AIOContextPool::reportExceptionToAnyProducer()
std
::
future
<
AIOContextPool
::
BytesRead
>
AIOContextPool
::
post
(
struct
iocb
&
iocb
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
{
mutex
};
std
::
unique_lock
lock
{
mutex
};
/// get current id and increment it by one
const
auto
request_id
=
next_id
;
...
...
dbms/src/IO/HTTPCommon.cpp
浏览文件 @
e33e5150
...
...
@@ -141,7 +141,7 @@ namespace
public:
Entry
getSession
(
const
Poco
::
URI
&
uri
,
const
ConnectionTimeouts
&
timeouts
,
size_t
max_connections_per_endpoint
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
const
std
::
string
&
host
=
uri
.
getHost
();
UInt16
port
=
uri
.
getPort
();
bool
https
=
isHTTPS
(
uri
);
...
...
dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp
浏览文件 @
e33e5150
...
...
@@ -68,7 +68,7 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders()
void
WriteBufferFromHTTPServerResponse
::
nextImpl
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
startSendHeaders
();
...
...
@@ -147,7 +147,7 @@ WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
void
WriteBufferFromHTTPServerResponse
::
onProgress
(
const
Progress
&
progress
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
/// Cannot add new headers if body was started to send.
if
(
headers_finished_sending
)
...
...
@@ -181,7 +181,7 @@ void WriteBufferFromHTTPServerResponse::finalize()
else
{
/// If no remaining data, just send headers.
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
startSendHeaders
();
finishSendHeaders
();
}
...
...
dbms/src/Interpreters/ActionLocksManager.cpp
浏览文件 @
e33e5150
...
...
@@ -33,7 +33,7 @@ void ActionLocksManager::add(StorageActionBlockType action_type)
if
(
!
action_lock
.
expired
())
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
storage_locks
[
table
.
get
()][
action_type
]
=
std
::
move
(
action_lock
);
}
});
...
...
@@ -47,7 +47,7 @@ void ActionLocksManager::add(const String & database_name, const String & table_
if
(
!
action_lock
.
expired
())
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
storage_locks
[
table
.
get
()][
action_type
]
=
std
::
move
(
action_lock
);
}
}
...
...
@@ -55,7 +55,7 @@ void ActionLocksManager::add(const String & database_name, const String & table_
void
ActionLocksManager
::
remove
(
StorageActionBlockType
action_type
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
for
(
auto
&
storage_elem
:
storage_locks
)
storage_elem
.
second
.
erase
(
action_type
);
...
...
@@ -65,7 +65,7 @@ void ActionLocksManager::remove(const String & database_name, const String & tab
{
if
(
auto
table
=
global_context
.
tryGetTable
(
database_name
,
table_name
))
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
storage_locks
.
count
(
table
.
get
()))
storage_locks
[
table
.
get
()].
erase
(
action_type
);
...
...
@@ -74,7 +74,7 @@ void ActionLocksManager::remove(const String & database_name, const String & tab
void
ActionLocksManager
::
cleanExpired
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
for
(
auto
it_storage
=
storage_locks
.
begin
();
it_storage
!=
storage_locks
.
end
();)
{
...
...
dbms/src/Interpreters/Aggregator.cpp
浏览文件 @
e33e5150
...
...
@@ -196,7 +196,7 @@ Aggregator::Aggregator(const Params & params_)
void
Aggregator
::
compileIfPossible
(
AggregatedDataVariants
::
Type
type
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
compiled_if_possible
)
return
;
...
...
@@ -966,7 +966,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
double
uncompressed_bytes
=
compressed_buf
.
count
();
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
temporary_files
.
mutex
);
std
::
lock_guard
lock
(
temporary_files
.
mutex
);
temporary_files
.
files
.
emplace_back
(
std
::
move
(
file
));
temporary_files
.
sum_size_uncompressed
+=
uncompressed_bytes
;
temporary_files
.
sum_size_compressed
+=
compressed_bytes
;
...
...
@@ -1819,7 +1819,7 @@ protected:
while
(
true
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
parallel_merge_data
->
mutex
);
std
::
unique_lock
lock
(
parallel_merge_data
->
mutex
);
if
(
parallel_merge_data
->
exception
)
std
::
rethrow_exception
(
parallel_merge_data
->
exception
);
...
...
@@ -1909,12 +1909,12 @@ private:
APPLY_FOR_VARIANTS_TWO_LEVEL
(
M
)
#undef M
std
::
lock_guard
<
std
::
mutex
>
lock
(
parallel_merge_data
->
mutex
);
std
::
lock_guard
lock
(
parallel_merge_data
->
mutex
);
parallel_merge_data
->
ready_blocks
[
bucket_num
]
=
std
::
move
(
block
);
}
catch
(...)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parallel_merge_data
->
mutex
);
std
::
lock_guard
lock
(
parallel_merge_data
->
mutex
);
if
(
!
parallel_merge_data
->
exception
)
parallel_merge_data
->
exception
=
std
::
current_exception
();
}
...
...
dbms/src/Interpreters/Aggregator.h
浏览文件 @
e33e5150
...
...
@@ -1486,7 +1486,7 @@ public:
bool
empty
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
files
.
empty
();
}
};
...
...
dbms/src/Interpreters/AsynchronousMetrics.cpp
浏览文件 @
e33e5150
...
...
@@ -42,7 +42,7 @@ AsynchronousMetrics::~AsynchronousMetrics()
try
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
wait_mutex
};
std
::
lock_guard
lock
{
wait_mutex
};
quit
=
true
;
}
...
...
@@ -58,14 +58,14 @@ AsynchronousMetrics::~AsynchronousMetrics()
AsynchronousMetrics
::
Container
AsynchronousMetrics
::
getValues
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
container_mutex
};
std
::
lock_guard
lock
{
container_mutex
};
return
container
;
}
void
AsynchronousMetrics
::
set
(
const
std
::
string
&
name
,
Value
value
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
container_mutex
};
std
::
lock_guard
lock
{
container_mutex
};
container
[
name
]
=
value
;
}
...
...
@@ -74,7 +74,7 @@ void AsynchronousMetrics::run()
{
setThreadName
(
"AsyncMetrics"
);
std
::
unique_lock
<
std
::
mutex
>
lock
{
wait_mutex
};
std
::
unique_lock
lock
{
wait_mutex
};
/// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter.
const
auto
get_next_minute
=
[]
...
...
dbms/src/Interpreters/CatBoostModel.cpp
浏览文件 @
e33e5150
...
...
@@ -488,7 +488,7 @@ std::shared_ptr<CatBoostLibHolder> getCatBoostWrapperHolder(const std::string &
static
std
::
weak_ptr
<
CatBoostLibHolder
>
ptr
;
static
std
::
mutex
mutex
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
auto
result
=
ptr
.
lock
();
if
(
!
result
||
result
->
getCurrentPath
()
!=
lib_path
)
...
...
dbms/src/Interpreters/Compiler.cpp
浏览文件 @
e33e5150
...
...
@@ -87,7 +87,7 @@ SharedLibraryPtr Compiler::getOrCount(
{
HashedKey
hashed_key
=
getHash
(
key
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
UInt32
count
=
++
counts
[
hashed_key
];
...
...
@@ -306,7 +306,7 @@ void Compiler::compile(
SharedLibraryPtr
lib
(
new
SharedLibrary
(
so_file_path
));
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
libraries
[
hashed_key
]
=
lib
;
}
...
...
dbms/src/Interpreters/Context.cpp
浏览文件 @
e33e5150
...
...
@@ -908,7 +908,7 @@ DDLGuard::DDLGuard(Map & map_, std::unique_lock<std::mutex> guards_lock_, const
it
=
map
.
emplace
(
elem
,
Entry
{
std
::
make_unique
<
std
::
mutex
>
(),
0
}).
first
;
++
it
->
second
.
counter
;
guards_lock
.
unlock
();
table_lock
=
std
::
unique_lock
<
std
::
mutex
>
(
*
it
->
second
.
mutex
);
table_lock
=
std
::
unique_lock
(
*
it
->
second
.
mutex
);
}
DDLGuard
::~
DDLGuard
()
...
...
@@ -924,7 +924,7 @@ DDLGuard::~DDLGuard()
std
::
unique_ptr
<
DDLGuard
>
Context
::
getDDLGuard
(
const
String
&
database
,
const
String
&
table
)
const
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
shared
->
ddl_guards_mutex
);
std
::
unique_lock
lock
(
shared
->
ddl_guards_mutex
);
return
std
::
make_unique
<
DDLGuard
>
(
shared
->
ddl_guards
[
database
],
std
::
move
(
lock
),
table
);
}
...
...
@@ -1177,7 +1177,7 @@ ExternalModels & Context::getExternalModels()
EmbeddedDictionaries
&
Context
::
getEmbeddedDictionariesImpl
(
const
bool
throw_on_error
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
embedded_dictionaries_mutex
);
std
::
lock_guard
lock
(
shared
->
embedded_dictionaries_mutex
);
if
(
!
shared
->
embedded_dictionaries
)
{
...
...
@@ -1195,7 +1195,7 @@ EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_
ExternalDictionaries
&
Context
::
getExternalDictionariesImpl
(
const
bool
throw_on_error
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
external_dictionaries_mutex
);
std
::
lock_guard
lock
(
shared
->
external_dictionaries_mutex
);
if
(
!
shared
->
external_dictionaries
)
{
...
...
@@ -1215,7 +1215,7 @@ ExternalDictionaries & Context::getExternalDictionariesImpl(const bool throw_on_
ExternalModels
&
Context
::
getExternalModelsImpl
(
bool
throw_on_error
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
external_models_mutex
);
std
::
lock_guard
lock
(
shared
->
external_models_mutex
);
if
(
!
shared
->
external_models
)
{
...
...
@@ -1372,7 +1372,7 @@ DDLWorker & Context::getDDLWorker() const
zkutil
::
ZooKeeperPtr
Context
::
getZooKeeper
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
zookeeper_mutex
);
std
::
lock_guard
lock
(
shared
->
zookeeper_mutex
);
if
(
!
shared
->
zookeeper
)
shared
->
zookeeper
=
std
::
make_shared
<
zkutil
::
ZooKeeper
>
(
getConfigRef
(),
"zookeeper"
);
...
...
@@ -1465,7 +1465,7 @@ void Context::reloadClusterConfig()
{
ConfigurationPtr
cluster_config
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
clusters_mutex
);
std
::
lock_guard
lock
(
shared
->
clusters_mutex
);
cluster_config
=
shared
->
clusters_config
;
}
...
...
@@ -1473,7 +1473,7 @@ void Context::reloadClusterConfig()
auto
new_clusters
=
std
::
make_unique
<
Clusters
>
(
config
,
settings
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
clusters_mutex
);
std
::
lock_guard
lock
(
shared
->
clusters_mutex
);
if
(
shared
->
clusters_config
.
get
()
==
cluster_config
.
get
())
{
shared
->
clusters
=
std
::
move
(
new_clusters
);
...
...
@@ -1488,7 +1488,7 @@ void Context::reloadClusterConfig()
Clusters
&
Context
::
getClusters
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
clusters_mutex
);
std
::
lock_guard
lock
(
shared
->
clusters_mutex
);
if
(
!
shared
->
clusters
)
{
auto
&
config
=
shared
->
clusters_config
?
*
shared
->
clusters_config
:
getConfigRef
();
...
...
@@ -1502,7 +1502,7 @@ Clusters & Context::getClusters() const
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
void
Context
::
setClustersConfig
(
const
ConfigurationPtr
&
config
,
const
String
&
config_name
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
clusters_mutex
);
std
::
lock_guard
lock
(
shared
->
clusters_mutex
);
shared
->
clusters_config
=
config
;
...
...
@@ -1515,7 +1515,7 @@ void Context::setClustersConfig(const ConfigurationPtr & config, const String &
void
Context
::
setCluster
(
const
String
&
cluster_name
,
const
std
::
shared_ptr
<
Cluster
>
&
cluster
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
shared
->
clusters_mutex
);
std
::
lock_guard
lock
(
shared
->
clusters_mutex
);
if
(
!
shared
->
clusters
)
throw
Exception
(
"Clusters are not set"
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
@@ -1846,7 +1846,7 @@ SessionCleaner::~SessionCleaner()
try
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
std
::
lock_guard
lock
{
mutex
};
quit
=
true
;
}
...
...
@@ -1864,7 +1864,7 @@ void SessionCleaner::run()
{
setThreadName
(
"HTTPSessionCleaner"
);
std
::
unique_lock
<
std
::
mutex
>
lock
{
mutex
};
std
::
unique_lock
lock
{
mutex
};
while
(
true
)
{
...
...
dbms/src/Interpreters/EmbeddedDictionaries.cpp
浏览文件 @
e33e5150
...
...
@@ -64,7 +64,7 @@ bool EmbeddedDictionaries::reloadDictionary(
bool
EmbeddedDictionaries
::
reloadImpl
(
const
bool
throw_on_error
,
const
bool
force_reload
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
/** If you can not update the directories, then despite this, do not throw an exception (use the old directories).
* If there are no old correct directories, then when using functions that depend on them,
...
...
dbms/src/Interpreters/ExpressionJIT.cpp
浏览文件 @
e33e5150
...
...
@@ -626,7 +626,7 @@ size_t CompiledExpressionCache::weight() const
{
#if LLVM_VERSION_MAJOR >= 6
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
size_t
result
{
0
};
std
::
unordered_set
<
size_t
>
seen
;
for
(
const
auto
&
cell
:
cells
)
...
...
dbms/src/Interpreters/ExternalLoader.cpp
浏览文件 @
e33e5150
...
...
@@ -92,7 +92,7 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
/// list of recreated loadable objects to perform delayed removal from unordered_map
std
::
list
<
std
::
string
>
recreated_failed_loadable_objects
;
std
::
unique_lock
<
std
::
mutex
>
all_lock
(
all_mutex
);
std
::
unique_lock
all_lock
(
all_mutex
);
/// retry loading failed loadable objects
for
(
auto
&
failed_loadable_object
:
failed_loadable_objects
)
...
...
@@ -122,7 +122,7 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
}
else
{
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
map_mutex
};
const
std
::
lock_guard
lock
{
map_mutex
};
const
auto
&
lifetime
=
loadable_ptr
->
getLifetime
();
std
::
uniform_int_distribution
<
UInt64
>
distribution
{
lifetime
.
min_sec
,
lifetime
.
max_sec
};
...
...
@@ -253,7 +253,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
}
else
{
std
::
unique_lock
<
std
::
mutex
>
all_lock
(
all_mutex
);
std
::
unique_lock
all_lock
(
all_mutex
);
auto
modification_time_it
=
last_modification_times
.
find
(
config_path
);
if
(
modification_time_it
==
std
::
end
(
last_modification_times
))
...
...
@@ -305,7 +305,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
decltype
(
loadable_objects
.
begin
())
object_it
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
map_mutex
};
std
::
lock_guard
lock
{
map_mutex
};
object_it
=
loadable_objects
.
find
(
name
);
}
...
...
@@ -342,7 +342,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
}
}
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
map_mutex
};
const
std
::
lock_guard
lock
{
map_mutex
};
/// add new loadable object or update an existing version
if
(
object_it
==
std
::
end
(
loadable_objects
))
...
...
@@ -365,7 +365,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
/// If the loadable object could not load data or even failed to initialize from the config.
/// - all the same we insert information into the `loadable_objects`, with the zero pointer `loadable`.
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
map_mutex
};
const
std
::
lock_guard
lock
{
map_mutex
};
const
auto
exception_ptr
=
std
::
current_exception
();
const
auto
loadable_it
=
loadable_objects
.
find
(
name
);
...
...
@@ -397,14 +397,14 @@ void ExternalLoader::reload(const std::string & name)
reloadFromConfigFiles
(
true
,
true
,
name
);
/// Check that specified object was loaded
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
map_mutex
};
const
std
::
lock_guard
lock
{
map_mutex
};
if
(
!
loadable_objects
.
count
(
name
))
throw
Exception
(
"Failed to load "
+
object_name
+
" '"
+
name
+
"' during the reload process"
,
ErrorCodes
::
BAD_ARGUMENTS
);
}
ExternalLoader
::
LoadablePtr
ExternalLoader
::
getLoadableImpl
(
const
std
::
string
&
name
,
bool
throw_on_error
)
const
{
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
map_mutex
};
const
std
::
lock_guard
lock
{
map_mutex
};
const
auto
it
=
loadable_objects
.
find
(
name
);
if
(
it
==
std
::
end
(
loadable_objects
))
...
...
dbms/src/Interpreters/InterserverIOHandler.h
浏览文件 @
e33e5150
...
...
@@ -84,7 +84,7 @@ class InterserverIOHandler
public:
void
addEndpoint
(
const
String
&
name
,
InterserverIOEndpointPtr
endpoint
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
bool
inserted
=
endpoint_map
.
try_emplace
(
name
,
std
::
move
(
endpoint
)).
second
;
if
(
!
inserted
)
throw
Exception
(
"Duplicate interserver IO endpoint: "
+
name
,
ErrorCodes
::
DUPLICATE_INTERSERVER_IO_ENDPOINT
);
...
...
@@ -92,7 +92,7 @@ public:
void
removeEndpoint
(
const
String
&
name
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
!
endpoint_map
.
erase
(
name
))
throw
Exception
(
"No interserver IO endpoint named "
+
name
,
ErrorCodes
::
NO_SUCH_INTERSERVER_IO_ENDPOINT
);
}
...
...
@@ -100,7 +100,7 @@ public:
InterserverIOEndpointPtr
getEndpoint
(
const
String
&
name
)
try
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
endpoint_map
.
at
(
name
);
}
catch
(...)
...
...
dbms/src/Interpreters/ProcessList.cpp
浏览文件 @
e33e5150
...
...
@@ -283,7 +283,7 @@ QueryStatus::~QueryStatus() = default;
void
QueryStatus
::
setQueryStreams
(
const
BlockIO
&
io
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
std
::
lock_guard
lock
(
query_streams_mutex
);
query_stream_in
=
io
.
in
;
query_stream_out
=
io
.
out
;
...
...
@@ -296,7 +296,7 @@ void QueryStatus::releaseQueryStreams()
BlockOutputStreamPtr
out
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
std
::
lock_guard
lock
(
query_streams_mutex
);
query_streams_status
=
QueryStreamsStatus
::
Released
;
in
=
std
::
move
(
query_stream_in
);
...
...
@@ -308,14 +308,14 @@ void QueryStatus::releaseQueryStreams()
bool
QueryStatus
::
streamsAreReleased
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
std
::
lock_guard
lock
(
query_streams_mutex
);
return
query_streams_status
==
QueryStreamsStatus
::
Released
;
}
bool
QueryStatus
::
tryGetQueryStreams
(
BlockInputStreamPtr
&
in
,
BlockOutputStreamPtr
&
out
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
std
::
lock_guard
lock
(
query_streams_mutex
);
if
(
query_streams_status
!=
QueryStreamsStatus
::
Initialized
)
return
false
;
...
...
@@ -358,7 +358,7 @@ QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query
ProcessList
::
CancellationCode
ProcessList
::
sendCancelToQuery
(
const
String
&
current_query_id
,
const
String
&
current_user
,
bool
kill
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
QueryStatus
*
elem
=
tryGetProcessListElement
(
current_query_id
,
current_user
);
...
...
@@ -431,7 +431,7 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
{
Info
per_query_infos
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
per_query_infos
.
reserve
(
processes
.
size
());
for
(
const
auto
&
process
:
processes
)
...
...
dbms/src/Interpreters/ProcessList.h
浏览文件 @
e33e5150
...
...
@@ -308,7 +308,7 @@ public:
void
setMaxSize
(
size_t
max_size_
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
max_size
=
max_size_
;
}
...
...
dbms/src/Interpreters/QueryPriorities.h
浏览文件 @
e33e5150
...
...
@@ -60,7 +60,7 @@ private:
std
::
chrono
::
nanoseconds
cur_timeout
=
timeout
;
Stopwatch
watch
(
CLOCK_MONOTONIC_COARSE
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
std
::
unique_lock
lock
(
mutex
);
while
(
true
)
{
...
...
@@ -109,7 +109,7 @@ public:
~
HandleImpl
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parent
.
mutex
);
std
::
lock_guard
lock
(
parent
.
mutex
);
--
value
.
second
;
}
parent
.
condvar
.
notify_all
();
...
...
@@ -132,7 +132,7 @@ public:
if
(
0
==
priority
)
return
{};
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
auto
it
=
container
.
emplace
(
priority
,
0
).
first
;
++
it
->
second
;
return
std
::
make_shared
<
HandleImpl
>
(
*
this
,
*
it
);
...
...
dbms/src/Interpreters/Quota.cpp
浏览文件 @
e33e5150
...
...
@@ -298,7 +298,7 @@ QuotaForIntervalsPtr Quota::get(const String & quota_key, const String & user_na
?
quota_key
:
user_name
));
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
Container
::
iterator
it
=
quota_for_keys
.
find
(
quota_key_hashed
);
if
(
quota_for_keys
.
end
()
==
it
)
...
...
dbms/src/Interpreters/tests/internal_iotop.cpp
浏览文件 @
e33e5150
...
...
@@ -52,7 +52,7 @@ void do_io(size_t id)
TaskStatsInfoGetter
get_info
;
get_info
.
getStat
(
stat
,
tid
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
std
::
cerr
<<
"#"
<<
id
<<
", tid "
<<
tid
<<
", intitial
\n
"
<<
stat
<<
"
\n
"
;
size_t
copy_size
=
1048576
*
(
1
+
id
);
...
...
@@ -67,7 +67,7 @@ void do_io(size_t id)
get_info
.
getStat
(
stat
,
tid
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
std
::
cerr
<<
"#"
<<
id
<<
", tid "
<<
tid
<<
", step1
\n
"
<<
stat
<<
"
\n
"
;
}
...
...
@@ -79,7 +79,7 @@ void do_io(size_t id)
get_info
.
getStat
(
stat
,
tid
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
std
::
cerr
<<
"#"
<<
id
<<
", tid "
<<
tid
<<
", step2
\n
"
<<
stat
<<
"
\n
"
;
}
...
...
@@ -91,7 +91,7 @@ void do_io(size_t id)
get_info
.
getStat
(
stat
,
tid
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
std
::
cerr
<<
"#"
<<
id
<<
", tid "
<<
tid
<<
", step3
\n
"
<<
stat
<<
"
\n
"
;
}
...
...
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
浏览文件 @
e33e5150
...
...
@@ -109,7 +109,7 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
{
quit
=
true
;
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
std
::
lock_guard
lock
{
mutex
};
}
cond
.
notify_one
();
thread
.
join
();
...
...
@@ -123,7 +123,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
{
quit
=
true
;
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
std
::
lock_guard
lock
{
mutex
};
}
cond
.
notify_one
();
thread
.
join
();
...
...
@@ -137,7 +137,7 @@ void StorageDistributedDirectoryMonitor::run()
{
setThreadName
(
"DistrDirMonitor"
);
std
::
unique_lock
<
std
::
mutex
>
lock
{
mutex
};
std
::
unique_lock
lock
{
mutex
};
const
auto
quit_requested
=
[
this
]
{
return
quit
.
load
(
std
::
memory_order_relaxed
);
};
...
...
dbms/src/Storages/Kafka/StorageKafka.cpp
浏览文件 @
e33e5150
...
...
@@ -427,7 +427,7 @@ StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
semaphore
.
wait
();
// Take the first available consumer from the list
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
auto
consumer
=
consumers
.
back
();
consumers
.
pop_back
();
return
consumer
;
...
...
@@ -435,7 +435,7 @@ StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
void
StorageKafka
::
pushConsumer
(
StorageKafka
::
ConsumerPtr
c
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
consumers
.
push_back
(
c
);
semaphore
.
set
();
}
...
...
dbms/src/Storages/MergeTree/DiskSpaceMonitor.h
浏览文件 @
e33e5150
...
...
@@ -41,7 +41,7 @@ public:
{
try
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
DiskSpaceMonitor
::
mutex
);
std
::
lock_guard
lock
(
DiskSpaceMonitor
::
mutex
);
if
(
DiskSpaceMonitor
::
reserved_bytes
<
size
)
{
DiskSpaceMonitor
::
reserved_bytes
=
0
;
...
...
@@ -70,7 +70,7 @@ public:
/// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked.
void
update
(
UInt64
new_size
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
DiskSpaceMonitor
::
mutex
);
std
::
lock_guard
lock
(
DiskSpaceMonitor
::
mutex
);
DiskSpaceMonitor
::
reserved_bytes
-=
size
;
size
=
new_size
;
DiskSpaceMonitor
::
reserved_bytes
+=
size
;
...
...
@@ -84,7 +84,7 @@ public:
Reservation
(
UInt64
size_
)
:
size
(
size_
),
metric_increment
(
CurrentMetrics
::
DiskSpaceReservedForMerge
,
size
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
DiskSpaceMonitor
::
mutex
);
std
::
lock_guard
lock
(
DiskSpaceMonitor
::
mutex
);
DiskSpaceMonitor
::
reserved_bytes
+=
size
;
++
DiskSpaceMonitor
::
reservation_count
;
}
...
...
@@ -108,7 +108,7 @@ public:
/// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df.
res
-=
std
::
min
(
res
,
static_cast
<
UInt64
>
(
30
*
(
1ul
<<
20
)));
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
reserved_bytes
>
res
)
res
=
0
;
...
...
@@ -120,13 +120,13 @@ public:
static
UInt64
getReservedSpace
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
reserved_bytes
;
}
static
UInt64
getReservationCount
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
return
reservation_count
;
}
...
...
dbms/src/Storages/MergeTree/MergeList.h
浏览文件 @
e33e5150
...
...
@@ -124,13 +124,13 @@ public:
template
<
typename
...
Args
>
EntryPtr
insert
(
Args
&&
...
args
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
std
::
lock_guard
lock
{
mutex
};
return
std
::
make_unique
<
Entry
>
(
*
this
,
merges
.
emplace
(
merges
.
end
(),
std
::
forward
<
Args
>
(
args
)...));
}
info_container_t
get
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
std
::
lock_guard
lock
{
mutex
};
info_container_t
res
;
for
(
const
auto
&
merge_element
:
merges
)
res
.
emplace_back
(
merge_element
.
getInfo
());
...
...
@@ -141,7 +141,7 @@ public:
inline
MergeListEntry
::~
MergeListEntry
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
list
.
mutex
};
std
::
lock_guard
lock
{
list
.
mutex
};
list
.
merges
.
erase
(
it
);
}
...
...
dbms/src/Storages/MergeTree/MergeTreeData.cpp
浏览文件 @
e33e5150
...
...
@@ -558,7 +558,7 @@ String MergeTreeData::MergingParams::getModeName() const
Int64
MergeTreeData
::
getMaxBlockNumber
()
{
std
::
lock_guard
<
std
::
mutex
>
lock_all
(
data_parts_mutex
);
std
::
lock_guard
lock_all
(
data_parts_mutex
);
Int64
max_block_num
=
0
;
for
(
const
DataPartPtr
&
part
:
data_parts_by_info
)
...
...
@@ -587,7 +587,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
DataPartsVector
broken_parts_to_detach
;
size_t
suspicious_broken_parts
=
0
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
data_parts_indexes
.
clear
();
for
(
const
String
&
file_name
:
part_file_names
)
...
...
@@ -766,7 +766,7 @@ static bool isOldPartDirectory(Poco::File & directory, time_t threshold)
void
MergeTreeData
::
clearOldTemporaryDirectories
(
ssize_t
custom_directories_lifetime_seconds
)
{
/// If the method is already called from another thread, then we don't need to do anything.
std
::
unique_lock
<
std
::
mutex
>
lock
(
clear_old_temporary_directories_mutex
,
std
::
defer_lock
);
std
::
unique_lock
lock
(
clear_old_temporary_directories_mutex
,
std
::
defer_lock
);
if
(
!
lock
.
try_lock
())
return
;
...
...
@@ -805,7 +805,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
DataPartsVector
res
;
/// If the method is already called from another thread, then we don't need to do anything.
std
::
unique_lock
<
std
::
mutex
>
lock
(
grab_old_parts_mutex
,
std
::
defer_lock
);
std
::
unique_lock
lock
(
grab_old_parts_mutex
,
std
::
defer_lock
);
if
(
!
lock
.
try_lock
())
return
res
;
...
...
@@ -813,7 +813,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
std
::
vector
<
DataPartIteratorByStateAndInfo
>
parts_to_delete
;
{
std
::
lock_guard
<
std
::
mutex
>
lock_parts
(
data_parts_mutex
);
std
::
lock_guard
lock_parts
(
data_parts_mutex
);
auto
outdated_parts_range
=
getDataPartsStateRange
(
DataPartState
::
Outdated
);
for
(
auto
it
=
outdated_parts_range
.
begin
();
it
!=
outdated_parts_range
.
end
();
++
it
)
...
...
@@ -847,7 +847,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
void
MergeTreeData
::
rollbackDeletingParts
(
const
MergeTreeData
::
DataPartsVector
&
parts
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
for
(
auto
&
part
:
parts
)
{
/// We should modify it under data_parts_mutex
...
...
@@ -859,7 +859,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
void
MergeTreeData
::
removePartsFinally
(
const
MergeTreeData
::
DataPartsVector
&
parts
)
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
/// TODO: use data_parts iterators instead of pointers
for
(
auto
&
part
:
parts
)
...
...
@@ -926,7 +926,7 @@ void MergeTreeData::dropAllData()
{
LOG_TRACE
(
log
,
"dropAllData: waiting for locks."
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
LOG_TRACE
(
log
,
"dropAllData: removing data from memory."
);
...
...
@@ -1630,7 +1630,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
DataPartsVector
covered_parts
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
unique_lock
lock
(
data_parts_mutex
);
renameTempPartAndReplace
(
part
,
increment
,
out_transaction
,
lock
,
&
covered_parts
);
}
return
covered_parts
;
...
...
@@ -1844,7 +1844,7 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
{
DataPartPtr
part_to_delete
;
{
std
::
lock_guard
<
std
::
mutex
>
lock_parts
(
data_parts_mutex
);
std
::
lock_guard
lock_parts
(
data_parts_mutex
);
LOG_TRACE
(
log
,
"Trying to immediately remove part "
<<
part
->
getNameWithState
());
...
...
@@ -1880,7 +1880,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
{
size_t
res
=
0
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
for
(
auto
&
part
:
getDataPartsStateRange
(
DataPartState
::
Committed
))
res
+=
part
->
bytes_on_disk
;
...
...
@@ -1892,7 +1892,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
size_t
MergeTreeData
::
getMaxPartsCountForPartition
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
size_t
res
=
0
;
size_t
cur_count
=
0
;
...
...
@@ -2016,7 +2016,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg
{
DataPartStateAndPartitionID
state_with_partition
{
state
,
partition_id
};
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
return
DataPartsVector
(
data_parts_by_state_and_info
.
lower_bound
(
state_with_partition
),
data_parts_by_state_and_info
.
upper_bound
(
state_with_partition
));
...
...
@@ -2025,7 +2025,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg
MergeTreeData
::
DataPartPtr
MergeTreeData
::
getPartIfExists
(
const
MergeTreePartInfo
&
part_info
,
const
MergeTreeData
::
DataPartStates
&
valid_states
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
auto
it
=
data_parts_by_info
.
find
(
part_info
);
if
(
it
==
data_parts_by_info
.
end
())
...
...
@@ -2266,7 +2266,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartS
DataPartsVector
res
;
DataPartsVector
buf
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
for
(
auto
state
:
affordable_states
)
{
...
...
@@ -2292,7 +2292,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
{
DataPartsVector
res
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
res
.
assign
(
data_parts_by_info
.
begin
(),
data_parts_by_info
.
end
());
if
(
out_states
!=
nullptr
)
...
...
@@ -2310,7 +2310,7 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo
{
DataParts
res
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
data_parts_mutex
);
std
::
lock_guard
lock
(
data_parts_mutex
);
for
(
auto
state
:
affordable_states
)
{
auto
range
=
getDataPartsStateRange
(
state
);
...
...
dbms/src/Storages/MergeTree/MergeTreeData.h
浏览文件 @
e33e5150
...
...
@@ -531,7 +531,7 @@ public:
size_t
getColumnCompressedSize
(
const
std
::
string
&
name
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
data_parts_mutex
};
std
::
lock_guard
lock
{
data_parts_mutex
};
const
auto
it
=
column_sizes
.
find
(
name
);
return
it
==
std
::
end
(
column_sizes
)
?
0
:
it
->
second
.
data_compressed
;
...
...
@@ -540,14 +540,14 @@ public:
using
ColumnSizeByName
=
std
::
unordered_map
<
std
::
string
,
DataPart
::
ColumnSize
>
;
ColumnSizeByName
getColumnSizes
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
data_parts_mutex
};
std
::
lock_guard
lock
{
data_parts_mutex
};
return
column_sizes
;
}
/// Calculates column sizes in compressed form for the current state of data_parts.
void
recalculateColumnSizes
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
{
data_parts_mutex
};
std
::
lock_guard
lock
{
data_parts_mutex
};
calculateColumnSizesImpl
();
}
...
...
dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp
浏览文件 @
e33e5150
...
...
@@ -41,7 +41,7 @@ MergeTreeReadPool::MergeTreeReadPool(
MergeTreeReadTaskPtr
MergeTreeReadPool
::
getTask
(
const
size_t
min_marks_to_read
,
const
size_t
thread
,
const
Names
&
ordered_names
)
{
const
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
const
std
::
lock_guard
lock
{
mutex
};
/// If number of threads was lowered due to backoff, then will assign work only for maximum 'backoff_state.current_threads' threads.
if
(
thread
>=
backoff_state
.
current_threads
)
...
...
@@ -164,7 +164,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
if
(
info
.
nanoseconds
<
backoff_settings
.
min_read_latency_ms
*
1000000
)
return
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
if
(
backoff_state
.
current_threads
<=
1
)
return
;
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
浏览文件 @
e33e5150
...
...
@@ -38,7 +38,7 @@ ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread()
void
ReplicatedMergeTreePartCheckThread
::
start
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
start_stop_mutex
);
std
::
lock_guard
lock
(
start_stop_mutex
);
need_stop
=
false
;
task
->
activateAndSchedule
();
}
...
...
@@ -48,14 +48,14 @@ void ReplicatedMergeTreePartCheckThread::stop()
//based on discussion on https://github.com/yandex/ClickHouse/pull/1489#issuecomment-344756259
//using the schedule pool there is no problem in case stop is called two time in row and the start multiple times
std
::
lock_guard
<
std
::
mutex
>
lock
(
start_stop_mutex
);
std
::
lock_guard
lock
(
start_stop_mutex
);
need_stop
=
true
;
task
->
deactivate
();
}
void
ReplicatedMergeTreePartCheckThread
::
enqueuePart
(
const
String
&
name
,
time_t
delay_to_check_seconds
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parts_mutex
);
std
::
lock_guard
lock
(
parts_mutex
);
if
(
parts_set
.
count
(
name
))
return
;
...
...
@@ -68,7 +68,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
size_t
ReplicatedMergeTreePartCheckThread
::
size
()
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parts_mutex
);
std
::
lock_guard
lock
(
parts_mutex
);
return
parts_set
.
size
();
}
...
...
@@ -295,7 +295,7 @@ void ReplicatedMergeTreePartCheckThread::run()
time_t
min_check_time
=
std
::
numeric_limits
<
time_t
>::
max
();
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parts_mutex
);
std
::
lock_guard
lock
(
parts_mutex
);
if
(
parts_queue
.
empty
())
{
...
...
@@ -331,7 +331,7 @@ void ReplicatedMergeTreePartCheckThread::run()
/// Remove the part from check queue.
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parts_mutex
);
std
::
lock_guard
lock
(
parts_mutex
);
if
(
parts_queue
.
empty
())
{
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
浏览文件 @
e33e5150
...
...
@@ -294,7 +294,7 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
size_t
queue_size
=
0
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
unique_lock
lock
(
state_mutex
);
/// Remove the job from the queue in the RAM.
/// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
...
...
@@ -335,7 +335,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
std
::
optional
<
time_t
>
max_processed_insert_time_changed
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
unique_lock
lock
(
state_mutex
);
virtual_parts
.
remove
(
part_name
);
...
...
@@ -370,7 +370,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
bool
ReplicatedMergeTreeQueue
::
removeFromVirtualParts
(
const
MergeTreePartInfo
&
part_info
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
unique_lock
lock
(
state_mutex
);
return
virtual_parts
.
remove
(
part_info
);
}
...
...
@@ -711,7 +711,7 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(zkutil::ZooKeeperPt
std
::
optional
<
time_t
>
max_processed_insert_time_changed
;
/// Remove operations with parts, contained in the range to be deleted, from the queue.
std
::
unique_lock
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
unique_lock
lock
(
state_mutex
);
for
(
Queue
::
iterator
it
=
queue
.
begin
();
it
!=
queue
.
end
();)
{
auto
type
=
(
*
it
)
->
type
;
...
...
@@ -785,7 +785,7 @@ size_t ReplicatedMergeTreeQueue::getConflictsCountForRange(
void
ReplicatedMergeTreeQueue
::
checkThereAreNoConflictsInRange
(
const
MergeTreePartInfo
&
range
,
const
LogEntry
&
entry
)
{
String
conflicts_description
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
lock_guard
lock
(
state_mutex
);
if
(
0
!=
getConflictsCountForRange
(
range
,
entry
,
&
conflicts_description
,
lock
))
throw
Exception
(
conflicts_description
,
ErrorCodes
::
UNFINISHED
);
...
...
@@ -1013,7 +1013,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
{
LogEntryPtr
entry
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
lock_guard
lock
(
state_mutex
);
for
(
auto
it
=
queue
.
begin
();
it
!=
queue
.
end
();
++
it
)
{
...
...
@@ -1635,8 +1635,8 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge
ReplicatedMergeTreeQueue
::
SubscriberHandler
ReplicatedMergeTreeQueue
::
addSubscriber
(
ReplicatedMergeTreeQueue
::
SubscriberCallBack
&&
callback
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
state_mutex
);
std
::
lock_guard
<
std
::
mutex
>
lock_subscribers
(
subscribers_mutex
);
std
::
lock_guard
lock
(
state_mutex
);
std
::
lock_guard
lock_subscribers
(
subscribers_mutex
);
auto
it
=
subscribers
.
emplace
(
subscribers
.
end
(),
std
::
move
(
callback
));
...
...
@@ -1648,13 +1648,13 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
ReplicatedMergeTreeQueue
::
SubscriberHandler
::~
SubscriberHandler
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue
.
subscribers_mutex
);
std
::
lock_guard
lock
(
queue
.
subscribers_mutex
);
queue
.
subscribers
.
erase
(
it
);
}
void
ReplicatedMergeTreeQueue
::
notifySubscribers
(
size_t
new_queue_size
)
{
std
::
lock_guard
<
std
::
mutex
>
lock_subscribers
(
subscribers_mutex
);
std
::
lock_guard
lock_subscribers
(
subscribers_mutex
);
for
(
auto
&
subscriber_callback
:
subscribers
)
subscriber_callback
(
new_queue_size
);
}
...
...
dbms/src/Storages/StorageBuffer.cpp
浏览文件 @
e33e5150
...
...
@@ -99,7 +99,7 @@ protected:
return
res
;
has_been_read
=
true
;
std
::
lock_guard
<
std
::
mutex
>
lock
(
buffer
.
mutex
);
std
::
lock_guard
lock
(
buffer
.
mutex
);
if
(
!
buffer
.
data
.
rows
())
return
res
;
...
...
@@ -336,7 +336,7 @@ public:
for
(
size_t
try_no
=
0
;
try_no
<
storage
.
num_shards
;
++
try_no
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
storage
.
buffers
[
shard_num
].
mutex
,
std
::
try_to_lock
);
std
::
unique_lock
lock
(
storage
.
buffers
[
shard_num
].
mutex
,
std
::
try_to_lock
);
if
(
lock
.
owns_lock
())
{
...
...
@@ -356,7 +356,7 @@ public:
if
(
!
least_busy_buffer
)
{
least_busy_buffer
=
&
storage
.
buffers
[
start_shard_num
];
least_busy_lock
=
std
::
unique_lock
<
std
::
mutex
>
(
least_busy_buffer
->
mutex
);
least_busy_lock
=
std
::
unique_lock
(
least_busy_buffer
->
mutex
);
}
insertIntoBuffer
(
block
,
*
least_busy_buffer
);
}
...
...
@@ -527,7 +527,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
size_t
bytes
=
0
;
time_t
time_passed
=
0
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
buffer
.
mutex
,
std
::
defer_lock
);
std
::
unique_lock
lock
(
buffer
.
mutex
,
std
::
defer_lock
);
if
(
!
locked
)
lock
.
lock
();
...
...
dbms/src/Storages/StorageMemory.cpp
浏览文件 @
e33e5150
...
...
@@ -66,7 +66,7 @@ public:
void
write
(
const
Block
&
block
)
override
{
storage
.
check
(
block
,
true
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
storage
.
mutex
);
std
::
lock_guard
lock
(
storage
.
mutex
);
storage
.
data
.
push_back
(
block
);
}
private:
...
...
@@ -90,7 +90,7 @@ BlockInputStreams StorageMemory::read(
{
check
(
column_names
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
size_t
size
=
data
.
size
();
...
...
@@ -123,13 +123,13 @@ BlockOutputStreamPtr StorageMemory::write(
void
StorageMemory
::
drop
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
data
.
clear
();
}
void
StorageMemory
::
truncate
(
const
ASTPtr
&
,
const
Context
&
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
std
::
lock_guard
lock
(
mutex
);
data
.
clear
();
}
...
...
dbms/src/Storages/StorageMergeTree.cpp
浏览文件 @
e33e5150
...
...
@@ -277,7 +277,7 @@ struct CurrentlyMergingPartsTagger
~
CurrentlyMergingPartsTagger
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
storage
->
currently_merging_mutex
);
std
::
lock_guard
lock
(
storage
->
currently_merging_mutex
);
for
(
const
auto
&
part
:
parts
)
{
...
...
@@ -386,7 +386,7 @@ bool StorageMergeTree::merge(
std
::
optional
<
CurrentlyMergingPartsTagger
>
merging_tagger
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
currently_merging_mutex
);
std
::
lock_guard
lock
(
currently_merging_mutex
);
auto
can_merge
=
[
this
,
&
lock
]
(
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
,
String
*
)
{
...
...
@@ -492,7 +492,7 @@ bool StorageMergeTree::tryMutatePart()
{
auto
disk_space
=
DiskSpaceMonitor
::
getUnreservedFreeSpace
(
full_path
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
currently_merging_mutex
);
std
::
lock_guard
lock
(
currently_merging_mutex
);
if
(
current_mutations_by_version
.
empty
())
return
false
;
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
e33e5150
...
...
@@ -174,13 +174,13 @@ thread_local
void
StorageReplicatedMergeTree
::
setZooKeeper
(
zkutil
::
ZooKeeperPtr
zookeeper
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
current_zookeeper_mutex
);
std
::
lock_guard
lock
(
current_zookeeper_mutex
);
current_zookeeper
=
zookeeper
;
}
zkutil
::
ZooKeeperPtr
StorageReplicatedMergeTree
::
tryGetZooKeeper
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
current_zookeeper_mutex
);
std
::
lock_guard
lock
(
current_zookeeper_mutex
);
return
current_zookeeper
;
}
...
...
@@ -2144,7 +2144,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
{
/// We must select parts for merge under merge_selecting_mutex because other threads
/// (OPTIMIZE queries) can assign new merges.
std
::
lock_guard
<
std
::
mutex
>
merge_selecting_lock
(
merge_selecting_mutex
);
std
::
lock_guard
merge_selecting_lock
(
merge_selecting_mutex
);
auto
zookeeper
=
getZooKeeper
();
...
...
@@ -2644,7 +2644,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
}
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
currently_fetching_parts_mutex
);
std
::
lock_guard
lock
(
currently_fetching_parts_mutex
);
if
(
!
currently_fetching_parts
.
insert
(
part_name
).
second
)
{
LOG_DEBUG
(
log
,
"Part "
<<
part_name
<<
" is already fetching right now"
);
...
...
@@ -2654,7 +2654,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
SCOPE_EXIT
({
std
::
lock_guard
<
std
::
mutex
>
lock
(
currently_fetching_parts_mutex
);
std
::
lock_guard
lock
(
currently_fetching_parts_mutex
);
currently_fetching_parts
.
erase
(
part_name
);
});
...
...
@@ -2948,7 +2948,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
{
/// We must select parts for merge under merge_selecting_mutex because other threads
/// (merge_selecting_thread or OPTIMIZE queries) could assign new merges.
std
::
lock_guard
<
std
::
mutex
>
merge_selecting_lock
(
merge_selecting_mutex
);
std
::
lock_guard
merge_selecting_lock
(
merge_selecting_mutex
);
auto
zookeeper
=
getZooKeeper
();
ReplicatedMergeTreeMergePredicate
can_merge
=
queue
.
getMergePredicate
(
zookeeper
);
...
...
@@ -3620,7 +3620,7 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str
bool
StorageReplicatedMergeTree
::
existsNodeCached
(
const
std
::
string
&
path
)
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
existing_nodes_cache_mutex
);
std
::
lock_guard
lock
(
existing_nodes_cache_mutex
);
if
(
existing_nodes_cache
.
count
(
path
))
return
true
;
}
...
...
@@ -3629,7 +3629,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
if
(
res
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
existing_nodes_cache_mutex
);
std
::
lock_guard
lock
(
existing_nodes_cache_mutex
);
existing_nodes_cache
.
insert
(
path
);
}
...
...
@@ -4666,7 +4666,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
/// It does not provides strong guarantees, but is suitable for intended use case (assume merges are quite rare).
{
std
::
lock_guard
<
std
::
mutex
>
merge_selecting_lock
(
merge_selecting_mutex
);
std
::
lock_guard
merge_selecting_lock
(
merge_selecting_mutex
);
queue
.
disableMergesInRange
(
drop_range_fake_part_name
);
}
}
...
...
@@ -4913,7 +4913,7 @@ bool StorageReplicatedMergeTree::dropPartsInPartition(
*/
String
drop_range_fake_part_name
=
getPartNamePossiblyFake
(
data
.
format_version
,
drop_range_info
);
{
std
::
lock_guard
<
std
::
mutex
>
merge_selecting_lock
(
merge_selecting_mutex
);
std
::
lock_guard
merge_selecting_lock
(
merge_selecting_mutex
);
queue
.
disableMergesInRange
(
drop_range_fake_part_name
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录