Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3ba5d8e9
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3ba5d8e9
编写于
6月 28, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact del data
上级
fbb9ebfd
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
195 addition
and
186 deletion
+195
-186
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+11
-9
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+68
-82
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+2
-8
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+114
-87
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
3ba5d8e9
...
@@ -236,14 +236,14 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
...
@@ -236,14 +236,14 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
// SDelFWriter
// SDelFWriter
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
S
MapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
);
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
S
Array
*
aDelData
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
);
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
);
// SDelFReader
// SDelFReader
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
);
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
);
int32_t
tsdbDelFReaderClose
(
SDelFReader
*
pReader
);
int32_t
tsdbDelFReaderClose
(
SDelFReader
*
pReader
);
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
S
MapData
*
pDelDataMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
S
Array
*
aDelData
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
);
// tsdbCache
// tsdbCache
int32_t
tsdbOpenCache
(
STsdb
*
pTsdb
);
int32_t
tsdbOpenCache
(
STsdb
*
pTsdb
);
...
@@ -470,10 +470,6 @@ struct SDelIdx {
...
@@ -470,10 +470,6 @@ struct SDelIdx {
struct
SDelFile
{
struct
SDelFile
{
int64_t
commitID
;
int64_t
commitID
;
TSKEY
minKey
;
TSKEY
maxKey
;
int64_t
minVersion
;
int64_t
maxVersion
;
int64_t
size
;
int64_t
size
;
int64_t
offset
;
int64_t
offset
;
};
};
...
@@ -557,6 +553,12 @@ struct STsdbFS {
...
@@ -557,6 +553,12 @@ struct STsdbFS {
STsdbFSState
*
nState
;
STsdbFSState
*
nState
;
};
};
struct
SDelFWriter
{
STsdb
*
pTsdb
;
SDelFile
fDel
;
TdFilePtr
pWriteH
;
};
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
3ba5d8e9
...
@@ -45,13 +45,10 @@ typedef struct {
...
@@ -45,13 +45,10 @@ typedef struct {
STSchema
*
pTSchema
;
STSchema
*
pTSchema
;
/* commit del */
/* commit del */
SDelFReader
*
pDelFReader
;
SDelFReader
*
pDelFReader
;
SMapData
oDelIdxMap
;
// SMapData<SDelIdx>, old
SMapData
oDelDataMap
;
// SMapData<SDelData>, old
SDelFWriter
*
pDelFWriter
;
SDelFWriter
*
pDelFWriter
;
SMapData
nDelIdxMap
;
// SMapData<SDelIdx>, new
SArray
*
aDelIdx
;
// SArray<SDelIdx>
SMapData
nDelDataMap
;
// SMapData<SDelData>, new
SArray
*
aDelIdxN
;
// SArray<SDelIdx>
SArray
*
aDelIdx
;
SArray
*
aDelData
;
// SArray<SDelData>
SArray
*
aDelData
;
}
SCommitter
;
}
SCommitter
;
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitter
*
pCommitter
);
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitter
*
pCommitter
);
...
@@ -121,23 +118,37 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
...
@@ -121,23 +118,37 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
int32_t
code
=
0
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SDelFile
*
pDelFileR
=
NULL
;
// TODO
SDelFile
*
pDelFileW
=
NULL
;
// TODO
tMapDataReset
(
&
pCommitter
->
oDelIdxMap
);
pCommitter
->
aDelIdx
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
));
tMapDataReset
(
&
pCommitter
->
nDelIdxMap
);
if
(
pCommitter
->
aDelIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCommitter
->
aDelData
=
taosArrayInit
(
0
,
sizeof
(
SDelData
));
if
(
pCommitter
->
aDelData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
// load old
pCommitter
->
aDelIdxN
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
));
if
(
pCommitter
->
aDelIdxN
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
SDelFile
*
pDelFileR
=
pTsdb
->
fs
->
nState
->
pDelFile
;
if
(
pDelFileR
)
{
if
(
pDelFileR
)
{
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
&
pCommitter
->
oDelIdxMap
,
NULL
);
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
pCommitter
->
aDelIdx
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
// prepare new
// prepare new
code
=
tsdbDelFWriterOpen
(
&
pCommitter
->
pDelFWriter
,
pDelFileW
,
pTsdb
);
SDelFile
wDelFile
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
,
.
offset
=
0
};
code
=
tsdbDelFWriterOpen
(
&
pCommitter
->
pDelFWriter
,
&
wDelFile
,
pTsdb
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
_exit:
_exit:
...
@@ -151,60 +162,51 @@ _err:
...
@@ -151,60 +162,51 @@ _err:
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SDelData
*
pDelData
=
&
(
SDelData
){}
;
SDelData
*
pDelData
;
tb_uid_t
suid
;
tb_uid_t
suid
;
tb_uid_t
uid
;
tb_uid_t
uid
;
SDelIdx
delIdx
;
// TODO
// check no del data, just return
taosArrayClear
(
pCommitter
->
aDelData
);
if
(
pTbData
&&
pTbData
->
pHead
==
NULL
)
{
pTbData
=
NULL
;
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
// prepare
if
(
pTbData
)
{
if
(
pTbData
)
{
delIdx
.
suid
=
pTbData
->
suid
;
suid
=
pTbData
->
suid
;
delIdx
.
uid
=
pTbData
->
uid
;
uid
=
pTbData
->
uid
;
}
else
{
delIdx
.
suid
=
pDelIdx
->
suid
;
delIdx
.
uid
=
pDelIdx
->
uid
;
}
// start
if
(
pTbData
->
pHead
==
NULL
)
{
tMapDataReset
(
&
pCommitter
->
oDelDataMap
);
pTbData
=
NULL
;
tMapDataReset
(
&
pCommitter
->
nDelDataMap
);
}
}
if
(
pDelIdx
)
{
if
(
pDelIdx
)
{
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
&
pCommitter
->
oDelDataMap
,
NULL
);
suid
=
pDelIdx
->
suid
;
uid
=
pDelIdx
->
uid
;
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
pCommitter
->
aDelData
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
// disk
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
for
(
int32_t
iDelData
=
0
;
iDelData
<
pCommitter
->
oDelDataMap
.
nItem
;
iDelData
++
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelDataMap
,
iDelData
,
pDelData
,
tGetDelData
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
pDelData
,
tPutDelData
);
SDelIdx
delIdx
=
{.
suid
=
suid
,
.
uid
=
uid
};
if
(
code
)
goto
_err
;
}
// memory
// memory
pDelData
=
pTbData
?
pTbData
->
pHead
:
NULL
;
pDelData
=
pTbData
?
pTbData
->
pHead
:
NULL
;
for
(;
pDelData
;
pDelData
=
pDelData
->
pNext
)
{
for
(;
pDelData
;
pDelData
=
pDelData
->
pNext
)
{
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
pDelData
,
tPutDelData
);
if
(
taosArrayPush
(
pCommitter
->
aDelData
,
pDelData
)
==
NULL
)
{
if
(
code
)
goto
_err
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
}
ASSERT
(
pCommitter
->
nDelDataMap
.
nItem
>
0
);
// write
// write
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
nDelDataMap
,
NULL
,
&
delIdx
);
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
pCommitter
->
aDelData
,
NULL
,
&
delIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// put delIdx
// put delIdx
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelIdxMap
,
&
delIdx
,
tPutDelIdx
);
if
(
taosArrayPush
(
pCommitter
->
aDelIdx
,
&
delIdx
)
==
NULL
)
{
if
(
code
)
goto
_err
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
_exit:
_exit:
return
code
;
return
code
;
...
@@ -219,30 +221,23 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
...
@@ -219,30 +221,23 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
int32_t
iDelIdx
=
0
;
int32_t
iDelIdx
=
0
;
int32_t
nDelIdx
=
pCommitter
->
oDelIdxMap
.
nItem
;
int32_t
nDelIdx
=
taosArrayGetSize
(
pCommitter
->
aDelIdx
)
;
int32_t
iTbData
=
0
;
int32_t
iTbData
=
0
;
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
STbData
*
pTbData
;
STbData
*
pTbData
;
SDelIdx
*
pDelIdx
;
SDelIdx
*
pDelIdx
;
SDelIdx
delIdx
;
int32_t
c
;
ASSERT
(
nTbData
>
0
);
ASSERT
(
nTbData
>
0
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
if
(
iDelIdx
<
nDelIdx
)
{
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
while
(
true
)
{
while
(
true
)
{
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
break
;
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
break
;
if
(
pTbData
&&
pDelIdx
)
{
if
(
pTbData
&&
pDelIdx
)
{
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdx
);
int32_t
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdx
);
if
(
c
==
0
)
{
if
(
c
==
0
)
{
goto
_commit_mem_and_disk_del
;
goto
_commit_mem_and_disk_del
;
}
else
if
(
c
<
0
)
{
}
else
if
(
c
<
0
)
{
...
@@ -258,44 +253,27 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
...
@@ -258,44 +253,27 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
_commit_mem_del:
_commit_mem_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
NULL
);
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iTbData
++
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
)
:
NULL
;
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
continue
;
continue
;
_commit_disk_del:
_commit_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
NULL
,
pDelIdx
);
code
=
tsdbCommitTableDel
(
pCommitter
,
NULL
,
pDelIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iDelIdx
++
;
iDelIdx
++
;
if
(
iDelIdx
<
nDelIdx
)
{
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
continue
;
continue
;
_commit_mem_and_disk_del:
_commit_mem_and_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
pDelIdx
);
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
pDelIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iTbData
++
;
iTbData
++
;
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
)
:
NULL
;
iDelIdx
++
;
iDelIdx
++
;
if
(
iTbData
<
nTbData
)
{
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
if
(
iDelIdx
<
nDelIdx
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
continue
;
continue
;
}
}
...
@@ -308,11 +286,15 @@ _err:
...
@@ -308,11 +286,15 @@ _err:
static
int32_t
tsdbCommitDelEnd
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitDelEnd
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
nDelIdxMap
,
NULL
);
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
pCommitter
->
aDelIdxN
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
,
NULL
);
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFSStateUpsertDelFile
(
pTsdb
->
fs
->
nState
,
&
pCommitter
->
pDelFWriter
->
fDel
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
pCommitter
->
pDelFWriter
,
1
);
code
=
tsdbDelFWriterClose
(
pCommitter
->
pDelFWriter
,
1
);
...
@@ -323,6 +305,10 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
...
@@ -323,6 +305,10 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
taosArrayDestroy
(
pCommitter
->
aDelIdx
);
taosArrayDestroy
(
pCommitter
->
aDelData
);
taosArrayDestroy
(
pCommitter
->
aDelIdxN
);
return
code
;
return
code
;
_err:
_err:
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
3ba5d8e9
...
@@ -283,10 +283,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
...
@@ -283,10 +283,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelFile
->
minKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
commitID
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelFile
->
maxKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
minVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
maxVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
size
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
size
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
offset
);
...
@@ -296,10 +293,7 @@ int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
...
@@ -296,10 +293,7 @@ int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
n
+=
tGetI64
(
p
+
n
,
&
pDelFile
->
minKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
commitID
);
n
+=
tGetI64
(
p
+
n
,
&
pDelFile
->
maxKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
minVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
maxVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
size
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
size
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
offset
);
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
3ba5d8e9
...
@@ -18,43 +18,45 @@
...
@@ -18,43 +18,45 @@
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
// SDelFWriter ====================================================
// SDelFWriter ====================================================
struct
SDelFWriter
{
STsdb
*
pTsdb
;
SDelFile
*
pFile
;
TdFilePtr
pWriteH
;
};
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
char
*
fname
=
NULL
;
// TODO
char
fname
[
TSDB_FILENAME_LEN
];
char
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
SDelFWriter
*
pDelFWriter
;
SDelFWriter
*
pDelFWriter
;
int64_t
n
;
// alloc
pDelFWriter
=
(
SDelFWriter
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pDelFWriter
));
pDelFWriter
=
(
SDelFWriter
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pDelFWriter
));
if
(
pDelFWriter
==
NULL
)
{
if
(
pDelFWriter
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
goto
_err
;
}
}
pDelFWriter
->
pTsdb
=
pTsdb
;
pDelFWriter
->
pTsdb
=
pTsdb
;
pDelFWriter
->
pFile
=
pFile
;
pDelFWriter
->
fDel
=
*
pFile
;
tsdbDelFileName
(
pTsdb
,
pFile
,
fname
);
pDelFWriter
->
pWriteH
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
|
TD_FILE_CREATE
);
pDelFWriter
->
pWriteH
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
|
TD_FILE_CREATE
);
if
(
pDelFWriter
->
pWriteH
==
NULL
)
{
if
(
pDelFWriter
->
pWriteH
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
if
(
taosLSeekFile
(
pDelFWriter
->
pWriteH
,
TSDB_FHDR_SIZE
,
SEEK_SET
)
<
0
)
{
// update header
n
=
taosWriteFile
(
pDelFWriter
->
pWriteH
,
&
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pDelFWriter
->
pFile
->
size
=
TSDB_FHDR_SIZE
;
pDelFWriter
->
fDel
.
size
=
TSDB_FHDR_SIZE
;
pDelFWriter
->
pFile
->
offset
=
0
;
pDelFWriter
->
fDel
.
size
=
0
;
*
ppWriter
=
pDelFWriter
;
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d failed to open del file writer since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d failed to open del file writer since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
ppWriter
=
NULL
;
return
code
;
return
code
;
}
}
...
@@ -80,28 +82,33 @@ _err:
...
@@ -80,28 +82,33 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
)
{
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SArray
*
aDelData
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
uint8_t
*
pBuf
=
NULL
;
uint8_t
*
pBuf
=
NULL
;
int64_t
size
=
0
;
int64_t
size
;
int64_t
n
=
0
;
int64_t
n
;
SBlockDataHdr
hdr
=
{.
delimiter
=
TSDB_FILE_DLMT
,
.
suid
=
pDelIdx
->
suid
,
.
uid
=
pDelIdx
->
uid
};
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// prepare
// prepare
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
=
sizeof
(
hdr
);
size
+=
tPutI64
(
NULL
,
pDelIdx
->
suid
);
for
(
int32_t
iDelData
=
0
;
iDelData
<
taosArrayGetSize
(
aDelData
);
iDelData
++
)
{
size
+=
tPutI64
(
NULL
,
pDelIdx
->
uid
);
size
+=
tPutDelData
(
NULL
,
taosArrayGet
(
aDelData
,
iDelData
));
size
=
size
+
tPutMapData
(
NULL
,
pDelDataMap
)
+
sizeof
(
TSCKSUM
);
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// build
// build
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
=
0
;
n
+=
tPutI64
(
*
ppBuf
+
n
,
pDelIdx
->
suid
);
*
(
SBlockDataHdr
*
)(
*
ppBuf
)
=
hdr
;
n
+=
tPutI64
(
*
ppBuf
+
n
,
pDelIdx
->
uid
);
n
+=
sizeof
(
hdr
);
n
+=
tPutMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
for
(
int32_t
iDelData
=
0
;
iDelData
<
taosArrayGetSize
(
aDelData
);
iDelData
++
)
{
size
+=
tPutDelData
(
*
ppBuf
+
n
,
taosArrayGet
(
aDelData
,
iDelData
));
}
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
...
@@ -116,10 +123,9 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t **
...
@@ -116,10 +123,9 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t **
ASSERT
(
n
==
size
);
ASSERT
(
n
==
size
);
// update
// update
pDelIdx
->
offset
=
pWriter
->
pFile
->
size
;
pDelIdx
->
offset
=
pWriter
->
fDel
.
size
;
pDelIdx
->
size
=
size
;
pDelIdx
->
size
=
size
;
pWriter
->
pFile
->
offset
=
pWriter
->
pFile
->
size
;
pWriter
->
fDel
.
size
+=
size
;
pWriter
->
pFile
->
size
+=
size
;
tsdbFree
(
pBuf
);
tsdbFree
(
pBuf
);
return
code
;
return
code
;
...
@@ -130,24 +136,33 @@ _err:
...
@@ -130,24 +136,33 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
size
=
0
;
int64_t
size
;
int64_t
n
=
0
;
int64_t
n
;
uint8_t
*
pBuf
=
NULL
;
uint8_t
*
pBuf
=
NULL
;
SDelIdx
*
pDelIdx
;
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// prepare
// prepare
size
=
0
;
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
=
size
+
tPutMapData
(
NULL
,
pDelIdxMap
)
+
sizeof
(
TSCKSUM
);
for
(
int32_t
iDelIdx
=
0
;
iDelIdx
<
taosArrayGetSize
(
aDelIdx
);
iDelIdx
++
)
{
size
+=
tPutDelIdx
(
NULL
,
taosArrayGet
(
aDelIdx
,
iDelIdx
));
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// build
// build
n
=
0
;
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutMapData
(
*
ppBuf
+
n
,
pDelIdxMap
);
for
(
int32_t
iDelIdx
=
0
;
iDelIdx
<
taosArrayGetSize
(
aDelIdx
);
iDelIdx
++
)
{
n
+=
tPutDelIdx
(
*
ppBuf
+
n
,
taosArrayGet
(
aDelIdx
,
iDelIdx
));
}
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
...
@@ -159,11 +174,9 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **pp
...
@@ -159,11 +174,9 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **pp
goto
_err
;
goto
_err
;
}
}
ASSERT
(
n
==
size
);
// update
// update
pWriter
->
pFile
->
offset
=
pWriter
->
pFile
->
size
;
pWriter
->
fDel
.
offset
=
pWriter
->
fDel
.
size
;
pWriter
->
pFile
->
size
+=
size
;
pWriter
->
fDel
.
size
+=
size
;
tsdbFree
(
pBuf
);
tsdbFree
(
pBuf
);
return
code
;
return
code
;
...
@@ -174,23 +187,16 @@ _err:
...
@@ -174,23 +187,16 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
uint8_t
*
pBuf
=
NULL
;
char
hdr
[
TSDB_FHDR_SIZE
];
int64_t
size
=
TSDB_FHDR_SIZE
;
int64_t
size
=
TSDB_FHDR_SIZE
;
int64_t
n
;
int64_t
n
;
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
// build
// build
memset
(
*
ppBuf
,
0
,
size
);
memset
(
hdr
,
0
,
size
);
n
=
tPutDelFile
(
*
ppBuf
,
pWriter
->
pFile
);
tPutDelFile
(
hdr
,
&
pWriter
->
fDel
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
hdr
,
size
);
ASSERT
(
n
<=
size
-
sizeof
(
TSCKSUM
));
// seek
// seek
if
(
taosLSeekFile
(
pWriter
->
pWriteH
,
0
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pWriter
->
pWriteH
,
0
,
SEEK_SET
)
<
0
)
{
...
@@ -199,30 +205,29 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) {
...
@@ -199,30 +205,29 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) {
}
}
// write
// write
if
(
taosWriteFile
(
pWriter
->
pWriteH
,
*
ppBuf
,
size
)
<
size
)
{
n
=
taosWriteFile
(
pWriter
->
pWriteH
,
hdr
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
tsdbFree
(
pBuf
);
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d update del file hdr failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d update del file hdr failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
}
}
// SDelFReader ====================================================
// SDelFReader ====================================================
struct
SDelFReader
{
struct
SDelFReader
{
STsdb
*
pTsdb
;
STsdb
*
pTsdb
;
SDelFile
*
pFile
;
SDelFile
fDel
;
TdFilePtr
pReadH
;
TdFilePtr
pReadH
;
};
};
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
char
*
fname
=
NULL
;
// todo
char
fname
[
TSDB_FILENAME_LEN
];
SDelFReader
*
pDelFReader
;
SDelFReader
*
pDelFReader
;
int64_t
n
;
int64_t
n
;
...
@@ -235,7 +240,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
...
@@ -235,7 +240,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
// open impl
// open impl
pDelFReader
->
pTsdb
=
pTsdb
;
pDelFReader
->
pTsdb
=
pTsdb
;
pDelFReader
->
pFile
=
pFile
;
pDelFReader
->
fDel
=
*
pFile
;
tsdbDelFileName
(
pTsdb
,
pFile
,
fname
);
pDelFReader
->
pReadH
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
pDelFReader
->
pReadH
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pDelFReader
==
NULL
)
{
if
(
pDelFReader
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
@@ -243,6 +250,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
...
@@ -243,6 +250,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
goto
_err
;
goto
_err
;
}
}
#if 0
// load and check hdr if buffer is given
// load and check hdr if buffer is given
if (ppBuf) {
if (ppBuf) {
code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE);
code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE);
...
@@ -266,6 +274,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
...
@@ -266,6 +274,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
// TODO: check the content
// TODO: check the content
}
}
#endif
_exit:
_exit:
*
ppReader
=
pDelFReader
;
*
ppReader
=
pDelFReader
;
...
@@ -292,66 +301,75 @@ _exit:
...
@@ -292,66 +301,75 @@ _exit:
return
code
;
return
code
;
}
}
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SArray
*
aDelData
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
n
;
int64_t
offset
=
pDelIdx
->
offset
;
uint32_t
delimiter
;
int64_t
size
=
pDelIdx
->
size
;
tb_uid_t
suid
;
int64_t
n
;
tb_uid_t
uid
;
uint8_t
*
pBuf
=
NULL
;
SBlockDataHdr
*
pHdr
;
SDelData
*
pDelData
=
&
(
SDelData
){
0
};
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// seek
// seek
if
(
taosLSeekFile
(
pReader
->
pReadH
,
pDelIdx
->
offset
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pReader
->
pReadH
,
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pDelDataMap
->
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
pDelIdx
->
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// read
// read
n
=
taosReadFile
(
pReader
->
pReadH
,
*
ppBuf
,
pDelIdx
->
size
);
n
=
taosReadFile
(
pReader
->
pReadH
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
else
if
(
n
<
pDelIdx
->
size
)
{
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
goto
_err
;
}
}
// check
// check
if
(
!
taosCheckChecksumWhole
(
*
ppBuf
,
pDelIdx
->
size
))
{
if
(
!
taosCheckChecksumWhole
(
*
ppBuf
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
goto
_err
;
}
}
// // decode
// // decode
n
=
0
;
n
=
0
;
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
pHdr
=
(
SBlockDataHdr
*
)(
*
ppBuf
+
n
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pHdr
->
delimiter
==
TSDB_FILE_DLMT
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
suid
);
ASSERT
(
pHdr
->
suid
==
pDelIdx
->
suid
);
ASSERT
(
suid
==
pDelIdx
->
suid
);
ASSERT
(
pHdr
->
uid
==
pDelIdx
->
uid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
uid
);
n
+=
sizeof
(
*
pHdr
);
ASSERT
(
uid
==
pDelIdx
->
uid
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
n
+=
tGetMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
n
+=
tGetDelData
(
*
ppBuf
+
n
,
pDelData
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
pDelIdx
->
size
);
}
ASSERT
(
n
==
size
-
sizeof
(
TSCKSUM
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d read del data failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d read del data failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
}
}
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
n
;
int32_t
n
;
int64_t
offset
=
pReader
->
pFile
->
offset
;
int64_t
offset
=
pReader
->
fDel
.
offset
;
int64_t
size
=
pReader
->
pFile
->
size
-
offset
;
int64_t
size
=
pReader
->
fDel
.
size
-
offset
;
uint32_t
delimiter
;
uint32_t
delimiter
;
uint8_t
*
pBuf
=
NULL
;
SDelIdx
*
pDelIdx
=
&
(
SDelIdx
){};
ASSERT
(
ppBuf
&&
*
ppBuf
)
;
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// seek
// seek
if
(
taosLSeekFile
(
pReader
->
pReadH
,
offset
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pReader
->
pReadH
,
offset
,
SEEK_SET
)
<
0
)
{
...
@@ -360,7 +378,6 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
...
@@ -360,7 +378,6 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
}
}
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pDelIdxMap
->
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
...
@@ -384,8 +401,18 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
...
@@ -384,8 +401,18 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
n
=
0
;
n
=
0
;
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
n
+=
tGetMapData
(
*
ppBuf
+
n
,
pDelIdxMap
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
taosArrayClear
(
aDelIdx
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
n
+=
tGetDelIdx
(
*
ppBuf
+
n
,
pDelIdx
);
if
(
taosArrayPush
(
aDelIdx
,
pDelIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
ASSERT
(
n
==
size
-
sizeof
(
TSCKSUM
));
return
code
;
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录