Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
493ee673
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
493ee673
编写于
12月 14, 2020
作者:
A
alexey-milovidov
提交者:
GitHub
12月 14, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18052 from kitaisreal/memory-storage-read-optimization
MemoryStorage read optimization
上级
aac8b85b
1bccd6df
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
44 addition
and
52 deletion
+44
-52
src/Storages/StorageMemory.cpp
src/Storages/StorageMemory.cpp
+43
-51
src/Storages/StorageMemory.h
src/Storages/StorageMemory.h
+1
-1
未找到文件。
src/Storages/StorageMemory.cpp
浏览文件 @
493ee673
...
...
@@ -23,7 +23,7 @@ namespace ErrorCodes
class
MemorySource
:
public
SourceWithProgress
{
using
InitializerFunc
=
std
::
function
<
void
(
BlocksList
::
const_iterator
&
,
size_t
&
,
std
::
shared_ptr
<
const
BlocksList
>
&
)
>
;
using
InitializerFunc
=
std
::
function
<
void
(
std
::
shared_ptr
<
const
Blocks
>
&
)
>
;
public:
/// Blocks are stored in std::list which may be appended in another thread.
/// We use pointer to the beginning of the list and its current size.
...
...
@@ -32,17 +32,15 @@ public:
MemorySource
(
Names
column_names_
,
BlocksList
::
const_iterator
first_
,
size_t
num_blocks_
,
const
StorageMemory
&
storage
,
const
StorageMetadataPtr
&
metadata_snapshot
,
std
::
shared_ptr
<
const
BlocksList
>
data_
,
InitializerFunc
initializer_func_
=
[](
BlocksList
::
const_iterator
&
,
size_t
&
,
std
::
shared_ptr
<
const
BlocksList
>
&
)
{})
std
::
shared_ptr
<
const
Blocks
>
data_
,
std
::
shared_ptr
<
std
::
atomic
<
size_t
>>
parallel_execution_index_
,
InitializerFunc
initializer_func_
=
{})
:
SourceWithProgress
(
metadata_snapshot
->
getSampleBlockForColumns
(
column_names_
,
storage
.
getVirtuals
(),
storage
.
getStorageID
()))
,
column_names
(
std
::
move
(
column_names_
))
,
current_it
(
first_
)
,
num_blocks
(
num_blocks_
)
,
data
(
data_
)
,
parallel_execution_index
(
parallel_execution_index_
)
,
initializer_func
(
std
::
move
(
initializer_func_
))
{
}
...
...
@@ -52,16 +50,20 @@ public:
protected:
Chunk
generate
()
override
{
if
(
!
postponed_init_done
)
if
(
initializer_func
)
{
initializer_func
(
current_it
,
num_blocks
,
data
);
postponed_init_done
=
true
;
initializer_func
(
data
);
initializer_func
=
{}
;
}
if
(
current_block_idx
==
num_blocks
)
size_t
current_index
=
getAndIncrementExecutionIndex
();
if
(
current_index
>=
data
->
size
())
{
return
{};
}
const
Block
&
src
=
*
current_it
;
const
Block
&
src
=
(
*
data
)[
current_index
]
;
Columns
columns
;
columns
.
reserve
(
column_names
.
size
());
...
...
@@ -69,20 +71,26 @@ protected:
for
(
const
auto
&
name
:
column_names
)
columns
.
push_back
(
src
.
getByName
(
name
).
column
);
if
(
++
current_block_idx
<
num_blocks
)
++
current_it
;
return
Chunk
(
std
::
move
(
columns
),
src
.
rows
());
}
private:
const
Names
column_names
;
BlocksList
::
const_iterator
current_it
;
size_t
num_blocks
;
size_t
current_block_idx
=
0
;
size_t
getAndIncrementExecutionIndex
()
{
if
(
parallel_execution_index
)
{
return
(
*
parallel_execution_index
)
++
;
}
else
{
return
execution_index
++
;
}
}
std
::
shared_ptr
<
const
BlocksList
>
data
;
bool
postponed_init_done
=
false
;
const
Names
column_names
;
size_t
execution_index
=
0
;
std
::
shared_ptr
<
const
Blocks
>
data
;
std
::
shared_ptr
<
std
::
atomic
<
size_t
>>
parallel_execution_index
;
InitializerFunc
initializer_func
;
};
...
...
@@ -107,7 +115,7 @@ public:
metadata_snapshot
->
check
(
block
,
true
);
{
std
::
lock_guard
lock
(
storage
.
mutex
);
auto
new_data
=
std
::
make_unique
<
Blocks
List
>
(
*
(
storage
.
data
.
get
()));
auto
new_data
=
std
::
make_unique
<
Blocks
>
(
*
(
storage
.
data
.
get
()));
new_data
->
push_back
(
block
);
storage
.
data
.
set
(
std
::
move
(
new_data
));
...
...
@@ -123,7 +131,7 @@ private:
StorageMemory
::
StorageMemory
(
const
StorageID
&
table_id_
,
ColumnsDescription
columns_description_
,
ConstraintsDescription
constraints_
)
:
IStorage
(
table_id_
),
data
(
std
::
make_unique
<
const
Blocks
List
>
())
:
IStorage
(
table_id_
),
data
(
std
::
make_unique
<
const
Blocks
>
())
{
StorageInMemoryMetadata
storage_metadata
;
storage_metadata
.
setColumns
(
std
::
move
(
columns_description_
));
...
...
@@ -155,21 +163,17 @@ Pipe StorageMemory::read(
return
Pipe
(
std
::
make_shared
<
MemorySource
>
(
column_names
,
data
.
get
()
->
end
(),
0
,
*
this
,
metadata_snapshot
,
data
.
get
(),
[
this
](
BlocksList
::
const_iterator
&
current_it
,
size_t
&
num_blocks
,
std
::
shared_ptr
<
const
BlocksList
>
&
current_data
)
nullptr
/* data */
,
nullptr
/* parallel execution index */
,
[
this
](
std
::
shared_ptr
<
const
Blocks
>
&
data_to_initialize
)
{
current_data
=
data
.
get
();
current_it
=
current_data
->
begin
();
num_blocks
=
current_data
->
size
();
data_to_initialize
=
data
.
get
();
}));
}
auto
current_data
=
data
.
get
();
size_t
size
=
current_data
->
size
();
if
(
num_streams
>
size
)
...
...
@@ -177,23 +181,11 @@ Pipe StorageMemory::read(
Pipes
pipes
;
BlocksList
::
const_iterator
it
=
current_data
->
begin
(
);
auto
parallel_execution_index
=
std
::
make_shared
<
std
::
atomic
<
size_t
>>
(
0
);
size_t
offset
=
0
;
for
(
size_t
stream
=
0
;
stream
<
num_streams
;
++
stream
)
{
size_t
next_offset
=
(
stream
+
1
)
*
size
/
num_streams
;
size_t
num_blocks
=
next_offset
-
offset
;
assert
(
num_blocks
>
0
);
pipes
.
emplace_back
(
std
::
make_shared
<
MemorySource
>
(
column_names
,
it
,
num_blocks
,
*
this
,
metadata_snapshot
,
current_data
));
while
(
offset
<
next_offset
)
{
++
it
;
++
offset
;
}
pipes
.
emplace_back
(
std
::
make_shared
<
MemorySource
>
(
column_names
,
*
this
,
metadata_snapshot
,
current_data
,
parallel_execution_index
));
}
return
Pipe
::
unitePipes
(
std
::
move
(
pipes
));
...
...
@@ -208,7 +200,7 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag
void
StorageMemory
::
drop
()
{
data
.
set
(
std
::
make_unique
<
Blocks
List
>
());
data
.
set
(
std
::
make_unique
<
Blocks
>
());
total_size_bytes
.
store
(
0
,
std
::
memory_order_relaxed
);
total_size_rows
.
store
(
0
,
std
::
memory_order_relaxed
);
}
...
...
@@ -233,7 +225,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
auto
in
=
interpreter
->
execute
();
in
->
readPrefix
();
Blocks
List
out
;
Blocks
out
;
Block
block
;
while
((
block
=
in
->
read
()))
{
...
...
@@ -241,17 +233,17 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
}
in
->
readSuffix
();
std
::
unique_ptr
<
Blocks
List
>
new_data
;
std
::
unique_ptr
<
Blocks
>
new_data
;
// all column affected
if
(
interpreter
->
isAffectingAllColumns
())
{
new_data
=
std
::
make_unique
<
Blocks
List
>
(
out
);
new_data
=
std
::
make_unique
<
Blocks
>
(
out
);
}
else
{
/// just some of the column affected, we need update it with new column
new_data
=
std
::
make_unique
<
Blocks
List
>
(
*
(
data
.
get
()));
new_data
=
std
::
make_unique
<
Blocks
>
(
*
(
data
.
get
()));
auto
data_it
=
new_data
->
begin
();
auto
out_it
=
out
.
begin
();
...
...
@@ -284,7 +276,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
void
StorageMemory
::
truncate
(
const
ASTPtr
&
,
const
StorageMetadataPtr
&
,
const
Context
&
,
TableExclusiveLockHolder
&
)
{
data
.
set
(
std
::
make_unique
<
Blocks
List
>
());
data
.
set
(
std
::
make_unique
<
Blocks
>
());
total_size_bytes
.
store
(
0
,
std
::
memory_order_relaxed
);
total_size_rows
.
store
(
0
,
std
::
memory_order_relaxed
);
}
...
...
src/Storages/StorageMemory.h
浏览文件 @
493ee673
...
...
@@ -91,7 +91,7 @@ public:
private:
/// MultiVersion data storage, so that we can copy the list of blocks to readers.
MultiVersion
<
Blocks
List
>
data
;
MultiVersion
<
Blocks
>
data
;
mutable
std
::
mutex
mutex
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录