Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d4308489
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d4308489
编写于
7月 31, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
7月 31, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15610 from taosdata/refact/tsdb_last
refact: prepare for last optimize
上级
b9f95d70
aba95a7b
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
93 addition
and
64 deletion
+93
-64
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+24
-0
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+69
-64
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
d4308489
...
...
@@ -43,6 +43,7 @@ typedef struct STbDataIter STbDataIter;
typedef
struct
SMapData
SMapData
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlockL
SBlockL
;
typedef
struct
SColData
SColData
;
typedef
struct
SBlockDataHdr
SBlockDataHdr
;
typedef
struct
SBlockData
SBlockData
;
...
...
@@ -414,6 +415,29 @@ struct SBlock {
SSubBlock
aSubBlock
[
TSDB_MAX_SUBBLOCKS
];
};
struct
SBlockL
{
struct
{
int64_t
uid
;
int64_t
version
;
TSKEY
ts
;
}
minKey
;
struct
{
int64_t
uid
;
int64_t
version
;
TSKEY
ts
;
}
maxKey
;
int64_t
minVer
;
int64_t
maxVer
;
int32_t
nRow
;
int8_t
cmprAlg
;
int64_t
offset
;
int32_t
szBlock
;
int32_t
szBlockCol
;
int32_t
szUid
;
int32_t
szVer
;
int32_t
szTSKEY
;
};
struct
SColData
{
int16_t
cid
;
int8_t
type
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
d4308489
...
...
@@ -36,16 +36,20 @@ typedef struct {
TSKEY
minKey
;
TSKEY
maxKey
;
// commit file data
SDataFReader
*
pReader
;
SArray
*
aBlockIdx
;
// SArray<SBlockIdx>
SMapData
oBlockMap
;
// SMapData<SBlock>, read from reader
SBlockData
oBlockData
;
SDataFWriter
*
pWriter
;
SArray
*
aBlockIdxN
;
// SArray<SBlockIdx>
SMapData
nBlockMap
;
// SMapData<SBlock>
SBlockData
nBlockData
;
SSkmInfo
skmTable
;
SSkmInfo
skmRow
;
struct
{
SDataFReader
*
pReader
;
SArray
*
aBlockIdx
;
// SArray<SBlockIdx>
SMapData
mBlock
;
// SMapData<SBlock>, read from reader
SBlockData
bData
;
}
dReader
;
struct
{
SDataFWriter
*
pWriter
;
SArray
*
aBlockIdx
;
// SArray<SBlockIdx>
SMapData
mBlock
;
// SMapData<SBlock>
SBlockData
bData
;
}
dWriter
;
SSkmInfo
skmTable
;
SSkmInfo
skmRow
;
/* commit del */
SDelFReader
*
pDelFReader
;
SDelFWriter
*
pDelFWriter
;
...
...
@@ -276,16 +280,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter
->
nextKey
=
TSKEY_MAX
;
// old
taosArrayClear
(
pCommitter
->
aBlockIdx
);
tMapDataReset
(
&
pCommitter
->
oBlockMap
);
tBlockDataReset
(
&
pCommitter
->
oBlock
Data
);
taosArrayClear
(
pCommitter
->
dReader
.
aBlockIdx
);
tMapDataReset
(
&
pCommitter
->
dReader
.
mBlock
);
tBlockDataReset
(
&
pCommitter
->
dReader
.
b
Data
);
pRSet
=
(
SDFileSet
*
)
taosArraySearch
(
pCommitter
->
fs
.
aDFileSet
,
&
(
SDFileSet
){.
fid
=
pCommitter
->
commitFid
},
tDFileSetCmprFn
,
TD_EQ
);
if
(
pRSet
)
{
code
=
tsdbDataFReaderOpen
(
&
pCommitter
->
pReader
,
pTsdb
,
pRSet
);
code
=
tsdbDataFReaderOpen
(
&
pCommitter
->
dReader
.
pReader
,
pTsdb
,
pRSet
);
if
(
code
)
goto
_err
;
code
=
tsdbReadBlockIdx
(
pCommitter
->
pReader
,
pCommitter
->
aBlockIdx
,
NULL
);
code
=
tsdbReadBlockIdx
(
pCommitter
->
dReader
.
pReader
,
pCommitter
->
dReader
.
aBlockIdx
,
NULL
);
if
(
code
)
goto
_err
;
}
...
...
@@ -296,9 +300,9 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
SSmaFile
fSma
;
SDFileSet
wSet
=
{.
pHeadF
=
&
fHead
,
.
pDataF
=
&
fData
,
.
pLastF
=
&
fLast
,
.
pSmaF
=
&
fSma
};
taosArrayClear
(
pCommitter
->
aBlockIdxN
);
tMapDataReset
(
&
pCommitter
->
nBlockMap
);
tBlockDataReset
(
&
pCommitter
->
nBlock
Data
);
taosArrayClear
(
pCommitter
->
dWriter
.
aBlockIdx
);
tMapDataReset
(
&
pCommitter
->
dWriter
.
mBlock
);
tBlockDataReset
(
&
pCommitter
->
dWriter
.
b
Data
);
if
(
pRSet
)
{
wSet
.
diskId
=
pRSet
->
diskId
;
wSet
.
fid
=
pCommitter
->
commitFid
;
...
...
@@ -320,7 +324,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
fLast
=
(
SLastFile
){.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
};
fSma
=
(
SSmaFile
){.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
};
}
code
=
tsdbDataFWriterOpen
(
&
pCommitter
->
pWriter
,
pTsdb
,
&
wSet
);
code
=
tsdbDataFWriterOpen
(
&
pCommitter
->
dWriter
.
pWriter
,
pTsdb
,
&
wSet
);
if
(
code
)
goto
_err
;
_exit:
...
...
@@ -391,10 +395,11 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat
}
}
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockData
,
NULL
,
NULL
,
pBlockIdx
,
pBlock
,
pCommitter
->
cmprAlg
);
code
=
tsdbWriteBlockData
(
pCommitter
->
dWriter
.
pWriter
,
pBlockData
,
NULL
,
NULL
,
pBlockIdx
,
pBlock
,
pCommitter
->
cmprAlg
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockMap
,
pBlock
,
tPutBlock
);
code
=
tMapDataPutItem
(
&
pCommitter
->
dWriter
.
mBlock
,
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
return
code
;
...
...
@@ -407,8 +412,8 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
int8_t
toDataOnly
)
{
int32_t
code
=
0
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
pIter
->
pTbData
->
suid
,
.
uid
=
pIter
->
pTbData
->
uid
};
SBlockData
*
pBlockDataMerge
=
&
pCommitter
->
oBlock
Data
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlock
Data
;
SBlockData
*
pBlockDataMerge
=
&
pCommitter
->
dReader
.
b
Data
;
SBlockData
*
pBlockData
=
&
pCommitter
->
dWriter
.
b
Data
;
SBlock
block
;
SBlock
*
pBlock
=
&
block
;
TSDBROW
*
pRow1
;
...
...
@@ -416,7 +421,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
TSDBROW
*
pRow2
=
&
row2
;
// read SBlockData
code
=
tsdbReadBlockData
(
pCommitter
->
pReader
,
pBlockIdx
,
pBlockMerge
,
pBlockDataMerge
,
NULL
,
NULL
);
code
=
tsdbReadBlockData
(
pCommitter
->
dReader
.
pReader
,
pBlockIdx
,
pBlockMerge
,
pBlockDataMerge
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
code
=
tBlockDataSetSchema
(
pBlockData
,
pCommitter
->
skmTable
.
pTSchema
);
...
...
@@ -513,7 +518,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
TSDBROW
*
pRow
;
SBlock
block
;
SBlock
*
pBlock
=
&
block
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlock
Data
;
SBlockData
*
pBlockData
=
&
pCommitter
->
dWriter
.
b
Data
;
int64_t
suid
=
pIter
->
pTbData
->
suid
;
int64_t
uid
=
pIter
->
pTbData
->
uid
;
...
...
@@ -575,14 +580,14 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S
SBlock
block
;
if
(
pBlock
->
last
)
{
code
=
tsdbReadBlockData
(
pCommitter
->
pReader
,
pBlockIdx
,
pBlock
,
&
pCommitter
->
oBlock
Data
,
NULL
,
NULL
);
code
=
tsdbReadBlockData
(
pCommitter
->
dReader
.
pReader
,
pBlockIdx
,
pBlock
,
&
pCommitter
->
dReader
.
b
Data
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
tBlockReset
(
&
block
);
code
=
tsdbCommitBlockData
(
pCommitter
,
&
pCommitter
->
oBlock
Data
,
&
block
,
pBlockIdx
,
0
);
code
=
tsdbCommitBlockData
(
pCommitter
,
&
pCommitter
->
dReader
.
b
Data
,
&
block
,
pBlockIdx
,
0
);
if
(
code
)
goto
_err
;
}
else
{
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockMap
,
pBlock
,
tPutBlock
);
code
=
tMapDataPutItem
(
&
pCommitter
->
dWriter
.
mBlock
,
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
...
...
@@ -598,10 +603,10 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int6
SBlockIdx
blockIdx
=
{.
suid
=
suid
,
.
uid
=
uid
};
SBlockIdx
*
pBlockIdx
=
&
blockIdx
;
code
=
tsdbWriteBlock
(
pCommitter
->
pWriter
,
&
pCommitter
->
nBlockMap
,
NULL
,
pBlockIdx
);
code
=
tsdbWriteBlock
(
pCommitter
->
dWriter
.
pWriter
,
&
pCommitter
->
dWriter
.
mBlock
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
if
(
taosArrayPush
(
pCommitter
->
aBlockIdxN
,
pBlockIdx
)
==
NULL
)
{
if
(
taosArrayPush
(
pCommitter
->
dWriter
.
aBlockIdx
,
pBlockIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
...
...
@@ -643,7 +648,7 @@ static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
static
int32_t
tsdbMergeAsSubBlock
(
SCommitter
*
pCommitter
,
STbDataIter
*
pIter
,
SBlock
*
pBlock
)
{
int32_t
code
=
0
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlock
Data
;
SBlockData
*
pBlockData
=
&
pCommitter
->
dWriter
.
b
Data
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
pIter
->
pTbData
->
suid
,
.
uid
=
pIter
->
pTbData
->
uid
};
SBlock
block
;
TSDBROW
*
pRow
;
...
...
@@ -711,10 +716,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
}
if
(
pBlockIdx
)
{
code
=
tsdbReadBlock
(
pCommitter
->
pReader
,
pBlockIdx
,
&
pCommitter
->
oBlockMap
,
NULL
);
code
=
tsdbReadBlock
(
pCommitter
->
dReader
.
pReader
,
pBlockIdx
,
&
pCommitter
->
dReader
.
mBlock
,
NULL
);
if
(
code
)
goto
_err
;
nBlock
=
pCommitter
->
oBlockMap
.
nItem
;
nBlock
=
pCommitter
->
dReader
.
mBlock
.
nItem
;
ASSERT
(
nBlock
>
0
);
suid
=
pBlockIdx
->
suid
;
...
...
@@ -726,13 +731,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if
(
pRow
==
NULL
&&
nBlock
==
0
)
goto
_exit
;
// start ===========
tMapDataReset
(
&
pCommitter
->
nBlockMap
);
tMapDataReset
(
&
pCommitter
->
dWriter
.
mBlock
);
SBlock
block
;
SBlock
*
pBlock
=
&
block
;
iBlock
=
0
;
if
(
iBlock
<
nBlock
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockMap
,
iBlock
,
pBlock
,
tGetBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pBlock
,
tGetBlock
);
}
else
{
pBlock
=
NULL
;
}
...
...
@@ -756,7 +761,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if
(
pRow
&&
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
pRow
=
NULL
;
iBlock
++
;
if
(
iBlock
<
nBlock
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockMap
,
iBlock
,
pBlock
,
tGetBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pBlock
,
tGetBlock
);
}
else
{
pBlock
=
NULL
;
}
...
...
@@ -771,7 +776,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
iBlock
++
;
if
(
iBlock
<
nBlock
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockMap
,
iBlock
,
pBlock
,
tGetBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pBlock
,
tGetBlock
);
}
else
{
pBlock
=
NULL
;
}
...
...
@@ -798,7 +803,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
SBlock
nextBlock
=
{
0
};
tBlockReset
(
&
nextBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockMap
,
iBlock
+
1
,
&
nextBlock
,
tGetBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
+
1
,
&
nextBlock
,
tGetBlock
);
toKey
=
nextBlock
.
minKey
;
}
...
...
@@ -810,7 +815,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if
(
pRow
&&
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
pRow
=
NULL
;
iBlock
++
;
if
(
iBlock
<
nBlock
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockMap
,
iBlock
,
pBlock
,
tGetBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pBlock
,
tGetBlock
);
}
else
{
pBlock
=
NULL
;
}
...
...
@@ -822,7 +827,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
iBlock
++
;
if
(
iBlock
<
nBlock
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockMap
,
iBlock
,
pBlock
,
tGetBlock
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pBlock
,
tGetBlock
);
}
else
{
pBlock
=
NULL
;
}
...
...
@@ -857,23 +862,23 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t
code
=
0
;
// write blockIdx
code
=
tsdbWriteBlockIdx
(
pCommitter
->
pWriter
,
pCommitter
->
aBlockIdxN
,
NULL
);
code
=
tsdbWriteBlockIdx
(
pCommitter
->
dWriter
.
pWriter
,
pCommitter
->
dWriter
.
aBlockIdx
,
NULL
);
if
(
code
)
goto
_err
;
// update file header
code
=
tsdbUpdateDFileSetHeader
(
pCommitter
->
pWriter
);
code
=
tsdbUpdateDFileSetHeader
(
pCommitter
->
dWriter
.
pWriter
);
if
(
code
)
goto
_err
;
// upsert SDFileSet
code
=
tsdbFSUpsertFSet
(
&
pCommitter
->
fs
,
&
pCommitter
->
pWriter
->
wSet
);
code
=
tsdbFSUpsertFSet
(
&
pCommitter
->
fs
,
&
pCommitter
->
dWriter
.
pWriter
->
wSet
);
if
(
code
)
goto
_err
;
// close and sync
code
=
tsdbDataFWriterClose
(
&
pCommitter
->
pWriter
,
1
);
code
=
tsdbDataFWriterClose
(
&
pCommitter
->
dWriter
.
pWriter
,
1
);
if
(
code
)
goto
_err
;
if
(
pCommitter
->
pReader
)
{
code
=
tsdbDataFReaderClose
(
&
pCommitter
->
pReader
);
if
(
pCommitter
->
dReader
.
pReader
)
{
code
=
tsdbDataFReaderClose
(
&
pCommitter
->
dReader
.
pReader
);
if
(
code
)
goto
_err
;
}
...
...
@@ -898,14 +903,14 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t
iTbData
=
0
;
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
int32_t
iBlockIdx
=
0
;
int32_t
nBlockIdx
=
taosArrayGetSize
(
pCommitter
->
aBlockIdx
);
int32_t
nBlockIdx
=
taosArrayGetSize
(
pCommitter
->
dReader
.
aBlockIdx
);
STbData
*
pTbData
;
SBlockIdx
*
pBlockIdx
;
ASSERT
(
nTbData
>
0
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
pBlockIdx
=
(
iBlockIdx
<
nBlockIdx
)
?
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
aBlockIdx
,
iBlockIdx
)
:
NULL
;
pBlockIdx
=
(
iBlockIdx
<
nBlockIdx
)
?
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
dReader
.
aBlockIdx
,
iBlockIdx
)
:
NULL
;
while
(
pTbData
||
pBlockIdx
)
{
if
(
pTbData
&&
pBlockIdx
)
{
int32_t
c
=
tTABLEIDCmprFn
(
pTbData
,
pBlockIdx
);
...
...
@@ -936,7 +941,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
iBlockIdx
++
;
pBlockIdx
=
(
iBlockIdx
<
nBlockIdx
)
?
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
aBlockIdx
,
iBlockIdx
)
:
NULL
;
pBlockIdx
=
(
iBlockIdx
<
nBlockIdx
)
?
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
dReader
.
aBlockIdx
,
iBlockIdx
)
:
NULL
;
continue
;
_commit_table_mem_and_disk:
...
...
@@ -944,7 +949,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
iBlockIdx
++
;
pBlockIdx
=
(
iBlockIdx
<
nBlockIdx
)
?
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
aBlockIdx
,
iBlockIdx
)
:
NULL
;
pBlockIdx
=
(
iBlockIdx
<
nBlockIdx
)
?
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
dReader
.
aBlockIdx
,
iBlockIdx
)
:
NULL
;
iTbData
++
;
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
)
:
NULL
;
continue
;
...
...
@@ -958,8 +963,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
_err:
tsdbError
(
"vgId:%d commit file data failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbDataFReaderClose
(
&
pCommitter
->
pReader
);
tsdbDataFWriterClose
(
&
pCommitter
->
pWriter
,
0
);
tsdbDataFReaderClose
(
&
pCommitter
->
dReader
.
pReader
);
tsdbDataFWriterClose
(
&
pCommitter
->
dWriter
.
pWriter
,
0
);
return
code
;
}
...
...
@@ -996,22 +1001,22 @@ _err:
static
int32_t
tsdbCommitDataStart
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
pCommitter
->
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCommitter
->
aBlockIdx
==
NULL
)
{
pCommitter
->
dReader
.
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCommitter
->
dReader
.
aBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
pCommitter
->
aBlockIdxN
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCommitter
->
aBlockIdxN
==
NULL
)
{
pCommitter
->
dWriter
.
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCommitter
->
dWriter
.
aBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
code
=
tBlockDataInit
(
&
pCommitter
->
oBlock
Data
);
code
=
tBlockDataInit
(
&
pCommitter
->
dReader
.
b
Data
);
if
(
code
)
goto
_exit
;
code
=
tBlockDataInit
(
&
pCommitter
->
nBlock
Data
);
code
=
tBlockDataInit
(
&
pCommitter
->
dWriter
.
b
Data
);
if
(
code
)
goto
_exit
;
_exit:
...
...
@@ -1019,12 +1024,12 @@ _exit:
}
static
void
tsdbCommitDataEnd
(
SCommitter
*
pCommitter
)
{
taosArrayDestroy
(
pCommitter
->
aBlockIdx
);
tMapDataClear
(
&
pCommitter
->
oBlockMap
);
tBlockDataClear
(
&
pCommitter
->
oBlock
Data
,
1
);
taosArrayDestroy
(
pCommitter
->
aBlockIdxN
);
tMapDataClear
(
&
pCommitter
->
nBlockMap
);
tBlockDataClear
(
&
pCommitter
->
nBlock
Data
,
1
);
taosArrayDestroy
(
pCommitter
->
dReader
.
aBlockIdx
);
tMapDataClear
(
&
pCommitter
->
dReader
.
mBlock
);
tBlockDataClear
(
&
pCommitter
->
dReader
.
b
Data
,
1
);
taosArrayDestroy
(
pCommitter
->
dWriter
.
aBlockIdx
);
tMapDataClear
(
&
pCommitter
->
dWriter
.
mBlock
);
tBlockDataClear
(
&
pCommitter
->
dWriter
.
b
Data
,
1
);
tTSchemaDestroy
(
pCommitter
->
skmTable
.
pTSchema
);
tTSchemaDestroy
(
pCommitter
->
skmRow
.
pTSchema
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录