Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bbf92eeb
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bbf92eeb
编写于
6月 24, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more work
上级
39e48a7e
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
111 addition
and
129 deletion
+111
-129
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+94
-112
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+11
-14
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+6
-3
未找到文件。
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
bbf92eeb
...
@@ -340,88 +340,6 @@ _err:
...
@@ -340,88 +340,6 @@ _err:
return
code
;
return
code
;
}
}
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))
// static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey,
// bool toDataOnly) {
// int32_t code = 0;
// TSDBROW *pRow;
// STSchema *pTSchema = NULL; // TODO
// TSDBKEY key;
// SBlock *pBlock = &pCommitter->nBlock;
// if (pIter == NULL) goto _exit;
// tBlockReset(pBlock);
// tBlockDataReset(&pCommitter->nBlockData);
// while (true) {
// pRow = tsdbTbDataIterGet(pIter);
// if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
// if (pCommitter->nBlockData.nRow == 0) {
// break;
// } else {
// goto _write_block_data;
// }
// }
// // update schema
// if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) {
// // TODO
// // pTSchema = NULL;
// }
// // append row
// code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema);
// if (code) goto _err;
// // update info
// key = tsdbRowKey(pRow);
// if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key;
// if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key;
// if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version;
// if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version;
// // iter next
// tsdbTbDataIterNext(pIter);
// // check write
// if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) {
// continue;
// }
// _write_block_data:
// if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) {
// pCommitter->nBlock.last = 1;
// } else {
// pCommitter->nBlock.last = 0;
// }
// code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock);
// if (code) goto _err;
// code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
// if (code) goto _err;
// // update info
// if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey =
// pBlockIdx->info.minKey; if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey
// = pBlockIdx->info.maxKey; if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion =
// pBlock->info.minVerion; if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion =
// pBlock->info.maxVersion;
// tBlockReset(pBlock);
// tBlockDataReset(&pCommitter->nBlockData);
// }
// _exit:
// return code;
// _err:
// tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
// return code;
// }
// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
// int32_t nRow = 0;
// int32_t nRow = 0;
// TSDBROW *pRow;
// TSDBROW *pRow;
...
@@ -755,60 +673,61 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
...
@@ -755,60 +673,61 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
goto
_exit
;
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
goto
_exit
;
// main loop
// main loop
SMapData
*
mBlock
=
&
pCommitter
->
nBlockMap
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
pTbData
->
suid
,
.
uid
=
pTbData
->
uid
};
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
pTbData
->
suid
,
.
uid
=
pTbData
->
uid
};
SMapData
*
mBlock
=
&
pCommitter
->
nBlockMap
;
SBlock
*
pBlock
=
&
pCommitter
->
nBlock
;
SBlock
*
pBlock
=
&
pCommitter
->
nBlock
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlockData
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlockData
;
TSKEY
lastTS
;
tMapDataReset
(
mBlock
);
tBlockIdxReset
(
pBlockIdx
);
tBlockIdxReset
(
pBlockIdx
);
tMapDataReset
(
mBlock
);
tBlockReset
(
pBlock
);
tBlockReset
(
pBlock
);
tBlockDataReset
(
pBlockData
);
tBlockDataReset
(
pBlockData
);
while
(
pRow
!=
NULL
&&
TSDBROW_TS
(
pRow
)
<=
pCommitter
->
maxKey
)
{
lastTS
=
TSKEY_MIN
;
while
(
1
)
{
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
{
if
(
pBlockData
->
nRow
>
0
)
{
goto
_write_block
;
}
else
{
break
;
}
}
// update schema
code
=
tsdbCommitterUpdateSchema
(
pCommitter
,
pTbData
->
suid
,
pTbData
->
uid
,
TSDBROW_SVERSION
(
pRow
));
code
=
tsdbCommitterUpdateSchema
(
pCommitter
,
pTbData
->
suid
,
pTbData
->
uid
,
TSDBROW_SVERSION
(
pRow
));
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// append
code
=
tBlockDataAppendRow
(
pBlockData
,
pRow
,
pCommitter
->
pTSchema
);
code
=
tBlockDataAppendRow
(
pBlockData
,
pRow
,
pCommitter
->
pTSchema
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// update
pBlock
->
minVersion
=
TMIN
(
pBlock
->
minVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
minVersion
=
TMIN
(
pBlock
->
minVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
maxVersion
=
TMAX
(
pBlock
->
maxVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
maxVersion
=
TMAX
(
pBlock
->
maxVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
nRow
++
;
pBlock
->
nRow
++
;
if
(
TSDBROW_TS
(
pRow
)
==
lastTS
)
pBlock
->
hasDup
=
1
;
lastTS
=
TSDBROW_TS
(
pRow
);
// next
// next
tsdbTbDataIterNext
(
pIter
);
tsdbTbDataIterNext
(
pIter
);
pRow
=
tsdbTbDataIterGet
(
pIter
);
pRow
=
tsdbTbDataIterGet
(
pIter
);
if
(
pBlockData
->
nRow
>=
pCommitter
->
maxRow
*
4
/
5
)
{
// check
ASSERT
(
0
);
if
(
pBlockData
->
nRow
>=
pCommitter
->
maxRow
*
4
/
5
)
goto
_write_block
;
// // SBlock
continue
;
// pBlock->last = 0;
// pBlock->cmprAlg = pCommitter->cmprAlg;
// code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock);
// if (code) goto _err;
// // SBlockIdx
// pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts);
// pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts);
// pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion);
// pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion);
// tBlockReset(pBlock);
// tBlockDataReset(pBlockData);
}
}
if
(
pBlockData
->
nRow
>
0
)
{
_write_block:
// SBlock
row
=
tBlockDataFirstRow
(
pBlockData
);
row
=
tBlockDataFirstRow
(
pBlockData
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
minKey
,
&
TSDBROW_KEY
(
&
row
))
>
0
)
pBlock
->
minKey
=
TSDBROW_KEY
(
&
row
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
minKey
,
&
TSDBROW_KEY
(
&
row
))
>
0
)
pBlock
->
minKey
=
TSDBROW_KEY
(
&
row
);
row
=
tBlockDataLastRow
(
pBlockData
);
row
=
tBlockDataLastRow
(
pBlockData
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
maxKey
,
&
TSDBROW_KEY
(
&
row
))
<
0
)
pBlock
->
maxKey
=
TSDBROW_KEY
(
&
row
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
maxKey
,
&
TSDBROW_KEY
(
&
row
))
<
0
)
pBlock
->
maxKey
=
TSDBROW_KEY
(
&
row
);
pBlock
->
last
=
1
;
pBlock
->
last
=
pBlockData
->
nRow
<
pCommitter
->
minRow
?
1
:
0
;
pBlock
->
cmprAlg
=
pCommitter
->
cmprAlg
;
pBlock
->
cmprAlg
=
pCommitter
->
cmprAlg
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockData
,
NULL
,
NULL
,
pBlockIdx
,
pBlock
);
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockData
,
NULL
,
NULL
,
pBlockIdx
,
pBlock
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// Design SMA and write SMA to file
// SBlockIdx
// SBlockIdx
code
=
tMapDataPutItem
(
mBlock
,
pBlock
,
tPutBlock
);
code
=
tMapDataPutItem
(
mBlock
,
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
...
@@ -816,6 +735,10 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
...
@@ -816,6 +735,10 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
pBlockIdx
->
maxKey
=
TMAX
(
pBlockIdx
->
maxKey
,
pBlock
->
maxKey
.
ts
);
pBlockIdx
->
maxKey
=
TMAX
(
pBlockIdx
->
maxKey
,
pBlock
->
maxKey
.
ts
);
pBlockIdx
->
minVersion
=
TMIN
(
pBlockIdx
->
minVersion
,
pBlock
->
minVersion
);
pBlockIdx
->
minVersion
=
TMIN
(
pBlockIdx
->
minVersion
,
pBlock
->
minVersion
);
pBlockIdx
->
maxVersion
=
TMAX
(
pBlockIdx
->
maxVersion
,
pBlock
->
maxVersion
);
pBlockIdx
->
maxVersion
=
TMAX
(
pBlockIdx
->
maxVersion
,
pBlock
->
maxVersion
);
tBlockReset
(
pBlock
);
tBlockDataReset
(
pBlockData
);
lastTS
=
TSKEY_MIN
;
}
}
// write block
// write block
...
@@ -834,6 +757,65 @@ _err:
...
@@ -834,6 +757,65 @@ _err:
return
code
;
return
code
;
}
}
static
int32_t
tsdbCommitDiskData
(
SCommitter
*
pCommitter
,
SBlockIdx
*
oBlockIdx
)
{
int32_t
code
=
0
;
SMapData
*
mBlockO
=
&
pCommitter
->
oBlockMap
;
SMapData
*
mBlockN
=
&
pCommitter
->
nBlockMap
;
SBlock
*
pBlockO
=
&
pCommitter
->
oBlock
;
SBlock
*
pBlockN
=
&
pCommitter
->
nBlock
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
oBlockIdx
->
suid
,
.
uid
=
oBlockIdx
->
uid
,
.
maxKey
=
oBlockIdx
->
maxKey
,
.
minKey
=
oBlockIdx
->
minKey
,
.
minVersion
=
oBlockIdx
->
minVersion
,
.
maxVersion
=
oBlockIdx
->
maxVersion
,
.
offset
=
-
1
,
.
size
=
-
1
};
SBlockData
*
pBlockDataO
=
&
pCommitter
->
oBlockData
;
// read
code
=
tsdbReadBlock
(
pCommitter
->
pReader
,
oBlockIdx
,
mBlockO
,
NULL
);
if
(
code
)
goto
_err
;
// loop to add to new
tMapDataReset
(
mBlockN
);
for
(
int32_t
iBlock
=
0
;
iBlock
<
mBlockO
->
nItem
;
iBlock
++
)
{
tMapDataGetItemByIdx
(
mBlockO
,
iBlock
,
pBlockO
,
tGetBlock
);
if
(
pBlockO
->
last
)
{
ASSERT
(
iBlock
==
mBlockO
->
nItem
-
1
);
code
=
tsdbReadBlockData
(
pCommitter
->
pReader
,
oBlockIdx
,
pBlockO
,
pBlockDataO
,
NULL
,
-
1
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
tBlockReset
(
pBlockN
);
pBlockN
->
last
=
1
;
pBlockN
->
cmprAlg
=
pBlockO
->
cmprAlg
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockDataO
,
NULL
,
NULL
,
pBlockIdx
,
pBlockN
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
mBlockN
,
pBlockN
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
else
{
code
=
tMapDataPutItem
(
mBlockN
,
pBlockO
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
}
// SBlock
code
=
tsdbWriteBlock
(
pCommitter
->
pWriter
,
mBlockN
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
// SBlockIdx
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockIdxMap
,
pBlockIdx
,
tPutBlockIdx
);
if
(
code
)
goto
_err
;
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb Commit disk data failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
c
;
int32_t
c
;
...
@@ -878,8 +860,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
...
@@ -878,8 +860,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
}
}
}
else
if
(
c
<
0
)
{
}
else
if
(
c
<
0
)
{
// commit memory data
// commit memory data
//
code = tsdbCommitMemoryData(pCommitter, pTbData);
code
=
tsdbCommitMemoryData
(
pCommitter
,
pTbData
);
//
if (code) goto _err;
if
(
code
)
goto
_err
;
iTbData
++
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
if
(
iTbData
<
nTbData
)
{
...
@@ -889,8 +871,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
...
@@ -889,8 +871,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
}
}
}
else
{
}
else
{
// commit disk data
// commit disk data
//
code = tsdbCommitDiskData(pCommitter, pBlockIdx);
code
=
tsdbCommitDiskData
(
pCommitter
,
pBlockIdx
);
//
if (code) goto _err;
if
(
code
)
goto
_err
;
iBlockIdx
++
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
if
(
iBlockIdx
<
nBlockIdx
)
{
...
@@ -904,8 +886,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
...
@@ -904,8 +886,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
// disk
// disk
while
(
pBlockIdx
)
{
while
(
pBlockIdx
)
{
// commit disk data
// commit disk data
//
code = tsdbCommitDiskData(pCommitter, pBlockIdx);
code
=
tsdbCommitDiskData
(
pCommitter
,
pBlockIdx
);
//
if (code) goto _err;
if
(
code
)
goto
_err
;
iBlockIdx
++
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
if
(
iBlockIdx
<
nBlockIdx
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
bbf92eeb
...
@@ -545,9 +545,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
...
@@ -545,9 +545,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
int64_t
offset
=
pBlockIdx
->
offset
;
int64_t
offset
=
pBlockIdx
->
offset
;
int64_t
size
=
pBlockIdx
->
size
;
int64_t
size
=
pBlockIdx
->
size
;
int64_t
n
;
int64_t
n
;
uint32_t
delimiter
;
SBlockDataHdr
hdr
;
int64_t
suid
;
int64_t
uid
;
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
mBlock
->
pBuf
;
if
(
!
ppBuf
)
ppBuf
=
&
mBlock
->
pBuf
;
...
@@ -577,13 +575,12 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
...
@@ -577,13 +575,12 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
}
}
// decode
// decode
n
=
0
;
hdr
=
*
(
SBlockDataHdr
*
)(
*
ppBuf
);
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
ASSERT
(
hdr
.
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
hdr
.
suid
==
pBlockIdx
->
suid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
suid
);
ASSERT
(
hdr
.
uid
==
pBlockIdx
->
uid
);
ASSERT
(
suid
==
pBlockIdx
->
suid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
uid
);
n
=
sizeof
(
hdr
);
ASSERT
(
uid
==
pBlockIdx
->
uid
);
n
+=
tGetMapData
(
*
ppBuf
+
n
,
mBlock
);
n
+=
tGetMapData
(
*
ppBuf
+
n
,
mBlock
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
bbf92eeb
...
@@ -27,9 +27,12 @@ void tMapDataReset(SMapData *pMapData) {
...
@@ -27,9 +27,12 @@ void tMapDataReset(SMapData *pMapData) {
}
}
void
tMapDataClear
(
SMapData
*
pMapData
)
{
void
tMapDataClear
(
SMapData
*
pMapData
)
{
if
(
pMapData
->
pBuf
)
{
tsdbFree
(
pMapData
->
pBuf
);
}
else
{
tsdbFree
(
pMapData
->
pOfst
);
tsdbFree
(
pMapData
->
pOfst
);
tsdbFree
(
pMapData
->
pData
);
tsdbFree
(
pMapData
->
pData
);
tsdbFree
(
pMapData
->
pBuf
);
}
}
}
int32_t
tMapDataPutItem
(
SMapData
*
pMapData
,
void
*
pItem
,
int32_t
(
*
tPutItemFn
)(
uint8_t
*
,
void
*
))
{
int32_t
tMapDataPutItem
(
SMapData
*
pMapData
,
void
*
pItem
,
int32_t
(
*
tPutItemFn
)(
uint8_t
*
,
void
*
))
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录