Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
017170e8
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
017170e8
编写于
6月 28, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more work
上级
89026d75
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
44 addition
and
328 deletion
+44
-328
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+44
-328
未找到文件。
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
017170e8
...
...
@@ -431,162 +431,6 @@ _exit:
return
code
;
}
static
int32_t
tsdbCommitMemoryData
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STbDataIter
*
pIter
=
&
(
STbDataIter
){
0
};
TSDBKEY
key
=
{.
ts
=
pCommitter
->
minKey
,
.
version
=
VERSION_MIN
};
TSDBROW
row
;
TSDBROW
*
pRow
;
// create iter
tsdbTbDataIterOpen
(
pTbData
,
&
key
,
0
,
pIter
);
pRow
=
tsdbTbDataIterGet
(
pIter
);
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
goto
_exit
;
// main loop
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
pTbData
->
suid
,
.
uid
=
pTbData
->
uid
};
SMapData
*
mBlock
=
&
pCommitter
->
nBlockMap
;
SBlock
*
pBlock
=
&
pCommitter
->
nBlock
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlockData
;
TSKEY
lastTS
;
tBlockIdxReset
(
pBlockIdx
);
tMapDataReset
(
mBlock
);
tBlockReset
(
pBlock
);
tBlockDataReset
(
pBlockData
);
lastTS
=
TSKEY_MIN
;
while
(
1
)
{
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
{
if
(
pBlockData
->
nRow
>
0
)
{
goto
_write_block
;
}
else
{
break
;
}
}
// update schema
code
=
tsdbCommitterUpdateSchema
(
pCommitter
,
pTbData
->
suid
,
pTbData
->
uid
,
TSDBROW_SVERSION
(
pRow
));
if
(
code
)
goto
_err
;
// append
code
=
tBlockDataAppendRow
(
pBlockData
,
pRow
,
pCommitter
->
pTSchema
);
if
(
code
)
goto
_err
;
// update
pBlock
->
minVersion
=
TMIN
(
pBlock
->
minVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
maxVersion
=
TMAX
(
pBlock
->
maxVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
nRow
++
;
if
(
TSDBROW_TS
(
pRow
)
==
lastTS
)
pBlock
->
hasDup
=
1
;
lastTS
=
TSDBROW_TS
(
pRow
);
// next
tsdbTbDataIterNext
(
pIter
);
pRow
=
tsdbTbDataIterGet
(
pIter
);
// check
if
(
pBlockData
->
nRow
>=
pCommitter
->
maxRow
*
4
/
5
)
goto
_write_block
;
continue
;
_write_block:
row
=
tBlockDataFirstRow
(
pBlockData
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
minKey
,
&
TSDBROW_KEY
(
&
row
))
>
0
)
pBlock
->
minKey
=
TSDBROW_KEY
(
&
row
);
row
=
tBlockDataLastRow
(
pBlockData
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
maxKey
,
&
TSDBROW_KEY
(
&
row
))
<
0
)
pBlock
->
maxKey
=
TSDBROW_KEY
(
&
row
);
pBlock
->
last
=
pBlockData
->
nRow
<
pCommitter
->
minRow
?
1
:
0
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockData
,
NULL
,
NULL
,
pBlockIdx
,
pBlock
,
pCommitter
->
cmprAlg
);
if
(
code
)
goto
_err
;
// Design SMA and write SMA to file
// SBlockIdx
code
=
tMapDataPutItem
(
mBlock
,
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
pBlockIdx
->
minKey
=
TMIN
(
pBlockIdx
->
minKey
,
pBlock
->
minKey
.
ts
);
pBlockIdx
->
maxKey
=
TMAX
(
pBlockIdx
->
maxKey
,
pBlock
->
maxKey
.
ts
);
pBlockIdx
->
minVersion
=
TMIN
(
pBlockIdx
->
minVersion
,
pBlock
->
minVersion
);
pBlockIdx
->
maxVersion
=
TMAX
(
pBlockIdx
->
maxVersion
,
pBlock
->
maxVersion
);
tBlockReset
(
pBlock
);
tBlockDataReset
(
pBlockData
);
lastTS
=
TSKEY_MIN
;
}
// write block
code
=
tsdbWriteBlock
(
pCommitter
->
pWriter
,
mBlock
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockIdxMap
,
pBlockIdx
,
tPutBlockIdx
);
if
(
code
)
goto
_err
;
_exit:
if
(
pRow
)
pCommitter
->
nextKey
=
TMIN
(
pCommitter
->
nextKey
,
TSDBROW_TS
(
pRow
));
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb commit memory data failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitDiskData
(
SCommitter
*
pCommitter
,
SBlockIdx
*
oBlockIdx
)
{
int32_t
code
=
0
;
SMapData
*
mBlockO
=
&
pCommitter
->
oBlockMap
;
SBlock
*
pBlockO
=
&
pCommitter
->
oBlock
;
SMapData
*
mBlockN
=
&
pCommitter
->
nBlockMap
;
SBlock
*
pBlockN
=
&
pCommitter
->
nBlock
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){
0
};
SBlockData
*
pBlockDataO
=
&
pCommitter
->
oBlockData
;
// read
code
=
tsdbReadBlock
(
pCommitter
->
pReader
,
oBlockIdx
,
mBlockO
,
NULL
);
if
(
code
)
goto
_err
;
// loop to add to new
tMapDataReset
(
mBlockN
);
for
(
int32_t
iBlock
=
0
;
iBlock
<
mBlockO
->
nItem
;
iBlock
++
)
{
tMapDataGetItemByIdx
(
mBlockO
,
iBlock
,
pBlockO
,
tGetBlock
);
if
(
pBlockO
->
last
)
{
ASSERT
(
iBlock
==
mBlockO
->
nItem
-
1
);
code
=
tsdbReadBlockData
(
pCommitter
->
pReader
,
oBlockIdx
,
pBlockO
,
pBlockDataO
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
tBlockReset
(
pBlockN
);
pBlockN
->
minKey
=
pBlockO
->
minKey
;
pBlockN
->
maxKey
=
pBlockO
->
maxKey
;
pBlockN
->
minVersion
=
pBlockO
->
minVersion
;
pBlockN
->
maxVersion
=
pBlockO
->
maxVersion
;
pBlockN
->
nRow
=
pBlockO
->
nRow
;
pBlockN
->
last
=
pBlockO
->
last
;
pBlockN
->
hasDup
=
pBlockO
->
hasDup
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockDataO
,
NULL
,
NULL
,
pBlockIdx
,
pBlockN
,
pCommitter
->
cmprAlg
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
mBlockN
,
pBlockN
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
else
{
code
=
tMapDataPutItem
(
mBlockN
,
pBlockO
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
}
// SBlock
*
pBlockIdx
=
*
oBlockIdx
;
code
=
tsdbWriteBlock
(
pCommitter
->
pWriter
,
mBlockN
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
// SBlockIdx
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockIdxMap
,
pBlockIdx
,
tPutBlockIdx
);
if
(
code
)
goto
_err
;
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb commit disk data failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbMergeTableData
(
SCommitter
*
pCommitter
,
STbDataIter
*
pIter
,
SBlock
*
pBlockMerge
,
TSDBKEY
toKey
,
int8_t
toDataOnly
)
{
int32_t
code
=
0
;
...
...
@@ -820,123 +664,6 @@ _err:
return
code
;
}
static
int32_t
tsdbMergeMemDisk
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SBlockIdx
*
oBlockIdx
)
{
int32_t
code
=
0
;
// STbDataIter *pIter = &(STbDataIter){0};
// TSDBROW *pRow;
// // create iter
// tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
// pRow == tsdbTbDataIterGet(pIter);
// if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
// code = tsdbCommitDiskData(pCommitter, oBlockIdx);
// if (code) {
// goto _err;
// } else {
// goto _exit;
// }
// }
// // start ==================
// // read
// code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
// if (code) goto _err;
// // loop to merge
// // SBlockData *pBlockData = &pCommitter->nBlockData;
// int32_t iBlock = 0;
// int32_t nBlock = pCommitter->oBlockMap.nItem;
// // SBlock *pBlockO = &pCommitter->oBlock;
// SBlock *pBlock;
// int32_t c;
// // merge ===================
// while (true) {
// if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
// if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
// if (pBlock->last) {
// // merge memory data and disk data to write to .data/.last (todo)
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
// (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// } else {
// c = tBlockCmprFn(&(SBlock){}, pBlock);
// if (c < 0) {
// // commit memory data until pBlock->minKey (not included) only to .data file (todo)
// code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// } else if (c > 0) {
// // just move the block (todo)
// // code = tsdbCommitTableDiskData(pCommitter, pBlock);
// if (code) goto _err;
// iBlock++;
// // TODO
// } else {
// int64_t nOvlp = 0; // = tsdbOvlpRows();
// if (nOvlp + pBlock->nRow <= pCommitter->maxRow) {
// // add as a subblock
// } else {
// if (iBlock == nBlock - 1) {
// // merge memory data and disk data to .data/.last file
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
// (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
// if (code) goto _err;
// } else {
// // merge memory data and disk data to .data file only until pBlock[1].
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
// }
// }
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// }
// }
// } else if (pBlock) {
// // code = tsdbCommitTableDiskData(pCommitter, pBlock);
// if (code) goto _err;
// iBlock++;
// // next block
// } else {
// // commit only memory data until (pCommitter->maxKey, VERSION_MAX)
// code =
// tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version =
// VERSION_MIN}, 0);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// }
// }
// // end =====================
// // SBlock
// // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
// // if (code) goto _err;
// // // SBlockIdx
// // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
// // if (code) goto _err;
// _exit:
// pRow = tsdbTbDataIterGet(pIter);
// if (pRow) {
// pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
// }
return
code
;
// _err:
// tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode),
// tstrerror(code)); return code;
}
static
int32_t
tsdbCommitTableDataEnd
(
SCommitter
*
pCommitter
,
int64_t
suid
,
int64_t
uid
)
{
int32_t
code
=
0
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
suid
,
.
uid
=
uid
};
...
...
@@ -1190,7 +917,6 @@ _err:
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
c
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
int32_t
iTbData
=
0
;
...
...
@@ -1209,30 +935,59 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
pBlockIdx
=
NULL
;
}
// merge
while
(
pTbData
&&
pBlockIdx
)
{
c
=
tTABLEIDCmprFn
(
pTbData
,
pBlockIdx
);
while
(
pTbData
||
pBlockIdx
)
{
if
(
pTbData
&&
pBlockIdx
)
{
int32_t
c
=
tTABLEIDCmprFn
(
pTbData
,
pBlockIdx
);
if
(
c
==
0
)
{
// merge commit
code
=
tsdbMergeMemDisk
(
pCommitter
,
pTbData
,
pBlockIdx
);
if
(
code
)
goto
_err
;
if
(
c
==
0
)
{
code
=
tsdbCommitTableData
(
pCommitter
,
pTbData
,
pBlockIdx
);
if
(
code
)
goto
_err
;
iTbData
++
;
iBlockIdx
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
}
else
if
(
c
<
0
)
{
code
=
tsdbCommitTableData
(
pCommitter
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
}
else
{
pTbData
=
NULL
;
code
=
tsdbCommitTableData
(
pCommitter
,
pTbData
,
NULL
);
if
(
code
)
goto
_err
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
}
}
else
if
(
pBlockIdx
)
{
code
=
tsdbCommitTableData
(
pCommitter
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
}
else
if
(
c
<
0
)
{
// commit memory data
code
=
tsdbCommitMemoryData
(
pCommitter
,
pTbData
);
}
else
{
code
=
tsdbCommitTableData
(
pCommitter
,
pTbData
,
NULL
);
if
(
code
)
goto
_err
;
iTbData
++
;
...
...
@@ -1241,45 +996,6 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
}
else
{
pTbData
=
NULL
;
}
}
else
{
// commit disk data
code
=
tsdbCommitDiskData
(
pCommitter
,
pBlockIdx
);
if
(
code
)
goto
_err
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
}
}
// disk
while
(
pBlockIdx
)
{
// commit disk data
code
=
tsdbCommitDiskData
(
pCommitter
,
pBlockIdx
);
if
(
code
)
goto
_err
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
}
// memory
while
(
pTbData
)
{
// commit memory data
code
=
tsdbCommitMemoryData
(
pCommitter
,
pTbData
);
if
(
code
)
goto
_err
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录