Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
566160c0
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
566160c0
编写于
9月 01, 2017
作者:
V
Vitaliy Lyudvichenko
提交者:
alexey-milovidov
9月 01, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Use FIFO lock in IStorage. [#CLICKHOUSE-3246]
上级
add3f920
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
113 addition
and
74 deletion
+113
-74
dbms/src/Common/RWLockFIFO.cpp
dbms/src/Common/RWLockFIFO.cpp
+35
-7
dbms/src/Common/RWLockFIFO.h
dbms/src/Common/RWLockFIFO.h
+17
-12
dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp
dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp
+3
-2
dbms/src/DataStreams/PushingToViewsBlockOutputStream.h
dbms/src/DataStreams/PushingToViewsBlockOutputStream.h
+1
-1
dbms/src/Interpreters/InterpreterCreateQuery.cpp
dbms/src/Interpreters/InterpreterCreateQuery.cpp
+2
-2
dbms/src/Interpreters/InterpreterDescribeQuery.cpp
dbms/src/Interpreters/InterpreterDescribeQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterDropQuery.cpp
dbms/src/Interpreters/InterpreterDropQuery.cpp
+2
-2
dbms/src/Interpreters/InterpreterInsertQuery.cpp
dbms/src/Interpreters/InterpreterInsertQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterOptimizeQuery.cpp
dbms/src/Interpreters/InterpreterOptimizeQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterRenameQuery.cpp
dbms/src/Interpreters/InterpreterRenameQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterSelectQuery.cpp
dbms/src/Interpreters/InterpreterSelectQuery.cpp
+1
-1
dbms/src/Storages/IStorage.cpp
dbms/src/Storages/IStorage.cpp
+4
-4
dbms/src/Storages/IStorage.h
dbms/src/Storages/IStorage.h
+21
-16
dbms/src/Storages/MergeTree/DataPartsExchange.cpp
dbms/src/Storages/MergeTree/DataPartsExchange.cpp
+1
-1
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
...src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
+2
-2
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
...Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
+1
-1
dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp
dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp
+1
-1
dbms/src/Storages/StorageBuffer.cpp
dbms/src/Storages/StorageBuffer.cpp
+1
-1
dbms/src/Storages/StorageDistributed.cpp
dbms/src/Storages/StorageDistributed.cpp
+1
-1
dbms/src/Storages/StorageMerge.cpp
dbms/src/Storages/StorageMerge.cpp
+2
-2
dbms/src/Storages/StorageMergeTree.cpp
dbms/src/Storages/StorageMergeTree.cpp
+6
-6
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+5
-5
dbms/src/Storages/StorageTrivialBuffer.cpp
dbms/src/Storages/StorageTrivialBuffer.cpp
+1
-1
dbms/src/Storages/System/StorageSystemColumns.cpp
dbms/src/Storages/System/StorageSystemColumns.cpp
+1
-1
dbms/src/Storages/System/StorageSystemParts.cpp
dbms/src/Storages/System/StorageSystemParts.cpp
+1
-1
未找到文件。
dbms/src/Common/RWLockFIFO.cpp
浏览文件 @
566160c0
#include <boost/core/noncopyable.hpp>
#include "RWLockFIFO.h"
#include <Common/Exception.h>
#include <iostream>
#include <Poco/Ext/ThreadNumber.h>
namespace
DB
{
...
...
@@ -12,7 +15,6 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
if
(
type
==
Type
::
Write
||
queue
.
empty
()
||
queue
.
back
().
type
==
Type
::
Write
)
{
/// Create new group of clients
...
...
@@ -38,15 +40,41 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
throw
;
}
it_client
->
thread_number
=
Poco
::
ThreadNumber
::
get
();
it_client
->
enqueue_time
=
time
(
nullptr
);
it_client
->
type
=
type
;
LockHandler
res
=
std
::
make_unique
<
LockHandlerImpl
>
(
shared_from_this
(),
it_group
,
it_client
);
/// We are first, we should not wait anything
/// If we are not the first client in the group, a notification could be already sent
if
(
it_group
==
queue
.
begin
())
{
it_client
->
start_time
=
it_client
->
enqueue_time
;
return
res
;
}
/// Wait a notification
it_group
->
cv
.
wait
(
lock
,
[
&
it_group
]
()
{
return
it_group
->
awakened
;
}
);
it_group
->
cv
.
wait
(
lock
,
[
&
]
()
{
return
it_group
==
queue
.
begin
();
}
);
it_client
->
start_time
=
time
(
nullptr
);
return
res
;
}
RWLockFIFO
::
Clients
RWLockFIFO
::
getClientsInTheQueue
()
const
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
Clients
res
;
for
(
const
auto
&
group
:
queue
)
{
for
(
const
auto
&
client
:
group
.
clients
)
{
res
.
emplace_back
(
client
);
}
}
return
res
;
}
...
...
@@ -64,17 +92,17 @@ void RWLockFIFO::LockHandlerImpl::unlock()
queue
.
erase
(
it_group
);
if
(
!
queue
.
empty
())
{
queue
.
front
().
awakened
=
true
;
queue
.
front
().
cv
.
notify_all
();
}
}
parent
.
reset
();
}
RWLockFIFO
::
LockHandlerImpl
::~
LockHandlerImpl
()
{
unlock
();
if
(
parent
)
unlock
();
}
...
...
dbms/src/Common/RWLockFIFO.h
浏览文件 @
566160c0
#pragma once
#include <boost/core/noncopyable.hpp>
#include <list>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <Common/Exception.h>
namespace
DB
{
namespace
ErrorCodes
{
extern
const
int
LOGICAL_ERROR
;
}
struct
RWLockFIFO
;
using
RWLockFIFOPtr
=
std
::
shared_ptr
<
RWLockFIFO
>
;
/// Implements shared lock with FIFO service
/// It does not work as recursive mutex, so a deadlock will occur if you try to acquire 2 locks in the same thread
class
RWLockFIFO
:
public
std
::
enable_shared_from_this
<
RWLockFIFO
>
{
public:
...
...
@@ -38,21 +33,31 @@ public:
/// Client is that who wants to acquire the lock.
struct
Client
{
explicit
Client
(
const
std
::
string
&
info
=
"Anonymous client"
)
:
info
{
info
}
{}
explicit
Client
(
const
std
::
string
&
info
=
{})
:
info
{
info
}
{}
std
::
string
info
;
int
thread_number
=
0
;
std
::
time_t
enqueue_time
=
0
;
std
::
time_t
start_time
=
0
;
Type
type
;
};
class
LockHandlerImpl
;
using
LockHandler
=
std
::
unique_ptr
<
LockHandlerImpl
>
;
/// Waits in the queue and returns appropriate lock
LockHandler
getLock
(
Type
type
,
Client
client
);
LockHandler
getLock
(
Type
type
,
Client
client
=
Client
{}
);
LockHandler
getLock
(
Type
type
,
const
std
::
string
&
who
)
{
return
getLock
(
type
,
Client
(
who
));
}
using
Clients
=
std
::
vector
<
Client
>
;
/// Returns list of executing and waiting clients
Clients
getClientsInTheQueue
()
const
;
private:
RWLockFIFO
()
=
default
;
...
...
@@ -69,7 +74,6 @@ private:
ClientsContainer
clients
;
std
::
condition_variable
cv
;
/// all clients of the group wait group condvar
bool
awakened
{
false
};
/// just only to handle spurious wake ups
explicit
Group
(
Type
type
)
:
type
{
type
}
{}
};
...
...
@@ -88,6 +92,7 @@ public:
LockHandlerImpl
(
const
LockHandlerImpl
&
other
)
=
delete
;
/// Unlocks acquired lock
void
unlock
();
~
LockHandlerImpl
();
...
...
@@ -97,7 +102,7 @@ public:
private:
std
::
mutex
mutex
;
mutable
std
::
mutex
mutex
;
GroupsContainer
queue
;
};
...
...
dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp
浏览文件 @
566160c0
...
...
@@ -24,7 +24,7 @@ static void execute_1(size_t threads, int round, int cycles)
for
(
int
i
=
0
;
i
<
cycles
;
++
i
)
{
auto
type
=
(
std
::
uniform_int_distribution
<>
(
0
,
9
)(
gen
)
>=
round
)
?
RWLockFIFO
::
Read
:
RWLockFIFO
::
Write
;
auto
sleep_for
=
std
::
chrono
::
duration
<
int
,
std
::
micro
>
(
std
::
uniform_int_distribution
<>
(
1
,
5
)(
gen
));
auto
sleep_for
=
std
::
chrono
::
duration
<
int
,
std
::
micro
>
(
std
::
uniform_int_distribution
<>
(
1
,
1000
)(
gen
));
auto
lock
=
fifo_lock
->
getLock
(
type
,
"RW"
);
...
...
@@ -57,8 +57,9 @@ static void execute_1(size_t threads, int round, int cycles)
TEST
(
Common
,
RWLockFIFO_1
)
{
constexpr
int
cycles
=
10000
;
const
std
::
vector
<
size_t
>
pool_sizes
{
1
,
2
,
4
,
8
};
for
(
size_t
pool_size
=
1
;
pool_size
<
8
;
++
pool_size
)
for
(
auto
pool_size
:
pool_sizes
)
{
for
(
int
round
=
0
;
round
<
10
;
++
round
)
{
...
...
dbms/src/DataStreams/PushingToViewsBlockOutputStream.h
浏览文件 @
566160c0
...
...
@@ -26,7 +26,7 @@ public:
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock
(
storage
->
lockStructure
(
true
));
addTableLock
(
storage
->
lockStructure
(
true
,
__PRETTY_FUNCTION__
));
Dependencies
dependencies
=
context
.
getDependencies
(
database
,
table
);
for
(
const
auto
&
database_table
:
dependencies
)
...
...
dbms/src/Interpreters/InterpreterCreateQuery.cpp
浏览文件 @
566160c0
...
...
@@ -506,7 +506,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if
(
!
as_table_name
.
empty
())
{
as_storage
=
context
.
getTable
(
as_database_name
,
as_table_name
);
as_storage_lock
=
as_storage
->
lockStructure
(
false
);
as_storage_lock
=
as_storage
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
}
/// Set and retrieve list of columns.
...
...
@@ -557,7 +557,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// If the CREATE SELECT query is, insert the data into the table
if
(
create
.
select
&&
storage_name
!=
"View"
&&
(
storage_name
!=
"MaterializedView"
||
create
.
is_populate
))
{
auto
table_lock
=
res
->
lockStructure
(
true
);
auto
table_lock
=
res
->
lockStructure
(
true
,
__PRETTY_FUNCTION__
);
/// Also see InterpreterInsertQuery.
BlockOutputStreamPtr
out
=
...
...
dbms/src/Interpreters/InterpreterDescribeQuery.cpp
浏览文件 @
566160c0
...
...
@@ -60,7 +60,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
{
StoragePtr
table
=
context
.
getTable
(
ast
.
database
,
ast
.
table
);
auto
table_lock
=
table
->
lockStructure
(
false
);
auto
table_lock
=
table
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
columns
=
table
->
getColumnsList
();
columns
.
insert
(
std
::
end
(
columns
),
std
::
begin
(
table
->
alias_columns
),
std
::
end
(
table
->
alias_columns
));
column_defaults
=
table
->
column_defaults
;
...
...
dbms/src/Interpreters/InterpreterDropQuery.cpp
浏览文件 @
566160c0
...
...
@@ -53,7 +53,7 @@ BlockIO InterpreterDropQuery::execute()
{
table
->
shutdown
();
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
table
->
lockForAlter
();
auto
table_lock
=
table
->
lockForAlter
(
__PRETTY_FUNCTION__
);
/// Delete table data
table
->
drop
();
table
->
is_dropped
=
true
;
...
...
@@ -116,7 +116,7 @@ BlockIO InterpreterDropQuery::execute()
table
.
first
->
shutdown
();
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
table
.
first
->
lockForAlter
();
auto
table_lock
=
table
.
first
->
lockForAlter
(
__PRETTY_FUNCTION__
);
String
current_table_name
=
table
.
first
->
getTableName
();
...
...
dbms/src/Interpreters/InterpreterInsertQuery.cpp
浏览文件 @
566160c0
...
...
@@ -86,7 +86,7 @@ BlockIO InterpreterInsertQuery::execute()
ASTInsertQuery
&
query
=
typeid_cast
<
ASTInsertQuery
&>
(
*
query_ptr
);
StoragePtr
table
=
getTable
();
auto
table_lock
=
table
->
lockStructure
(
true
);
auto
table_lock
=
table
->
lockStructure
(
true
,
__PRETTY_FUNCTION__
);
NamesAndTypesListPtr
required_columns
=
std
::
make_shared
<
NamesAndTypesList
>
(
table
->
getColumnsList
());
...
...
dbms/src/Interpreters/InterpreterOptimizeQuery.cpp
浏览文件 @
566160c0
...
...
@@ -22,7 +22,7 @@ BlockIO InterpreterOptimizeQuery::execute()
throw
Exception
(
"FINAL flag for OPTIMIZE query is meaningful only with specified PARTITION"
,
ErrorCodes
::
BAD_ARGUMENTS
);
StoragePtr
table
=
context
.
getTable
(
ast
.
database
,
ast
.
table
);
auto
table_lock
=
table
->
lockStructure
(
true
);
auto
table_lock
=
table
->
lockStructure
(
true
,
__PRETTY_FUNCTION__
);
table
->
optimize
(
query_ptr
,
ast
.
partition
,
ast
.
final
,
ast
.
deduplicate
,
context
.
getSettings
());
return
{};
}
...
...
dbms/src/Interpreters/InterpreterRenameQuery.cpp
浏览文件 @
566160c0
...
...
@@ -100,7 +100,7 @@ BlockIO InterpreterRenameQuery::execute()
for
(
const
auto
&
names
:
unique_tables_from
)
if
(
auto
table
=
context
.
tryGetTable
(
names
.
database_name
,
names
.
table_name
))
locks
.
emplace_back
(
table
->
lockForAlter
());
locks
.
emplace_back
(
table
->
lockForAlter
(
__PRETTY_FUNCTION__
));
/** All tables are locked. If there are more than one rename in chain,
* we need to hold global lock while doing all renames. Order matters to avoid deadlocks.
...
...
dbms/src/Interpreters/InterpreterSelectQuery.cpp
浏览文件 @
566160c0
...
...
@@ -156,7 +156,7 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_)
storage
=
context
.
getTable
(
database_name
,
table_name
);
}
table_lock
=
storage
->
lockStructure
(
false
);
table_lock
=
storage
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
if
(
table_column_names
.
empty
())
table_column_names
=
storage
->
getColumnsListNonMaterialized
();
}
...
...
dbms/src/Storages/IStorage.cpp
浏览文件 @
566160c0
...
...
@@ -4,13 +4,13 @@
namespace
DB
{
TableStructureReadLock
::
TableStructureReadLock
(
StoragePtr
storage_
,
bool
lock_structure
,
bool
lock_data
)
:
storage
(
storage_
)
,
data_lock
(
storage
->
data_lock
,
std
::
defer_lock
),
structure_lock
(
storage
->
structure_lock
,
std
::
defer_lock
)
TableStructureReadLock
::
TableStructureReadLock
(
StoragePtr
storage_
,
bool
lock_structure
,
bool
lock_data
,
const
std
::
string
&
who
)
:
storage
(
storage_
)
{
if
(
lock_data
)
data_lock
.
lock
(
);
data_lock
=
storage
->
data_lock
->
getLock
(
RWLockFIFO
::
Read
,
who
);
if
(
lock_structure
)
structure_lock
.
lock
(
);
structure_lock
=
storage
->
structure_lock
->
getLock
(
RWLockFIFO
::
Read
,
who
);
}
}
dbms/src/Storages/IStorage.h
浏览文件 @
566160c0
...
...
@@ -2,6 +2,7 @@
#include <Core/Names.h>
#include <Common/Exception.h>
#include <Common/RWLockFIFO.h>
#include <Core/QueryProcessingStage.h>
#include <Storages/ITableDeclaration.h>
#include <Storages/SelectQueryInfo.h>
...
...
@@ -22,6 +23,9 @@ class Context;
class
IBlockInputStream
;
class
IBlockOutputStream
;
struct
RWLockFIFO
;
using
RWLockFIFOPtr
=
std
::
shared_ptr
<
RWLockFIFO
>
;
using
BlockOutputStreamPtr
=
std
::
shared_ptr
<
IBlockOutputStream
>
;
using
BlockInputStreamPtr
=
std
::
shared_ptr
<
IBlockInputStream
>
;
using
BlockInputStreams
=
std
::
vector
<
BlockInputStreamPtr
>
;
...
...
@@ -54,18 +58,19 @@ private:
StoragePtr
storage
;
/// Order is important.
std
::
shared_lock
<
std
::
shared_mutex
>
data_lock
;
std
::
shared_lock
<
std
::
shared_mutex
>
structure_lock
;
RWLockFIFO
::
LockHandler
data_lock
;
RWLockFIFO
::
LockHandler
structure_lock
;
public:
TableStructureReadLock
(
StoragePtr
storage_
,
bool
lock_structure
,
bool
lock_data
);
TableStructureReadLock
(
StoragePtr
storage_
,
bool
lock_structure
,
bool
lock_data
,
const
std
::
string
&
who
);
};
using
TableStructureReadLockPtr
=
std
::
shared_ptr
<
TableStructureReadLock
>
;
using
TableStructureReadLocks
=
std
::
vector
<
TableStructureReadLockPtr
>
;
using
TableStructureWriteLock
=
std
::
unique_lock
<
std
::
shared_mutex
>
;
using
TableDataWriteLock
=
std
::
unique_lock
<
std
::
shared_mutex
>
;
using
TableStructureWriteLock
=
RWLockFIFO
::
LockHandler
;
using
TableDataWriteLock
=
RWLockFIFO
::
LockHandler
;
using
TableFullWriteLock
=
std
::
pair
<
TableDataWriteLock
,
TableStructureWriteLock
>
;
...
...
@@ -107,9 +112,9 @@ public:
* WARNING: You need to call methods from ITableDeclaration under such a lock. Without it, they are not thread safe.
* WARNING: To avoid deadlocks, this method must not be called under lock of Context.
*/
TableStructureReadLockPtr
lockStructure
(
bool
will_modify_data
)
TableStructureReadLockPtr
lockStructure
(
bool
will_modify_data
,
const
std
::
string
&
who
=
"Anonymous"
)
{
TableStructureReadLockPtr
res
=
std
::
make_shared
<
TableStructureReadLock
>
(
shared_from_this
(),
true
,
will_modify_data
);
TableStructureReadLockPtr
res
=
std
::
make_shared
<
TableStructureReadLock
>
(
shared_from_this
(),
true
,
will_modify_data
,
who
);
if
(
is_dropped
)
throw
Exception
(
"Table is dropped"
,
ErrorCodes
::
TABLE_IS_DROPPED
);
return
res
;
...
...
@@ -117,11 +122,11 @@ public:
/** Does not allow reading the table structure. It is taken for ALTER, RENAME and DROP.
*/
TableFullWriteLock
lockForAlter
()
TableFullWriteLock
lockForAlter
(
const
std
::
string
&
who
=
"Alter"
)
{
/// The calculation order is important.
auto
data_lock
=
lockDataForAlter
();
auto
structure_lock
=
lockStructureForAlter
();
auto
data_lock
=
lockDataForAlter
(
who
);
auto
structure_lock
=
lockStructureForAlter
(
who
);
return
{
std
::
move
(
data_lock
),
std
::
move
(
structure_lock
)};
}
...
...
@@ -130,17 +135,17 @@ public:
* It is taken during write temporary data in ALTER MODIFY.
* Under this lock, you can take lockStructureForAlter() to change the structure of the table.
*/
TableDataWriteLock
lockDataForAlter
()
TableDataWriteLock
lockDataForAlter
(
const
std
::
string
&
who
=
"Alter"
)
{
std
::
unique_lock
<
std
::
shared_mutex
>
res
(
data_lock
);
auto
res
=
data_lock
->
getLock
(
RWLockFIFO
::
Write
,
who
);
if
(
is_dropped
)
throw
Exception
(
"Table is dropped"
,
ErrorCodes
::
TABLE_IS_DROPPED
);
return
res
;
}
TableStructureWriteLock
lockStructureForAlter
()
TableStructureWriteLock
lockStructureForAlter
(
const
std
::
string
&
who
=
"Alter"
)
{
std
::
unique_lock
<
std
::
shared_mutex
>
res
(
structure_lock
);
auto
res
=
structure_lock
->
getLock
(
RWLockFIFO
::
Write
,
who
);
if
(
is_dropped
)
throw
Exception
(
"Table is dropped"
,
ErrorCodes
::
TABLE_IS_DROPPED
);
return
res
;
...
...
@@ -316,7 +321,7 @@ private:
* 2) all changes to the data after releasing the lock will be based on the structure of the table at the time after the lock was released.
* You need to take for read for the entire time of the operation that changes the data.
*/
mutable
std
::
shared_mutex
data_lock
;
mutable
RWLockFIFOPtr
data_lock
=
RWLockFIFO
::
create
()
;
/** Lock for multiple columns and path to table. It is taken for write at RENAME, ALTER (for ALTER MODIFY for a while) and DROP.
* It is taken for read for the whole time of SELECT, INSERT and merge parts (for MergeTree).
...
...
@@ -325,7 +330,7 @@ private:
* That is, if this lock is taken for write, you should not worry about `parts_writing_lock`.
* parts_writing_lock is only needed for cases when you do not want to take `table_structure_lock` for long operations (ALTER MODIFY).
*/
mutable
std
::
shared_mutex
structure_lock
;
mutable
RWLockFIFOPtr
structure_lock
=
RWLockFIFO
::
create
()
;
};
/// table name -> table
...
...
dbms/src/Storages/MergeTree/DataPartsExchange.cpp
浏览文件 @
566160c0
...
...
@@ -77,7 +77,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
try
{
auto
storage_lock
=
owned_storage
->
lockStructure
(
false
);
auto
storage_lock
=
owned_storage
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
MergeTreeData
::
DataPartPtr
part
;
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
浏览文件 @
566160c0
...
...
@@ -82,7 +82,7 @@ void ReplicatedMergeTreeAlterThread::run()
LOG_INFO
(
log
,
"Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."
);
auto
table_lock
=
storage
.
lockStructureForAlter
();
auto
table_lock
=
storage
.
lockStructureForAlter
(
__PRETTY_FUNCTION__
);
const
auto
columns_changed
=
columns
!=
storage
.
data
.
getColumnsListNonMaterialized
();
const
auto
materialized_columns_changed
=
materialized_columns
!=
storage
.
data
.
materialized_columns
;
...
...
@@ -140,7 +140,7 @@ void ReplicatedMergeTreeAlterThread::run()
/// Update parts.
if
(
changed_version
||
force_recheck_parts
)
{
auto
table_lock
=
storage
.
lockStructure
(
false
);
auto
table_lock
=
storage
.
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
if
(
changed_version
)
LOG_INFO
(
log
,
"ALTER-ing parts"
);
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
浏览文件 @
566160c0
...
...
@@ -218,7 +218,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
else
if
(
part
->
name
==
part_name
)
{
auto
zookeeper
=
storage
.
getZooKeeper
();
auto
table_lock
=
storage
.
lockStructure
(
false
);
auto
table_lock
=
storage
.
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
/// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper.
if
(
zookeeper
->
exists
(
storage
.
replica_path
+
"/parts/"
+
part_name
))
...
...
dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp
浏览文件 @
566160c0
...
...
@@ -143,7 +143,7 @@ bool Client::send(const std::string & part_name, size_t shard_no,
LOG_TRACE
(
log
,
"Sending part "
<<
part_name
);
auto
storage_lock
=
storage
.
lockStructure
(
false
);
auto
storage_lock
=
storage
.
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
MergeTreeData
::
DataPartPtr
part
=
findShardedPart
(
part_name
,
shard_no
);
...
...
dbms/src/Storages/StorageBuffer.cpp
浏览文件 @
566160c0
...
...
@@ -592,7 +592,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_
if
(
param
.
type
==
AlterCommand
::
MODIFY_PRIMARY_KEY
)
throw
Exception
(
"Storage engine "
+
getName
()
+
" doesn't support primary key."
,
ErrorCodes
::
NOT_IMPLEMENTED
);
auto
lock
=
lockStructureForAlter
();
auto
lock
=
lockStructureForAlter
(
__PRETTY_FUNCTION__
);
/// So that no blocks of the old structure remain.
optimize
({}
/*query*/
,
{}
/*partition_id*/
,
false
/*final*/
,
false
/*deduplicate*/
,
context
.
getSettings
());
...
...
dbms/src/Storages/StorageDistributed.cpp
浏览文件 @
566160c0
...
...
@@ -246,7 +246,7 @@ void StorageDistributed::alter(const AlterCommands & params, const String & data
if
(
param
.
type
==
AlterCommand
::
MODIFY_PRIMARY_KEY
)
throw
Exception
(
"Storage engine "
+
getName
()
+
" doesn't support primary key."
,
ErrorCodes
::
NOT_IMPLEMENTED
);
auto
lock
=
lockStructureForAlter
();
auto
lock
=
lockStructureForAlter
(
__PRETTY_FUNCTION__
);
params
.
apply
(
*
columns
,
materialized_columns
,
alias_columns
,
column_defaults
);
context
.
getDatabase
(
database_name
)
->
alterTable
(
...
...
dbms/src/Storages/StorageMerge.cpp
浏览文件 @
566160c0
...
...
@@ -335,7 +335,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
{
auto
&
table
=
iterator
->
table
();
if
(
table
.
get
()
!=
this
)
selected_tables
.
emplace_back
(
table
,
table
->
lockStructure
(
false
));
selected_tables
.
emplace_back
(
table
,
table
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
));
}
iterator
->
next
();
...
...
@@ -351,7 +351,7 @@ void StorageMerge::alter(const AlterCommands & params, const String & database_n
if
(
param
.
type
==
AlterCommand
::
MODIFY_PRIMARY_KEY
)
throw
Exception
(
"Storage engine "
+
getName
()
+
" doesn't support primary key."
,
ErrorCodes
::
NOT_IMPLEMENTED
);
auto
lock
=
lockStructureForAlter
();
auto
lock
=
lockStructureForAlter
(
__PRETTY_FUNCTION__
);
params
.
apply
(
*
columns
,
materialized_columns
,
alias_columns
,
column_defaults
);
context
.
getDatabase
(
database_name
)
->
alterTable
(
...
...
dbms/src/Storages/StorageMergeTree.cpp
浏览文件 @
566160c0
...
...
@@ -151,7 +151,7 @@ void StorageMergeTree::alter(
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto
merge_blocker
=
merger
.
cancel
();
auto
table_soft_lock
=
lockDataForAlter
();
auto
table_soft_lock
=
lockDataForAlter
(
__PRETTY_FUNCTION__
);
data
.
checkAlter
(
params
);
...
...
@@ -194,7 +194,7 @@ void StorageMergeTree::alter(
transactions
.
push_back
(
std
::
move
(
transaction
));
}
auto
table_hard_lock
=
lockStructureForAlter
();
auto
table_hard_lock
=
lockStructureForAlter
(
__PRETTY_FUNCTION__
);
IDatabase
::
ASTModifier
engine_modifier
;
if
(
primary_key_is_modified
)
...
...
@@ -293,7 +293,7 @@ bool StorageMergeTree::merge(
data
.
clearOldTemporaryDirectories
();
}
auto
structure_lock
=
lockStructure
(
true
);
auto
structure_lock
=
lockStructure
(
true
,
__PRETTY_FUNCTION__
);
size_t
disk_space
=
DiskSpaceMonitor
::
getUnreservedFreeSpace
(
full_path
);
...
...
@@ -402,8 +402,8 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & query, const Field
/// This protects against "revival" of data for a removed partition after completion of merge.
auto
merge_blocker
=
merger
.
cancel
();
auto
lock_read_structure
=
lockStructure
(
false
);
auto
lock_
write_data
=
lockDataForAlter
(
);
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto
lock_
read_structure
=
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
String
partition_id
=
data
.
getPartitionIDFromQuery
(
partition
);
MergeTreeData
::
DataParts
parts
=
data
.
getDataParts
();
...
...
@@ -452,7 +452,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const Field & partiti
/// This protects against "revival" of data for a removed partition after completion of merge.
auto
merge_blocker
=
merger
.
cancel
();
/// Waits for completion of merge and does not start new ones.
auto
lock
=
lockForAlter
();
auto
lock
=
lockForAlter
(
__PRETTY_FUNCTION__
);
String
partition_id
=
data
.
getPartitionIDFromQuery
(
partition
);
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
566160c0
...
...
@@ -1065,7 +1065,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
/// Can throw an exception.
DiskSpaceMonitor
::
ReservationPtr
reserved_space
=
DiskSpaceMonitor
::
reserve
(
full_path
,
estimated_space_for_merge
);
auto
table_lock
=
lockStructure
(
false
);
auto
table_lock
=
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
MergeList
::
EntryPtr
merge_entry
=
context
.
getMergeList
().
insert
(
database_name
,
table_name
,
entry
.
new_part_name
,
parts
);
MergeTreeData
::
Transaction
transaction
;
...
...
@@ -1430,7 +1430,7 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
/// We don't change table structure, only data in some parts
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
/// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
auto
lock_read_structure
=
lockStructure
(
false
);
auto
lock_read_structure
=
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
auto
zookeeper
=
getZooKeeper
();
...
...
@@ -2107,7 +2107,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
TableStructureReadLockPtr
table_lock
;
if
(
!
to_detached
)
table_lock
=
lockStructure
(
true
);
table_lock
=
lockStructure
(
true
,
__PRETTY_FUNCTION__
);
ReplicatedMergeTreeAddress
address
(
getZooKeeper
()
->
get
(
replica_path
+
"/host"
));
...
...
@@ -2408,7 +2408,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
{
/// Just to read current structure. Alter will be done in separate thread.
auto
table_lock
=
lockStructure
(
false
);
auto
table_lock
=
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
if
(
is_readonly
)
throw
Exception
(
"Can't ALTER readonly table"
,
ErrorCodes
::
TABLE_IS_READ_ONLY
);
...
...
@@ -3788,7 +3788,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
Logger
*
log
=
log_
?
log_
:
this
->
log
;
auto
table_lock
=
lockStructure
(
false
);
auto
table_lock
=
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
auto
zookeeper
=
getZooKeeper
();
MergeTreeData
::
DataPartsVector
parts
=
data
.
grabOldParts
();
...
...
dbms/src/Storages/StorageTrivialBuffer.cpp
浏览文件 @
566160c0
...
...
@@ -536,7 +536,7 @@ void StorageTrivialBuffer::alter(
throw
Exception
(
"Storage engine "
+
getName
()
+
" doesn't support primary key."
,
ErrorCodes
::
NOT_IMPLEMENTED
);
auto
lock
=
lockStructureForAlter
();
auto
lock
=
lockStructureForAlter
(
__PRETTY_FUNCTION__
);
/// To avoid presence of blocks of different structure in the buffer.
flush
(
false
);
...
...
dbms/src/Storages/System/StorageSystemColumns.cpp
浏览文件 @
566160c0
...
...
@@ -130,7 +130,7 @@ BlockInputStreams StorageSystemColumns::read(
try
{
table_lock
=
storage
->
lockStructure
(
false
);
table_lock
=
storage
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
}
catch
(
const
Exception
&
e
)
{
...
...
dbms/src/Storages/System/StorageSystemParts.cpp
浏览文件 @
566160c0
...
...
@@ -163,7 +163,7 @@ BlockInputStreams StorageSystemParts::read(
try
{
table_lock
=
storage
->
lockStructure
(
false
);
/// For table not to be dropped.
table_lock
=
storage
->
lockStructure
(
false
,
__PRETTY_FUNCTION__
);
/// For table not to be dropped.
}
catch
(
const
Exception
&
e
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录