Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1d42f790
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1d42f790
编写于
4月 25, 2022
作者:
C
Cary Xu
提交者:
GitHub
4月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11726 from taosdata/feature/TD-14481-3.0
feat: register rsma info
上级
0468b12d
e3e5f8d1
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
722 addition
and
104 deletion
+722
-104
include/common/tmsg.h
include/common/tmsg.h
+21
-4
source/common/src/tmsg.c
source/common/src/tmsg.c
+80
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+0
-1
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+2
-2
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+0
-1
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+64
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+4
-0
source/dnode/vnode/src/meta/metaTDBImpl.c
source/dnode/vnode/src/meta/metaTDBImpl.c
+7
-1
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-0
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+524
-79
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+13
-11
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
1d42f790
...
@@ -227,8 +227,16 @@ typedef struct {
...
@@ -227,8 +227,16 @@ typedef struct {
}
SSubmitBlkIter
;
}
SSubmitBlkIter
;
typedef
struct
{
typedef
struct
{
int32_t
totalLen
;
int32_t
totalLen
;
int32_t
len
;
int32_t
len
;
// head of SSubmitBlk
// int64_t uid; // table unique id
// int64_t suid; // stable id
// int32_t sversion; // data schema version
// int32_t dataLen; // data part length, not including the SSubmitBlk head
// int32_t schemaLen; // schema length, if length is 0, no schema exists
// int16_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
const
void
*
pMsg
;
const
void
*
pMsg
;
}
SSubmitMsgIter
;
}
SSubmitMsgIter
;
...
@@ -237,6 +245,15 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
...
@@ -237,6 +245,15 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t
tInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
int32_t
tInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
// int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
// int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
// int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
// STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
typedef
struct
{
typedef
struct
{
int32_t
index
;
// index of failed block in submit blocks
int32_t
index
;
// index of failed block in submit blocks
int32_t
vnode
;
// vnode index of failed block
int32_t
vnode
;
// vnode index of failed block
...
@@ -1496,8 +1513,8 @@ typedef struct {
...
@@ -1496,8 +1513,8 @@ typedef struct {
int32_t
qmsg1Len
;
int32_t
qmsg1Len
;
int32_t
qmsg2Len
;
int32_t
qmsg2Len
;
func_id_t
*
pFuncIds
;
func_id_t
*
pFuncIds
;
char
*
qmsg1
;
//
not null:
pAst1:qmsg1:SRetention1 => trigger aggr task1
char
*
qmsg1
;
// pAst1:qmsg1:SRetention1 => trigger aggr task1
char
*
qmsg2
;
//
not null:
pAst2:qmsg2:SRetention2 => trigger aggr task2
char
*
qmsg2
;
// pAst2:qmsg2:SRetention2 => trigger aggr task2
int8_t
nFuncIds
;
int8_t
nFuncIds
;
}
SRSmaParam
;
}
SRSmaParam
;
...
...
source/common/src/tmsg.c
浏览文件 @
1d42f790
...
@@ -93,7 +93,87 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
...
@@ -93,7 +93,87 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
return
row
;
return
row
;
}
}
}
}
#if 0
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = htonl(pMsg->length);
ASSERT(pIter->totalLen > 0);
pIter->len = 0;
pIter->pMsg = pMsg;
if (pIter->totalLen <= sizeof(SSubmitReq)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(pIter->len >= 0);
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitReq);
} else {
if (pIter->len >= pIter->totalLen) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
ASSERT(pIter->len > 0);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
if (pIter->len == pIter->totalLen) {
*pPBlock = NULL;
} else {
*pPBlock = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->uid = htobe64((*pPBlock)->uid);
pIter->suid = htobe64((*pPBlock)->suid);
pIter->sversion = htonl((*pPBlock)->sversion);
pIter->dataLen = htonl((*pPBlock)->dataLen);
pIter->schemaLen = htonl((*pPBlock)->schemaLen);
pIter->numOfRows = htons((*pPBlock)->numOfRows);
}
return 0;
}
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pMsgIter->dataLen <= 0) return -1;
pIter->totalLen = pMsgIter->dataLen;
pIter->len = 0;
pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen);
return 0;
}
STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
STSRow *row = pIter->row;
if (pIter->len >= pIter->totalLen) {
return NULL;
} else {
pIter->len += TD_ROW_LEN(row);
if (pIter->len < pIter->totalLen) {
pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row));
}
return row;
}
}
#endif
int32_t
tEncodeSEpSet
(
SCoder
*
pEncoder
,
const
SEpSet
*
pEp
)
{
int32_t
tEncodeSEpSet
(
SCoder
*
pEncoder
,
const
SEpSet
*
pEp
)
{
if
(
tEncodeI8
(
pEncoder
,
pEp
->
inUse
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pEp
->
inUse
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pEp
->
numOfEps
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pEp
->
numOfEps
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
1d42f790
...
@@ -453,7 +453,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
...
@@ -453,7 +453,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
}
}
}
}
if
(
pStb
->
ast2Len
>
0
)
{
if
(
pStb
->
ast2Len
>
0
)
{
int32_t
qmsgLen2
=
0
;
if
(
mndConvertRSmaTask
(
pStb
->
pAst2
,
0
,
0
,
&
pRSmaParam
->
qmsg2
,
&
pRSmaParam
->
qmsg2Len
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
mndConvertRSmaTask
(
pStb
->
pAst2
,
0
,
0
,
&
pRSmaParam
->
qmsg2
,
&
pRSmaParam
->
qmsg2Len
)
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFreeClear
(
pRSmaParam
->
pFuncIds
);
taosMemoryFreeClear
(
pRSmaParam
->
pFuncIds
);
taosMemoryFreeClear
(
pRSmaParam
->
qmsg1
);
taosMemoryFreeClear
(
pRSmaParam
->
qmsg1
);
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
1d42f790
...
@@ -51,7 +51,7 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
...
@@ -51,7 +51,7 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
,
STbDdlH
*
pHandle
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
...
@@ -74,7 +74,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
...
@@ -74,7 +74,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
// SMetaDB
// SMetaDB
int
metaOpenDB
(
SMeta
*
pMeta
);
int
metaOpenDB
(
SMeta
*
pMeta
);
void
metaCloseDB
(
SMeta
*
pMeta
);
void
metaCloseDB
(
SMeta
*
pMeta
);
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
,
STbDdlH
*
pHandle
);
int
metaRemoveTableFromDb
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaRemoveTableFromDb
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaSaveSmaToDB
(
SMeta
*
pMeta
,
STSma
*
pTbCfg
);
int
metaSaveSmaToDB
(
SMeta
*
pMeta
,
STSma
*
pTbCfg
);
int
metaRemoveSmaFromDb
(
SMeta
*
pMeta
,
int64_t
indexUid
);
int
metaRemoveSmaFromDb
(
SMeta
*
pMeta
,
int64_t
indexUid
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
1d42f790
...
@@ -57,7 +57,6 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
...
@@ -57,7 +57,6 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
typedef
enum
{
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
// .head
TSDB_FILE_HEAD
=
0
,
// .head
TSDB_FILE_DATA
,
// .data
TSDB_FILE_DATA
,
// .data
...
...
source/dnode/vnode/src/inc/tsdbSma.h
0 → 100644
浏览文件 @
1d42f790
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_TSDB_SMA_H_
#define _TD_VNODE_TSDB_SMA_H_
#include "os.h"
#include "thash.h"
#include "tmsg.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
int32_t
(
*
__tb_ddl_fn_t
)(
void
*
ahandle
,
void
**
result
,
void
*
p1
,
void
*
p2
);
struct
STbDdlH
{
void
*
ahandle
;
void
*
result
;
__tb_ddl_fn_t
fp
;
};
typedef
struct
{
tb_uid_t
suid
;
SArray
*
tbUids
;
SHashObj
*
uidHash
;
}
STbUidStore
;
static
FORCE_INLINE
int32_t
tsdbUidStoreInit
(
STbUidStore
**
pStore
)
{
ASSERT
(
*
pStore
==
NULL
);
*
pStore
=
taosMemoryCalloc
(
1
,
sizeof
(
STbUidStore
));
if
(
*
pStore
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tsdbUidStoreFree
(
STbUidStore
*
pStore
);
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateTbReq
*
pReq
);
int32_t
tsdbFetchTbUidList
(
void
*
pTsdb
,
void
**
result
,
void
*
suid
,
void
*
uid
);
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pUidStore
);
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
void
*
pMsg
,
int32_t
inputType
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VNODE_TSDB_SMA_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
1d42f790
...
@@ -103,6 +103,8 @@ struct SVnode {
...
@@ -103,6 +103,8 @@ struct SVnode {
#define TD_VID(PVNODE) (PVNODE)->config.vgId
#define TD_VID(PVNODE) (PVNODE)->config.vgId
typedef
struct
STbDdlH
STbDdlH
;
// sma
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
...
@@ -116,6 +118,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data);
...
@@ -116,6 +118,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data);
#include "vnodeSync.h"
#include "vnodeSync.h"
#include "tsdbSma.h"
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/vnode/src/meta/metaTDBImpl.c
浏览文件 @
1d42f790
...
@@ -250,7 +250,7 @@ void metaCloseDB(SMeta *pMeta) {
...
@@ -250,7 +250,7 @@ void metaCloseDB(SMeta *pMeta) {
}
}
}
}
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
,
STbDdlH
*
pHandle
)
{
tb_uid_t
uid
;
tb_uid_t
uid
;
SMetaDB
*
pMetaDb
;
SMetaDB
*
pMetaDb
;
void
*
pKey
;
void
*
pKey
;
...
@@ -349,6 +349,12 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
...
@@ -349,6 +349,12 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
// child table handle for rsma
if
(
pHandle
&&
pHandle
->
fp
)
{
if
(((
*
pHandle
->
fp
)(
pHandle
->
ahandle
,
&
pHandle
->
result
,
&
ctbIdxKey
.
suid
,
&
uid
))
<
0
)
{
return
-
1
;
};
}
}
else
if
(
pTbCfg
->
type
==
META_NORMAL_TABLE
)
{
}
else
if
(
pTbCfg
->
type
==
META_NORMAL_TABLE
)
{
pKey
=
&
uid
;
pKey
=
&
uid
;
kLen
=
sizeof
(
uid
);
kLen
=
sizeof
(
uid
);
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
1d42f790
...
@@ -15,7 +15,7 @@
...
@@ -15,7 +15,7 @@
#include "vnodeInt.h"
#include "vnodeInt.h"
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
,
STbDdlH
*
pHandle
)
{
// Validate the tbOptions
// Validate the tbOptions
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
// // TODO: handle error
// // TODO: handle error
...
@@ -24,7 +24,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
...
@@ -24,7 +24,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
// TODO: add atomicity
// TODO: add atomicity
if
(
metaSaveTableToDB
(
pMeta
,
pTbCfg
)
<
0
)
{
if
(
metaSaveTableToDB
(
pMeta
,
pTbCfg
,
pHandle
)
<
0
)
{
// TODO: handle error
// TODO: handle error
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
1d42f790
...
@@ -81,8 +81,8 @@ static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg
...
@@ -81,8 +81,8 @@ static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg
static
void
tsdbFree
(
STsdb
*
pTsdb
)
{
static
void
tsdbFree
(
STsdb
*
pTsdb
)
{
if
(
pTsdb
)
{
if
(
pTsdb
)
{
//
tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb));
tsdbFreeSmaEnv
(
REPO_TSMA_ENV
(
pTsdb
));
//
tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb));
tsdbFreeSmaEnv
(
REPO_RSMA_ENV
(
pTsdb
));
tsdbFreeFS
(
pTsdb
->
fs
);
tsdbFreeFS
(
pTsdb
->
fs
);
taosMemoryFreeClear
(
pTsdb
->
path
);
taosMemoryFreeClear
(
pTsdb
->
path
);
taosMemoryFree
(
pTsdb
);
taosMemoryFree
(
pTsdb
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
1d42f790
...
@@ -3636,6 +3636,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
...
@@ -3636,6 +3636,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
tsdbError
(
"%p failed to get stable, uid:%"
PRIu64
", TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
pMeta
,
uid
,
taskId
,
reqId
);
tsdbError
(
"%p failed to get stable, uid:%"
PRIu64
", TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
pMeta
,
uid
,
taskId
,
reqId
);
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
goto
_error
;
goto
_error
;
}
else
{
tsdbDebug
(
"%p succeed to get stable, uid:%"
PRIu64
", TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
pMeta
,
uid
,
taskId
,
reqId
);
}
}
if
(
pTbCfg
->
type
!=
META_SUPER_TABLE
)
{
if
(
pTbCfg
->
type
!=
META_SUPER_TABLE
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
1d42f790
...
@@ -33,6 +33,8 @@ static const char *TSDB_SMA_DNAME[] = {
...
@@ -33,6 +33,8 @@ static const char *TSDB_SMA_DNAME[] = {
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
typedef
struct
SRSmaInfo
SRSmaInfo
;
typedef
enum
{
typedef
enum
{
SMA_STORAGE_LEVEL_TSDB
=
0
,
// use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
SMA_STORAGE_LEVEL_TSDB
=
0
,
// use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
SMA_STORAGE_LEVEL_DFILESET
=
1
// use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
SMA_STORAGE_LEVEL_DFILESET
=
1
// use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
...
@@ -46,6 +48,7 @@ typedef struct SPoolMem {
...
@@ -46,6 +48,7 @@ typedef struct SPoolMem {
struct
SSmaEnv
{
struct
SSmaEnv
{
TdThreadRwlock
lock
;
TdThreadRwlock
lock
;
int8_t
type
;
TXN
txn
;
TXN
txn
;
SPoolMem
*
pPool
;
SPoolMem
*
pPool
;
SDiskID
did
;
SDiskID
did
;
...
@@ -55,6 +58,7 @@ struct SSmaEnv {
...
@@ -55,6 +58,7 @@ struct SSmaEnv {
};
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_PATH(env) ((env)->path)
...
@@ -91,16 +95,45 @@ typedef struct {
...
@@ -91,16 +95,45 @@ typedef struct {
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* without information about its previous state.
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
* N.B. only applicable to tsma
*/
*/
int8_t
state
;
// ETsdbSmaStat
int8_t
state
;
// ETsdbSmaStat
SHashObj
*
expiredWindows
;
// key: skey of time window, value: N/A
SHashObj
*
expiredWindows
;
// key: skey of time window, value: N/A
STSma
*
pSma
;
// cache schema
STSma
*
pSma
;
// cache schema
}
SSmaStatItem
;
}
SSmaStatItem
;
#define RSMA_MAX_LEVEL 2
#define RSMA_TASK_INFO_HASH_SLOT 8
struct
SRSmaInfo
{
void
*
taskInfo
[
RSMA_MAX_LEVEL
];
// qTaskInfo_t
};
struct
SSmaStat
{
struct
SSmaStat
{
SHashObj
*
smaStatItems
;
// key: indexUid, value: SSmaStatItem
union
{
SHashObj
*
smaStatItems
;
// key: indexUid, value: SSmaStatItem for tsma
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
};
T_REF_DECLARE
()
T_REF_DECLARE
()
};
};
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
static
FORCE_INLINE
void
tsdbFreeTaskHandle
(
qTaskInfo_t
*
taskHandle
)
{
// Note: free/kill may in RC
qTaskInfo_t
otaskHandle
=
atomic_load_ptr
(
taskHandle
);
if
(
otaskHandle
&&
atomic_val_compare_exchange_ptr
(
taskHandle
,
otaskHandle
,
NULL
))
{
qDestroyTask
(
otaskHandle
);
}
}
static
FORCE_INLINE
void
*
tsdbFreeRSmaInfo
(
SRSmaInfo
*
pInfo
)
{
for
(
int32_t
i
=
0
;
i
<
RSMA_MAX_LEVEL
;
++
i
)
{
if
(
pInfo
->
taskInfo
[
i
])
{
tsdbFreeTaskHandle
(
pInfo
->
taskInfo
[
i
]);
}
}
return
NULL
;
}
// declaration of static functions
// declaration of static functions
...
@@ -108,11 +141,11 @@ struct SSmaStat {
...
@@ -108,11 +141,11 @@ struct SSmaStat {
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
int64_t
version
);
int64_t
version
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
const
char
*
path
,
SDiskID
did
);
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
);
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
const
char
*
path
,
SDiskID
did
,
SSmaEnv
**
pEnv
);
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
,
SSmaEnv
**
pEnv
);
static
int32_t
tsdbResetExpiredWindow
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
,
int64_t
indexUid
,
TSKEY
skey
);
static
int32_t
tsdbResetExpiredWindow
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
,
int64_t
indexUid
,
TSKEY
skey
);
static
int32_t
tsdbRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
);
static
int32_t
tsdbRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
);
static
int32_t
tsdbUnRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
);
static
int32_t
tsdbUnRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
);
...
@@ -139,6 +172,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
...
@@ -139,6 +172,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
const
char
*
msg
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
const
char
*
msg
);
static
FORCE_INLINE
int32_t
tsdbUpdateTbUidListImpl
(
STsdb
*
pTsdb
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
// mgmt interface
// mgmt interface
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
...
@@ -229,7 +263,7 @@ static void *poolMalloc(void *arg, size_t size) {
...
@@ -229,7 +263,7 @@ static void *poolMalloc(void *arg, size_t size) {
SPoolMem
*
pMem
;
SPoolMem
*
pMem
;
pMem
=
(
SPoolMem
*
)
tdbOsMalloc
(
sizeof
(
*
pMem
)
+
size
);
pMem
=
(
SPoolMem
*
)
tdbOsMalloc
(
sizeof
(
*
pMem
)
+
size
);
if
(
pMem
==
NULL
)
{
if
(
!
pMem
)
{
assert
(
0
);
assert
(
0
);
}
}
...
@@ -317,15 +351,17 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
...
@@ -317,15 +351,17 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s"
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
TSDB_SMA_DNAME
[
smaType
]);
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s"
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
TSDB_SMA_DNAME
[
smaType
]);
}
}
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
const
char
*
path
,
SDiskID
did
)
{
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
)
{
SSmaEnv
*
pEnv
=
NULL
;
SSmaEnv
*
pEnv
=
NULL
;
pEnv
=
(
SSmaEnv
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSmaEnv
));
pEnv
=
(
SSmaEnv
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSmaEnv
));
if
(
pEnv
==
NULL
)
{
if
(
!
pEnv
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
SMA_ENV_TYPE
(
pEnv
)
=
smaType
;
int
code
=
taosThreadRwlockInit
(
&
(
pEnv
->
lock
),
NULL
);
int
code
=
taosThreadRwlockInit
(
&
(
pEnv
->
lock
),
NULL
);
if
(
code
)
{
if
(
code
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
...
@@ -334,15 +370,15 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
...
@@ -334,15 +370,15 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
}
}
ASSERT
(
path
&&
(
strlen
(
path
)
>
0
));
ASSERT
(
path
&&
(
strlen
(
path
)
>
0
));
pEnv
->
path
=
strdup
(
path
);
SMA_ENV_PATH
(
pEnv
)
=
strdup
(
path
);
if
(
pEnv
->
path
==
NULL
)
{
if
(
!
SMA_ENV_PATH
(
pEnv
)
)
{
tsdbFreeSmaEnv
(
pEnv
);
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
return
NULL
;
}
}
pEnv
->
did
=
did
;
SMA_ENV_DID
(
pEnv
)
=
did
;
if
(
tsdbInitSmaStat
(
&
pEnv
->
pStat
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbInitSmaStat
(
&
SMA_ENV_STAT
(
pEnv
),
smaType
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbFreeSmaEnv
(
pEnv
);
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
return
NULL
;
}
}
...
@@ -354,7 +390,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
...
@@ -354,7 +390,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
return
NULL
;
return
NULL
;
}
}
if
(
(
pEnv
->
pPool
=
openPool
())
==
NULL
)
{
if
(
!
(
pEnv
->
pPool
=
openPool
())
)
{
tsdbFreeSmaEnv
(
pEnv
);
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
return
NULL
;
}
}
...
@@ -362,14 +398,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
...
@@ -362,14 +398,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
return
pEnv
;
return
pEnv
;
}
}
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
const
char
*
path
,
SDiskID
did
,
SSmaEnv
**
pEnv
)
{
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
,
SSmaEnv
**
pEnv
)
{
if
(
!
pEnv
)
{
if
(
!
pEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
*
pEnv
==
NULL
)
{
if
(
!
(
*
pEnv
)
)
{
if
(
(
*
pEnv
=
tsdbNewSmaEnv
(
pTsdb
,
path
,
did
))
==
NULL
)
{
if
(
!
(
*
pEnv
=
tsdbNewSmaEnv
(
pTsdb
,
smaType
,
path
,
did
))
)
{
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
}
}
...
@@ -385,7 +421,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaE
...
@@ -385,7 +421,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaE
*/
*/
void
tsdbDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
)
{
void
tsdbDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
)
{
if
(
pSmaEnv
)
{
if
(
pSmaEnv
)
{
tsdbDestroySmaState
(
pSmaEnv
->
pStat
);
tsdbDestroySmaState
(
pSmaEnv
->
pStat
,
SMA_ENV_TYPE
(
pSmaEnv
)
);
taosMemoryFreeClear
(
pSmaEnv
->
pStat
);
taosMemoryFreeClear
(
pSmaEnv
->
pStat
);
taosMemoryFreeClear
(
pSmaEnv
->
path
);
taosMemoryFreeClear
(
pSmaEnv
->
path
);
taosThreadRwlockDestroy
(
&
(
pSmaEnv
->
lock
));
taosThreadRwlockDestroy
(
&
(
pSmaEnv
->
lock
));
...
@@ -401,7 +437,7 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
...
@@ -401,7 +437,7 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
}
}
static
int32_t
tsdbRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
)
{
static
int32_t
tsdbRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
)
{
if
(
pStat
==
NULL
)
return
0
;
if
(
!
pStat
)
return
0
;
int
ref
=
T_REF_INC
(
pStat
);
int
ref
=
T_REF_INC
(
pStat
);
tsdbDebug
(
"vgId:%d ref sma stat:%p, val:%d"
,
REPO_ID
(
pTsdb
),
pStat
,
ref
);
tsdbDebug
(
"vgId:%d ref sma stat:%p, val:%d"
,
REPO_ID
(
pTsdb
),
pStat
,
ref
);
...
@@ -409,17 +445,17 @@ static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
...
@@ -409,17 +445,17 @@ static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
}
}
static
int32_t
tsdbUnRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
)
{
static
int32_t
tsdbUnRefSmaStat
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
)
{
if
(
pStat
==
NULL
)
return
0
;
if
(
!
pStat
)
return
0
;
int
ref
=
T_REF_DEC
(
pStat
);
int
ref
=
T_REF_DEC
(
pStat
);
tsdbDebug
(
"vgId:%d unref sma stat:%p, val:%d"
,
REPO_ID
(
pTsdb
),
pStat
,
ref
);
tsdbDebug
(
"vgId:%d unref sma stat:%p, val:%d"
,
REPO_ID
(
pTsdb
),
pStat
,
ref
);
return
0
;
return
0
;
}
}
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
)
{
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
)
{
ASSERT
(
pSmaStat
!=
NULL
);
ASSERT
(
pSmaStat
!=
NULL
);
if
(
*
pSmaStat
!=
NULL
)
{
// no lock
if
(
*
pSmaStat
)
{
// no lock
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -428,19 +464,31 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
...
@@ -428,19 +464,31 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
* tsdbInitSmaStat invoked in other multithread environment later.
* tsdbInitSmaStat invoked in other multithread environment later.
*/
*/
if
(
*
pSmaStat
==
NULL
)
{
if
(
!
(
*
pSmaStat
)
)
{
*
pSmaStat
=
(
SSmaStat
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSmaStat
));
*
pSmaStat
=
(
SSmaStat
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSmaStat
));
if
(
*
pSmaStat
==
NULL
)
{
if
(
!
(
*
pSmaStat
)
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
(
*
pSmaStat
)
->
smaStatItems
=
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
taosHashInit
(
SMA_STATE_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
SMA_STAT_INFO_HASH
(
*
pSmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
((
*
pSmaStat
)
->
smaStatItems
==
NULL
)
{
if
(
!
SMA_STAT_INFO_HASH
(
*
pSmaStat
))
{
taosMemoryFreeClear
(
*
pSmaStat
);
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
else
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
SMA_STAT_ITEMS
(
*
pSmaStat
)
=
taosHashInit
(
SMA_STATE_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
SMA_STAT_ITEMS
(
*
pSmaStat
))
{
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
}
}
else
{
ASSERT
(
0
);
}
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -462,7 +510,7 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
...
@@ -462,7 +510,7 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
}
}
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
)
{
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
)
{
if
(
pSmaStatItem
!=
NULL
)
{
if
(
pSmaStatItem
)
{
tdDestroyTSma
(
pSmaStatItem
->
pSma
);
tdDestroyTSma
(
pSmaStatItem
->
pSma
);
taosMemoryFreeClear
(
pSmaStatItem
->
pSma
);
taosMemoryFreeClear
(
pSmaStatItem
->
pSma
);
taosHashCleanup
(
pSmaStatItem
->
expiredWindows
);
taosHashCleanup
(
pSmaStatItem
->
expiredWindows
);
...
@@ -477,16 +525,28 @@ static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
...
@@ -477,16 +525,28 @@ static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
* @param pSmaStat
* @param pSmaStat
* @return int32_t
* @return int32_t
*/
*/
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
)
{
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
if
(
pSmaStat
)
{
if
(
pSmaStat
)
{
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void
*
item
=
taosHashIterate
(
pSmaStat
->
smaStatItems
,
NULL
);
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
while
(
item
!=
NULL
)
{
void
*
item
=
taosHashIterate
(
SMA_STAT_ITEMS
(
pSmaStat
),
NULL
);
SSmaStatItem
*
pItem
=
*
(
SSmaStatItem
**
)
item
;
while
(
item
)
{
tsdbFreeSmaStatItem
(
pItem
);
SSmaStatItem
*
pItem
=
*
(
SSmaStatItem
**
)
item
;
item
=
taosHashIterate
(
pSmaStat
->
smaStatItems
,
item
);
tsdbFreeSmaStatItem
(
pItem
);
item
=
taosHashIterate
(
SMA_STAT_ITEMS
(
pSmaStat
),
item
);
}
taosHashCleanup
(
SMA_STAT_ITEMS
(
pSmaStat
));
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
void
*
infoHash
=
taosHashIterate
(
SMA_STAT_INFO_HASH
(
pSmaStat
),
NULL
);
while
(
infoHash
)
{
SRSmaInfo
*
pInfoHash
=
*
(
SRSmaInfo
**
)
infoHash
;
tsdbFreeRSmaInfo
(
pInfoHash
);
infoHash
=
taosHashIterate
(
SMA_STAT_INFO_HASH
(
pSmaStat
),
infoHash
);
}
taosHashCleanup
(
SMA_STAT_INFO_HASH
(
pSmaStat
));
}
else
{
ASSERT
(
0
);
}
}
taosHashCleanup
(
pSmaStat
->
smaStatItems
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -497,12 +557,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
...
@@ -497,12 +557,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
// return if already init
// return if already init
switch
(
smaType
)
{
switch
(
smaType
)
{
case
TSDB_SMA_TYPE_TIME_RANGE
:
case
TSDB_SMA_TYPE_TIME_RANGE
:
if
((
pEnv
=
(
SSmaEnv
*
)
atomic_load_ptr
(
&
REPO_TSMA_ENV
(
pTsdb
)))
!=
NULL
)
{
if
((
pEnv
=
(
SSmaEnv
*
)
atomic_load_ptr
(
&
REPO_TSMA_ENV
(
pTsdb
))))
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
break
;
break
;
case
TSDB_SMA_TYPE_ROLLUP
:
case
TSDB_SMA_TYPE_ROLLUP
:
if
((
pEnv
=
(
SSmaEnv
*
)
atomic_load_ptr
(
&
REPO_RSMA_ENV
(
pTsdb
)))
!=
NULL
)
{
if
((
pEnv
=
(
SSmaEnv
*
)
atomic_load_ptr
(
&
REPO_RSMA_ENV
(
pTsdb
))))
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
break
;
break
;
...
@@ -515,7 +575,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
...
@@ -515,7 +575,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
tsdbLockRepo
(
pTsdb
);
tsdbLockRepo
(
pTsdb
);
pEnv
=
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
?
atomic_load_ptr
(
&
REPO_TSMA_ENV
(
pTsdb
))
pEnv
=
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
?
atomic_load_ptr
(
&
REPO_TSMA_ENV
(
pTsdb
))
:
atomic_load_ptr
(
&
REPO_RSMA_ENV
(
pTsdb
));
:
atomic_load_ptr
(
&
REPO_RSMA_ENV
(
pTsdb
));
if
(
pEnv
==
NULL
)
{
if
(
!
pEnv
)
{
char
rname
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
rname
[
TSDB_FILENAME_LEN
]
=
{
0
};
SDiskID
did
=
{
0
};
SDiskID
did
=
{
0
};
...
@@ -531,7 +591,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
...
@@ -531,7 +591,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
tsdbInitSmaEnv
(
pTsdb
,
rname
,
did
,
&
pEnv
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbInitSmaEnv
(
pTsdb
,
smaType
,
rname
,
did
,
&
pEnv
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbUnlockRepo
(
pTsdb
);
tsdbUnlockRepo
(
pTsdb
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -547,10 +607,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
...
@@ -547,10 +607,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
int64_t
version
)
{
int64_t
version
)
{
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
if
(
!
pItem
)
{
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_OK
);
// TODO use the real state
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_OK
);
// TODO use the real state
if
(
pItem
==
NULL
)
{
if
(
!
pItem
)
{
// Response to stream computing: OOM
// Response to stream computing: OOM
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
...
@@ -558,7 +618,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
...
@@ -558,7 +618,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
// cache smaMeta
// cache smaMeta
STSma
*
pSma
=
metaGetSmaInfoByIndex
(
REPO_META
(
pTsdb
),
indexUid
,
true
);
STSma
*
pSma
=
metaGetSmaInfoByIndex
(
REPO_META
(
pTsdb
),
indexUid
,
true
);
if
(
pSma
==
NULL
)
{
if
(
!
pSma
)
{
terrno
=
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
;
terrno
=
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
;
taosHashCleanup
(
pItem
->
expiredWindows
);
taosHashCleanup
(
pItem
->
expiredWindows
);
taosMemoryFree
(
pItem
);
taosMemoryFree
(
pItem
);
...
@@ -574,7 +634,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
...
@@ -574,7 +634,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
taosMemoryFree
(
pItem
);
taosMemoryFree
(
pItem
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
}
else
if
(
(
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
==
NULL
)
{
}
else
if
(
!
(
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -634,7 +694,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
...
@@ -634,7 +694,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
pItemsHash
=
SMA_ENV_STAT_ITEMS
(
pEnv
);
SHashObj
*
pItemsHash
=
SMA_ENV_STAT_ITEMS
(
pEnv
);
TASSERT
(
pEnv
!=
NULL
&&
pStat
!=
NULL
&&
pItemsHash
!=
NULL
);
TASSERT
(
pEnv
&&
pStat
&&
pItemsHash
);
// basic procedure
// basic procedure
// TODO: optimization
// TODO: optimization
...
@@ -651,7 +711,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
...
@@ -651,7 +711,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
while
(
true
)
{
while
(
true
)
{
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
if
(
!
pBlock
)
break
;
STSmaWrapper
*
pSW
=
NULL
;
STSmaWrapper
*
pSW
=
NULL
;
STSma
*
pTSma
=
NULL
;
STSma
*
pTSma
=
NULL
;
...
@@ -664,7 +724,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
...
@@ -664,7 +724,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
while
(
true
)
{
while
(
true
)
{
STSRow
*
row
=
tGetSubmitBlkNext
(
&
blkIter
);
STSRow
*
row
=
tGetSubmitBlkNext
(
&
blkIter
);
if
(
row
==
NULL
)
{
if
(
!
row
)
{
tdFreeTSmaWrapper
(
pSW
);
tdFreeTSmaWrapper
(
pSW
);
break
;
break
;
}
}
...
@@ -672,10 +732,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
...
@@ -672,10 +732,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
if
(
pSW
)
{
if
(
pSW
)
{
pSW
=
tdFreeTSmaWrapper
(
pSW
);
pSW
=
tdFreeTSmaWrapper
(
pSW
);
}
}
if
(
(
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
pBlock
->
suid
))
==
NULL
)
{
if
(
!
(
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
pBlock
->
suid
))
)
{
break
;
break
;
}
}
if
((
pSW
->
number
)
<=
0
||
(
pSW
->
tSma
==
NULL
)
)
{
if
((
pSW
->
number
)
<=
0
||
!
pSW
->
tSma
)
{
pSW
=
tdFreeTSmaWrapper
(
pSW
);
pSW
=
tdFreeTSmaWrapper
(
pSW
);
break
;
break
;
}
}
...
@@ -721,10 +781,10 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind
...
@@ -721,10 +781,10 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind
tsdbRefSmaStat
(
pTsdb
,
pStat
);
tsdbRefSmaStat
(
pTsdb
,
pStat
);
if
(
pStat
&&
pStat
->
smaStatItems
)
{
if
(
pStat
&&
SMA_STAT_ITEMS
(
pStat
)
)
{
pItem
=
taosHashGet
(
pStat
->
smaStatItems
,
&
indexUid
,
sizeof
(
indexUid
));
pItem
=
taosHashGet
(
SMA_STAT_ITEMS
(
pStat
)
,
&
indexUid
,
sizeof
(
indexUid
));
}
}
if
((
pItem
!=
NULL
)
&&
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
!=
NULL
))
{
if
((
pItem
)
&&
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
))
{
// pItem resides in hash buffer all the time unless drop sma index
// pItem resides in hash buffer all the time unless drop sma index
// TODO: multithread protect
// TODO: multithread protect
if
(
taosHashRemove
(
pItem
->
expiredWindows
,
&
skey
,
sizeof
(
TSKEY
))
!=
0
)
{
if
(
taosHashRemove
(
pItem
->
expiredWindows
,
&
skey
,
sizeof
(
TSKEY
))
!=
0
)
{
...
@@ -934,7 +994,7 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
...
@@ -934,7 +994,7 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int64_t
indexUid
,
int32_t
fid
)
{
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int64_t
indexUid
,
int32_t
fid
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
ASSERT
(
pSmaH
->
dFile
.
path
==
NULL
&&
pSmaH
->
dFile
.
pDB
==
NULL
);
ASSERT
(
!
pSmaH
->
dFile
.
path
&&
!
pSmaH
->
dFile
.
pDB
);
pSmaH
->
dFile
.
fid
=
fid
;
pSmaH
->
dFile
.
fid
=
fid
;
char
tSmaFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tSmaFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
...
@@ -1004,6 +1064,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
...
@@ -1004,6 +1064,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
// TODO: destroy SSDataBlocks(msg)
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// the sma data would arrive ahead of the update-expired-window msg.
// the sma data would arrive ahead of the update-expired-window msg.
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -1011,7 +1073,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
...
@@ -1011,7 +1073,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
pDataBlocks
==
NULL
)
{
if
(
!
pDataBlocks
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert tSma data failed since pDataBlocks is NULL"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert tSma data failed since pDataBlocks is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
return
terrno
;
...
@@ -1029,11 +1091,11 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
...
@@ -1029,11 +1091,11 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
tsdbRefSmaStat
(
pTsdb
,
pStat
);
tsdbRefSmaStat
(
pTsdb
,
pStat
);
if
(
pStat
&&
pStat
->
smaStatItems
)
{
if
(
pStat
&&
SMA_STAT_ITEMS
(
pStat
)
)
{
pItem
=
taosHashGet
(
pStat
->
smaStatItems
,
&
indexUid
,
sizeof
(
indexUid
));
pItem
=
taosHashGet
(
SMA_STAT_ITEMS
(
pStat
)
,
&
indexUid
,
sizeof
(
indexUid
));
}
}
if
(
(
pItem
==
NULL
)
||
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
==
NULL
)
||
tsdbSmaStatIsDropped
(
pItem
))
{
if
(
!
pItem
||
!
(
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
||
tsdbSmaStatIsDropped
(
pItem
))
{
terrno
=
TSDB_CODE_TDB_INVALID_SMA_STAT
;
terrno
=
TSDB_CODE_TDB_INVALID_SMA_STAT
;
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
...
@@ -1061,9 +1123,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
...
@@ -1061,9 +1123,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
int32_t
storageLevel
=
tsdbGetSmaStorageLevel
(
pSma
->
interval
,
pSma
->
intervalUnit
);
int32_t
storageLevel
=
tsdbGetSmaStorageLevel
(
pSma
->
interval
,
pSma
->
intervalUnit
);
int32_t
daysPerFile
=
tsdbGetTSmaDays
(
pTsdb
,
tSmaH
.
interval
,
storageLevel
);
int32_t
daysPerFile
=
tsdbGetTSmaDays
(
pTsdb
,
tSmaH
.
interval
,
storageLevel
);
// key: skey + groupId
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
// key: skey + groupId
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
char
dataBuf
[
512
]
=
{
0
};
// val: aggr data // TODO: handle 512 buffer?
char
dataBuf
[
512
]
=
{
0
};
void
*
pDataBuf
=
NULL
;
void
*
pDataBuf
=
NULL
;
int32_t
sz
=
taosArrayGetSize
(
pDataBlocks
);
int32_t
sz
=
taosArrayGetSize
(
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
...
@@ -1228,7 +1289,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
...
@@ -1228,7 +1289,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
tsdbDebug
(
"vgId:%d drop tSma local cache for %"
PRIi64
,
REPO_ID
(
pTsdb
),
indexUid
);
tsdbDebug
(
"vgId:%d drop tSma local cache for %"
PRIi64
,
REPO_ID
(
pTsdb
),
indexUid
);
SSmaStatItem
*
pItem
=
taosHashGet
(
SMA_ENV_STAT_ITEMS
(
pEnv
),
&
indexUid
,
sizeof
(
indexUid
));
SSmaStatItem
*
pItem
=
taosHashGet
(
SMA_ENV_STAT_ITEMS
(
pEnv
),
&
indexUid
,
sizeof
(
indexUid
));
if
((
pItem
!=
NULL
)
||
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
!=
NULL
))
{
if
((
pItem
)
||
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
))
{
if
(
tsdbSmaStatIsDropped
(
pItem
))
{
if
(
tsdbSmaStatIsDropped
(
pItem
))
{
tsdbDebug
(
"vgId:%d tSma stat is already dropped for %"
PRIi64
,
REPO_ID
(
pTsdb
),
indexUid
);
tsdbDebug
(
"vgId:%d tSma stat is already dropped for %"
PRIi64
,
REPO_ID
(
pTsdb
),
indexUid
);
return
TSDB_CODE_TDB_INVALID_ACTION
;
// TODO: duplicate drop msg would be intercepted by mnode
return
TSDB_CODE_TDB_INVALID_ACTION
;
// TODO: duplicate drop msg would be intercepted by mnode
...
@@ -1284,19 +1345,13 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
...
@@ -1284,19 +1345,13 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
REPO_RSMA_ENV
(
pTsdb
));
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
REPO_RSMA_ENV
(
pTsdb
));
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
if
(
pEnv
==
NULL
)
{
if
(
!
pEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert rSma data failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
}
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert rSma data failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert rSma data failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
return
terrno
;
}
}
if
(
pDataBlocks
==
NULL
)
{
if
(
!
pDataBlocks
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert rSma data failed since pDataBlocks is NULL"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert rSma data failed since pDataBlocks is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
return
terrno
;
...
@@ -1313,11 +1368,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
...
@@ -1313,11 +1368,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
tsdbRefSmaStat
(
pTsdb
,
pStat
);
tsdbRefSmaStat
(
pTsdb
,
pStat
);
if
(
pStat
&&
pStat
->
smaStatItems
)
{
if
(
pStat
&&
SMA_STAT_ITEMS
(
pStat
)
)
{
pItem
=
taosHashGet
(
pStat
->
smaStatItems
,
&
indexUid
,
sizeof
(
indexUid
));
pItem
=
taosHashGet
(
SMA_STAT_ITEMS
(
pStat
)
,
&
indexUid
,
sizeof
(
indexUid
));
}
}
if
(
(
pItem
==
NULL
)
||
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
==
NULL
)
||
tsdbSmaStatIsDropped
(
pItem
))
{
if
(
!
pItem
||
!
(
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
||
tsdbSmaStatIsDropped
(
pItem
))
{
terrno
=
TSDB_CODE_TDB_INVALID_SMA_STAT
;
terrno
=
TSDB_CODE_TDB_INVALID_SMA_STAT
;
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
...
@@ -1438,7 +1493,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
...
@@ -1438,7 +1493,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
++pReadH->smaFsIter.iter;
++pReadH->smaFsIter.iter;
}
}
if (pReadH->pDFile
!= NULL
) {
if (pReadH->pDFile) {
tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
return true;
return true;
}
}
...
@@ -1471,7 +1526,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
...
@@ -1471,7 +1526,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
tsdbRefSmaStat
(
pTsdb
,
pStat
);
tsdbRefSmaStat
(
pTsdb
,
pStat
);
SSmaStatItem
*
pItem
=
taosHashGet
(
SMA_ENV_STAT_ITEMS
(
pEnv
),
&
indexUid
,
sizeof
(
indexUid
));
SSmaStatItem
*
pItem
=
taosHashGet
(
SMA_ENV_STAT_ITEMS
(
pEnv
),
&
indexUid
,
sizeof
(
indexUid
));
if
(
(
pItem
==
NULL
)
||
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
==
NULL
))
{
if
(
!
pItem
||
!
(
pItem
=
*
(
SSmaStatItem
**
)
pItem
))
{
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
// it's NULL.
// it's NULL.
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
...
@@ -1484,7 +1539,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
...
@@ -1484,7 +1539,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
for (int32_t n = 0; n < nQueryWin; ++n) {
for (int32_t n = 0; n < nQueryWin; ++n) {
TSKEY skey = taosArrayGet(pQuerySKey, n);
TSKEY skey = taosArrayGet(pQuerySKey, n);
if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))
!= NULL
) {
if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) {
// TODO: mark this window as expired.
// TODO: mark this window as expired.
}
}
}
}
...
@@ -1500,7 +1555,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
...
@@ -1500,7 +1555,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
taosHashGet
(
pItem
->
expiredWindows
,
&
querySKey
,
sizeof
(
TSKEY
))
!=
NULL
)
{
if
(
taosHashGet
(
pItem
->
expiredWindows
,
&
querySKey
,
sizeof
(
TSKEY
)))
{
// TODO: mark this window as expired.
// TODO: mark this window as expired.
tsdbDebug
(
"vgId:%d skey %"
PRIi64
" of window exists in expired window for index %"
PRIi64
,
REPO_ID
(
pTsdb
),
tsdbDebug
(
"vgId:%d skey %"
PRIi64
" of window exists in expired window for index %"
PRIi64
,
REPO_ID
(
pTsdb
),
querySKey
,
indexUid
);
querySKey
,
indexUid
);
...
@@ -1534,7 +1589,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
...
@@ -1534,7 +1589,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
void
*
result
=
NULL
;
void
*
result
=
NULL
;
int32_t
valueSize
=
0
;
int32_t
valueSize
=
0
;
if
(
(
result
=
tsdbGetSmaDataByKey
(
&
tReadH
.
dFile
,
smaKey
,
SMA_KEY_LEN
,
&
valueSize
))
==
NULL
)
{
if
(
!
(
result
=
tsdbGetSmaDataByKey
(
&
tReadH
.
dFile
,
smaKey
,
SMA_KEY_LEN
,
&
valueSize
))
)
{
tsdbWarn
(
"vgId:%d get sma data failed from smaIndex %"
PRIi64
", smaKey %"
PRIx64
"-%"
PRIx64
" since %s"
,
tsdbWarn
(
"vgId:%d get sma data failed from smaIndex %"
PRIi64
", smaKey %"
PRIx64
"-%"
PRIx64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
*
(
int64_t
*
)
smaKey
,
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
tstrerror
(
terrno
));
REPO_ID
(
pTsdb
),
indexUid
,
*
(
int64_t
*
)
smaKey
,
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
tstrerror
(
terrno
));
tsdbCloseDBF
(
&
tReadH
.
dFile
);
tsdbCloseDBF
(
&
tReadH
.
dFile
);
...
@@ -1578,7 +1633,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
...
@@ -1578,7 +1633,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
)
{
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
)
{
SSmaCfg
vCreateSmaReq
=
{
0
};
SSmaCfg
vCreateSmaReq
=
{
0
};
if
(
tDeserializeSVCreateTSmaReq
(
pMsg
,
&
vCreateSmaReq
)
==
NULL
)
{
if
(
!
tDeserializeSVCreateTSmaReq
(
pMsg
,
&
vCreateSmaReq
)
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tsdbWarn
(
"vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s"
,
REPO_ID
(
pTsdb
),
terrstr
(
terrno
));
tsdbWarn
(
"vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s"
,
REPO_ID
(
pTsdb
),
terrstr
(
terrno
));
return
-
1
;
return
-
1
;
...
@@ -1604,7 +1659,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
...
@@ -1604,7 +1659,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
)
{
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
)
{
SVDropTSmaReq
vDropSmaReq
=
{
0
};
SVDropTSmaReq
vDropSmaReq
=
{
0
};
if
(
tDeserializeSVDropTSmaReq
(
pMsg
,
&
vDropSmaReq
)
==
NULL
)
{
if
(
!
tDeserializeSVDropTSmaReq
(
pMsg
,
&
vDropSmaReq
)
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
...
@@ -1632,6 +1687,395 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
...
@@ -1632,6 +1687,395 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
*
* @param pTsdb
* @param pMeta
* @param pReq
* @return int32_t
*/
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateTbReq
*
pReq
)
{
SRSmaParam
*
param
=
pReq
->
stbCfg
.
pRSmaParam
;
if
(
!
param
)
{
tsdbDebug
(
"vgId:%d return directly since no rollup for stable %s %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
stbCfg
.
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
((
param
->
qmsg1Len
==
0
)
&&
(
param
->
qmsg2Len
==
0
))
{
tsdbWarn
(
"vgId:%d no qmsg1/qmsg2 for rollup stable %s %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
stbCfg
.
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_ROLLUP
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
(
pTsdb
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
&
pReq
->
stbCfg
.
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
tsdbWarn
(
"vgId:%d rsma info already exists for stb: %s, %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
stbCfg
.
suid
);
return
TSDB_CODE_SUCCESS
;
}
pRSmaInfo
=
(
SRSmaInfo
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SRSmaInfo
));
if
(
!
pRSmaInfo
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pMeta
);
if
(
!
pReadHandle
)
{
taosMemoryFree
(
pRSmaInfo
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pMeta
,
};
if
(
param
->
qmsg1
)
{
pRSmaInfo
->
taskInfo
[
0
]
=
qCreateStreamExecTaskInfo
(
param
->
qmsg1
,
&
handle
);
if
(
!
pRSmaInfo
->
taskInfo
[
0
])
{
taosMemoryFree
(
pRSmaInfo
);
taosMemoryFree
(
pReadHandle
);
return
TSDB_CODE_FAILED
;
}
}
if
(
param
->
qmsg2
)
{
pRSmaInfo
->
taskInfo
[
1
]
=
qCreateStreamExecTaskInfo
(
param
->
qmsg2
,
&
handle
);
if
(
!
pRSmaInfo
->
taskInfo
[
1
])
{
taosMemoryFree
(
pRSmaInfo
);
taosMemoryFree
(
pReadHandle
);
return
TSDB_CODE_FAILED
;
}
}
if
(
taosHashPut
(
SMA_STAT_INFO_HASH
(
pStat
),
&
pReq
->
stbCfg
.
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_FAILED
;
}
else
{
tsdbDebug
(
"vgId:%d register rsma info succeed for suid:%"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
stbCfg
.
suid
);
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief store suid/[uids], prefer to use array and then hash
*
* @param pStore
* @param suid
* @param uid
* @return int32_t
*/
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
)
{
// prefer to store suid/uids in array
if
((
suid
==
pStore
->
suid
)
||
(
pStore
->
suid
==
0
))
{
if
(
pStore
->
suid
==
0
)
{
pStore
->
suid
=
suid
;
}
if
(
uid
)
{
if
(
!
pStore
->
tbUids
)
{
if
(
!
(
pStore
->
tbUids
=
taosArrayInit
(
1
,
sizeof
(
tb_uid_t
))))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
}
if
(
!
taosArrayPush
(
pStore
->
tbUids
,
&
uid
))
{
return
TSDB_CODE_FAILED
;
}
}
}
else
{
// store other suid/uids in hash when multiple stable/table included in 1 batch of request
if
(
!
pStore
->
uidHash
)
{
pStore
->
uidHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
!
pStore
->
uidHash
)
{
return
TSDB_CODE_FAILED
;
}
}
if
(
uid
)
{
SArray
*
uidArray
=
taosHashGet
(
pStore
->
uidHash
,
&
suid
,
sizeof
(
tb_uid_t
));
if
(
uidArray
&&
((
uidArray
=
*
(
SArray
**
)
uidArray
)))
{
taosArrayPush
(
uidArray
,
&
uid
);
}
else
{
SArray
*
pUidArray
=
taosArrayInit
(
1
,
sizeof
(
tb_uid_t
));
if
(
!
pUidArray
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
if
(
!
taosArrayPush
(
pUidArray
,
&
uid
))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
if
(
taosHashPut
(
pStore
->
uidHash
,
&
suid
,
sizeof
(
suid
),
&
pUidArray
,
sizeof
(
pUidArray
))
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
}
}
else
{
if
(
taosHashPut
(
pStore
->
uidHash
,
&
suid
,
sizeof
(
suid
),
NULL
,
0
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
}
}
return
TSDB_CODE_SUCCESS
;
}
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
)
{
if
(
pStore
)
{
if
(
pStore
->
uidHash
)
{
if
(
pStore
->
tbUids
)
{
void
*
pIter
=
taosHashIterate
(
pStore
->
uidHash
,
NULL
);
while
(
pIter
)
{
SArray
*
arr
=
*
(
SArray
**
)
pIter
;
taosArrayDestroy
(
arr
);
pIter
=
taosHashIterate
(
pStore
->
uidHash
,
pIter
);
}
}
taosHashCleanup
(
pStore
->
uidHash
);
}
taosArrayDestroy
(
pStore
->
tbUids
);
}
}
void
*
tsdbUidStoreFree
(
STbUidStore
*
pStore
)
{
tsdbUidStoreDestory
(
pStore
);
taosMemoryFree
(
pStore
);
return
NULL
;
}
/**
* @brief fetch suid/uids when create child tables of rollup SMA
*
* @param pTsdb
* @param ppStore
* @param suid
* @param uid
* @return int32_t
*/
int32_t
tsdbFetchTbUidList
(
void
*
pTsdb
,
void
**
ppStore
,
void
*
suid
,
void
*
uid
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
((
STsdb
*
)
pTsdb
);
// only applicable to rollup SMA ctables
if
(
!
pEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
infoHash
=
NULL
;
if
(
!
pStat
||
!
(
infoHash
=
SMA_STAT_INFO_HASH
(
pStat
)))
{
terrno
=
TSDB_CODE_TDB_INVALID_SMA_STAT
;
return
TSDB_CODE_FAILED
;
}
// info cached when create rsma stable and return directly for non-rsma ctables
if
(
!
taosHashGet
(
infoHash
,
suid
,
sizeof
(
tb_uid_t
)))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
!
(
*
ppStore
))
{
if
(
tsdbUidStoreInit
((
STbUidStore
**
)
ppStore
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
}
if
(
tsdbUidStorePut
(
*
ppStore
,
*
(
tb_uid_t
*
)
suid
,
(
tb_uid_t
*
)
uid
)
!=
0
)
{
*
ppStore
=
tsdbUidStoreFree
(
*
ppStore
);
return
TSDB_CODE_FAILED
;
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int32_t
tsdbUpdateTbUidListImpl
(
STsdb
*
pTsdb
,
tb_uid_t
*
suid
,
SArray
*
tbUids
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
(
pTsdb
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
if
(
!
suid
||
!
tbUids
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbError
(
"vgId:%d failed to get rsma info for uid:%"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
*
suid
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
}
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
tsdbError
(
"vgId:%d failed to get rsma info for uid:%"
PRIi64
,
REPO_ID
(
pTsdb
),
*
suid
);
terrno
=
TSDB_CODE_TDB_INVALID_SMA_STAT
;
return
TSDB_CODE_FAILED
;
}
if
(
pRSmaInfo
->
taskInfo
[
0
]
&&
(
qUpdateQualifiedTableId
(
pRSmaInfo
->
taskInfo
[
0
],
tbUids
,
true
)
!=
0
))
{
tsdbError
(
"vgId:%d update tbUidList failed for uid:%"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
*
suid
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
}
else
{
tsdbDebug
(
"vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
REPO_ID
(
pTsdb
),
pRSmaInfo
->
taskInfo
[
0
],
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
}
if
(
pRSmaInfo
->
taskInfo
[
1
]
&&
(
qUpdateQualifiedTableId
(
pRSmaInfo
->
taskInfo
[
1
],
tbUids
,
true
)
!=
0
))
{
tsdbError
(
"vgId:%d update tbUidList failed for uid:%"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
*
suid
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
}
else
{
tsdbDebug
(
"vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
REPO_ID
(
pTsdb
),
pRSmaInfo
->
taskInfo
[
1
],
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pStore
)
{
if
(
!
pStore
||
(
taosArrayGetSize
(
pStore
->
tbUids
)
==
0
))
{
tsdbDebug
(
"vgId:%d no need to update tbUids since empty uidStore"
,
REPO_ID
(
pTsdb
));
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_SUCCESS
;
}
if
(
tsdbUpdateTbUidListImpl
(
pTsdb
,
&
pStore
->
suid
,
pStore
->
tbUids
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_FAILED
;
}
void
*
pIter
=
taosHashIterate
(
pStore
->
uidHash
,
NULL
);
while
(
pIter
)
{
tb_uid_t
*
pTbSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
SArray
*
pTbUids
=
*
(
SArray
**
)
pIter
;
if
(
tsdbUpdateTbUidListImpl
(
pTsdb
,
pTbSuid
,
pTbUids
)
!=
TSDB_CODE_SUCCESS
)
{
taosHashCancelIterate
(
pStore
->
uidHash
,
pIter
);
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_FAILED
;
}
pIter
=
taosHashIterate
(
pStore
->
uidHash
,
pIter
);
}
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbFetchSubmitReqSuids
(
SSubmitReq
*
pMsg
,
STbUidStore
*
pStore
)
{
ASSERT
(
pMsg
!=
NULL
);
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
SSubmitBlkIter
blkIter
=
{
0
};
STSRow
*
row
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if
(
tInitSubmitMsgIter
(
pMsg
,
&
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
)
<
0
)
return
-
1
;
if
(
!
pBlock
)
break
;
tsdbUidStorePut
(
pStore
,
pBlock
->
suid
,
NULL
);
}
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
return
-
1
;
return
0
;
}
int32_t
tsdbExecuteRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
*
suid
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
(
pTsdb
);
if
(
!
pEnv
)
{
// only applicable when rsma env exists
return
TSDB_CODE_SUCCESS
;
}
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
tsdbDebug
(
"vgId:%d no rsma info for suid:%"
PRIu64
,
REPO_ID
(
pTsdb
),
*
suid
);
return
TSDB_CODE_SUCCESS
;
}
SArray
*
pResult
=
NULL
;
pResult
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
!
pResult
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
if
(
pRSmaInfo
->
taskInfo
[
0
])
{
qSetStreamInput
(
pRSmaInfo
->
taskInfo
[
0
],
pMsg
,
inputType
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
pRSmaInfo
->
taskInfo
[
0
],
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
!
output
)
{
break
;
}
taosArrayPush
(
pResult
,
output
);
}
blockDebugShowData
(
pResult
);
}
// if (pRSmaInfo->taskInfo[1]) {
// qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType);
// while (1) {
// SSDataBlock *output;
// uint64_t ts;
// if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) {
// ASSERT(false);
// }
// if (!output) {
// break;
// }
// taosArrayPush(pResult, output);
// }
// blockDebugShowData(pResult);
// }
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
void
*
pMsg
,
int32_t
inputType
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
(
pTsdb
);
if
(
!
pEnv
)
{
// only applicable when rsma env exists
return
TSDB_CODE_SUCCESS
;
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
STbUidStore
uidStore
=
{
0
};
tsdbFetchSubmitReqSuids
(
pMsg
,
&
uidStore
);
if
(
uidStore
.
suid
!=
0
)
{
tsdbExecuteRSma
(
pTsdb
,
pMeta
,
pMsg
,
inputType
,
&
uidStore
.
suid
);
void
*
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
NULL
);
while
(
pIter
)
{
tb_uid_t
*
pTbSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
tsdbExecuteRSma
(
pTsdb
,
pMeta
,
pMsg
,
inputType
,
pTbSuid
);
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
pIter
);
}
tsdbUidStoreDestory
(
&
uidStore
);
}
}
return
TSDB_CODE_SUCCESS
;
}
#if 0
#if 0
/**
/**
* @brief Get the start TS key of the last data block of one interval/sliding.
* @brief Get the start TS key of the last data block of one interval/sliding.
...
@@ -1674,6 +2118,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
...
@@ -1674,6 +2118,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
indexUid
,
msg
))
<
0
)
{
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
indexUid
,
msg
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
tsdbWarn
(
"vgId:%d insert tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
}
// TODO: destroy SSDataBlocks(msg)
return
code
;
return
code
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
1d42f790
...
@@ -83,6 +83,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
...
@@ -83,6 +83,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
case
TDMT_VND_SUBMIT
:
case
TDMT_VND_SUBMIT
:
pRsp
->
msgType
=
TDMT_VND_SUBMIT_RSP
;
pRsp
->
msgType
=
TDMT_VND_SUBMIT_RSP
;
vnodeProcessSubmitReq
(
pVnode
,
ptr
,
pRsp
);
vnodeProcessSubmitReq
(
pVnode
,
ptr
,
pRsp
);
tsdbTriggerRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
ptr
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
break
;
break
;
case
TDMT_VND_MQ_VG_CHANGE
:
case
TDMT_VND_MQ_VG_CHANGE
:
if
(
tqProcessVgChangeReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
if
(
tqProcessVgChangeReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
...
@@ -101,7 +102,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
...
@@ -101,7 +102,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
}
}
}
break
;
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
// TODO
}
}
...
@@ -277,19 +277,12 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -277,19 +277,12 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
static
int
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
void
*
pReq
)
{
static
int
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
void
*
pReq
)
{
SVCreateTbReq
vCreateTbReq
=
{
0
};
SVCreateTbReq
vCreateTbReq
=
{
0
};
tDeserializeSVCreateTbReq
(
pReq
,
&
vCreateTbReq
);
tDeserializeSVCreateTbReq
(
pReq
,
&
vCreateTbReq
);
if
(
metaCreateTable
(
pVnode
->
pMeta
,
&
(
vCreateTbReq
))
<
0
)
{
if
(
metaCreateTable
(
pVnode
->
pMeta
,
&
(
vCreateTbReq
)
,
NULL
)
<
0
)
{
// TODO
// TODO
return
-
1
;
return
-
1
;
}
}
// TODO: remove the debug log
tsdbRegisterRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
&
vCreateTbReq
);
SRSmaParam
*
param
=
vCreateTbReq
.
stbCfg
.
pRSmaParam
;
if
(
param
)
{
printf
(
"qmsg1 len = %d, body = %s
\n
"
,
param
->
qmsg1
?
(
int32_t
)
strlen
(
param
->
qmsg1
)
:
0
,
param
->
qmsg1
?
param
->
qmsg1
:
""
);
printf
(
"qmsg1 len = %d, body = %s
\n
"
,
param
->
qmsg2
?
(
int32_t
)
strlen
(
param
->
qmsg2
)
:
0
,
param
->
qmsg2
?
param
->
qmsg2
:
""
);
}
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pTagSchema
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pTagSchema
);
...
@@ -309,6 +302,13 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
...
@@ -309,6 +302,13 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
SVCreateTbBatchRsp
vCreateTbBatchRsp
=
{
0
};
SVCreateTbBatchRsp
vCreateTbBatchRsp
=
{
0
};
tDeserializeSVCreateTbBatchReq
(
pReq
,
&
vCreateTbBatchReq
);
tDeserializeSVCreateTbBatchReq
(
pReq
,
&
vCreateTbBatchReq
);
int
reqNum
=
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
);
int
reqNum
=
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
);
STbDdlH
ddlHandle
=
{
.
ahandle
=
pVnode
->
pTsdb
,
.
result
=
NULL
,
.
fp
=
tsdbFetchTbUidList
,
};
for
(
int
i
=
0
;
i
<
reqNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
reqNum
;
i
++
)
{
SVCreateTbReq
*
pCreateTbReq
=
taosArrayGet
(
vCreateTbBatchReq
.
pArray
,
i
);
SVCreateTbReq
*
pCreateTbReq
=
taosArrayGet
(
vCreateTbBatchReq
.
pArray
,
i
);
...
@@ -324,7 +324,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
...
@@ -324,7 +324,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
taosArrayPush
(
vCreateTbBatchRsp
.
rspList
,
&
rsp
);
taosArrayPush
(
vCreateTbBatchRsp
.
rspList
,
&
rsp
);
}
}
if
(
metaCreateTable
(
pVnode
->
pMeta
,
pCreateTbReq
)
<
0
)
{
if
(
metaCreateTable
(
pVnode
->
pMeta
,
pCreateTbReq
,
&
ddlHandle
)
<
0
)
{
// TODO: handle error
// TODO: handle error
vError
(
"vgId:%d, failed to create table: %s"
,
TD_VID
(
pVnode
),
pCreateTbReq
->
name
);
vError
(
"vgId:%d, failed to create table: %s"
,
TD_VID
(
pVnode
),
pCreateTbReq
->
name
);
}
}
...
@@ -348,6 +348,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
...
@@ -348,6 +348,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
}
}
}
}
tsdbUpdateTbUidList
(
pVnode
->
pTsdb
,
ddlHandle
.
result
);
vTrace
(
"vgId:%d process create %"
PRIzu
" tables"
,
TD_VID
(
pVnode
),
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
));
vTrace
(
"vgId:%d process create %"
PRIzu
" tables"
,
TD_VID
(
pVnode
),
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
));
taosArrayDestroy
(
vCreateTbBatchReq
.
pArray
);
taosArrayDestroy
(
vCreateTbBatchReq
.
pArray
);
if
(
vCreateTbBatchRsp
.
rspList
)
{
if
(
vCreateTbBatchRsp
.
rspList
)
{
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
1d42f790
...
@@ -407,7 +407,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
...
@@ -407,7 +407,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
}
}
}
}
EXPECT_EQ
(
tdScanAndConvertSubmitMsg
(
pMsg
),
TSDB_CODE_SUCCESS
);
//
EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
pMsg
,
0
),
0
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
pMsg
,
0
),
0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录