Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
90788972
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
90788972
编写于
6月 29, 2022
作者:
M
Minglei Jin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
tsdbCache: new release interface
上级
dde1b94d
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
295 addition
and
14 deletion
+295
-14
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+2
-1
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+292
-12
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+1
-1
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
90788972
...
...
@@ -251,7 +251,8 @@ int32_t tsdbOpenCache(STsdb *pTsdb);
void
tsdbCloseCache
(
SLRUCache
*
pCache
);
int32_t
tsdbCacheInsertLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
STSRow
*
row
);
int32_t
tsdbCacheGetLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSRow
**
ppRow
);
int32_t
tsdbCacheDeleteLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
);
int32_t
tsdbCacheDeleteLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
TSKEY
eKey
);
int32_t
tsdbCacheRelease
(
SLRUCache
*
pCache
,
LRUHandle
*
h
);
// structs =======================
typedef
struct
{
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
90788972
...
...
@@ -65,7 +65,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
if
(
TD_ROW_LEN
(
row
)
<=
TD_ROW_LEN
(
cacheRow
))
{
tdRowCpy
(
cacheRow
,
row
);
}
else
{
tsdbCacheDeleteLastrow
(
pCache
,
uid
);
tsdbCacheDeleteLastrow
(
pCache
,
uid
,
TSKEY_MAX
);
tsdbCacheInsertLastrow
(
pCache
,
uid
,
row
);
}
}
...
...
@@ -97,7 +97,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
if
(
TD_ROW_LEN
(
row
)
<=
TD_ROW_LEN
(
cacheRow
))
{
tdRowCpy
(
cacheRow
,
row
);
}
else
{
tsdbCacheDeleteLastrow
(
pCache
,
uid
);
tsdbCacheDeleteLastrow
(
pCache
,
uid
,
TSKEY_MAX
);
tsdbCacheInsertLastrow
(
pCache
,
uid
,
row
);
}
}
...
...
@@ -581,6 +581,10 @@ typedef struct TsdbNextRowState {
static
int32_t
mergeLastRow
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSRow
**
ppRow
)
{
int32_t
code
=
0
;
STSchema
*
pTSchema
=
metaGetTbTSchema
(
pTsdb
->
pVnode
->
pMeta
,
uid
,
-
1
);
int16_t
nCol
=
pTSchema
->
numOfCols
;
SArray
*
pColArray
=
taosArrayInit
(
nCol
,
sizeof
(
SColVal
));
tb_uid_t
suid
=
getTableSuidByUid
(
uid
,
pTsdb
);
STbData
*
pMem
=
NULL
;
...
...
@@ -597,6 +601,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
SArray
*
pSkyline
=
taosArrayInit
(
32
,
sizeof
(
TSDBKEY
));
SDelIdx
delIdx
;
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
;
...
...
@@ -604,7 +610,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
code
=
tsdbDelFReaderOpen
(
&
pDelFReader
,
pDelFile
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
SDelIdx
delIdx
;
code
=
getTableDelIdx
(
pDelFReader
,
suid
,
uid
,
&
delIdx
);
if
(
code
)
goto
_err
;
...
...
@@ -612,6 +617,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
if
(
code
)
goto
_err
;
tsdbDelFReaderClose
(
pDelFReader
);
}
else
{
code
=
getTableDelSkyline
(
pMem
,
pIMem
,
NULL
,
NULL
,
pSkyline
);
if
(
code
)
goto
_err
;
}
int
iSkyline
=
taosArrayGetSize
(
pSkyline
)
-
1
;
...
...
@@ -644,7 +652,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
input
[
1
].
next
=
true
;
}
STSchema
*
pTSchema
=
metaGetTbTSchema
(
pTsdb
->
pVnode
->
pMeta
,
uid
,
-
1
);
int16_t
nilColCount
=
nCol
-
1
;
// count of null & none cols
int
iCol
=
0
;
// index of first nil col index from left to right
bool
setICol
=
false
;
do
{
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
...
...
@@ -667,15 +677,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
TSDBROW
*
max
[
3
]
=
{
0
};
int
iMax
[
3
]
=
{
-
1
,
-
1
,
-
1
};
int
nMax
=
0
;
TSKEY
maxKey
=
TSKEY_MIN
;
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
input
[
i
].
pRow
!=
NULL
)
{
if
(
!
input
[
i
].
stop
&&
input
[
i
].
pRow
!=
NULL
)
{
TSDBKEY
key
=
TSDBROW_KEY
(
input
[
i
].
pRow
);
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
nMax
]);
// merging & deduplicating on client side
if
(
maxKey
.
ts
<=
key
.
ts
)
{
if
(
maxKey
.
ts
<
key
.
ts
)
{
if
(
maxKey
<=
key
.
ts
)
{
if
(
maxKey
<
key
.
ts
)
{
nMax
=
0
;
maxKey
=
key
.
ts
;
}
iMax
[
nMax
]
=
i
;
...
...
@@ -686,6 +698,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
// delete detection
TSDBROW
*
merge
[
3
]
=
{
0
};
int
iMerge
[
3
]
=
{
-
1
,
-
1
,
-
1
};
int
nMerge
=
0
;
for
(
int
i
=
0
;
i
<
nMax
;
++
i
)
{
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
i
]);
...
...
@@ -693,6 +706,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
// bool deleted = false;
bool
deleted
=
tsdbKeyDeleted
(
&
maxKey
,
pSkyline
,
&
iSkyline
);
if
(
!
deleted
)
{
iMerge
[
nMerge
]
=
i
;
merge
[
nMerge
++
]
=
max
[
i
];
}
...
...
@@ -716,6 +730,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tRowMergerClear
(
&
merger
);
}
}
}
while
(
*
ppRow
==
NULL
);
taosMemoryFreeClear
(
pTSchema
);
...
...
@@ -727,6 +742,245 @@ _err:
return
code
;
}
static
int32_t
mergeLast
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSRow
**
ppRow
)
{
int32_t
code
=
0
;
STSchema
*
pTSchema
=
metaGetTbTSchema
(
pTsdb
->
pVnode
->
pMeta
,
uid
,
-
1
);
int16_t
nCol
=
pTSchema
->
numOfCols
;
SArray
*
pColArray
=
taosArrayInit
(
nCol
,
sizeof
(
SColVal
));
tb_uid_t
suid
=
getTableSuidByUid
(
uid
,
pTsdb
);
STbData
*
pMem
=
NULL
;
if
(
pTsdb
->
mem
)
{
tsdbGetTbDataFromMemTable
(
pTsdb
->
mem
,
suid
,
uid
,
&
pMem
);
}
STbData
*
pIMem
=
NULL
;
if
(
pTsdb
->
imem
)
{
tsdbGetTbDataFromMemTable
(
pTsdb
->
imem
,
suid
,
uid
,
&
pIMem
);
}
*
ppRow
=
NULL
;
SArray
*
pSkyline
=
taosArrayInit
(
32
,
sizeof
(
TSDBKEY
));
SDelIdx
delIdx
;
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
;
code
=
tsdbDelFReaderOpen
(
&
pDelFReader
,
pDelFile
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
code
=
getTableDelIdx
(
pDelFReader
,
suid
,
uid
,
&
delIdx
);
if
(
code
)
goto
_err
;
code
=
getTableDelSkyline
(
pMem
,
pIMem
,
pDelFReader
,
&
delIdx
,
pSkyline
);
if
(
code
)
goto
_err
;
tsdbDelFReaderClose
(
pDelFReader
);
}
else
{
code
=
getTableDelSkyline
(
pMem
,
pIMem
,
NULL
,
NULL
,
pSkyline
);
if
(
code
)
goto
_err
;
}
int
iSkyline
=
taosArrayGetSize
(
pSkyline
)
-
1
;
SBlockIdx
idx
=
{.
suid
=
suid
,
.
uid
=
uid
};
SFSNextRowIter
fsState
=
{
0
};
fsState
.
state
=
SFSNEXTROW_FS
;
fsState
.
pTsdb
=
pTsdb
;
fsState
.
pBlockIdxExp
=
&
idx
;
SMemNextRowIter
memState
=
{
0
};
SMemNextRowIter
imemState
=
{
0
};
TSDBROW
memRow
,
imemRow
,
fsRow
;
TsdbNextRowState
input
[
3
]
=
{{
&
memRow
,
true
,
false
,
&
memState
,
getNextRowFromMem
},
{
&
imemRow
,
true
,
false
,
&
imemState
,
getNextRowFromMem
},
{
&
fsRow
,
false
,
true
,
&
fsState
,
getNextRowFromFS
}};
if
(
pMem
)
{
memState
.
pMem
=
pMem
;
memState
.
state
=
SMEMNEXTROW_ENTER
;
input
[
0
].
stop
=
false
;
input
[
0
].
next
=
true
;
}
if
(
pIMem
)
{
imemState
.
pMem
=
pIMem
;
imemState
.
state
=
SMEMNEXTROW_ENTER
;
input
[
1
].
stop
=
false
;
input
[
1
].
next
=
true
;
}
int16_t
nilColCount
=
nCol
-
1
;
// count of null & none cols
int
iCol
=
0
;
// index of first nil col index from left to right
bool
setICol
=
false
;
do
{
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
input
[
i
].
next
&&
!
input
[
i
].
stop
)
{
code
=
input
[
i
].
nextRowFn
(
input
[
i
].
iter
,
&
input
[
i
].
pRow
);
if
(
code
)
goto
_err
;
if
(
input
[
i
].
pRow
==
NULL
)
{
input
[
i
].
stop
=
true
;
input
[
i
].
next
=
false
;
}
}
}
if
(
input
[
0
].
stop
&&
input
[
1
].
stop
&&
input
[
2
].
stop
)
{
break
;
}
// select maxpoint(s) from mem, imem, fs
TSDBROW
*
max
[
3
]
=
{
0
};
int
iMax
[
3
]
=
{
-
1
,
-
1
,
-
1
};
int
nMax
=
0
;
TSKEY
maxKey
=
TSKEY_MIN
;
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
!
input
[
i
].
stop
&&
input
[
i
].
pRow
!=
NULL
)
{
TSDBKEY
key
=
TSDBROW_KEY
(
input
[
i
].
pRow
);
// merging & deduplicating on client side
if
(
maxKey
<=
key
.
ts
)
{
if
(
maxKey
<
key
.
ts
)
{
nMax
=
0
;
maxKey
=
key
.
ts
;
}
iMax
[
nMax
]
=
i
;
max
[
nMax
++
]
=
input
[
i
].
pRow
;
}
}
}
// delete detection
TSDBROW
*
merge
[
3
]
=
{
0
};
int
iMerge
[
3
]
=
{
-
1
,
-
1
,
-
1
};
int
nMerge
=
0
;
for
(
int
i
=
0
;
i
<
nMax
;
++
i
)
{
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
i
]);
// bool deleted = false;
bool
deleted
=
tsdbKeyDeleted
(
&
maxKey
,
pSkyline
,
&
iSkyline
);
if
(
!
deleted
)
{
iMerge
[
nMerge
]
=
i
;
merge
[
nMerge
++
]
=
max
[
i
];
}
input
[
iMax
[
i
]].
next
=
deleted
;
}
// merge if nMerge > 1
if
(
nMerge
>
0
)
{
if
(
nMerge
==
1
)
{
code
=
tsRowFromTsdbRow
(
pTSchema
,
merge
[
nMerge
-
1
],
ppRow
);
if
(
code
)
goto
_err
;
}
else
{
// merge 2 or 3 rows
SRowMerger
merger
=
{
0
};
tRowMergerInit
(
&
merger
,
merge
[
0
],
pTSchema
);
for
(
int
i
=
1
;
i
<
nMerge
;
++
i
)
{
tRowMerge
(
&
merger
,
merge
[
i
]);
}
tRowMergerGetRow
(
&
merger
,
ppRow
);
tRowMergerClear
(
&
merger
);
}
}
if
(
iCol
==
0
)
{
STColumn
*
pTColumn
=
&
pTSchema
->
columns
[
0
];
SColVal
*
pColVal
=
&
(
SColVal
){
0
};
*
pColVal
=
COL_VAL_VALUE
(
pTColumn
->
colId
,
pTColumn
->
type
,
(
SValue
){.
ts
=
maxKey
});
if
(
taosArrayPush
(
pColArray
,
pColVal
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
++
iCol
;
setICol
=
false
;
for
(
int16_t
i
=
iCol
;
iCol
<
nCol
;
++
i
)
{
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
if
(
taosArrayPush
(
pColArray
,
pColVal
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
if
(
pColVal
->
isNull
||
pColVal
->
isNone
)
{
for
(
int
j
=
0
;
j
<
nMerge
;
++
j
)
{
SColVal
jColVal
=
{
0
};
tsdbRowGetColVal
(
merge
[
j
],
pTSchema
,
i
,
&
jColVal
);
if
(
jColVal
.
isNull
||
jColVal
.
isNone
)
{
input
[
iMerge
[
j
]].
next
=
true
;
}
}
if
(
!
setICol
)
{
iCol
=
i
;
setICol
=
true
;
}
}
else
{
--
nilColCount
;
}
}
continue
;
}
setICol
=
false
;
for
(
int16_t
i
=
iCol
;
i
<
nCol
;
++
i
)
{
SColVal
colVal
=
{
0
};
tTSRowGetVal
(
*
ppRow
,
pTSchema
,
i
,
&
colVal
);
SColVal
*
tColVal
=
(
SColVal
*
)
taosArrayGet
(
pColArray
,
i
);
if
(
!
colVal
.
isNone
&&
!
colVal
.
isNull
)
{
if
(
tColVal
->
isNull
||
tColVal
->
isNone
)
{
taosArraySet
(
pColArray
,
i
,
&
colVal
);
--
nilColCount
;
}
}
else
{
if
(
tColVal
->
isNull
||
tColVal
->
isNone
&&
!
setICol
)
{
iCol
=
i
;
setICol
=
true
;
for
(
int
j
=
0
;
j
<
nMerge
;
++
j
)
{
SColVal
jColVal
=
{
0
};
tsdbRowGetColVal
(
merge
[
j
],
pTSchema
,
i
,
&
jColVal
);
if
(
jColVal
.
isNull
||
jColVal
.
isNone
)
{
input
[
iMerge
[
j
]].
next
=
true
;
}
}
}
}
}
}
while
(
nilColCount
>
0
);
// if () new ts row from pColArray if non empty
if
(
taosArrayGetSize
(
pColArray
)
==
nCol
)
{
code
=
tdSTSRowNew
(
pColArray
,
pTSchema
,
ppRow
);
if
(
code
)
goto
_err
;
}
taosArrayDestroy
(
pColArray
);
taosMemoryFreeClear
(
pTSchema
);
return
code
;
_err:
taosArrayDestroy
(
pColArray
);
taosMemoryFreeClear
(
pTSchema
);
tsdbError
(
"vgId:%d merge last_row failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbCacheGetLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSRow
**
ppRow
)
{
int32_t
code
=
0
;
char
key
[
32
]
=
{
0
};
...
...
@@ -749,6 +1003,8 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo
*
ppRow
=
(
STSRow
*
)
taosLRUCacheValue
(
pCache
,
h
);
}
// taosLRUCacheRelease(pCache, h, true);
return
code
;
}
...
...
@@ -763,7 +1019,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow *
*
ppRow
=
(
STSRow
*
)
taosLRUCacheValue
(
pCache
,
h
);
}
else
{
STSRow
*
pRow
=
NULL
;
//
code = mergeLast(uid, pTsdb, &pRow);
code
=
mergeLast
(
uid
,
pTsdb
,
&
pRow
);
// if table's empty or error, return code of -1
if
(
code
<
0
||
pRow
==
NULL
)
{
return
-
1
;
...
...
@@ -774,10 +1030,12 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow *
*
ppRow
=
(
STSRow
*
)
taosLRUCacheValue
(
pCache
,
h
);
}
// taosLRUCacheRelease(pCache, h, true);
return
code
;
}
int32_t
tsdbCacheDeleteLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
)
{
int32_t
tsdbCacheDeleteLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
char
key
[
32
]
=
{
0
};
int
keyLen
=
0
;
...
...
@@ -785,10 +1043,32 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
getTableCacheKey
(
uid
,
"lr"
,
key
,
&
keyLen
);
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
key
,
keyLen
);
if
(
h
)
{
STSRow
*
pRow
=
(
STSRow
*
)
taosLRUCacheValue
(
pCache
,
h
);
if
(
pRow
->
ts
<=
eKey
)
{
taosLRUCacheRelease
(
pCache
,
h
,
true
);
}
else
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
getTableCacheKey
(
uid
,
"l"
,
key
,
&
keyLen
);
h
=
taosLRUCacheLookup
(
pCache
,
key
,
keyLen
);
if
(
h
)
{
// clear last cache anyway, no matter where eKey ends.
taosLRUCacheRelease
(
pCache
,
h
,
true
);
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t
// keyLen);
//
void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t
keyLen);
}
return
code
;
}
int32_t
tsdbCacheRelease
(
SLRUCache
*
pCache
,
LRUHandle
*
h
)
{
int32_t
code
=
0
;
taosLRUCacheRelease
(
pCache
,
h
,
false
);
return
code
;
}
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
90788972
...
...
@@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable
->
nDel
++
;
if
(
tsdbKeyCmprFn
(
&
lastKey
,
&
pTbData
->
maxKey
)
>=
0
)
{
tsdbCacheDeleteLastrow
(
pTsdb
->
lruCache
,
pTbData
->
uid
);
tsdbCacheDeleteLastrow
(
pTsdb
->
lruCache
,
pTbData
->
uid
,
eKey
);
}
tsdbError
(
"vgId:%d, delete data from table suid:%"
PRId64
" uid:%"
PRId64
" skey:%"
PRId64
" eKey:%"
PRId64
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录