Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
07d9ab9b
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
07d9ab9b
编写于
4月 14, 2014
作者:
M
Michael Kolupaev
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Merge
上级
3dc646c0
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
39 addition
and
126 deletion
+39
-126
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
+3
-14
dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h
dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h
+12
-21
dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
...Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
+1
-1
dbms/src/Storages/MergeTree/MergeTreeData.cpp
dbms/src/Storages/MergeTree/MergeTreeData.cpp
+19
-84
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+4
-6
未找到文件。
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
浏览文件 @
07d9ab9b
...
...
@@ -125,26 +125,15 @@ public:
{
struct
Checksum
{
size_t
file_size
;
uint128
file_hash
;
bool
is_compressed
=
false
;
size_t
uncompressed_size
;
uint128
uncompressed_hash
;
void
checkEqual
(
const
Checksum
&
rhs
,
bool
have_uncompressed
,
const
String
&
name
)
const
;
void
checkSize
(
const
String
&
path
)
const
;
size_t
size
;
uint128
hash
;
};
typedef
std
::
map
<
String
,
Checksum
>
FileChecksums
;
FileChecksums
files
;
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
/// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов.
void
checkEqual
(
const
Checksums
&
rhs
,
bool
have_uncompressed
)
const
;
/// Проверяет, что в директории есть все нужные файлы правильных размеров. Не проверяет чексуммы.
void
checkSizes
(
const
String
&
path
)
const
;
void
check
(
const
Checksums
&
rhs
)
const
;
/// Сериализует и десериализует в человекочитаемом виде.
void
readText
(
ReadBuffer
&
in
);
...
...
dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h
浏览文件 @
07d9ab9b
...
...
@@ -26,19 +26,15 @@ protected:
ColumnStream
(
const
String
&
escaped_column_name_
,
const
String
&
data_path
,
const
std
::
string
&
marks_path
,
size_t
max_compress_block_size
=
DEFAULT_MAX_COMPRESS_BLOCK_SIZE
)
:
escaped_column_name
(
escaped_column_name_
),
plain_file
(
data_path
,
max_compress_block_size
,
O_TRUNC
|
O_CREAT
|
O_WRONLY
),
plain_hashing
(
plain_file
),
compressed_buf
(
plain_hashing
),
compressed
(
compressed_buf
),
marks_file
(
marks_path
,
4096
,
O_TRUNC
|
O_CREAT
|
O_WRONLY
),
marks
(
marks_file
)
{}
compressed_buf
(
plain_file
),
marks_file
(
marks_path
,
4096
,
O_TRUNC
|
O_CREAT
|
O_WRONLY
),
compressed
(
compressed_buf
),
marks
(
marks_file
)
{}
String
escaped_column_name
;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
WriteBufferFromFile
plain_file
;
HashingWriteBuffer
plain_hashing
;
CompressedWriteBuffer
compressed_buf
;
HashingWriteBuffer
compressed
;
/// marks -> marks_file
WriteBufferFromFile
marks_file
;
HashingWriteBuffer
compressed
;
HashingWriteBuffer
marks
;
void
finalize
()
...
...
@@ -58,15 +54,10 @@ protected:
{
if
(
name
==
""
)
name
=
escaped_column_name
;
checksums
.
files
[
name
+
".bin"
].
is_compressed
=
true
;
checksums
.
files
[
name
+
".bin"
].
uncompressed_size
=
compressed
.
count
();
checksums
.
files
[
name
+
".bin"
].
uncompressed_hash
=
compressed
.
getHash
();
checksums
.
files
[
name
+
".bin"
].
file_size
=
plain_hashing
.
count
();
checksums
.
files
[
name
+
".bin"
].
file_hash
=
plain_hashing
.
getHash
();
checksums
.
files
[
name
+
".mrk"
].
file_size
=
marks
.
count
();
checksums
.
files
[
name
+
".mrk"
].
file_hash
=
marks
.
getHash
();
checksums
.
files
[
name
+
".bin"
].
size
=
compressed
.
count
();
checksums
.
files
[
name
+
".bin"
].
hash
=
compressed
.
getHash
();
checksums
.
files
[
name
+
".mrk"
].
size
=
marks
.
count
();
checksums
.
files
[
name
+
".mrk"
].
hash
=
marks
.
getHash
();
}
};
...
...
@@ -135,7 +126,7 @@ protected:
else
{
limit
=
storage
.
index_granularity
;
writeIntBinary
(
stream
.
plain_
hashing
.
count
(),
stream
.
marks
);
writeIntBinary
(
stream
.
plain_
file
.
count
(),
stream
.
marks
);
writeIntBinary
(
stream
.
compressed
.
offset
(),
stream
.
marks
);
}
...
...
@@ -166,7 +157,7 @@ protected:
else
{
limit
=
storage
.
index_granularity
;
writeIntBinary
(
stream
.
plain_
hashing
.
count
(),
stream
.
marks
);
writeIntBinary
(
stream
.
plain_
file
.
count
(),
stream
.
marks
);
writeIntBinary
(
stream
.
compressed
.
offset
(),
stream
.
marks
);
}
...
...
@@ -260,8 +251,8 @@ public:
MergeTreeData
::
DataPart
::
Checksums
checksums
;
index_stream
->
next
();
checksums
.
files
[
"primary.idx"
].
file_
size
=
index_stream
->
count
();
checksums
.
files
[
"primary.idx"
].
file_
hash
=
index_stream
->
getHash
();
checksums
.
files
[
"primary.idx"
].
size
=
index_stream
->
count
();
checksums
.
files
[
"primary.idx"
].
hash
=
index_stream
->
getHash
();
for
(
ColumnStreams
::
iterator
it
=
column_streams
.
begin
();
it
!=
column_streams
.
end
();
++
it
)
{
...
...
dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
浏览文件 @
07d9ab9b
...
...
@@ -37,7 +37,7 @@ public:
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto
expected_checksums
=
MergeTreeData
::
DataPart
::
Checksums
::
parse
(
expected_checksums_str
);
expected_checksums
.
check
Equal
(
part
->
checksums
,
true
);
expected_checksums
.
check
(
part
->
checksums
);
part
->
remove
();
...
...
dbms/src/Storages/MergeTree/MergeTreeData.cpp
浏览文件 @
07d9ab9b
...
...
@@ -427,7 +427,7 @@ static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesLis
}
/// одинаковыми считаются имена, вида "name.*"
static
bool
namesWithDotEqual
(
const
String
&
name_with_dot
,
const
NameAndTypePair
&
name_type
)
static
bool
namesWithDotEqual
(
const
String
&
name_with_dot
,
const
DB
::
NameAndTypePair
&
name_type
)
{
return
(
name_with_dot
==
name_type
.
first
.
substr
(
0
,
name_with_dot
.
length
()));
}
...
...
@@ -492,7 +492,7 @@ void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
try
{
while
(
Block
b
=
in
.
read
())
while
(
DB
::
Block
b
=
in
.
read
())
out
.
write
(
b
);
in
.
readSuffix
();
...
...
@@ -793,36 +793,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_
}
void
MergeTreeData
::
DataPart
::
Checksums
::
Checksum
::
checkEqual
(
const
Checksum
&
rhs
,
bool
have_uncompressed
,
const
String
&
name
)
const
{
if
(
is_compressed
&&
have_uncompressed
)
{
if
(
!
rhs
.
is_compressed
)
throw
Exception
(
"No uncompressed checksum for file "
+
name
,
ErrorCodes
::
CHECKSUM_DOESNT_MATCH
);
if
(
rhs
.
uncompressed_size
!=
uncompressed_size
)
throw
Exception
(
"Unexpected size of file "
+
name
+
" in data part"
,
ErrorCodes
::
BAD_SIZE_OF_FILE_IN_DATA_PART
);
if
(
rhs
.
uncompressed_hash
!=
uncompressed_hash
)
throw
Exception
(
"Checksum mismatch for file "
+
name
+
" in data part"
,
ErrorCodes
::
CHECKSUM_DOESNT_MATCH
);
return
;
}
if
(
rhs
.
file_size
!=
file_size
)
throw
Exception
(
"Unexpected size of file "
+
name
+
" in data part"
,
ErrorCodes
::
BAD_SIZE_OF_FILE_IN_DATA_PART
);
if
(
rhs
.
file_hash
!=
file_hash
)
throw
Exception
(
"Checksum mismatch for file "
+
name
+
" in data part"
,
ErrorCodes
::
CHECKSUM_DOESNT_MATCH
);
}
void
MergeTreeData
::
DataPart
::
Checksums
::
Checksum
::
checkSize
(
const
String
&
path
)
const
{
Poco
::
File
file
(
path
);
if
(
!
file
.
exists
())
throw
Exception
(
path
+
" doesn't exist"
,
ErrorCodes
::
FILE_DOESNT_EXIST
);
size_t
size
=
file
.
getSize
();
if
(
size
!=
file_size
)
throw
Exception
(
path
+
" has unexpected size: "
+
DB
::
toString
(
size
)
+
" instead of "
+
DB
::
toString
(
file_size
),
ErrorCodes
::
BAD_SIZE_OF_FILE_IN_DATA_PART
);
}
void
MergeTreeData
::
DataPart
::
Checksums
::
checkEqual
(
const
Checksums
&
rhs
,
bool
have_uncompressed
)
const
void
MergeTreeData
::
DataPart
::
Checksums
::
check
(
const
Checksums
&
rhs
)
const
{
for
(
const
auto
&
it
:
rhs
.
files
)
{
...
...
@@ -840,16 +811,14 @@ void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool
if
(
jt
==
rhs
.
files
.
end
())
throw
Exception
(
"No file "
+
name
+
" in data part"
,
ErrorCodes
::
NO_FILE_IN_DATA_PART
);
it
.
second
.
checkEqual
(
jt
->
second
,
have_uncompressed
,
name
);
}
}
const
Checksum
&
expected
=
it
.
second
;
const
Checksum
&
found
=
jt
->
second
;
void
MergeTreeData
::
DataPart
::
Checksums
::
checkSizes
(
const
String
&
path
)
const
{
for
(
const
auto
&
it
:
files
)
{
const
String
&
name
=
it
.
first
;
it
.
second
.
checkSize
(
path
+
name
);
if
(
expected
.
size
!=
found
.
size
)
throw
Exception
(
"Unexpected size of file "
+
name
+
" in data part"
,
ErrorCodes
::
BAD_SIZE_OF_FILE_IN_DATA_PART
);
if
(
expected
.
hash
!=
found
.
hash
)
throw
Exception
(
"Checksum mismatch for file "
+
name
+
" in data part"
,
ErrorCodes
::
CHECKSUM_DOESNT_MATCH
);
}
}
...
...
@@ -858,12 +827,7 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
files
.
clear
();
size_t
count
;
DB
::
assertString
(
"checksums format version: "
,
in
);
int
format_version
;
DB
::
readText
(
format_version
,
in
);
if
(
format_version
<
1
||
format_version
>
2
)
throw
Exception
(
"Bad checksums format version: "
+
DB
::
toString
(
format_version
),
ErrorCodes
::
UNKNOWN_FORMAT
);
DB
::
assertString
(
"
\n
"
,
in
);
DB
::
assertString
(
"checksums format version: 1
\n
"
,
in
);
DB
::
readText
(
count
,
in
);
DB
::
assertString
(
" files:
\n
"
,
in
);
...
...
@@ -874,27 +838,12 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
DB
::
readString
(
name
,
in
);
DB
::
assertString
(
"
\n\t
size: "
,
in
);
DB
::
readText
(
sum
.
file_
size
,
in
);
DB
::
readText
(
sum
.
size
,
in
);
DB
::
assertString
(
"
\n\t
hash: "
,
in
);
DB
::
readText
(
sum
.
file_
hash
.
first
,
in
);
DB
::
readText
(
sum
.
hash
.
first
,
in
);
DB
::
assertString
(
" "
,
in
);
DB
::
readText
(
sum
.
file_
hash
.
second
,
in
);
DB
::
readText
(
sum
.
hash
.
second
,
in
);
DB
::
assertString
(
"
\n
"
,
in
);
if
(
format_version
==
2
)
{
DB
::
assertString
(
"
\t
compressed: "
,
in
);
DB
::
readText
(
sum
.
is_compressed
,
in
);
if
(
sum
.
is_compressed
)
{
DB
::
assertString
(
"
\n\t
uncompressed size: "
,
in
);
DB
::
readText
(
sum
.
uncompressed_size
,
in
);
DB
::
assertString
(
"
\n\t
uncompressed hash: "
,
in
);
DB
::
readText
(
sum
.
uncompressed_hash
.
first
,
in
);
DB
::
assertString
(
" "
,
in
);
DB
::
readText
(
sum
.
uncompressed_hash
.
second
,
in
);
}
DB
::
assertString
(
"
\n
"
,
in
);
}
files
.
insert
(
std
::
make_pair
(
name
,
sum
));
}
...
...
@@ -902,34 +851,20 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
void
MergeTreeData
::
DataPart
::
Checksums
::
writeText
(
WriteBuffer
&
out
)
const
{
DB
::
writeString
(
"checksums format version:
2
\n
"
,
out
);
DB
::
writeString
(
"checksums format version:
1
\n
"
,
out
);
DB
::
writeText
(
files
.
size
(),
out
);
DB
::
writeString
(
" files:
\n
"
,
out
);
for
(
const
auto
&
it
:
files
)
{
const
String
&
name
=
it
.
first
;
const
Checksum
&
sum
=
it
.
second
;
DB
::
writeString
(
name
,
out
);
DB
::
writeString
(
it
.
first
,
out
);
DB
::
writeString
(
"
\n\t
size: "
,
out
);
DB
::
writeText
(
sum
.
file_
size
,
out
);
DB
::
writeText
(
it
.
second
.
size
,
out
);
DB
::
writeString
(
"
\n\t
hash: "
,
out
);
DB
::
writeText
(
sum
.
file_
hash
.
first
,
out
);
DB
::
writeText
(
it
.
second
.
hash
.
first
,
out
);
DB
::
writeString
(
" "
,
out
);
DB
::
writeText
(
sum
.
file_hash
.
second
,
out
);
DB
::
writeString
(
"
\n\t
compressed: "
,
out
);
DB
::
writeText
(
sum
.
is_compressed
,
out
);
DB
::
writeText
(
it
.
second
.
hash
.
second
,
out
);
DB
::
writeString
(
"
\n
"
,
out
);
if
(
sum
.
is_compressed
)
{
DB
::
writeString
(
"
\t
uncompressed size: "
,
out
);
DB
::
writeText
(
sum
.
uncompressed_size
,
out
);
DB
::
writeString
(
"
\n\t
uncompressed hash: "
,
out
);
DB
::
writeText
(
sum
.
uncompressed_hash
.
first
,
out
);
DB
::
writeString
(
" "
,
out
);
DB
::
writeText
(
sum
.
uncompressed_hash
.
second
,
out
);
DB
::
writeString
(
"
\n
"
,
out
);
}
}
}
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
07d9ab9b
...
...
@@ -298,12 +298,10 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
String
another_replica
=
findReplicaHavingPart
(
part
->
name
,
false
);
if
(
!
another_replica
.
empty
())
{
String
checksums_str
;
if
(
zookeeper
.
tryGet
(
zookeeper_path
+
"/replicas/"
+
another_replica
+
"/parts/"
+
part
->
name
+
"/checksums"
,
checksums_str
))
{
auto
checksums
=
MergeTreeData
::
DataPart
::
Checksums
::
parse
(
checksums_str
);
checksums
.
checkEqual
(
part
->
checksums
,
true
);
}
String
checksums_str
=
zookeeper
.
get
(
zookeeper_path
+
"/replicas/"
+
another_replica
+
"/parts/"
+
part
->
name
+
"/checksums"
);
auto
checksums
=
MergeTreeData
::
DataPart
::
Checksums
::
parse
(
checksums_str
);
checksums
.
check
(
part
->
checksums
);
}
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录