Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b2d6ec14
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b2d6ec14
编写于
5月 01, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: rollup sma retention unit
上级
b6de3801
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
72 addition
and
152 deletion
+72
-152
include/common/tmsg.h
include/common/tmsg.h
+3
-9
source/common/src/tmsg.c
source/common/src/tmsg.c
+12
-80
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+4
-4
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+0
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+50
-52
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+2
-2
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+0
-3
未找到文件。
include/common/tmsg.h
浏览文件 @
b2d6ec14
...
...
@@ -181,8 +181,8 @@ typedef struct SField {
}
SField
;
typedef
struct
SRetention
{
int
32
_t
freq
;
int
32
_t
keep
;
int
64
_t
freq
;
int
64
_t
keep
;
int8_t
freqUnit
;
int8_t
keepUnit
;
}
SRetention
;
...
...
@@ -240,13 +240,7 @@ typedef struct {
// head of SSubmitBlk
const
void
*
pMsg
;
}
SSubmitMsgIter
;
#if 0
int32_t tInitSubmitMsgIterOrigin(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIterOrigin(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNextOrigin(SSubmitBlkIter* pIter);
#endif
// TODO: KEEP one suite of iterator API finally.
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tInitSubmitBlkIter
(
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
...
...
source/common/src/tmsg.c
浏览文件 @
b2d6ec14
...
...
@@ -27,75 +27,7 @@
#define TD_MSG_DICT_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
#if 0
int32_t tInitSubmitMsgIterOrigin(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
ASSERT(pIter->totalLen > 0);
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= sizeof(SSubmitReq)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(pIter->len >= 0);
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitReq);
} else {
if (pIter->len >= pIter->totalLen) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
ASSERT(pIter->len > 0);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
int32_t tInitSubmitBlkIterOrigin(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (STSRow *)(pBlock->data + pBlock->schemaLen);
return 0;
}
STSRow *tGetSubmitBlkNextOrigin(SSubmitBlkIter *pIter) {
STSRow *row = pIter->row;
if (pIter->len >= pIter->totalLen) {
return NULL;
} else {
pIter->len += TD_ROW_LEN(row);
if (pIter->len < pIter->totalLen) {
pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row));
}
return row;
}
}
#endif
// TODO: KEEP one suite of iterator API finally.
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
...
...
@@ -1679,8 +1611,8 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfRetensions
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
pReq
->
pRetensions
,
i
);
if
(
tEncodeI
32
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
freqUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
...
...
@@ -1725,8 +1657,8 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfRetensions
;
++
i
)
{
SRetention
rentension
=
{
0
};
if
(
tDecodeI
32
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
freqUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
keepUnit
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pRetensions
,
&
rentension
)
==
NULL
)
{
...
...
@@ -2155,8 +2087,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfRetensions
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
pRsp
->
pRetensions
,
i
);
if
(
tEncodeI
32
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
freqUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
...
...
@@ -2199,8 +2131,8 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfRetensions
;
++
i
)
{
SRetention
rentension
=
{
0
};
if
(
tDecodeI
32
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
freqUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
keepUnit
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pRsp
->
pRetensions
,
&
rentension
)
==
NULL
)
{
...
...
@@ -2817,8 +2749,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfRetensions
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
pReq
->
pRetensions
,
i
);
if
(
tEncodeI
32
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
freqUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
...
...
@@ -2874,8 +2806,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfRetensions
;
++
i
)
{
SRetention
rentension
=
{
0
};
if
(
tDecodeI
32
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
freqUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
keepUnit
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pRetensions
,
&
rentension
)
==
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
b2d6ec14
...
...
@@ -107,8 +107,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
for
(
int32_t
i
=
0
;
i
<
pDb
->
cfg
.
numOfRetensions
;
++
i
)
{
TASSERT
(
taosArrayGetSize
(
pDb
->
cfg
.
pRetensions
)
==
pDb
->
cfg
.
numOfRetensions
);
SRetention
*
pRetension
=
taosArrayGet
(
pDb
->
cfg
.
pRetensions
,
i
);
SDB_SET_INT
32
(
pRaw
,
dataPos
,
pRetension
->
freq
,
_OVER
)
SDB_SET_INT
32
(
pRaw
,
dataPos
,
pRetension
->
keep
,
_OVER
)
SDB_SET_INT
64
(
pRaw
,
dataPos
,
pRetension
->
freq
,
_OVER
)
SDB_SET_INT
64
(
pRaw
,
dataPos
,
pRetension
->
keep
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pRetension
->
freqUnit
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pRetension
->
keepUnit
,
_OVER
)
}
...
...
@@ -180,8 +180,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
if
(
pDb
->
cfg
.
pRetensions
==
NULL
)
goto
_OVER
;
for
(
int32_t
i
=
0
;
i
<
pDb
->
cfg
.
numOfRetensions
;
++
i
)
{
SRetention
retension
=
{
0
};
SDB_GET_INT
32
(
pRaw
,
dataPos
,
&
retension
.
freq
,
_OVER
)
SDB_GET_INT
32
(
pRaw
,
dataPos
,
&
retension
.
keep
,
_OVER
)
SDB_GET_INT
64
(
pRaw
,
dataPos
,
&
retension
.
freq
,
_OVER
)
SDB_GET_INT
64
(
pRaw
,
dataPos
,
&
retension
.
keep
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
retension
.
freqUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
retension
.
keepUnit
,
_OVER
)
if
(
taosArrayPush
(
pDb
->
cfg
.
pRetensions
,
&
retension
)
==
NULL
)
{
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
b2d6ec14
...
...
@@ -40,7 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
return
TSDB_CODE_SUCCESS
;
}
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
b2d6ec14
...
...
@@ -123,7 +123,7 @@ int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, t
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pUidStore
);
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tsdbUidStoreFree
(
STbUidStore
*
pStore
);
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
void
*
pMsg
,
int32_t
inputType
);
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
void
*
pMsg
,
int32_t
inputType
);
typedef
struct
{
int8_t
streamType
;
// sma or other
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
b2d6ec14
...
...
@@ -174,6 +174,8 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
static
FORCE_INLINE
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
FORCE_INLINE
int32_t
tsdbUpdateTbUidListImpl
(
STsdb
*
pTsdb
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
FORCE_INLINE
int32_t
tsdbExecuteRSmaImpl
(
STsdb
*
pTsdb
,
const
void
*
pMsg
,
int32_t
inputType
,
qTaskInfo_t
*
taskInfo
,
tb_uid_t
suid
,
int8_t
retention
);
// mgmt interface
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
...
...
@@ -1881,8 +1883,10 @@ int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, t
return
TSDB_CODE_SUCCESS
;
}
ASSERT
(
ppStore
!=
NULL
);
if
(
!
(
*
ppStore
))
{
if
(
tsdbUidStoreInit
(
(
STbUidStore
**
)
ppStore
)
!=
0
)
{
if
(
tsdbUidStoreInit
(
ppStore
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
}
...
...
@@ -1978,7 +1982,44 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
return
0
;
}
int32_t
tsdbExecuteRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
*
suid
)
{
static
FORCE_INLINE
int32_t
tsdbExecuteRSmaImpl
(
STsdb
*
pTsdb
,
const
void
*
pMsg
,
int32_t
inputType
,
qTaskInfo_t
*
taskInfo
,
tb_uid_t
suid
,
int8_t
retention
)
{
SArray
*
pResult
=
NULL
;
tsdbDebug
(
"vgId:%d execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
REPO_ID
(
pTsdb
),
retention
,
taskInfo
,
suid
);
qSetStreamInput
(
taskInfo
,
pMsg
,
inputType
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
taskInfo
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
!
output
)
{
break
;
}
if
(
!
pResult
)
{
pResult
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
!
pResult
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
}
taosArrayPush
(
pResult
,
output
);
}
if
(
taosArrayGetSize
(
pResult
)
>
0
)
{
blockDebugShowData
(
pResult
);
}
else
{
tsdbWarn
(
"vgId:%d no rsma_1 data generated since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbExecuteRSma
(
STsdb
*
pTsdb
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
(
pTsdb
);
if
(
!
pEnv
)
{
// only applicable when rsma env exists
...
...
@@ -1988,65 +2029,22 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
tsdbDebug
(
"vgId:%d no rsma info for suid:%"
PRIu64
,
REPO_ID
(
pTsdb
),
*
suid
);
tsdbDebug
(
"vgId:%d no rsma info for suid:%"
PRIu64
,
REPO_ID
(
pTsdb
),
suid
);
return
TSDB_CODE_SUCCESS
;
}
SArray
*
pResult
=
NULL
;
pResult
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
!
pResult
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
if
(
pRSmaInfo
->
taskInfo
[
0
])
{
tsdbDebug
(
"vgId:%d execute rsma task for qTaskInfo:%p suid:%"
PRIu64
,
REPO_ID
(
pTsdb
),
pRSmaInfo
->
taskInfo
[
0
],
*
suid
);
qSetStreamInput
(
pRSmaInfo
->
taskInfo
[
0
],
pMsg
,
inputType
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
pRSmaInfo
->
taskInfo
[
0
],
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
!
output
)
{
break
;
}
taosArrayPush
(
pResult
,
output
);
}
if
(
taosArrayGetSize
(
pResult
)
>
0
)
{
blockDebugShowData
(
pResult
);
}
else
{
tsdbWarn
(
"vgId:%d no sma data generated since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
}
// if (pRSmaInfo->taskInfo[1]) {
// qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType);
// while (1) {
// SSDataBlock *output;
// uint64_t ts;
// if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) {
// ASSERT(false);
// }
// if (!output) {
// break;
// }
// taosArrayPush(pResult, output);
// }
// blockDebugShowData(pResult);
// }
tsdbExecuteRSmaImpl
(
pTsdb
,
pMsg
,
inputType
,
pRSmaInfo
->
taskInfo
[
0
],
suid
,
TSDB_RSMA_RETENTION_1
);
tsdbExecuteRSmaImpl
(
pTsdb
,
pMsg
,
inputType
,
pRSmaInfo
->
taskInfo
[
1
],
suid
,
TSDB_RSMA_RETENTION_2
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
void
*
pMsg
,
int32_t
inputType
)
{
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
void
*
pMsg
,
int32_t
inputType
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
(
pTsdb
);
if
(
!
pEnv
)
{
// only applicable when rsma env exists
...
...
@@ -2058,12 +2056,12 @@ int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputTyp
tsdbFetchSubmitReqSuids
(
pMsg
,
&
uidStore
);
if
(
uidStore
.
suid
!=
0
)
{
tsdbExecuteRSma
(
pTsdb
,
pM
eta
,
pMsg
,
inputType
,
&
uidStore
.
suid
);
tsdbExecuteRSma
(
pTsdb
,
pM
sg
,
inputType
,
uidStore
.
suid
);
void
*
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
NULL
);
while
(
pIter
)
{
tb_uid_t
*
pTbSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
tsdbExecuteRSma
(
pTsdb
,
pM
eta
,
pMsg
,
inputType
,
pTbSuid
);
tsdbExecuteRSma
(
pTsdb
,
pM
sg
,
inputType
,
*
pTbSuid
);
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
pIter
);
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
b2d6ec14
...
...
@@ -461,7 +461,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
SSubmitRsp
rsp
=
{
0
};
pRsp
->
code
=
0
;
tsdbTriggerRSma
(
pVnode
->
pTsdb
,
p
Vnode
->
pMeta
,
p
Req
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
tsdbTriggerRSma
(
pVnode
->
pTsdb
,
pReq
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
// handle the request
if
(
tsdbInsertData
(
pVnode
->
pTsdb
,
version
,
pSubmitReq
,
&
rsp
)
<
0
)
{
pRsp
->
code
=
terrno
;
...
...
@@ -470,7 +470,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
// pRsp->msgType = TDMT_VND_SUBMIT_RSP;
// vnodeProcessSubmitReq(pVnode, ptr, pRsp);
// tsdbTriggerRSma(pVnode->pTsdb, p
Vnode->pMeta, p
Req, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// encode the response (TODO)
pRsp
->
pCont
=
rpcMallocCont
(
sizeof
(
SSubmitRsp
));
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
b2d6ec14
...
...
@@ -1947,9 +1947,6 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
if
(
DEAL_RES_ERROR
==
translateValue
(
pCxt
,
pVal
))
{
return
pCxt
->
errCode
;
}
if
(
!
TIME_IS_VAR_DURATION
(
pVal
->
unit
))
{
pVal
->
datum
.
i
=
convertTimeFromPrecisionToUnit
(
pVal
->
datum
.
i
,
pVal
->
node
.
resType
.
precision
,
pVal
->
unit
);
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录