Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
089285ce
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
089285ce
编写于
7月 08, 2022
作者:
M
Minglei Jin
提交者:
GitHub
7月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14682 from taosdata/fix/last-ts
tsdbCache/last: save ts info of each column
上级
d94680c3
23fc0f00
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
106 addition
and
24 deletion
+106
-24
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+2
-0
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+97
-24
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+7
-0
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
089285ce
...
...
@@ -245,6 +245,8 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t
tsdbCacheDelete
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
TSKEY
eKey
);
int32_t
tsdbCacheLastArray2Row
(
SArray
*
pLastArray
,
STSRow
**
ppRow
,
STSchema
*
pSchema
);
// structs =======================
typedef
struct
{
int
minFid
;
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
089285ce
...
...
@@ -59,6 +59,8 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
static
void
deleteTableCacheLastrow
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
taosMemoryFree
(
value
);
}
static
void
deleteTableCacheLast
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
taosArrayDestroy
(
value
);
}
static
int32_t
tsdbCacheDeleteLastrow
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
char
key
[
32
]
=
{
0
};
...
...
@@ -761,7 +763,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
for
(
int
i
=
0
;
i
<
nMax
;
++
i
)
{
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
i
]);
// bool deleted = false;
bool
deleted
=
tsdbKeyDeleted
(
&
maxKey
,
pSkyline
,
&
iSkyline
);
if
(
!
deleted
)
{
// iMerge[nMerge] = i;
...
...
@@ -818,12 +819,22 @@ _err:
return
code
;
}
static
int32_t
mergeLast
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSRow
**
ppRow
)
{
int32_t
code
=
0
;
typedef
struct
{
TSKEY
ts
;
SColVal
colVal
;
}
SLastCol
;
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
static
int32_t
mergeLast
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
SArray
**
ppLastArray
)
{
int32_t
code
=
0
;
SArray
*
pSkyline
=
NULL
;
STSRow
*
pRow
=
NULL
;
STSRow
**
ppRow
=
&
pRow
;
STSchema
*
pTSchema
=
metaGetTbTSchema
(
pTsdb
->
pVnode
->
pMeta
,
uid
,
-
1
);
int16_t
nCol
=
pTSchema
->
numOfCols
;
SArray
*
pColArray
=
taosArrayInit
(
nCol
,
sizeof
(
SColVal
));
// SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
SArray
*
pColArray
=
taosArrayInit
(
nCol
,
sizeof
(
SLastCol
));
tb_uid_t
suid
=
getTableSuidByUid
(
uid
,
pTsdb
);
...
...
@@ -837,9 +848,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tsdbGetTbDataFromMemTable
(
pTsdb
->
imem
,
suid
,
uid
,
&
pIMem
);
}
*
pp
Row
=
NULL
;
*
pp
LastArray
=
NULL
;
SArray
*
pSkyline
=
taosArrayInit
(
32
,
sizeof
(
TSDBKEY
));
pSkyline
=
taosArrayInit
(
32
,
sizeof
(
TSDBKEY
));
SDelIdx
delIdx
;
...
...
@@ -943,7 +954,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
for
(
int
i
=
0
;
i
<
nMax
;
++
i
)
{
TSDBKEY
maxKey
=
TSDBROW_KEY
(
max
[
i
]);
// bool deleted = false;
bool
deleted
=
tsdbKeyDeleted
(
&
maxKey
,
pSkyline
,
&
iSkyline
);
if
(
!
deleted
)
{
iMerge
[
nMerge
]
=
iMax
[
i
];
...
...
@@ -970,8 +980,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tRowMergerClear
(
&
merger
);
}
}
else
{
*
ppRow
=
NULL
;
return
code
;
/* *ppRow = NULL; */
/* return code; */
continue
;
}
if
(
iCol
==
0
)
{
...
...
@@ -980,7 +991,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
*
pColVal
=
COL_VAL_VALUE
(
pTColumn
->
colId
,
pTColumn
->
type
,
(
SValue
){.
ts
=
maxKey
});
if
(
taosArrayPush
(
pColArray
,
pColVal
)
==
NULL
)
{
// if (taosArrayPush(pColArray, pColVal) == NULL) {
if
(
taosArrayPush
(
pColArray
,
&
(
SLastCol
){.
ts
=
TSKEY_MAX
,
.
colVal
=
*
pColVal
})
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
...
...
@@ -991,7 +1003,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
for
(
int16_t
i
=
iCol
;
i
<
nCol
;
++
i
)
{
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
tTSRowGetVal
(
*
ppRow
,
pTSchema
,
i
,
pColVal
);
if
(
taosArrayPush
(
pColArray
,
pColVal
)
==
NULL
)
{
// if (taosArrayPush(pColArray, pColVal) == NULL) {
if
(
taosArrayPush
(
pColArray
,
&
(
SLastCol
){.
ts
=
maxKey
,
.
colVal
=
*
pColVal
})
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
...
...
@@ -1012,11 +1025,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
--
nilColCount
;
}
}
/*
if
(
*
ppRow
)
{
taosMemoryFreeClear
(
*
ppRow
);
}
*/
continue
;
}
...
...
@@ -1024,12 +1037,16 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
for
(
int16_t
i
=
iCol
;
i
<
nCol
;
++
i
)
{
SColVal
colVal
=
{
0
};
tTSRowGetVal
(
*
ppRow
,
pTSchema
,
i
,
&
colVal
);
TSKEY
rowTs
=
(
*
ppRow
)
->
ts
;
SColVal
*
tColVal
=
(
SColVal
*
)
taosArrayGet
(
pColArray
,
i
);
// SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
SLastCol
*
tTsVal
=
(
SLastCol
*
)
taosArrayGet
(
pColArray
,
i
);
SColVal
*
tColVal
=
&
tTsVal
->
colVal
;
if
(
!
colVal
.
isNone
&&
!
colVal
.
isNull
)
{
if
(
tColVal
->
isNull
||
tColVal
->
isNone
)
{
taosArraySet
(
pColArray
,
i
,
&
colVal
);
// taosArraySet(pColArray, i, &colVal);
taosArraySet
(
pColArray
,
i
,
&
(
SLastCol
){.
ts
=
rowTs
,
.
colVal
=
colVal
});
--
nilColCount
;
}
}
else
{
...
...
@@ -1054,16 +1071,45 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
}
while
(
nilColCount
>
0
);
// if () new ts row from pColArray if non empty
if
(
taosArrayGetSize
(
pColArray
)
==
nCol
)
{
code
=
tdSTSRowNew
(
pColArray
,
pTSchema
,
ppRow
);
if
(
code
)
goto
_err
;
/* if (taosArrayGetSize(pColArray) == nCol) { */
/* code = tdSTSRowNew(pColArray, pTSchema, ppRow); */
/* if (code) goto _err; */
/* } */
/* taosArrayDestroy(pColArray); */
if
(
taosArrayGetSize
(
pColArray
)
<=
0
)
{
*
ppLastArray
=
NULL
;
taosArrayDestroy
(
pColArray
);
}
else
{
*
ppLastArray
=
pColArray
;
}
if
(
*
ppRow
)
{
taosMemoryFreeClear
(
*
ppRow
);
}
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
input
[
i
].
nextRowClearFn
)
{
input
[
i
].
nextRowClearFn
(
input
[
i
].
iter
);
}
}
if
(
pSkyline
)
{
taosArrayDestroy
(
pSkyline
);
}
taosArrayDestroy
(
pColArray
);
taosMemoryFreeClear
(
pTSchema
);
return
code
;
_err:
taosArrayDestroy
(
pColArray
);
if
(
*
ppRow
)
{
taosMemoryFreeClear
(
*
ppRow
);
}
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
if
(
input
[
i
].
nextRowClearFn
)
{
input
[
i
].
nextRowClearFn
(
input
[
i
].
iter
);
}
}
if
(
pSkyline
)
{
taosArrayDestroy
(
pSkyline
);
}
taosMemoryFreeClear
(
pTSchema
);
tsdbError
(
"vgId:%d merge last_row failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
...
...
@@ -1103,6 +1149,30 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return
code
;
}
int32_t
tsdbCacheLastArray2Row
(
SArray
*
pLastArray
,
STSRow
**
ppRow
,
STSchema
*
pTSchema
)
{
int32_t
code
=
0
;
int16_t
nCol
=
taosArrayGetSize
(
pLastArray
);
SArray
*
pColArray
=
taosArrayInit
(
nCol
,
sizeof
(
SColVal
));
for
(
int16_t
iCol
=
0
;
iCol
<
nCol
;
++
iCol
)
{
SLastCol
*
tTsVal
=
(
SLastCol
*
)
taosArrayGet
(
pLastArray
,
iCol
);
SColVal
*
tColVal
=
&
tTsVal
->
colVal
;
taosArrayPush
(
pColArray
,
tColVal
);
}
code
=
tdSTSRowNew
(
pColArray
,
pTSchema
,
ppRow
);
if
(
code
)
goto
_err
;
taosArrayDestroy
(
pColArray
);
return
code
;
_err:
taosArrayDestroy
(
pColArray
);
return
code
;
}
int32_t
tsdbCacheGetLastH
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
STsdb
*
pTsdb
,
LRUHandle
**
handle
)
{
int32_t
code
=
0
;
char
key
[
32
]
=
{
0
};
...
...
@@ -1115,17 +1185,20 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
else
{
STSRow
*
pRow
=
NULL
;
code
=
mergeLast
(
uid
,
pTsdb
,
&
pRow
);
// STSRow *pRow = NULL;
// code = mergeLast(uid, pTsdb, &pRow);
SArray
*
pLastArray
=
NULL
;
code
=
mergeLast
(
uid
,
pTsdb
,
&
pLastArray
);
// if table's empty or error, return code of -1
if
(
code
<
0
||
pRow
==
NULL
)
{
// if (code < 0 || pRow == NULL) {
if
(
code
<
0
||
pLastArray
==
NULL
)
{
*
handle
=
NULL
;
return
0
;
}
_taos_lru_deleter_t
deleter
=
deleteTableCacheLast
row
;
_taos_lru_deleter_t
deleter
=
deleteTableCacheLast
;
LRUStatus
status
=
taosLRUCacheInsert
(
pCache
,
key
,
keyLen
,
p
Row
,
TD_ROW_LEN
(
pRow
)
,
deleter
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
taosLRUCacheInsert
(
pCache
,
key
,
keyLen
,
p
LastArray
,
pLastArray
->
capacity
,
deleter
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
code
=
-
1
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
089285ce
...
...
@@ -128,6 +128,8 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
}
pRow
=
(
STSRow
*
)
taosLRUCacheValue
(
lruCache
,
h
);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
if
(
pRow
->
ts
>
lastKey
)
{
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
...
...
@@ -140,6 +142,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
lastKey
=
pRow
->
ts
;
}
// taosMemoryFree(pRow);
tsdbCacheRelease
(
lruCache
,
h
);
}
}
else
if
(
pr
->
type
==
LASTROW_RETRIEVE_TYPE_ALL
)
{
...
...
@@ -158,8 +161,12 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
}
pRow
=
(
STSRow
*
)
taosLRUCacheValue
(
lruCache
,
h
);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
saveOneRow
(
pRow
,
pResBlock
,
pr
,
slotIds
);
// taosMemoryFree(pRow);
tsdbCacheRelease
(
lruCache
,
h
);
pr
->
tableIndex
+=
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录