Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
65b54f9d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
65b54f9d
编写于
6月 25, 2022
作者:
M
Minglei Jin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
tsdbCache: framework for last row merging
上级
7d1574fe
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
373 addition
and
38 deletion
+373
-38
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+2
-1
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+351
-36
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+19
-0
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
65b54f9d
...
...
@@ -126,6 +126,7 @@ int32_t tBlockCmprFn(const void *p1, const void *p2);
void
tBlockIdxReset
(
SBlockIdx
*
pBlockIdx
);
int32_t
tPutBlockIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetBlockIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tCmprBlockIdx
(
void
const
*
lhs
,
void
const
*
rhs
);
// SColdata
#define tColDataInit() ((SColData){0})
void
tColDataReset
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
);
...
...
@@ -141,9 +142,9 @@ void tBlockDataReset(SBlockData *pBlockData);
void
tBlockDataClear
(
SBlockData
*
pBlockData
);
int32_t
tBlockDataAppendRow
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
);
// SDelIdx
int32_t
tCmprDelIdx
(
void
const
*
lhs
,
void
const
*
rhs
);
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tCmprDelIdx
(
void
const
*
lhs
,
void
const
*
rhs
);
// SDelData
int32_t
tPutDelData
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelData
(
uint8_t
*
p
,
void
*
ph
);
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
65b54f9d
...
...
@@ -102,8 +102,8 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
return
suid
;
}
/*
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow
**ppRow) {
int32_t code = 0;
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow
**ppRow) {
int32_t code = 0;
if (mem) {
STbData *pMem = NULL;
...
...
@@ -218,20 +218,28 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
_err:
return
code
;
}
#if 0
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline,
STsdb
*
pTsdb
,
STSRow
**
pLastRow
)
{
STsdb *pTsdb, STSRow **p
p
LastRow) {
int32_t code = 0;
TSDBROW *pMemRow = NULL;
TSDBROW *pIMemRow = NULL;
TSDBKEY memKey = TSDBKEY_MIN;
TSDBKEY imemKey = TSDBKEY_MIN;
if (iter != NULL) {
pMemRow = tsdbTbDataIterGet(iter);
if (pMemRow) {
memKey = tsdbRowKey(pMemRow);
}
}
if (iter != NULL) {
pIMemRow = tsdbTbDataIterGet(iiter);
if (pIMemRow) {
imemKey = tsdbRowKey(pIMemRow);
}
}
SDataFReader *pDataFReader;
...
...
@@ -243,42 +251,266 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile
code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL);
if (code) goto _err;
SBlockData
*
pBlockData
;
SBlockIdx blockIdx = {0};
tBlockIdxReset(&blockIdx);
code = tMapDataSearch(&blockIdxMap, pBlockIdx, tGetBlockIdx, tCmprBlockIdx, &blockIdx);
if (code) goto _err;
SMapData blockMap = {0};
tMapDataReset(&blockMap);
code = tsdbReadBlock(pDataFReader, &blockIdx, &blockMap, NULL);
if (code) goto _err;
int32_t nBlock = blockMap.nItem;
for (int32_t iBlock = nBlock - 1; iBlock >= 0; --iBlock) {
SBlock block = {0};
SBlockData blockData = {0};
tBlockReset(&block);
tBlockDataReset(&blockData);
tMapDataGetItemByIdx(&blockMap, iBlock, &block, tGetBlock);
code = tsdbReadBlockData(pDataFReader, &blockIdx, &block, &blockData, NULL, 0, NULL, NULL);
if (code) goto _err;
int32_t nRow = blockData.nRow;
for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
TSDBROW row = tsdbRowFromBlockData(&blockData, iRow);
TSDBKEY key = tsdbRowKey(&row);
if (pMemRow != NULL && pIMemRow != NULL) {
int32_t c = tsdbKeyCmprFn(memKey, imemKey);
if (c < 0) {
} else if (c > 0) {
} else {
}
} else if (pMemRow != NULL) {
pMemRow = tsdbTbDataIterGet(iter);
} else if (pIMemRow != NULL) {
} else {
if (!tsdbKeyDeleted(key, pSkyline)) {
*ppLastRow = buildTsrowFromTsdbrow(&row);
goto _done;
} else {
continue;
}
}
// select current row if outside delete area
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
}
}
_done:
tsdbDataFReaderClose(&pDataFReader);
return code;
_err:
return code;
}
#endif
typedef
enum
SFSNEXTROWSTATES
{
SFSNEXTROW_FS
,
SFSNEXTROW_FILESET
,
SFSNEXTROW_BLOCKDATA
,
SFSNEXTROW_BLOCKROW
}
SFSNEXTROWSTATES
;
typedef
struct
SFSNextRowIter
{
SFSNEXTROWSTATES
state
;
// [input]
STsdb
*
pTsdb
;
// [input]
SBlockIdx
*
pBlockIdxExp
;
// [input]
int32_t
nFileSet
;
int32_t
iFileSet
;
SArray
*
aDFileSet
;
SDataFReader
*
pDataFReader
;
SMapData
blockIdxMap
;
SBlockIdx
blockIdx
;
SMapData
blockMap
;
int32_t
nBlock
;
int32_t
iBlock
;
SBlock
block
;
SBlockData
blockData
;
int32_t
nRow
;
int32_t
iRow
;
TSDBROW
row
;
}
SFSNextRowIter
;
static
int32_t
getNextRowFromFS
(
void
*
iter
,
TSDBROW
**
ppRow
)
{
SFSNextRowIter
*
state
=
(
SFSNextRowIter
*
)
iter
;
int32_t
code
=
0
;
switch
(
state
->
state
)
{
case
SFSNEXTROW_FS
:
state
->
aDFileSet
=
state
->
pTsdb
->
fs
->
cState
->
aDFileSet
;
state
->
nFileSet
=
taosArrayGetSize
(
state
->
aDFileSet
);
state
->
iFileSet
=
state
->
nFileSet
;
case
SFSNEXTROW_FILESET
:
{
SDFileSet
*
pFileSet
=
NULL
;
if
(
--
state
->
iFileSet
>=
0
)
{
pFileSet
=
(
SDFileSet
*
)
taosArrayGet
(
state
->
aDFileSet
,
state
->
iFileSet
);
}
else
{
*
ppRow
=
NULL
;
return
code
;
}
code
=
tsdbDataFReaderOpen
(
&
state
->
pDataFReader
,
state
->
pTsdb
,
pFileSet
);
if
(
code
)
goto
_err
;
tMapDataReset
(
&
state
->
blockIdxMap
);
code
=
tsdbReadBlockIdx
(
state
->
pDataFReader
,
&
state
->
blockIdxMap
,
NULL
);
if
(
code
)
goto
_err
;
tBlockIdxReset
(
&
state
->
blockIdx
);
code
=
tMapDataSearch
(
&
state
->
blockIdxMap
,
state
->
pBlockIdxExp
,
tGetBlockIdx
,
tCmprBlockIdx
,
&
state
->
blockIdx
);
if
(
code
)
goto
_err
;
tMapDataReset
(
&
state
->
blockMap
);
code
=
tsdbReadBlock
(
state
->
pDataFReader
,
&
state
->
blockIdx
,
&
state
->
blockMap
,
NULL
);
if
(
code
)
goto
_err
;
state
->
nBlock
=
state
->
blockMap
.
nItem
;
state
->
iBlock
=
state
->
nBlock
-
1
;
}
case
SFSNEXTROW_BLOCKDATA
:
if
(
state
->
iBlock
>=
0
)
{
SBlock
block
=
{
0
};
tBlockReset
(
&
block
);
tBlockDataReset
(
&
state
->
blockData
);
tMapDataGetItemByIdx
(
&
state
->
blockMap
,
state
->
iBlock
,
&
block
,
tGetBlock
);
code
=
tsdbReadBlockData
(
state
->
pDataFReader
,
&
state
->
blockIdx
,
&
block
,
&
state
->
blockData
,
NULL
,
0
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
state
->
nRow
=
state
->
blockData
.
nRow
;
state
->
iRow
=
state
->
nRow
-
1
;
state
->
state
=
SFSNEXTROW_BLOCKROW
;
}
case
SFSNEXTROW_BLOCKROW
:
if
(
state
->
iRow
>=
0
)
{
state
->
row
=
tsdbRowFromBlockData
(
&
state
->
blockData
,
state
->
iRow
);
*
ppRow
=
&
state
->
row
;
if
(
--
state
->
iRow
<
0
)
{
state
->
state
=
SFSNEXTROW_BLOCKDATA
;
if
(
--
state
->
iBlock
<
0
)
{
tsdbDataFReaderClose
(
&
state
->
pDataFReader
);
state
->
state
=
SFSNEXTROW_FILESET
;
}
}
}
return
code
;
default:
ASSERT
(
0
);
break
;
}
_err:
*
ppRow
=
NULL
;
return
code
;
}
typedef
enum
SMEMNEXTROWSTATES
{
SMEMNEXTROW_ENTER
,
SMEMNEXTROW_NEXT
,
}
SMEMNEXTROWSTATES
;
typedef
struct
SMemNextRowIter
{
SMEMNEXTROWSTATES
state
;
STbData
*
pMem
;
// [input]
STbDataIter
iter
;
// mem buffer skip list iterator
}
SMemNextRowIter
;
static
int32_t
getNextRowFromMem
(
void
*
iter
,
TSDBROW
**
ppRow
)
{
SMemNextRowIter
*
state
=
(
SMemNextRowIter
*
)
iter
;
int32_t
code
=
0
;
switch
(
state
->
state
)
{
case
SMEMNEXTROW_ENTER
:
{
if
(
state
->
pMem
!=
NULL
)
{
tsdbTbDataIterOpen
(
state
->
pMem
,
NULL
,
1
,
&
state
->
iter
);
TSDBROW
*
pMemRow
=
tsdbTbDataIterGet
(
&
state
->
iter
);
if
(
pMemRow
)
{
*
ppRow
=
pMemRow
;
state
->
state
=
SMEMNEXTROW_NEXT
;
return
code
;
}
}
*
ppRow
=
NULL
;
return
code
;
}
case
SMEMNEXTROW_NEXT
:
if
(
tsdbTbDataIterNext
(
&
state
->
iter
))
{
*
ppRow
=
tsdbTbDataIterGet
(
&
state
->
iter
);
return
code
;
}
else
{
*
ppRow
=
NULL
;
return
code
;
}
default:
ASSERT
(
0
);
break
;
}
_err:
*
ppRow
=
NULL
;
return
code
;
}
static
STSRow
*
tsRowFromTsdbRow
(
TSDBROW
*
pRow
)
{
// TODO: new tsrow from tsdbrow
STSRow
*
ret
=
NULL
;
if
(
pRow
->
type
==
0
)
{
return
pRow
->
pTSRow
;
}
else
{
}
return
ret
;
}
typedef
int32_t
(
*
_next_row_fn_t
)(
void
*
iter
,
TSDBROW
**
ppRow
);
typedef
struct
TsdbNextRowState
{
TSDBROW
*
pRow
;
bool
stop
;
bool
next
;
void
*
iter
;
_next_row_fn_t
nextRowFn
;
}
TsdbNextRowState
;
static
int32_t
mergeLastRow
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSRow
**
ppRow
)
{
int32_t
code
=
0
;
tb_uid_t
suid
=
getTableSuidByUid
(
uid
,
pTsdb
);
STbData
*
pMem
=
NULL
;
STbData
*
pIMem
=
NULL
;
STbDataIter
iter
;
// mem buffer skip list iterator
STbDataIter
iiter
;
// imem buffer skip list iterator
STbData
*
pMem
=
NULL
;
if
(
pTsdb
->
mem
)
{
tsdbGetTbDataFromMemTable
(
pTsdb
->
mem
,
suid
,
uid
,
&
pMem
);
if
(
pMem
!=
NULL
)
{
tsdbTbDataIterOpen
(
pMem
,
NULL
,
1
,
&
iter
);
}
}
STbData
*
pIMem
=
NULL
;
if
(
pTsdb
->
imem
)
{
tsdbGetTbDataFromMemTable
(
pTsdb
->
imem
,
suid
,
uid
,
&
pIMem
);
if
(
pIMem
!=
NULL
)
{
tsdbTbDataIterOpen
(
pIMem
,
NULL
,
1
,
&
iiter
);
}
}
*
ppRow
=
NULL
;
SDelFReader
*
pDelFReader
;
// code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL);
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
code
=
tsdbDelFReaderOpen
(
&
pDelFReader
,
pDelFile
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
SDelIdx
delIdx
;
...
...
@@ -288,29 +520,111 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
SArray
*
pSkyline
=
taosArrayInit
(
32
,
sizeof
(
TSDBKEY
));
code
=
getTableDelSkyline
(
pMem
,
pIMem
,
pDelFReader
,
&
delIdx
,
pSkyline
);
if
(
code
)
goto
_err
;
/*
SFSIter fsiter;
bool fsHasNext = false;
tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter);
do {
*/
SDFileSet
*
pFileSet
=
NULL
;
// pFileSet = tsdbFSIterGet(fsiter);
int
iSkyline
=
taosArrayGetSize
(
pSkyline
)
-
1
;
code
=
mergeLastRowFileSet
(
&
iter
,
&
iiter
,
pFileSet
,
pSkyline
,
pTsdb
,
ppRow
);
if
(
code
<
0
)
{
goto
_err
;
}
tsdbDelFReaderClose
(
pDelFReader
);
SBlockIdx
idx
=
{.
suid
=
suid
,
.
uid
=
uid
};
SFSNextRowIter
fsState
=
{
0
};
fsState
.
state
=
SFSNEXTROW_FS
;
fsState
.
pTsdb
=
pTsdb
;
fsState
.
pBlockIdxExp
=
&
idx
;
if
(
*
ppRow
!=
NULL
)
{
// break;
SMemNextRowIter
memState
=
{
0
};
SMemNextRowIter
imemState
=
{
0
};
TSDBROW
memRow
,
imemRow
,
fsRow
;
TsdbNextRowState
input
[
3
]
=
{{
&
memRow
,
true
,
false
,
&
memState
,
getNextRowFromMem
},
{
&
imemRow
,
true
,
false
,
&
imemState
,
getNextRowFromMem
},
{
&
fsRow
,
false
,
true
,
&
fsState
,
getNextRowFromFS
}};
if
(
pMem
)
{
memState
.
pMem
=
pMem
;
memState
.
state
=
SMEMNEXTROW_ENTER
;
input
[
0
].
stop
=
false
;
input
[
0
].
next
=
true
;
}
if
(
pIMem
)
{
imemState
.
pMem
=
pIMem
;
imemState
.
state
=
SMEMNEXTROW_ENTER
;
input
[
1
].
stop
=
false
;
input
[
1
].
next
=
true
;
}
/*
} while (fsHasNext = tsdbFSIterNext(fsiter))
*/
tsdbDelFReaderClose
(
pDelFReader
);
do
{
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
input
[
i
].
next
&&
!
input
[
i
].
stop
)
{
code
=
input
[
i
].
nextRowFn
(
input
[
i
].
iter
,
&
input
[
i
].
pRow
);
if
(
code
)
goto
_err
;
if
(
input
[
i
].
pRow
==
NULL
)
{
input
[
i
].
stop
=
true
;
input
[
i
].
next
=
false
;
}
}
}
if
(
input
[
0
].
stop
&&
input
[
1
].
stop
&&
input
[
2
].
stop
)
{
break
;
}
// select maxpoint(s) from mem, imem, fs
TSDBROW
*
max
[
3
]
=
{
0
};
int
iMax
[
3
]
=
{
-
1
,
-
1
,
-
1
};
int
nMax
=
0
;
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
input
[
i
].
pRow
!=
NULL
)
{
TSDBKEY
key
=
TSDBROW_KEY
(
input
[
i
].
pRow
);
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
nMax
]);
// merging & deduplicating on client side
if
(
maxKey
.
ts
<=
key
.
ts
)
{
if
(
maxKey
.
ts
<
key
.
ts
)
{
nMax
=
0
;
}
iMax
[
nMax
]
=
i
;
max
[
nMax
++
]
=
input
[
i
].
pRow
;
}
}
}
// delete detection
TSDBROW
*
merge
[
3
]
=
{
0
};
int
nMerge
=
0
;
for
(
int
i
=
0
;
i
<
nMax
;
++
i
)
{
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
i
]);
bool
deleted
=
false
;
// bool deleted = tsdbKeyDeleted(maxKey, pSkyline, &iSkyline);
if
(
!
deleted
)
{
merge
[
nMerge
++
]
=
max
[
i
];
}
else
{
input
[
iMax
[
i
]].
next
=
true
;
}
}
// merge if nMerge > 1
if
(
nMerge
>
0
)
{
if
(
nMerge
==
1
)
{
*
ppRow
=
tsRowFromTsdbRow
(
merge
[
nMerge
]);
}
else
{
// merge 2 or 3 rows
SRowMerger
merger
=
{
0
};
STSchema
*
pTSchema
=
metaGetTbTSchema
(
pTsdb
->
pVnode
->
pMeta
,
uid
,
-
1
);
tRowMergerInit
(
&
merger
,
merge
[
0
],
pTSchema
);
for
(
int
i
=
1
;
i
<
nMerge
;
++
i
)
{
tRowMerge
(
&
merger
,
merge
[
i
]);
}
tRowMergerGetRow
(
&
merger
,
ppRow
);
tRowMergerClear
(
&
merger
);
}
}
}
while
(
*
ppRow
==
NULL
);
return
code
;
...
...
@@ -349,7 +663,8 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
key
,
keyLen
);
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
true
);
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t
// keyLen);
}
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
65b54f9d
...
...
@@ -627,4 +627,4 @@ SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile;
SDFileSet
*
tsdbFSStateGetDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
)
{
return
(
SDFileSet
*
)
taosArraySearch
(
pState
->
aDFileSet
,
&
(
SDFileSet
){.
fid
=
fid
},
tDFileSetCmprFn
,
TD_EQ
);
}
\ No newline at end of file
}
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
65b54f9d
...
...
@@ -321,6 +321,25 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
return
n
;
}
int32_t
tCmprBlockIdx
(
void
const
*
lhs
,
void
const
*
rhs
)
{
SBlockIdx
*
lBlockIdx
=
*
(
SBlockIdx
**
)
lhs
;
SBlockIdx
*
rBlockIdx
=
*
(
SBlockIdx
**
)
rhs
;
if
(
lBlockIdx
->
suid
<
lBlockIdx
->
suid
)
{
return
-
1
;
}
else
if
(
lBlockIdx
->
suid
>
lBlockIdx
->
suid
)
{
return
1
;
}
if
(
lBlockIdx
->
uid
<
lBlockIdx
->
uid
)
{
return
-
1
;
}
else
if
(
lBlockIdx
->
uid
>
lBlockIdx
->
uid
)
{
return
1
;
}
return
0
;
}
// SBlock ======================================================
void
tBlockReset
(
SBlock
*
pBlock
)
{
pBlock
->
minKey
=
TSDBKEY_MAX
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录