Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d3d03d8c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d3d03d8c
编写于
8月 19, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
adjust more code
上级
cc116b21
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
102 addition
and
103 deletion
+102
-103
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+2
-5
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+100
-98
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
d3d03d8c
...
...
@@ -574,7 +574,7 @@ struct SDelFWriter {
SDelFile
fDel
;
TdFilePtr
pWriteH
;
uint8_t
*
pBuf1
;
uint8_t
*
aBuf
[
1
]
;
};
struct
SDataFWriter
{
...
...
@@ -591,10 +591,7 @@ struct SDataFWriter {
SLastFile
fLast
;
SSmaFile
fSma
;
uint8_t
*
pBuf1
;
uint8_t
*
pBuf2
;
uint8_t
*
pBuf3
;
uint8_t
*
pBuf4
;
uint8_t
*
aBuf
[
4
];
};
struct
STsdbReadSnap
{
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
d3d03d8c
...
...
@@ -75,7 +75,9 @@ int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
goto
_err
;
}
tFree
(
pWriter
->
pBuf1
);
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
(
pWriter
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
(
pWriter
->
aBuf
[
iBuf
]);
}
taosMemoryFree
(
pWriter
);
*
ppWriter
=
NULL
;
...
...
@@ -99,21 +101,21 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelId
size
+=
sizeof
(
TSCKSUM
);
// alloc
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// build
n
=
0
;
n
+=
tPutU32
(
pWriter
->
pBuf1
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iDelData
=
0
;
iDelData
<
taosArrayGetSize
(
aDelData
);
iDelData
++
)
{
n
+=
tPutDelData
(
pWriter
->
pBuf1
+
n
,
taosArrayGet
(
aDelData
,
iDelData
));
n
+=
tPutDelData
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aDelData
,
iDelData
));
}
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf1
,
size
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
]
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pWriteH
,
pWriter
->
pBuf1
,
size
);
n
=
taosWriteFile
(
pWriter
->
pWriteH
,
pWriter
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -147,21 +149,21 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
size
+=
sizeof
(
TSCKSUM
);
// alloc
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// build
n
=
0
;
n
+=
tPutU32
(
pWriter
->
pBuf1
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iDelIdx
=
0
;
iDelIdx
<
taosArrayGetSize
(
aDelIdx
);
iDelIdx
++
)
{
n
+=
tPutDelIdx
(
pWriter
->
pBuf1
+
n
,
taosArrayGet
(
aDelIdx
,
iDelIdx
));
n
+=
tPutDelIdx
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aDelIdx
,
iDelIdx
));
}
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf1
,
size
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
]
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pWriteH
,
pWriter
->
pBuf1
,
size
);
n
=
taosWriteFile
(
pWriter
->
pWriteH
,
pWriter
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -215,7 +217,7 @@ struct SDelFReader {
SDelFile
fDel
;
TdFilePtr
pReadH
;
uint8_t
*
pBuf1
;
uint8_t
*
aBuf
[
1
]
;
};
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
...
...
@@ -262,7 +264,9 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_exit
;
}
tFree
(
pReader
->
pBuf1
);
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
(
pReader
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
(
pReader
->
aBuf
[
iBuf
]);
}
taosMemoryFree
(
pReader
);
}
*
ppReader
=
NULL
;
...
...
@@ -286,11 +290,11 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
}
// alloc
code
=
tRealloc
(
&
pReader
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// read
n
=
taosReadFile
(
pReader
->
pReadH
,
pReader
->
pBuf1
,
size
);
n
=
taosReadFile
(
pReader
->
pReadH
,
pReader
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -300,7 +304,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
pBuf1
,
size
))
{
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
]
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
...
...
@@ -309,10 +313,10 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
n
=
0
;
uint32_t
delimiter
;
n
+=
tGetU32
(
pReader
->
pBuf1
+
n
,
&
delimiter
);
n
+=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SDelData
delData
;
n
+=
tGetDelData
(
pReader
->
pBuf1
+
n
,
&
delData
);
n
+=
tGetDelData
(
pReader
->
aBuf
[
0
]
+
n
,
&
delData
);
if
(
taosArrayPush
(
aDelData
,
&
delData
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -344,11 +348,11 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
}
// alloc
code
=
tRealloc
(
&
pReader
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// read
n
=
taosReadFile
(
pReader
->
pReadH
,
pReader
->
pBuf1
,
size
);
n
=
taosReadFile
(
pReader
->
pReadH
,
pReader
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -358,7 +362,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
pBuf1
,
size
))
{
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
]
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
...
...
@@ -366,13 +370,13 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
// decode
n
=
0
;
uint32_t
delimiter
;
n
+=
tGetU32
(
pReader
->
pBuf1
+
n
,
&
delimiter
);
n
+=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SDelIdx
delIdx
;
n
+=
tGetDelIdx
(
pReader
->
pBuf1
+
n
,
&
delIdx
);
n
+=
tGetDelIdx
(
pReader
->
aBuf
[
0
]
+
n
,
&
delIdx
);
if
(
taosArrayPush
(
aDelIdx
,
&
delIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -398,9 +402,7 @@ struct SDataFReader {
TdFilePtr
pLastFD
;
TdFilePtr
pSmaFD
;
uint8_t
*
pBuf1
;
uint8_t
*
pBuf2
;
uint8_t
*
pBuf3
;
uint8_t
*
aBuf
[
3
];
};
int32_t
tsdbDataFReaderOpen
(
SDataFReader
**
ppReader
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
)
{
...
...
@@ -483,9 +485,10 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
goto
_err
;
}
tFree
((
*
ppReader
)
->
pBuf1
);
tFree
((
*
ppReader
)
->
pBuf2
);
tFree
((
*
ppReader
)
->
pBuf3
);
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
((
*
ppReader
)
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
((
*
ppReader
)
->
aBuf
[
iBuf
]);
}
taosMemoryFree
(
*
ppReader
);
_exit:
...
...
@@ -510,7 +513,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
}
// alloc
code
=
tRealloc
(
&
pReader
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// seek
...
...
@@ -520,7 +523,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
}
// read
n
=
taosReadFile
(
pReader
->
pHeadFD
,
pReader
->
pBuf1
,
size
);
n
=
taosReadFile
(
pReader
->
pHeadFD
,
pReader
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -530,19 +533,19 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
pBuf1
,
size
))
{
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
]
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
n
=
0
;
n
=
tGetU32
(
pReader
->
pBuf1
+
n
,
&
delimiter
);
n
=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SBlockIdx
blockIdx
;
n
+=
tGetBlockIdx
(
pReader
->
pBuf1
+
n
,
&
blockIdx
);
n
+=
tGetBlockIdx
(
pReader
->
aBuf
[
0
]
+
n
,
&
blockIdx
);
if
(
taosArrayPush
(
aBlockIdx
,
&
blockIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -573,7 +576,7 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) {
}
// alloc
code
=
tRealloc
(
&
pReader
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// seek
...
...
@@ -583,7 +586,7 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) {
}
// read
n
=
taosReadFile
(
pReader
->
pLastFD
,
pReader
->
pBuf1
,
size
);
n
=
taosReadFile
(
pReader
->
pLastFD
,
pReader
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -593,19 +596,19 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) {
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
pBuf1
,
size
))
{
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
]
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
n
=
0
;
n
=
tGetU32
(
pReader
->
pBuf1
+
n
,
&
delimiter
);
n
=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SBlockL
blockl
;
n
+=
tGetBlockL
(
pReader
->
pBuf1
+
n
,
&
blockl
);
n
+=
tGetBlockL
(
pReader
->
aBuf
[
0
]
+
n
,
&
blockl
);
if
(
taosArrayPush
(
aBlockL
,
&
blockl
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -631,7 +634,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
int64_t
tn
;
// alloc
code
=
tRealloc
(
&
pReader
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// seek
...
...
@@ -641,7 +644,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
}
// read
n
=
taosReadFile
(
pReader
->
pHeadFD
,
pReader
->
pBuf1
,
size
);
n
=
taosReadFile
(
pReader
->
pHeadFD
,
pReader
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -651,7 +654,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
pBuf1
,
size
))
{
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
]
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
...
...
@@ -660,10 +663,10 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
n
=
0
;
uint32_t
delimiter
;
n
+=
tGetU32
(
pReader
->
pBuf1
+
n
,
&
delimiter
);
n
+=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
tn
=
tGetMapData
(
pReader
->
pBuf1
+
n
,
mBlock
);
tn
=
tGetMapData
(
pReader
->
aBuf
[
0
]
+
n
,
mBlock
);
if
(
tn
<
0
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -688,11 +691,11 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
// alloc
int32_t
size
=
pSmaInfo
->
size
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pReader
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// read
int64_t
n
=
taosReadFile
(
pReader
->
pSmaFD
,
pReader
->
pBuf1
,
size
);
int64_t
n
=
taosReadFile
(
pReader
->
pSmaFD
,
pReader
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -702,7 +705,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
pBuf1
,
size
))
{
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
]
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
...
...
@@ -712,7 +715,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
while
(
n
<
pSmaInfo
->
size
)
{
SColumnDataAgg
sma
;
n
+=
tGetColumnDataAgg
(
pReader
->
pBuf1
+
n
,
&
sma
);
n
+=
tGetColumnDataAgg
(
pReader
->
aBuf
[
0
]
+
n
,
&
sma
);
if
(
taosArrayPush
(
aColumnDataAgg
,
&
sma
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -735,10 +738,10 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
TdFilePtr
pFD
=
fromLast
?
pReader
->
pLastFD
:
pReader
->
pDataFD
;
// uid + version + tskey
code
=
tsdbReadAndCheck
(
pFD
,
pBlkInfo
->
offset
,
&
pReader
->
pBuf1
,
pBlkInfo
->
szKey
,
1
);
code
=
tsdbReadAndCheck
(
pFD
,
pBlkInfo
->
offset
,
&
pReader
->
aBuf
[
0
]
,
pBlkInfo
->
szKey
,
1
);
if
(
code
)
goto
_err
;
SDiskDataHdr
hdr
;
uint8_t
*
p
=
pReader
->
pBuf1
+
tGetDiskDataHdr
(
pReader
->
pBuf1
,
&
hdr
);
uint8_t
*
p
=
pReader
->
aBuf
[
0
]
+
tGetDiskDataHdr
(
pReader
->
aBuf
[
0
]
,
&
hdr
);
ASSERT
(
hdr
.
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pBlockData
->
suid
==
hdr
.
suid
);
...
...
@@ -750,7 +753,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
if
(
hdr
.
uid
==
0
)
{
ASSERT
(
hdr
.
szUid
);
code
=
tsdbDecmprData
(
p
,
hdr
.
szUid
,
TSDB_DATA_TYPE_BIGINT
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aUid
,
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
pBuf2
);
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]
);
if
(
code
)
goto
_err
;
}
else
{
ASSERT
(
!
hdr
.
szUid
);
...
...
@@ -759,24 +762,24 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
// version
code
=
tsdbDecmprData
(
p
,
hdr
.
szVer
,
TSDB_DATA_TYPE_BIGINT
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
pBuf2
);
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]
);
if
(
code
)
goto
_err
;
p
+=
hdr
.
szVer
;
// TSKEY
code
=
tsdbDecmprData
(
p
,
hdr
.
szKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
hdr
.
nRow
,
&
pReader
->
pBuf2
);
sizeof
(
TSKEY
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]
);
if
(
code
)
goto
_err
;
p
+=
hdr
.
szKey
;
ASSERT
(
p
-
pReader
->
pBuf1
==
pBlkInfo
->
szKey
-
sizeof
(
TSCKSUM
));
ASSERT
(
p
-
pReader
->
aBuf
[
0
]
==
pBlkInfo
->
szKey
-
sizeof
(
TSCKSUM
));
// read and decode columns
if
(
taosArrayGetSize
(
pBlockData
->
aIdx
)
==
0
)
goto
_exit
;
if
(
hdr
.
szBlkCol
>
0
)
{
int64_t
offset
=
pBlkInfo
->
offset
+
pBlkInfo
->
szKey
;
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
pBuf1
,
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
),
1
);
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
aBuf
[
0
]
,
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
),
1
);
if
(
code
)
goto
_err
;
}
...
...
@@ -789,7 +792,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
while
(
pBlockCol
&&
pBlockCol
->
cid
<
pColData
->
cid
)
{
if
(
n
<
hdr
.
szBlkCol
)
{
n
+=
tGetBlockCol
(
pReader
->
pBuf1
+
n
,
pBlockCol
);
n
+=
tGetBlockCol
(
pReader
->
aBuf
[
0
]
+
n
,
pBlockCol
);
}
else
{
ASSERT
(
n
==
hdr
.
szBlkCol
);
pBlockCol
=
NULL
;
...
...
@@ -817,10 +820,10 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
int64_t
offset
=
pBlkInfo
->
offset
+
pBlkInfo
->
szKey
+
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
)
+
pBlockCol
->
offset
;
int32_t
size
=
pBlockCol
->
szBitmap
+
pBlockCol
->
szOffset
+
pBlockCol
->
szValue
+
sizeof
(
TSCKSUM
);
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
pBuf2
,
size
,
0
);
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
aBuf
[
1
]
,
size
,
0
);
if
(
code
)
goto
_err
;
code
=
tsdbDecmprColData
(
pReader
->
pBuf2
,
pBlockCol
,
hdr
.
cmprAlg
,
hdr
.
nRow
,
pColData
,
&
pReader
->
pBuf3
);
code
=
tsdbDecmprColData
(
pReader
->
aBuf
[
1
],
pBlockCol
,
hdr
.
cmprAlg
,
hdr
.
nRow
,
pColData
,
&
pReader
->
aBuf
[
2
]
);
if
(
code
)
goto
_err
;
}
}
...
...
@@ -1096,10 +1099,9 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
goto
_err
;
}
tFree
((
*
ppWriter
)
->
pBuf1
);
tFree
((
*
ppWriter
)
->
pBuf2
);
tFree
((
*
ppWriter
)
->
pBuf3
);
tFree
((
*
ppWriter
)
->
pBuf4
);
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
((
*
ppWriter
)
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
((
*
ppWriter
)
->
aBuf
[
iBuf
]);
}
taosMemoryFree
(
*
ppWriter
);
_exit:
*
ppWriter
=
NULL
;
...
...
@@ -1210,21 +1212,21 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
size
+=
sizeof
(
TSCKSUM
);
// alloc
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// build
n
=
0
;
n
=
tPutU32
(
pWriter
->
pBuf1
+
n
,
TSDB_FILE_DLMT
);
n
=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iBlockIdx
=
0
;
iBlockIdx
<
taosArrayGetSize
(
aBlockIdx
);
iBlockIdx
++
)
{
n
+=
tPutBlockIdx
(
pWriter
->
pBuf1
+
n
,
taosArrayGet
(
aBlockIdx
,
iBlockIdx
));
n
+=
tPutBlockIdx
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aBlockIdx
,
iBlockIdx
));
}
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf1
,
size
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
]
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
pWriter
->
pBuf1
,
size
);
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
pWriter
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1254,19 +1256,19 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc
// alloc
size
=
sizeof
(
uint32_t
)
+
tPutMapData
(
NULL
,
mBlock
)
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// build
n
=
0
;
n
+=
tPutU32
(
pWriter
->
pBuf1
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutMapData
(
pWriter
->
pBuf1
+
n
,
mBlock
);
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf1
,
size
);
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutMapData
(
pWriter
->
aBuf
[
0
]
+
n
,
mBlock
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
]
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
pWriter
->
pBuf1
,
size
);
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
pWriter
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1308,21 +1310,21 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
size
+=
sizeof
(
TSCKSUM
);
// alloc
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
// encode
n
=
0
;
n
+=
tPutU32
(
pWriter
->
pBuf1
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iBlockL
=
0
;
iBlockL
<
taosArrayGetSize
(
aBlockL
);
iBlockL
++
)
{
n
+=
tPutBlockL
(
pWriter
->
pBuf1
+
n
,
taosArrayGet
(
aBlockL
,
iBlockL
));
n
+=
tPutBlockL
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aBlockL
,
iBlockL
));
}
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf1
,
size
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
]
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
pWriter
->
pBuf1
,
size
);
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
pWriter
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1381,21 +1383,21 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
SColumnDataAgg
sma
;
tsdbCalcColDataSMA
(
pColData
,
&
sma
);
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
pSmaInfo
->
size
+
tPutColumnDataAgg
(
NULL
,
&
sma
));
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
pSmaInfo
->
size
+
tPutColumnDataAgg
(
NULL
,
&
sma
));
if
(
code
)
goto
_err
;
pSmaInfo
->
size
+=
tPutColumnDataAgg
(
pWriter
->
pBuf1
+
pSmaInfo
->
size
,
&
sma
);
pSmaInfo
->
size
+=
tPutColumnDataAgg
(
pWriter
->
aBuf
[
0
]
+
pSmaInfo
->
size
,
&
sma
);
}
// write
if
(
pSmaInfo
->
size
)
{
int32_t
size
=
pSmaInfo
->
size
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pWriter
->
pBuf1
,
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
]
,
size
);
if
(
code
)
goto
_err
;
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf1
,
size
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
]
,
size
);
int64_t
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
pWriter
->
pBuf1
,
size
);
int64_t
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
pWriter
->
aBuf
[
0
]
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1447,57 +1449,57 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
.
szOrigin
=
pColData
->
nData
};
if
(
pColData
->
flag
!=
HAS_NULL
)
{
code
=
tsdbCmprColData
(
pColData
,
cmprAlg
,
&
blockCol
,
&
pWriter
->
pBuf1
,
nBuf1
,
&
pWriter
->
pBuf3
);
code
=
tsdbCmprColData
(
pColData
,
cmprAlg
,
&
blockCol
,
&
pWriter
->
aBuf
[
0
],
nBuf1
,
&
pWriter
->
aBuf
[
2
]
);
if
(
code
)
goto
_err
;
blockCol
.
offset
=
nBuf1
;
nBuf1
=
nBuf1
+
blockCol
.
szBitmap
+
blockCol
.
szOffset
+
blockCol
.
szValue
+
sizeof
(
TSCKSUM
);
}
code
=
tRealloc
(
&
pWriter
->
pBuf2
,
hdr
.
szBlkCol
+
tPutBlockCol
(
NULL
,
&
blockCol
));
code
=
tRealloc
(
&
pWriter
->
aBuf
[
1
]
,
hdr
.
szBlkCol
+
tPutBlockCol
(
NULL
,
&
blockCol
));
if
(
code
)
goto
_err
;
hdr
.
szBlkCol
+=
tPutBlockCol
(
pWriter
->
pBuf2
+
hdr
.
szBlkCol
,
&
blockCol
);
hdr
.
szBlkCol
+=
tPutBlockCol
(
pWriter
->
aBuf
[
1
]
+
hdr
.
szBlkCol
,
&
blockCol
);
}
int32_t
nBuf2
=
0
;
if
(
hdr
.
szBlkCol
>
0
)
{
nBuf2
=
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pWriter
->
pBuf2
,
nBuf2
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
1
]
,
nBuf2
);
if
(
code
)
goto
_err
;
taosCalcChecksumAppend
(
0
,
pWriter
->
pBuf2
,
nBuf2
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
1
]
,
nBuf2
);
}
// uid + version + tskey
int32_t
nBuf3
=
0
;
if
(
pBlockData
->
uid
==
0
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
pBlockData
->
aUid
,
sizeof
(
int64_t
)
*
pBlockData
->
nRow
,
TSDB_DATA_TYPE_BIGINT
,
cmprAlg
,
&
pWriter
->
pBuf3
,
nBuf3
,
&
hdr
.
szUid
,
&
pWriter
->
pBuf4
);
&
pWriter
->
aBuf
[
2
],
nBuf3
,
&
hdr
.
szUid
,
&
pWriter
->
aBuf
[
3
]
);
if
(
code
)
goto
_err
;
}
nBuf3
+=
hdr
.
szUid
;
code
=
tsdbCmprData
((
uint8_t
*
)
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
pBlockData
->
nRow
,
TSDB_DATA_TYPE_BIGINT
,
cmprAlg
,
&
pWriter
->
pBuf3
,
nBuf3
,
&
hdr
.
szVer
,
&
pWriter
->
pBuf4
);
cmprAlg
,
&
pWriter
->
aBuf
[
2
],
nBuf3
,
&
hdr
.
szVer
,
&
pWriter
->
aBuf
[
3
]
);
if
(
code
)
goto
_err
;
nBuf3
+=
hdr
.
szVer
;
code
=
tsdbCmprData
((
uint8_t
*
)
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
pBlockData
->
nRow
,
TSDB_DATA_TYPE_TIMESTAMP
,
cmprAlg
,
&
pWriter
->
pBuf3
,
nBuf3
,
&
hdr
.
szKey
,
&
pWriter
->
pBuf4
);
cmprAlg
,
&
pWriter
->
aBuf
[
2
],
nBuf3
,
&
hdr
.
szKey
,
&
pWriter
->
aBuf
[
3
]
);
if
(
code
)
goto
_err
;
nBuf3
+=
hdr
.
szKey
;
nBuf3
+=
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pWriter
->
pBuf3
,
nBuf3
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
2
]
,
nBuf3
);
if
(
code
)
goto
_err
;
// hdr
int32_t
nBuf4
=
tPutDiskDataHdr
(
NULL
,
&
hdr
);
code
=
tRealloc
(
&
pWriter
->
pBuf4
,
nBuf4
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
3
]
,
nBuf4
);
if
(
code
)
goto
_err
;
tPutDiskDataHdr
(
pWriter
->
pBuf4
,
&
hdr
);
taosCalcChecksumAppend
(
taosCalcChecksum
(
0
,
pWriter
->
pBuf4
,
nBuf4
),
pWriter
->
pBuf3
,
nBuf3
);
tPutDiskDataHdr
(
pWriter
->
aBuf
[
3
]
,
&
hdr
);
taosCalcChecksumAppend
(
taosCalcChecksum
(
0
,
pWriter
->
aBuf
[
3
],
nBuf4
),
pWriter
->
aBuf
[
2
]
,
nBuf3
);
// write =================
TdFilePtr
pFD
=
toLast
?
pWriter
->
pLastFD
:
pWriter
->
pDataFD
;
...
...
@@ -1505,20 +1507,20 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
pBlkInfo
->
szKey
=
nBuf4
+
nBuf3
;
pBlkInfo
->
szBlock
=
nBuf1
+
nBuf2
+
nBuf3
+
nBuf4
;
int64_t
n
=
taosWriteFile
(
pFD
,
pWriter
->
pBuf4
,
nBuf4
);
int64_t
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
3
]
,
nBuf4
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pFD
,
pWriter
->
pBuf3
,
nBuf3
);
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
2
]
,
nBuf3
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
nBuf2
)
{
n
=
taosWriteFile
(
pFD
,
pWriter
->
pBuf2
,
nBuf2
);
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
1
]
,
nBuf2
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1526,7 +1528,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
}
if
(
nBuf1
)
{
n
=
taosWriteFile
(
pFD
,
pWriter
->
pBuf1
,
nBuf1
);
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
0
]
,
nBuf1
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录