Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
208c9756
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
208c9756
编写于
5月 19, 2023
作者:
W
wade zhang
提交者:
GitHub
5月 19, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21331 from taosdata/fix/TD-24097
enh(cache/batchread): load columns in one trip
上级
edc9175c
35f74ce2
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
641 addition
and
183 deletion
+641
-183
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+15
-13
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+5
-1
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+581
-145
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+27
-18
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+9
-4
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+4
-2
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
208c9756
...
...
@@ -305,10 +305,6 @@ void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proa
// tsdbMerge.c ==============================================================================================
int32_t
tsdbMerge
(
STsdb
*
pTsdb
);
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbDiskData ==============================================================================================
int32_t
tDiskDataBuilderCreate
(
SDiskDataBuilder
**
ppBuilder
);
void
*
tDiskDataBuilderDestroy
(
SDiskDataBuilder
*
pBuilder
);
...
...
@@ -346,13 +342,17 @@ struct STsdbFS {
};
typedef
struct
{
rocksdb_t
*
db
;
rocksdb_options_t
*
options
;
rocksdb_flushoptions_t
*
flushoptions
;
rocksdb_writeoptions_t
*
writeoptions
;
rocksdb_readoptions_t
*
readoptions
;
rocksdb_writebatch_t
*
writebatch
;
TdThreadMutex
rMutex
;
rocksdb_t
*
db
;
rocksdb_comparator_t
*
my_comparator
;
rocksdb_cache_t
*
blockcache
;
rocksdb_block_based_table_options_t
*
tableoptions
;
rocksdb_options_t
*
options
;
rocksdb_flushoptions_t
*
flushoptions
;
rocksdb_writeoptions_t
*
writeoptions
;
rocksdb_readoptions_t
*
readoptions
;
rocksdb_writebatch_t
*
writebatch
;
TdThreadMutex
rMutex
;
STSchema
*
pTSchema
;
}
SRocksCache
;
struct
STsdb
{
...
...
@@ -782,7 +782,7 @@ typedef struct SLDataIter {
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
SSttBlockLoadInfo
*
pBlockLoadInfo
,
bool
destroyLoadInfo
,
const
char
*
idStr
,
bool
strictTimeRange
,
SLDataIter
*
pLDataIter
);
bool
destroyLoadInfo
,
const
char
*
idStr
,
bool
strictTimeRange
,
SLDataIter
*
pLDataIter
);
void
tMergeTreeAddIter
(
SMergeTree
*
pMTree
,
SLDataIter
*
pIter
);
bool
tMergeTreeNext
(
SMergeTree
*
pMTree
);
bool
tMergeTreeIgnoreEarlierTs
(
SMergeTree
*
pMTree
);
...
...
@@ -822,13 +822,15 @@ typedef struct SCacheRowsReader {
typedef
struct
{
TSKEY
ts
;
int8_t
dirty
;
SColVal
colVal
;
}
SLastCol
;
int32_t
tsdbOpenCache
(
STsdb
*
pTsdb
);
void
tsdbCloseCache
(
STsdb
*
pTsdb
);
int32_t
tsdbCacheUpdate
(
STsdb
*
pTsdb
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSDBROW
*
row
);
int32_t
tsdbCacheGet
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int32_t
ltype
);
int32_t
tsdbCacheGetBatch
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int8_t
ltype
);
int32_t
tsdbCacheGet
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int8_t
ltype
);
int32_t
tsdbCacheDel
(
STsdb
*
pTsdb
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
);
int32_t
tsdbCacheInsertLast
(
SLRUCache
*
pCache
,
tb_uid_t
uid
,
TSDBROW
*
row
,
STsdb
*
pTsdb
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
208c9756
...
...
@@ -197,7 +197,7 @@ void tqClose(STQ*);
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
);
int
tqRegisterPushHandle
(
STQ
*
pTq
,
void
*
handle
,
SRpcMsg
*
pMsg
);
int
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
pHandle
);
int
tqStartStreamTasks
(
STQ
*
pTq
);
// restore all stream tasks after vnode launching completed.
int
tqStartStreamTasks
(
STQ
*
pTq
);
// restore all stream tasks after vnode launching completed.
int
tqCommit
(
STQ
*
);
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
);
...
...
@@ -403,6 +403,10 @@ struct SVnode {
#define VND_IS_RSMA(v) ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v) ((v)->config.isTsma == 1)
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
struct
STbUidStore
{
tb_uid_t
suid
;
SArray
*
tbUids
;
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
208c9756
...
...
@@ -46,7 +46,13 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
}
}
#define ROCKS_KEY_LEN 64
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
typedef
struct
{
tb_uid_t
uid
;
int16_t
cid
;
int8_t
ltype
;
}
SLastKey
;
static
void
tsdbGetRocksPath
(
STsdb
*
pTsdb
,
char
*
path
)
{
SVnode
*
pVnode
=
pTsdb
->
pVnode
;
...
...
@@ -62,9 +68,56 @@ static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
}
}
static
const
char
*
myCmpName
(
void
*
state
)
{
(
void
)
state
;
return
"myCmp"
;
}
static
void
myCmpDestroy
(
void
*
state
)
{
(
void
)
state
;
}
static
int
myCmp
(
void
*
state
,
const
char
*
a
,
size_t
alen
,
const
char
*
b
,
size_t
blen
)
{
(
void
)
state
;
(
void
)
alen
;
(
void
)
blen
;
SLastKey
*
lhs
=
(
SLastKey
*
)
a
;
SLastKey
*
rhs
=
(
SLastKey
*
)
b
;
if
(
lhs
->
uid
<
rhs
->
uid
)
{
return
-
1
;
}
else
if
(
lhs
->
uid
>
rhs
->
uid
)
{
return
1
;
}
if
(
lhs
->
cid
<
rhs
->
cid
)
{
return
-
1
;
}
else
if
(
lhs
->
cid
>
rhs
->
cid
)
{
return
1
;
}
if
(
lhs
->
ltype
<
rhs
->
ltype
)
{
return
-
1
;
}
else
if
(
lhs
->
ltype
>
rhs
->
ltype
)
{
return
1
;
}
return
0
;
}
static
int32_t
tsdbOpenRocksCache
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
rocksdb_comparator_t
*
cmp
=
rocksdb_comparator_create
(
NULL
,
myCmpDestroy
,
myCmp
,
myCmpName
);
if
(
NULL
==
cmp
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
5
*
1024
*
1024
);
pTsdb
->
rCache
.
blockcache
=
cache
;
rocksdb_block_based_table_options_t
*
tableoptions
=
rocksdb_block_based_options_create
();
pTsdb
->
rCache
.
tableoptions
=
tableoptions
;
rocksdb_options_t
*
options
=
rocksdb_options_create
();
if
(
NULL
==
options
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -72,6 +125,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
}
rocksdb_options_set_create_if_missing
(
options
,
1
);
rocksdb_options_set_comparator
(
options
,
cmp
);
rocksdb_block_based_options_set_block_cache
(
tableoptions
,
cache
);
rocksdb_options_set_block_based_table_factory
(
options
,
tableoptions
);
// rocksdb_options_set_inplace_update_support(options, 1);
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
...
...
@@ -80,12 +136,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err2
;
}
//
rocksdb_writeoptions_disable_WAL(writeoptions, 1);
rocksdb_writeoptions_disable_WAL
(
writeoptions
,
1
);
rocksdb_readoptions_t
*
readoptions
=
rocksdb_readoptions_create
();
if
(
NULL
==
readoptions
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
2
;
goto
_err
3
;
}
char
*
err
=
NULL
;
...
...
@@ -94,19 +150,23 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_t
*
db
=
rocksdb_open
(
options
,
cachePath
,
&
err
);
if
(
NULL
==
db
)
{
code
=
-
1
;
goto
_err3
;
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
rocksdb_free
(
err
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err4
;
}
rocksdb_flushoptions_t
*
flushoptions
=
rocksdb_flushoptions_create
();
if
(
NULL
==
flushoptions
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
4
;
goto
_err
5
;
}
rocksdb_writebatch_t
*
writebatch
=
rocksdb_writebatch_create
();
pTsdb
->
rCache
.
writebatch
=
writebatch
;
pTsdb
->
rCache
.
my_comparator
=
cmp
;
pTsdb
->
rCache
.
options
=
options
;
pTsdb
->
rCache
.
writeoptions
=
writeoptions
;
pTsdb
->
rCache
.
readoptions
=
readoptions
;
...
...
@@ -115,15 +175,22 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
taosThreadMutexInit
(
&
pTsdb
->
rCache
.
rMutex
,
NULL
);
pTsdb
->
rCache
.
pTSchema
=
NULL
;
return
code
;
_err5:
rocksdb_close
(
pTsdb
->
rCache
.
db
);
_err4:
rocksdb_readoptions_destroy
(
readoptions
);
_err3:
rocksdb_writeoptions_destroy
(
writeoptions
);
_err2:
rocksdb_options_destroy
(
options
);
rocksdb_block_based_options_destroy
(
tableoptions
);
rocksdb_cache_destroy
(
cache
);
_err:
rocksdb_comparator_destroy
(
cmp
);
return
code
;
}
...
...
@@ -134,13 +201,33 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_readoptions_destroy
(
pTsdb
->
rCache
.
readoptions
);
rocksdb_writeoptions_destroy
(
pTsdb
->
rCache
.
writeoptions
);
rocksdb_options_destroy
(
pTsdb
->
rCache
.
options
);
rocksdb_block_based_options_destroy
(
pTsdb
->
rCache
.
tableoptions
);
rocksdb_cache_destroy
(
pTsdb
->
rCache
.
blockcache
);
rocksdb_comparator_destroy
(
pTsdb
->
rCache
.
my_comparator
);
taosThreadMutexDestroy
(
&
pTsdb
->
rCache
.
rMutex
);
taosMemoryFree
(
pTsdb
->
rCache
.
pTSchema
);
}
static
void
rocksMayWrite
(
STsdb
*
pTsdb
,
bool
force
)
{
rocksdb_writebatch_t
*
wb
=
pTsdb
->
rCache
.
writebatch
;
if
(
force
||
rocksdb_writebatch_count
(
wb
)
>=
1024
)
{
char
*
err
=
NULL
;
rocksdb_write
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
writeoptions
,
wb
,
&
err
);
if
(
NULL
!=
err
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
rocksdb_free
(
err
);
}
rocksdb_writebatch_clear
(
wb
);
}
}
int32_t
tsdbCacheCommit
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
char
*
err
=
NULL
;
rocksMayWrite
(
pTsdb
,
true
);
rocksdb_flush
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
flushoptions
,
&
err
);
if
(
NULL
!=
err
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
...
...
@@ -191,15 +278,15 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*
size
=
length
;
}
static
SLastCol
*
tsdbCacheLookup
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
int16_t
cid
,
char
const
*
lstring
)
{
static
SLastCol
*
tsdbCacheLookup
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
int16_t
cid
,
int8_t
ltype
)
{
SLastCol
*
pLastCol
=
NULL
;
char
*
err
=
NULL
;
size_t
vlen
=
0
;
char
key
[
ROCKS_KEY_LEN
]
;
size_t
klen
=
snprintf
(
key
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":%s"
,
uid
,
cid
,
lstring
)
;
char
*
value
=
NULL
;
value
=
rocksdb_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
key
,
klen
,
&
vlen
,
&
err
);
char
*
err
=
NULL
;
size_t
vlen
=
0
;
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
ltype
,
.
uid
=
uid
,
.
cid
=
cid
}
;
size_t
klen
=
ROCKS_KEY_LEN
;
char
*
value
=
NULL
;
value
=
rocksdb_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
(
char
*
)
key
,
klen
,
&
vlen
,
&
err
);
if
(
NULL
!=
err
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
rocksdb_free
(
err
);
...
...
@@ -210,12 +297,38 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char c
return
pLastCol
;
}
static
void
reallocVarData
(
SColVal
*
pColVal
)
{
if
(
IS_VAR_DATA_TYPE
(
pColVal
->
type
))
{
uint8_t
*
pVal
=
pColVal
->
value
.
pData
;
pColVal
->
value
.
pData
=
taosMemoryMalloc
(
pColVal
->
value
.
nData
);
if
(
pColVal
->
value
.
nData
)
{
memcpy
(
pColVal
->
value
.
pData
,
pVal
,
pColVal
->
value
.
nData
);
}
}
}
static
void
tsdbCacheDeleter
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
SLastCol
*
pLastCol
=
(
SLastCol
*
)
value
;
// TODO: add dirty flag to SLastCol
if
(
pLastCol
->
dirty
)
{
// TODO: queue into dirty list, free it after save to backstore
}
else
{
if
(
IS_VAR_DATA_TYPE
(
pLastCol
->
colVal
.
type
)
/* && pLastCol->colVal.value.nData > 0*/
)
{
taosMemoryFree
(
pLastCol
->
colVal
.
value
.
pData
);
}
taosMemoryFree
(
value
);
}
}
int32_t
tsdbCacheUpdate
(
STsdb
*
pTsdb
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSDBROW
*
pRow
)
{
int32_t
code
=
0
;
// 1, fetch schema
STSchema
*
pTSchema
=
NULL
;
int32_t
sver
=
TSDBROW_SVERSION
(
pRow
);
code
=
metaGetTbTSchemaEx
(
pTsdb
->
pVnode
->
pMeta
,
suid
,
uid
,
sver
,
&
pTSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
...
...
@@ -229,22 +342,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
tsdbRowIterOpen
(
&
iter
,
pRow
,
pTSchema
);
for
(
SColVal
*
pColVal
=
tsdbRowIterNext
(
&
iter
);
pColVal
;
pColVal
=
tsdbRowIterNext
(
&
iter
))
{
/*
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
*/
taosArrayPush
(
aColVal
,
pColVal
);
}
...
...
@@ -254,23 +351,18 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
int
num_keys
=
TARRAY_SIZE
(
aColVal
);
char
**
keys_list
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
size_t
*
keys_list_sizes
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
size_t
));
char
*
key_list
=
taosMemoryMalloc
(
num_keys
*
ROCKS_KEY_LEN
*
2
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
SColVal
*
pColVal
=
(
SColVal
*
)
taosArrayGet
(
aColVal
,
i
);
int16_t
cid
=
pColVal
->
cid
;
char
*
keys
=
taosMemoryCalloc
(
2
,
ROCKS_KEY_LEN
);
int
last_key_len
=
snprintf
(
keys
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last"
,
uid
,
cid
);
if
(
last_key_len
>=
ROCKS_KEY_LEN
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
tstrerror
(
code
));
}
int
lr_key_len
=
snprintf
(
keys
+
ROCKS_KEY_LEN
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last_row"
,
uid
,
cid
);
if
(
lr_key_len
>=
ROCKS_KEY_LEN
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
tstrerror
(
code
));
}
keys_list
[
i
]
=
keys
;
keys_list
[
num_keys
+
i
]
=
keys
+
ROCKS_KEY_LEN
;
keys_list_sizes
[
i
]
=
last_key_len
;
keys_list_sizes
[
num_keys
+
i
]
=
lr_key_len
;
memcpy
(
key_list
+
i
*
ROCKS_KEY_LEN
,
&
(
SLastKey
){.
ltype
=
1
,
.
uid
=
uid
,
.
cid
=
cid
},
ROCKS_KEY_LEN
);
memcpy
(
key_list
+
i
*
ROCKS_KEY_LEN
+
num_keys
*
ROCKS_KEY_LEN
,
&
(
SLastKey
){.
ltype
=
0
,
.
uid
=
uid
,
.
cid
=
cid
},
ROCKS_KEY_LEN
);
keys_list
[
i
]
=
key_list
+
i
*
ROCKS_KEY_LEN
;
keys_list
[
num_keys
+
i
]
=
key_list
+
i
*
ROCKS_KEY_LEN
+
num_keys
*
ROCKS_KEY_LEN
;
keys_list_sizes
[
i
]
=
ROCKS_KEY_LEN
;
keys_list_sizes
[
num_keys
+
i
]
=
ROCKS_KEY_LEN
;
}
char
**
values_list
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
size_t
*
values_list_sizes
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
size_t
));
...
...
@@ -278,12 +370,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosThreadMutexLock
(
&
pTsdb
->
rCache
.
rMutex
);
rocksdb_multi_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
num_keys
*
2
,
(
const
char
*
const
*
)
keys_list
,
keys_list_sizes
,
values_list
,
values_list_sizes
,
errs
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
taosMemoryFree
(
keys_list
[
i
]);
}
for
(
int
i
=
0
;
i
<
num_keys
*
2
;
++
i
)
{
rocksdb_free
(
errs
[
i
]);
}
taosMemoryFree
(
key_list
);
taosMemoryFree
(
keys_list
);
taosMemoryFree
(
keys_list_sizes
);
taosMemoryFree
(
errs
);
...
...
@@ -292,19 +382,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_writebatch_t
*
wb
=
pTsdb
->
rCache
.
writebatch
;
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
SColVal
*
pColVal
=
(
SColVal
*
)
taosArrayGet
(
aColVal
,
i
);
if
(
COL_VAL_IS_VALUE
(
pColVal
))
{
SLastCol
*
pLastCol
=
tsdbCacheDeserialize
(
values_list
[
i
]);
if
(
NULL
==
pLastCol
||
pLastCol
->
ts
<=
keyTs
)
{
char
*
value
=
NULL
;
size_t
vlen
=
0
;
tsdbCacheSerialize
(
&
(
SLastCol
){.
ts
=
keyTs
,
.
colVal
=
*
pColVal
},
&
value
,
&
vlen
);
char
key
[
ROCKS_KEY_LEN
];
size_t
klen
=
snprintf
(
key
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last"
,
uid
,
pColVal
->
cid
);
rocksdb_writebatch_put
(
wb
,
key
,
klen
,
value
,
vlen
);
taosMemoryFree
(
value
);
}
}
if
(
!
COL_VAL_IS_NONE
(
pColVal
))
{
SLastCol
*
pLastCol
=
tsdbCacheDeserialize
(
values_list
[
i
+
num_keys
]);
...
...
@@ -313,11 +390,61 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
char
*
value
=
NULL
;
size_t
vlen
=
0
;
tsdbCacheSerialize
(
&
(
SLastCol
){.
ts
=
keyTs
,
.
colVal
=
*
pColVal
},
&
value
,
&
vlen
);
char
key
[
ROCKS_KEY_LEN
];
size_t
klen
=
snprintf
(
key
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last_row"
,
uid
,
pColVal
->
cid
);
rocksdb_writebatch_put
(
wb
,
key
,
klen
,
value
,
vlen
);
SLastKey
key
=
(
SLastKey
){.
ltype
=
0
,
.
uid
=
uid
,
.
cid
=
pColVal
->
cid
};
size_t
klen
=
ROCKS_KEY_LEN
;
rocksdb_writebatch_put
(
wb
,
(
char
*
)
&
key
,
klen
,
value
,
vlen
);
pLastCol
=
(
SLastCol
*
)
value
;
SLastCol
*
pTmpLastCol
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastCol
));
*
pTmpLastCol
=
*
pLastCol
;
pLastCol
=
pTmpLastCol
;
reallocVarData
(
&
pLastCol
->
colVal
);
size_t
charge
=
sizeof
(
*
pLastCol
);
if
(
IS_VAR_DATA_TYPE
(
pLastCol
->
colVal
.
type
))
{
charge
+=
pLastCol
->
colVal
.
value
.
nData
;
}
LRUStatus
status
=
taosLRUCacheInsert
(
pTsdb
->
lruCache
,
&
key
,
ROCKS_KEY_LEN
,
pLastCol
,
charge
,
tsdbCacheDeleter
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
code
=
-
1
;
}
taosMemoryFree
(
value
);
}
if
(
COL_VAL_IS_VALUE
(
pColVal
))
{
SLastCol
*
pLastCol
=
tsdbCacheDeserialize
(
values_list
[
i
]);
if
(
NULL
==
pLastCol
||
pLastCol
->
ts
<=
keyTs
)
{
char
*
value
=
NULL
;
size_t
vlen
=
0
;
tsdbCacheSerialize
(
&
(
SLastCol
){.
ts
=
keyTs
,
.
colVal
=
*
pColVal
},
&
value
,
&
vlen
);
SLastKey
key
=
(
SLastKey
){.
ltype
=
1
,
.
uid
=
uid
,
.
cid
=
pColVal
->
cid
};
rocksdb_writebatch_put
(
wb
,
(
char
*
)
&
key
,
ROCKS_KEY_LEN
,
value
,
vlen
);
pLastCol
=
(
SLastCol
*
)
value
;
SLastCol
*
pTmpLastCol
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastCol
));
*
pTmpLastCol
=
*
pLastCol
;
pLastCol
=
pTmpLastCol
;
reallocVarData
(
&
pLastCol
->
colVal
);
size_t
charge
=
sizeof
(
*
pLastCol
);
if
(
IS_VAR_DATA_TYPE
(
pLastCol
->
colVal
.
type
))
{
charge
+=
pLastCol
->
colVal
.
value
.
nData
;
}
LRUStatus
status
=
taosLRUCacheInsert
(
pTsdb
->
lruCache
,
&
key
,
ROCKS_KEY_LEN
,
pLastCol
,
charge
,
tsdbCacheDeleter
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
code
=
-
1
;
}
taosMemoryFree
(
value
);
}
}
}
rocksdb_free
(
values_list
[
i
]);
...
...
@@ -326,14 +453,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree
(
values_list
);
taosMemoryFree
(
values_list_sizes
);
char
*
err
=
NULL
;
rocksdb_write
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
writeoptions
,
wb
,
&
err
);
if
(
NULL
!=
err
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
rocksdb_free
(
err
);
}
rocksMayWrite
(
pTsdb
,
false
);
taosThreadMutexUnlock
(
&
pTsdb
->
rCache
.
rMutex
);
rocksdb_writebatch_clear
(
wb
);
_exit:
taosArrayDestroy
(
aColVal
);
...
...
@@ -341,53 +462,41 @@ _exit:
return
code
;
}
static
void
reallocVarData
(
SColVal
*
pColVal
)
{
if
(
IS_VAR_DATA_TYPE
(
pColVal
->
type
))
{
uint8_t
*
pVal
=
pColVal
->
value
.
pData
;
pColVal
->
value
.
pData
=
taosMemoryMalloc
(
pColVal
->
value
.
nData
);
if
(
pColVal
->
value
.
nData
)
{
memcpy
(
pColVal
->
value
.
pData
,
pVal
,
pColVal
->
value
.
nData
);
}
}
}
static
int32_t
mergeLastCid
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
SArray
**
ppLastArray
,
SCacheRowsReader
*
pr
,
int16_t
*
aCols
,
int
nCols
,
int16_t
*
slotIds
);
static
int32_t
mergeLastRowCid
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
SArray
**
ppLastArray
,
SCacheRowsReader
*
pr
,
int16_t
*
aCols
,
int
nCols
,
int16_t
*
slotIds
);
int32_t
tsdbCacheGet
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int32_t
ltype
)
{
static
char
const
*
alstring
[
2
]
=
{
"last_row"
,
"last"
};
char
const
*
lstring
=
alstring
[
ltype
];
#if 1
int32_t
tsdbCacheGetSlow
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int8_t
ltype
)
{
rocksdb_writebatch_t
*
wb
=
NULL
;
int32_t
code
=
0
;
SArray
*
pCidList
=
pr
->
pCidList
;
int
num_keys
=
TARRAY_SIZE
(
pCidList
);
char
**
keys_list
=
taosMemoryCalloc
(
num_keys
,
sizeof
(
char
*
));
size_t
*
keys_list_sizes
=
taosMemoryCalloc
(
num_keys
,
sizeof
(
size_t
));
char
**
keys_list
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
char
*
));
size_t
*
keys_list_sizes
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
size_t
));
char
*
key_list
=
taosMemoryMalloc
(
num_keys
*
ROCKS_KEY_LEN
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
int16_t
cid
=
*
(
int16_t
*
)
taosArrayGet
(
pCidList
,
i
);
char
*
keys
=
taosMemoryCalloc
(
2
,
ROCKS_KEY_LEN
);
int
last_key_len
=
snprintf
(
keys
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":%s"
,
uid
,
cid
,
lstring
);
if
(
last_key_len
>=
ROCKS_KEY_LEN
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
tstrerror
(
code
));
}
keys_list
[
i
]
=
keys
;
keys_list_sizes
[
i
]
=
last_key_len
;
memcpy
(
key_list
+
i
*
ROCKS_KEY_LEN
,
&
(
SLastKey
){.
ltype
=
ltype
,
.
uid
=
uid
,
.
cid
=
cid
},
ROCKS_KEY_LEN
);
keys_list
[
i
]
=
key_list
+
i
*
ROCKS_KEY_LEN
;
keys_list_sizes
[
i
]
=
ROCKS_KEY_LEN
;
}
char
**
values_list
=
taosMemoryCalloc
(
num_keys
,
sizeof
(
char
*
));
size_t
*
values_list_sizes
=
taosMemoryCalloc
(
num_keys
,
sizeof
(
size_t
));
char
**
errs
=
taosMemory
Calloc
(
num_keys
,
sizeof
(
char
*
));
char
**
errs
=
taosMemory
Malloc
(
num_keys
*
sizeof
(
char
*
));
rocksdb_multi_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
num_keys
,
(
const
char
*
const
*
)
keys_list
,
keys_list_sizes
,
values_list
,
values_list_sizes
,
errs
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
taosMemoryFree
(
keys_list
[
i
]);
rocksdb_free
(
errs
[
i
]);
if
(
errs
[
i
])
{
rocksdb_free
(
errs
[
i
]);
}
}
taosMemoryFree
(
key_list
);
taosMemoryFree
(
keys_list
);
taosMemoryFree
(
keys_list_sizes
);
taosMemoryFree
(
errs
);
...
...
@@ -403,7 +512,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
}
else
{
taosThreadMutexLock
(
&
pTsdb
->
rCache
.
rMutex
);
pLastCol
=
tsdbCacheLookup
(
pTsdb
,
uid
,
cid
,
l
string
);
pLastCol
=
tsdbCacheLookup
(
pTsdb
,
uid
,
cid
,
l
type
);
if
(
!
pLastCol
)
{
// recalc: load from tsdb
int16_t
aCols
[
1
]
=
{
cid
};
...
...
@@ -432,9 +541,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
char
*
value
=
NULL
;
size_t
vlen
=
0
;
tsdbCacheSerialize
(
pLastCol
,
&
value
,
&
vlen
);
char
key
[
ROCKS_KEY_LEN
];
size_t
klen
=
snprintf
(
key
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":%s"
,
uid
,
pLastCol
->
colVal
.
cid
,
lstring
);
rocksdb_writebatch_put
(
wb
,
key
,
klen
,
value
,
vlen
);
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
ltype
,
.
uid
=
uid
,
.
cid
=
pLastCol
->
colVal
.
cid
};
size_t
klen
=
ROCKS_KEY_LEN
;
rocksdb_writebatch_put
(
wb
,
(
char
*
)
key
,
klen
,
value
,
vlen
);
taosMemoryFree
(
value
);
}
else
{
...
...
@@ -442,21 +552,13 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
}
if
(
wb
)
{
char
*
err
=
NULL
;
rocksdb_write
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
writeoptions
,
wb
,
&
err
);
if
(
NULL
!=
err
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
rocksdb_free
(
err
);
}
rocksdb_writebatch_clear
(
wb
);
rocksMayWrite
(
pTsdb
,
false
);
}
taosThreadMutexUnlock
(
&
pTsdb
->
rCache
.
rMutex
);
}
taosArrayPush
(
pLastArray
,
pLastCol
);
taosArrayDestroy
(
pTmpColArray
);
if
(
freeCol
)
{
taosMemoryFree
(
pLastCol
);
...
...
@@ -467,43 +569,370 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
return
code
;
}
#endif
static
SLastCol
*
tsdbCacheLoadCol
(
STsdb
*
pTsdb
,
SCacheRowsReader
*
pr
,
int16_t
slotid
,
tb_uid_t
uid
,
int16_t
cid
,
int8_t
ltype
)
{
SLastCol
*
pLastCol
=
tsdbCacheLookup
(
pTsdb
,
uid
,
cid
,
ltype
);
if
(
!
pLastCol
)
{
rocksdb_writebatch_t
*
wb
=
NULL
;
taosThreadMutexLock
(
&
pTsdb
->
rCache
.
rMutex
);
pLastCol
=
tsdbCacheLookup
(
pTsdb
,
uid
,
cid
,
ltype
);
if
(
!
pLastCol
)
{
// recalc: load from tsdb
int16_t
aCols
[
1
]
=
{
cid
};
int16_t
slotIds
[
1
]
=
{
slotid
};
SArray
*
pTmpColArray
=
NULL
;
if
(
ltype
)
{
mergeLastCid
(
uid
,
pTsdb
,
&
pTmpColArray
,
pr
,
aCols
,
1
,
slotIds
);
}
else
{
mergeLastRowCid
(
uid
,
pTsdb
,
&
pTmpColArray
,
pr
,
aCols
,
1
,
slotIds
);
}
if
(
pTmpColArray
&&
TARRAY_SIZE
(
pTmpColArray
)
>=
1
)
{
pLastCol
=
taosArrayGet
(
pTmpColArray
,
0
);
}
// still null, then make up a none col value
SLastCol
noneCol
=
{.
ts
=
TSKEY_MIN
,
.
colVal
=
COL_VAL_NONE
(
cid
,
pr
->
pSchema
->
columns
[
slotid
].
type
)};
if
(
!
pLastCol
)
{
pLastCol
=
&
noneCol
;
}
// store result back to rocks cache
wb
=
pTsdb
->
rCache
.
writebatch
;
char
*
value
=
NULL
;
size_t
vlen
=
0
;
tsdbCacheSerialize
(
pLastCol
,
&
value
,
&
vlen
);
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
ltype
,
.
uid
=
uid
,
.
cid
=
pLastCol
->
colVal
.
cid
};
size_t
klen
=
ROCKS_KEY_LEN
;
rocksdb_writebatch_put
(
wb
,
(
char
*
)
key
,
klen
,
value
,
vlen
);
taosMemoryFree
(
value
);
SLastCol
*
pTmpLastCol
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastCol
));
*
pTmpLastCol
=
*
pLastCol
;
pLastCol
=
pTmpLastCol
;
taosArrayDestroy
(
pTmpColArray
);
}
if
(
wb
)
{
rocksMayWrite
(
pTsdb
,
false
);
}
taosThreadMutexUnlock
(
&
pTsdb
->
rCache
.
rMutex
);
}
return
pLastCol
;
}
typedef
struct
{
int
idx
;
SLastKey
key
;
}
SIdxKey
;
static
int32_t
tsdbCacheLoadFromRaw
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SArray
*
remainCols
,
SCacheRowsReader
*
pr
,
int8_t
ltype
)
{
int32_t
code
=
0
;
rocksdb_writebatch_t
*
wb
=
NULL
;
SArray
*
pTmpColArray
=
NULL
;
int
num_keys
=
TARRAY_SIZE
(
remainCols
);
int16_t
*
aCols
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
int16_t
));
int16_t
*
slotIds
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
int16_t
));
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
SIdxKey
*
idxKey
=
taosArrayGet
(
remainCols
,
i
);
aCols
[
i
]
=
idxKey
->
key
.
cid
;
slotIds
[
i
]
=
pr
->
pSlotIds
[
idxKey
->
idx
];
}
if
(
ltype
)
{
mergeLastCid
(
uid
,
pTsdb
,
&
pTmpColArray
,
pr
,
aCols
,
num_keys
,
slotIds
);
}
else
{
mergeLastRowCid
(
uid
,
pTsdb
,
&
pTmpColArray
,
pr
,
aCols
,
num_keys
,
slotIds
);
}
SLRUCache
*
pCache
=
pTsdb
->
lruCache
;
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
SIdxKey
*
idxKey
=
taosArrayGet
(
remainCols
,
i
);
SLastCol
*
pLastCol
=
NULL
;
if
(
pTmpColArray
&&
TARRAY_SIZE
(
pTmpColArray
)
>=
i
+
1
)
{
pLastCol
=
taosArrayGet
(
pTmpColArray
,
i
);
}
// still null, then make up a none col value
SLastCol
noneCol
=
{.
ts
=
TSKEY_MIN
,
.
colVal
=
COL_VAL_NONE
(
idxKey
->
key
.
cid
,
pr
->
pSchema
->
columns
[
slotIds
[
i
]].
type
)};
if
(
!
pLastCol
)
{
pLastCol
=
&
noneCol
;
}
SLastCol
*
pTmpLastCol
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastCol
));
*
pTmpLastCol
=
*
pLastCol
;
pLastCol
=
pTmpLastCol
;
size_t
charge
=
sizeof
(
*
pLastCol
);
if
(
IS_VAR_DATA_TYPE
(
pLastCol
->
colVal
.
type
))
{
charge
+=
pLastCol
->
colVal
.
value
.
nData
;
}
LRUStatus
status
=
taosLRUCacheInsert
(
pCache
,
&
idxKey
->
key
,
ROCKS_KEY_LEN
,
pLastCol
,
charge
,
tsdbCacheDeleter
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
code
=
-
1
;
}
// store result back to rocks cache
wb
=
pTsdb
->
rCache
.
writebatch
;
char
*
value
=
NULL
;
size_t
vlen
=
0
;
tsdbCacheSerialize
(
pLastCol
,
&
value
,
&
vlen
);
SLastKey
*
key
=
&
idxKey
->
key
;
size_t
klen
=
ROCKS_KEY_LEN
;
rocksdb_writebatch_put
(
wb
,
(
char
*
)
key
,
klen
,
value
,
vlen
);
taosMemoryFree
(
value
);
taosArraySet
(
pLastArray
,
idxKey
->
idx
,
pLastCol
);
// taosArrayRemove(remainCols, i);
}
if
(
wb
)
{
rocksMayWrite
(
pTsdb
,
false
);
}
taosArrayDestroy
(
pTmpColArray
);
taosMemoryFree
(
aCols
);
taosMemoryFree
(
slotIds
);
return
code
;
}
static
int32_t
tsdbCacheLoadFromRocks
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SArray
*
remainCols
,
SCacheRowsReader
*
pr
,
int8_t
ltype
)
{
int32_t
code
=
0
;
int
num_keys
=
TARRAY_SIZE
(
remainCols
);
char
**
keys_list
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
char
*
));
size_t
*
keys_list_sizes
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
size_t
));
char
*
key_list
=
taosMemoryMalloc
(
num_keys
*
ROCKS_KEY_LEN
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
int16_t
cid
=
*
(
int16_t
*
)
taosArrayGet
(
remainCols
,
i
);
memcpy
(
key_list
+
i
*
ROCKS_KEY_LEN
,
&
((
SIdxKey
*
)
taosArrayGet
(
remainCols
,
i
))
->
key
,
ROCKS_KEY_LEN
);
keys_list
[
i
]
=
key_list
+
i
*
ROCKS_KEY_LEN
;
keys_list_sizes
[
i
]
=
ROCKS_KEY_LEN
;
}
char
**
values_list
=
taosMemoryCalloc
(
num_keys
,
sizeof
(
char
*
));
size_t
*
values_list_sizes
=
taosMemoryCalloc
(
num_keys
,
sizeof
(
size_t
));
char
**
errs
=
taosMemoryMalloc
(
num_keys
*
sizeof
(
char
*
));
rocksdb_multi_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
num_keys
,
(
const
char
*
const
*
)
keys_list
,
keys_list_sizes
,
values_list
,
values_list_sizes
,
errs
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
if
(
errs
[
i
])
{
rocksdb_free
(
errs
[
i
]);
}
}
taosMemoryFree
(
key_list
);
taosMemoryFree
(
keys_list
);
taosMemoryFree
(
keys_list_sizes
);
taosMemoryFree
(
errs
);
SLRUCache
*
pCache
=
pTsdb
->
lruCache
;
for
(
int
i
=
0
,
j
=
0
;
i
<
num_keys
&&
j
<
TARRAY_SIZE
(
remainCols
);
++
i
)
{
SLastCol
*
pLastCol
=
tsdbCacheDeserialize
(
values_list
[
i
]);
SIdxKey
*
idxKey
=
&
((
SIdxKey
*
)
TARRAY_DATA
(
remainCols
))[
j
];
if
(
pLastCol
)
{
SLastCol
*
pTmpLastCol
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastCol
));
*
pTmpLastCol
=
*
pLastCol
;
pLastCol
=
pTmpLastCol
;
reallocVarData
(
&
pLastCol
->
colVal
);
size_t
charge
=
sizeof
(
*
pLastCol
);
if
(
IS_VAR_DATA_TYPE
(
pLastCol
->
colVal
.
type
))
{
charge
+=
pLastCol
->
colVal
.
value
.
nData
;
}
LRUStatus
status
=
taosLRUCacheInsert
(
pCache
,
&
idxKey
->
key
,
ROCKS_KEY_LEN
,
pLastCol
,
charge
,
tsdbCacheDeleter
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
code
=
-
1
;
}
taosArraySet
(
pLastArray
,
idxKey
->
idx
,
pLastCol
);
taosArrayRemove
(
remainCols
,
j
);
taosMemoryFree
(
values_list
[
i
]);
}
else
{
++
j
;
}
}
taosMemoryFree
(
values_list
);
taosMemoryFree
(
values_list_sizes
);
if
(
TARRAY_SIZE
(
remainCols
)
>
0
)
{
code
=
tsdbCacheLoadFromRaw
(
pTsdb
,
uid
,
pLastArray
,
remainCols
,
pr
,
ltype
);
}
return
code
;
}
int32_t
tsdbCacheGetBatch
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int8_t
ltype
)
{
int32_t
code
=
0
;
SArray
*
remainCols
=
NULL
;
SLRUCache
*
pCache
=
pTsdb
->
lruCache
;
SArray
*
pCidList
=
pr
->
pCidList
;
int
num_keys
=
TARRAY_SIZE
(
pCidList
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
int16_t
cid
=
((
int16_t
*
)
TARRAY_DATA
(
pCidList
))[
i
];
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
ltype
,
.
uid
=
uid
,
.
cid
=
cid
};
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
key
,
ROCKS_KEY_LEN
);
if
(
h
)
{
SLastCol
*
pLastCol
=
(
SLastCol
*
)
taosLRUCacheValue
(
pCache
,
h
);
SLastCol
lastCol
=
*
pLastCol
;
// reallocVarData(&lastCol.colVal);
taosArrayPush
(
pLastArray
,
&
lastCol
);
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
}
else
{
SLastCol
noneCol
=
{.
ts
=
TSKEY_MIN
,
.
colVal
=
COL_VAL_NONE
(
cid
,
pr
->
pSchema
->
columns
[
pr
->
pSlotIds
[
i
]].
type
)};
taosArrayPush
(
pLastArray
,
&
noneCol
);
if
(
!
remainCols
)
{
remainCols
=
taosArrayInit
(
num_keys
,
sizeof
(
SIdxKey
));
}
taosArrayPush
(
remainCols
,
&
(
SIdxKey
){
i
,
*
key
});
}
}
if
(
remainCols
&&
TARRAY_SIZE
(
remainCols
)
>
0
)
{
taosThreadMutexLock
(
&
pTsdb
->
lruMutex
);
for
(
int
i
=
0
;
i
<
TARRAY_SIZE
(
remainCols
);)
{
SIdxKey
*
idxKey
=
&
((
SIdxKey
*
)
TARRAY_DATA
(
remainCols
))[
i
];
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
&
idxKey
->
key
,
ROCKS_KEY_LEN
);
if
(
h
)
{
SLastCol
*
pLastCol
=
(
SLastCol
*
)
taosLRUCacheValue
(
pCache
,
h
);
SLastCol
lastCol
=
*
pLastCol
;
reallocVarData
(
&
lastCol
.
colVal
);
taosArraySet
(
pLastArray
,
idxKey
->
idx
,
&
lastCol
);
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
taosArrayRemove
(
remainCols
,
i
);
}
else
{
++
i
;
}
}
code
=
tsdbCacheLoadFromRocks
(
pTsdb
,
uid
,
pLastArray
,
remainCols
,
pr
,
ltype
);
taosThreadMutexUnlock
(
&
pTsdb
->
lruMutex
);
}
if
(
remainCols
)
{
taosArrayDestroy
(
remainCols
);
}
return
code
;
}
int32_t
tsdbCacheGet
(
STsdb
*
pTsdb
,
tb_uid_t
uid
,
SArray
*
pLastArray
,
SCacheRowsReader
*
pr
,
int8_t
ltype
)
{
int32_t
code
=
0
;
SLRUCache
*
pCache
=
pTsdb
->
lruCache
;
SArray
*
pCidList
=
pr
->
pCidList
;
int
num_keys
=
TARRAY_SIZE
(
pCidList
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
SLastCol
*
pLastCol
=
NULL
;
int16_t
cid
=
*
(
int16_t
*
)
taosArrayGet
(
pCidList
,
i
);
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
ltype
,
.
uid
=
uid
,
.
cid
=
cid
};
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
key
,
ROCKS_KEY_LEN
);
if
(
!
h
)
{
taosThreadMutexLock
(
&
pTsdb
->
lruMutex
);
h
=
taosLRUCacheLookup
(
pCache
,
key
,
ROCKS_KEY_LEN
);
if
(
!
h
)
{
pLastCol
=
tsdbCacheLoadCol
(
pTsdb
,
pr
,
pr
->
pSlotIds
[
i
],
uid
,
cid
,
ltype
);
size_t
charge
=
sizeof
(
*
pLastCol
);
if
(
IS_VAR_DATA_TYPE
(
pLastCol
->
colVal
.
type
))
{
charge
+=
pLastCol
->
colVal
.
value
.
nData
;
}
LRUStatus
status
=
taosLRUCacheInsert
(
pCache
,
key
,
ROCKS_KEY_LEN
,
pLastCol
,
charge
,
tsdbCacheDeleter
,
&
h
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
code
=
-
1
;
}
}
taosThreadMutexUnlock
(
&
pTsdb
->
lruMutex
);
}
pLastCol
=
(
SLastCol
*
)
taosLRUCacheValue
(
pCache
,
h
);
SLastCol
lastCol
=
*
pLastCol
;
reallocVarData
(
&
lastCol
.
colVal
);
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
taosArrayPush
(
pLastArray
,
&
lastCol
);
}
return
code
;
}
int32_t
tsdbCacheDel
(
STsdb
*
pTsdb
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
//
1,
fetch schema
// fetch schema
STSchema
*
pTSchema
=
NULL
;
int
32_t
sver
=
-
1
;
int
sver
=
-
1
;
code
=
metaGetTbTSchemaEx
(
pTsdb
->
pVnode
->
pMeta
,
suid
,
uid
,
sver
,
&
pTSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
-
1
;
}
//
3,
build keys & multi get from rocks
// build keys & multi get from rocks
int
num_keys
=
pTSchema
->
numOfCols
;
char
**
keys_list
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
size_t
*
keys_list_sizes
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
size_t
));
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
int16_t
cid
=
pTSchema
->
columns
[
i
].
colId
;
char
*
keys
=
taosMemoryCalloc
(
2
,
ROCKS_KEY_LEN
);
int
last_key_len
=
snprintf
(
keys
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last"
,
uid
,
cid
);
if
(
last_key_len
>=
ROCKS_KEY_LEN
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
tstrerror
(
code
));
}
int
lr_key_len
=
snprintf
(
keys
+
ROCKS_KEY_LEN
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last_row"
,
uid
,
cid
);
if
(
lr_key_len
>=
ROCKS_KEY_LEN
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
tstrerror
(
code
));
}
size_t
klen
=
ROCKS_KEY_LEN
;
char
*
keys
=
taosMemoryCalloc
(
2
,
sizeof
(
SLastKey
));
((
SLastKey
*
)
keys
)[
0
]
=
(
SLastKey
){.
ltype
=
1
,
.
uid
=
uid
,
.
cid
=
cid
};
((
SLastKey
*
)
keys
)[
1
]
=
(
SLastKey
){.
ltype
=
0
,
.
uid
=
uid
,
.
cid
=
cid
};
keys_list
[
i
]
=
keys
;
keys_list
[
num_keys
+
i
]
=
keys
+
ROCKS_KEY_LEN
;
keys_list_sizes
[
i
]
=
last_key_
len
;
keys_list_sizes
[
num_keys
+
i
]
=
lr_key_
len
;
keys_list
[
num_keys
+
i
]
=
keys
+
sizeof
(
SLastKey
)
;
keys_list_sizes
[
i
]
=
k
len
;
keys_list_sizes
[
num_keys
+
i
]
=
k
len
;
}
char
**
values_list
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
size_t
*
values_list_sizes
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
size_t
));
char
**
errs
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
taosThreadMutexLock
(
&
pTsdb
->
rCache
.
rMutex
);
rocksMayWrite
(
pTsdb
,
true
);
rocksdb_multi_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
num_keys
*
2
,
(
const
char
*
const
*
)
keys_list
,
keys_list_sizes
,
values_list
,
values_list_sizes
,
errs
);
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
...
...
@@ -520,16 +949,20 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
SLastCol
*
pLastCol
=
tsdbCacheDeserialize
(
values_list
[
i
]);
if
(
NULL
!=
pLastCol
&&
(
pLastCol
->
ts
<=
eKey
&&
pLastCol
->
ts
>=
sKey
))
{
char
key
[
ROCKS_KEY_LEN
];
size_t
klen
=
snprintf
(
key
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last"
,
uid
,
pLastCol
->
colVal
.
cid
);
rocksdb_writebatch_delete
(
wb
,
key
,
klen
);
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
1
,
.
uid
=
uid
,
.
cid
=
pLastCol
->
colVal
.
cid
};
size_t
klen
=
ROCKS_KEY_LEN
;
rocksdb_writebatch_delete
(
wb
,
(
char
*
)
key
,
klen
);
taosLRUCacheErase
(
pTsdb
->
lruCache
,
key
,
klen
);
}
pLastCol
=
tsdbCacheDeserialize
(
values_list
[
i
+
num_keys
]);
if
(
NULL
!=
pLastCol
&&
(
pLastCol
->
ts
<=
eKey
&&
pLastCol
->
ts
>=
sKey
))
{
char
key
[
ROCKS_KEY_LEN
];
size_t
klen
=
snprintf
(
key
,
ROCKS_KEY_LEN
,
"%"
PRIi64
":%"
PRIi16
":last_row"
,
uid
,
pLastCol
->
colVal
.
cid
);
rocksdb_writebatch_delete
(
wb
,
key
,
klen
);
SLastKey
*
key
=
&
(
SLastKey
){.
ltype
=
0
,
.
uid
=
uid
,
.
cid
=
pLastCol
->
colVal
.
cid
};
size_t
klen
=
ROCKS_KEY_LEN
;
rocksdb_writebatch_delete
(
wb
,
(
char
*
)
key
,
klen
);
taosLRUCacheErase
(
pTsdb
->
lruCache
,
key
,
klen
);
}
rocksdb_free
(
values_list
[
i
]);
...
...
@@ -538,14 +971,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree
(
values_list
);
taosMemoryFree
(
values_list_sizes
);
char
*
err
=
NULL
;
rocksdb_write
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
writeoptions
,
wb
,
&
err
);
if
(
NULL
!=
err
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
__LINE__
,
err
);
rocksdb_free
(
err
);
}
rocksMayWrite
(
pTsdb
,
true
);
taosThreadMutexUnlock
(
&
pTsdb
->
rCache
.
rMutex
);
rocksdb_writebatch_clear
(
wb
);
_exit:
taosMemoryFree
(
pTSchema
);
...
...
@@ -1111,7 +1538,7 @@ typedef struct {
SMergeTree
mergeTree
;
SMergeTree
*
pMergeTree
;
SSttBlockLoadInfo
*
pLoadInfo
;
SLDataIter
*
pDataIter
;
SLDataIter
*
pDataIter
;
int64_t
lastTs
;
}
SFSLastNextRowIter
;
...
...
@@ -1152,14 +1579,21 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
if
(
code
)
goto
_err
;
}
int
nTmpCols
=
nCols
;
bool
hasTs
=
false
;
if
(
aCols
[
0
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
--
nTmpCols
;
hasTs
=
true
;
}
for
(
int
i
=
0
;
i
<
state
->
pLoadInfo
->
numOfStt
;
++
i
)
{
state
->
pLoadInfo
[
i
].
colIds
=
aCols
;
state
->
pLoadInfo
[
i
].
numOfCols
=
nCols
;
state
->
pLoadInfo
[
i
].
colIds
=
hasTs
?
aCols
+
1
:
aCols
;
state
->
pLoadInfo
[
i
].
numOfCols
=
n
Tmp
Cols
;
state
->
pLoadInfo
[
i
].
isLast
=
isLast
;
}
tMergeTreeOpen
(
&
state
->
mergeTree
,
1
,
*
state
->
pDataFReader
,
state
->
suid
,
state
->
uid
,
&
(
STimeWindow
){.
skey
=
state
->
lastTs
,
.
ekey
=
TSKEY_MAX
},
&
(
SVersionRange
){.
minVer
=
0
,
.
maxVer
=
UINT64_MAX
},
state
->
pLoadInfo
,
false
,
NULL
,
true
,
state
->
pDataIter
);
&
(
SVersionRange
){.
minVer
=
0
,
.
maxVer
=
UINT64_MAX
},
state
->
pLoadInfo
,
false
,
NULL
,
true
,
state
->
pDataIter
);
state
->
pMergeTree
=
&
state
->
mergeTree
;
state
->
state
=
SFSLASTNEXTROW_BLOCKROW
;
}
...
...
@@ -1394,11 +1828,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
tBlockDataReset
(
state
->
pBlockData
);
TABLEID
tid
=
{.
suid
=
state
->
suid
,
.
uid
=
state
->
uid
};
int
nTmpCols
=
nCols
;
if
(
aCols
[
0
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
&&
nCols
==
1
)
{
nTmpCols
=
0
;
bool
hasTs
=
false
;
if
(
aCols
[
0
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
--
nTmpCols
;
skipBlock
=
false
;
hasTs
=
true
;
}
code
=
tBlockDataInit
(
state
->
pBlockData
,
&
tid
,
state
->
pTSchema
,
aCols
,
nTmpCols
);
code
=
tBlockDataInit
(
state
->
pBlockData
,
&
tid
,
state
->
pTSchema
,
hasTs
?
aCols
+
1
:
aCols
,
nTmpCols
);
if
(
code
)
goto
_err
;
code
=
tsdbReadDataBlock
(
*
state
->
pDataFReader
,
&
block
,
state
->
pBlockData
);
...
...
@@ -1730,8 +2166,8 @@ typedef struct {
}
CacheNextRowIter
;
static
int32_t
nextRowIterOpen
(
CacheNextRowIter
*
pIter
,
tb_uid_t
uid
,
STsdb
*
pTsdb
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
SSttBlockLoadInfo
*
pLoadInfo
,
SLDataIter
*
pLDataIter
,
STsdbReadSnap
*
pReadSnap
,
SDataFReader
**
pDataFReader
,
SDataFReader
**
pDataFReaderLast
,
int64_t
lastTs
)
{
SSttBlockLoadInfo
*
pLoadInfo
,
SLDataIter
*
pLDataIter
,
STsdbReadSnap
*
pReadSnap
,
SDataFReader
**
pDataFReader
,
SDataFReader
**
pDataFReader
Last
,
int64_t
lastTs
)
{
int
code
=
0
;
STbData
*
pMem
=
NULL
;
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
208c9756
...
...
@@ -180,8 +180,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
SVnodeCfg
*
pCfg
=
&
((
SVnode
*
)
pVnode
)
->
config
;
int32_t
numOfStt
=
pCfg
->
sttTrigger
;
int32_t
numOfStt
=
pCfg
->
sttTrigger
;
p
->
pLoadInfo
=
tCreateLastBlockLoadInfo
(
p
->
pSchema
,
NULL
,
0
,
numOfStt
);
if
(
p
->
pLoadInfo
==
NULL
)
{
tsdbCacherowsReaderClose
(
p
);
...
...
@@ -215,7 +214,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree
(
p
->
pSchema
);
}
taosMemoryFree
Clear
(
p
->
pDataIter
);
taosMemoryFree
(
p
->
pDataIter
);
taosMemoryFree
(
p
->
pCurrSchema
);
destroyLastBlockLoadInfo
(
p
->
pLoadInfo
);
...
...
@@ -306,23 +305,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
pr
->
pDataFReader
=
NULL
;
pr
->
pDataFReaderLast
=
NULL
;
int
32
_t
ltype
=
(
pr
->
type
&
CACHESCAN_RETRIEVE_LAST
)
>>
3
;
int
8
_t
ltype
=
(
pr
->
type
&
CACHESCAN_RETRIEVE_LAST
)
>>
3
;
// retrieve the only one last row of all tables in the uid list.
if
(
HASTYPE
(
pr
->
type
,
CACHESCAN_RETRIEVE_TYPE_SINGLE
))
{
int64_t
st
=
taosGetTimestampUs
();
int64_t
totalLastTs
=
INT64_MAX
;
for
(
int32_t
i
=
0
;
i
<
pr
->
numOfTables
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
&
pr
->
pTableList
[
i
];
tsdbCacheGet
(
pr
->
pTsdb
,
pKeyInfo
->
uid
,
pRow
,
pr
,
ltype
);
tsdbCacheGetBatch
(
pr
->
pTsdb
,
pKeyInfo
->
uid
,
pRow
,
pr
,
ltype
);
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
if
(
TARRAY_SIZE
(
pRow
)
<=
0
)
{
taosArrayClearEx
(
pRow
,
freeItem
);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear
(
pRow
);
continue
;
}
SLastCol
*
pColVal
=
(
SLastCol
*
)
taosArrayGet
(
pRow
,
0
);
SLastCol
*
pColVal
=
taosArrayGet
(
pRow
,
0
);
if
(
COL_VAL_IS_NONE
(
&
pColVal
->
colVal
))
{
taosArrayClearEx
(
pRow
,
freeItem
);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear
(
pRow
);
continue
;
}
...
...
@@ -373,13 +376,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
if
(
taosArrayGetSize
(
pTableUidList
)
==
0
)
{
if
(
TARRAY_SIZE
(
pTableUidList
)
==
0
)
{
taosArrayPush
(
pTableUidList
,
&
pKeyInfo
->
uid
);
}
else
{
taosArraySet
(
pTableUidList
,
0
,
&
pKeyInfo
->
uid
);
}
taosArrayClearEx
(
pRow
,
freeItem
);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear
(
pRow
);
}
if
(
hasRes
)
{
...
...
@@ -387,25 +391,28 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
else
if
(
HASTYPE
(
pr
->
type
,
CACHESCAN_RETRIEVE_TYPE_ALL
))
{
for
(
int32_t
i
=
pr
->
tableIndex
;
i
<
pr
->
numOfTables
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
&
pr
->
pTableList
[
i
]
;
tb_uid_t
uid
=
pr
->
pTableList
[
i
].
uid
;
tsdbCacheGet
(
pr
->
pTsdb
,
pKeyInfo
->
uid
,
pRow
,
pr
,
ltype
);
tsdbCacheGet
Batch
(
pr
->
pTsdb
,
uid
,
pRow
,
pr
,
ltype
);
if
(
TARRAY_SIZE
(
pRow
)
<=
0
)
{
taosArrayClearEx
(
pRow
,
freeItem
);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear
(
pRow
);
continue
;
}
SLastCol
*
pColVal
=
(
SLastCol
*
)
taosArrayGet
(
pRow
,
0
);
if
(
COL_VAL_IS_NONE
(
&
pColVal
->
colVal
))
{
taosArrayClearEx
(
pRow
,
freeItem
);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear
(
pRow
);
continue
;
}
saveOneRow
(
pRow
,
pResBlock
,
pr
,
slotIds
,
dstSlotIds
,
pRes
,
pr
->
idstr
);
taosArrayClearEx
(
pRow
,
freeItem
);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear
(
pRow
);
taosArrayPush
(
pTableUidList
,
&
pKeyInfo
->
uid
);
taosArrayPush
(
pTableUidList
,
&
uid
);
pr
->
tableIndex
+=
1
;
++
pr
->
tableIndex
;
if
(
pResBlock
->
info
.
rows
>=
pResBlock
->
info
.
capacity
)
{
goto
_end
;
}
...
...
@@ -429,7 +436,9 @@ _end:
}
taosMemoryFree
(
pRes
);
taosArrayDestroyEx
(
pRow
,
freeItem
);
// taosArrayDestroyEx(pRow, freeItem);
taosArrayDestroy
(
pRow
);
taosArrayDestroyEx
(
pLastCols
,
freeItem
);
return
code
;
}
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
208c9756
...
...
@@ -302,12 +302,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return
rowsNum
;
}
void
tsdbMemTableCountRows
(
SMemTable
*
pMemTable
,
SSHashObj
*
pTableMap
,
int64_t
*
rowsNum
)
{
void
tsdbMemTableCountRows
(
SMemTable
*
pMemTable
,
SSHashObj
*
pTableMap
,
int64_t
*
rowsNum
)
{
taosRLockLatch
(
&
pMemTable
->
latch
);
for
(
int32_t
i
=
0
;
i
<
pMemTable
->
nBucket
;
++
i
)
{
STbData
*
pTbData
=
pMemTable
->
aBucket
[
i
];
while
(
pTbData
)
{
void
*
p
=
tSimpleHashGet
(
pTableMap
,
&
pTbData
->
uid
,
sizeof
(
pTbData
->
uid
));
void
*
p
=
tSimpleHashGet
(
pTableMap
,
&
pTbData
->
uid
,
sizeof
(
pTbData
->
uid
));
if
(
p
==
NULL
)
{
pTbData
=
pTbData
->
next
;
continue
;
...
...
@@ -673,7 +673,10 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
if
(
key
.
ts
>=
pTbData
->
maxKey
)
{
pTbData
->
maxKey
=
key
.
ts
;
}
tsdbCacheUpdate
(
pMemTable
->
pTsdb
,
pTbData
->
suid
,
pTbData
->
uid
,
&
lRow
);
if
(
!
TSDB_CACHE_NO
(
pMemTable
->
pTsdb
->
pVnode
->
config
))
{
tsdbCacheUpdate
(
pMemTable
->
pTsdb
,
pTbData
->
suid
,
pTbData
->
uid
,
&
lRow
);
}
// SMemTable
pMemTable
->
minKey
=
TMIN
(
pMemTable
->
minKey
,
pTbData
->
minKey
);
...
...
@@ -734,7 +737,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
if
(
key
.
ts
>=
pTbData
->
maxKey
)
{
pTbData
->
maxKey
=
key
.
ts
;
}
tsdbCacheUpdate
(
pMemTable
->
pTsdb
,
pTbData
->
suid
,
pTbData
->
uid
,
&
lRow
);
if
(
!
TSDB_CACHE_NO
(
pMemTable
->
pTsdb
->
pVnode
->
config
))
{
tsdbCacheUpdate
(
pMemTable
->
pTsdb
,
pTbData
->
suid
,
pTbData
->
uid
,
&
lRow
);
}
// SMemTable
pMemTable
->
minKey
=
TMIN
(
pMemTable
->
minKey
,
pTbData
->
minKey
);
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
208c9756
...
...
@@ -439,8 +439,10 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
code
=
tsdbCommit
(
pVnode
->
pTsdb
,
pInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbCacheCommit
(
pVnode
->
pTsdb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
!
TSDB_CACHE_NO
(
pVnode
->
config
))
{
code
=
tsdbCacheCommit
(
pVnode
->
pTsdb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
VND_IS_RSMA
(
pVnode
))
{
code
=
smaCommit
(
pVnode
->
pSma
,
pInfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录