Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4bc8e086
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
4bc8e086
编写于
8月 21, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
8月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16262 from taosdata/feature/3.0_interval_hash_optimize
enh: rsma batch process and code optimization
上级
99b49877
e0317964
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
324 addition
and
392 deletion
+324
-392
include/common/tmsg.h
include/common/tmsg.h
+0
-25
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-1
include/util/tqueue.h
include/util/tqueue.h
+1
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+2
-0
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+14
-11
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-1
source/dnode/vnode/src/sma/smaCommit.c
source/dnode/vnode/src/sma/smaCommit.c
+33
-38
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+5
-12
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+14
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+219
-292
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+3
-0
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+18
-9
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+3
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/util/src/tqueue.c
source/util/src/tqueue.c
+2
-1
tests/script/tsim/sma/rsmaPersistenceRecovery.sim
tests/script/tsim/sma/rsmaPersistenceRecovery.sim
+6
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
4bc8e086
...
...
@@ -2667,31 +2667,6 @@ typedef struct {
int32_t
padding
;
}
SRSmaExecMsg
;
typedef
struct
{
int64_t
suid
;
int8_t
level
;
}
SRSmaFetchMsg
;
static
FORCE_INLINE
int32_t
tEncodeSRSmaFetchMsg
(
SEncoder
*
pCoder
,
const
SRSmaFetchMsg
*
pReq
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pReq
->
level
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
static
FORCE_INLINE
int32_t
tDecodeSRSmaFetchMsg
(
SDecoder
*
pCoder
,
SRSmaFetchMsg
*
pReq
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pReq
->
level
)
<
0
)
return
-
1
;
tEndDecode
(
pCoder
);
return
0
;
}
typedef
struct
{
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
...
...
include/common/tmsgdef.h
浏览文件 @
4bc8e086
...
...
@@ -201,7 +201,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBMIT_RSMA
,
"vnode-submit-rsma"
,
SSubmitReq
,
SSubmitRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_FETCH_RSMA
,
"vnode-fetch-rsma"
,
SRSmaFetchMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_FETCH_RSMA
,
"vnode-fetch-rsma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_EXEC_RSMA
,
"vnode-exec-rsma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DELETE
,
"delete-data"
,
SVDeleteReq
,
SVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_BATCH_DEL
,
"batch-delete"
,
SBatchDeleteReq
,
NULL
)
...
...
include/util/tqueue.h
浏览文件 @
4bc8e086
...
...
@@ -76,6 +76,7 @@ void taosFreeQall(STaosQall *qall);
int32_t
taosReadAllQitems
(
STaosQueue
*
queue
,
STaosQall
*
qall
);
int32_t
taosGetQitem
(
STaosQall
*
qall
,
void
**
ppItem
);
void
taosResetQitems
(
STaosQall
*
qall
);
int32_t
taosQallItemSize
(
STaosQall
*
qall
);
STaosQset
*
taosOpenQset
();
void
taosCloseQset
(
STaosQset
*
qset
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
4bc8e086
...
...
@@ -442,6 +442,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
if
(
req
.
rollup
)
{
req
.
rsmaParam
.
maxdelay
[
0
]
=
pStb
->
maxdelay
[
0
];
req
.
rsmaParam
.
maxdelay
[
1
]
=
pStb
->
maxdelay
[
1
];
req
.
rsmaParam
.
watermark
[
0
]
=
pStb
->
watermark
[
0
];
req
.
rsmaParam
.
watermark
[
1
]
=
pStb
->
watermark
[
1
];
if
(
pStb
->
ast1Len
>
0
)
{
if
(
mndConvertRsmaTask
(
&
req
.
rsmaParam
.
qmsg
[
0
],
&
req
.
rsmaParam
.
qmsgLen
[
0
],
pStb
->
pAst1
,
pStb
->
uid
,
STREAM_TRIGGER_WINDOW_CLOSE
,
req
.
rsmaParam
.
watermark
[
0
])
<
0
)
{
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
4bc8e086
...
...
@@ -32,7 +32,8 @@ extern "C" {
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define RSMA_TASK_INFO_HASH_SLOT 8
#define RSMA_TASK_INFO_HASH_SLOT (8)
#define RSMA_EXECUTOR_MAX (1)
typedef
struct
SSmaEnv
SSmaEnv
;
typedef
struct
SSmaStat
SSmaStat
;
...
...
@@ -90,14 +91,14 @@ struct SRSmaStat {
SSma
*
pSma
;
int64_t
commitAppliedVer
;
// vnode applied version for async commit
int64_t
refId
;
// shared by fetch tasks
volatile
int64_t
qBufSize
;
// queue buffer size
volatile
int64_t
nBufItems
;
// number of items in queue buffer
SRWLatch
lock
;
// r/w lock for rsma fs(e.g. qtaskinfo)
volatile
int8_t
nExecutor
;
// [1, max(half of query threads, 4)]
int8_t
triggerStat
;
// shared by fetch tasks
int8_t
commitStat
;
// 0 not in committing, 1 in committing
int8_t
execStat
;
// 0 not in exec , 1 in exec
SArray
*
aTaskFile
;
// qTaskFiles committed recently(for recovery/snapshot r/w)
SHashObj
*
infoHash
;
// key: suid, value: SRSmaInfo
SHashObj
*
fetchHash
;
// key: suid, value: L1 or L2 or L1|L2
tsem_t
notEmpty
;
// has items in queue buffer
};
struct
SSmaStat
{
...
...
@@ -111,26 +112,28 @@ struct SSmaStat {
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->infoHash)
#define RSMA_FETCH_HASH(r) ((r)->fetchHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct
SRSmaInfoItem
{
int8_t
level
;
int8_t
level
:
4
;
int8_t
fetchLevel
:
4
;
int8_t
triggerStat
;
uint16_t
interval
;
// second
int32_t
maxDelay
;
uint16_t
nSkipped
;
int32_t
maxDelay
;
// ms
tmr_h
tmrId
;
};
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
int64_t
suid
;
int64_t
refId
;
// refId of SRSmaStat
uint64_t
delFlag
:
1
;
uint64_t
lastReceived
:
63
;
// second
int64_t
refId
;
// refId of SRSmaStat
int64_t
lastRecv
;
// ms
int8_t
assigned
;
// 0 idle, 1 assgined for exec
int8_t
delFlag
;
int16_t
padding
;
T_REF_DECLARE
()
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
void
*
taskInfo
[
TSDB_RETENTION_L2
];
// qTaskInfo_t
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
4bc8e086
...
...
@@ -189,6 +189,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
int32_t
smaInit
();
void
smaCleanUp
();
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaPreClose
(
SSma
*
pSma
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
smaBegin
(
SSma
*
pSma
);
int32_t
smaSyncPreCommit
(
SSma
*
pSma
);
...
...
@@ -198,7 +199,6 @@ int32_t smaAsyncPreCommit(SSma* pSma);
int32_t
smaAsyncCommit
(
SSma
*
pSma
);
int32_t
smaAsyncPostCommit
(
SSma
*
pSma
);
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
);
int32_t
smaProcessFetch
(
SSma
*
pSma
,
void
*
pMsg
);
int32_t
smaProcessExec
(
SSma
*
pSma
,
void
*
pMsg
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
...
...
@@ -323,6 +323,7 @@ struct SVnode {
TdThreadMutex
lock
;
bool
blocked
;
bool
restored
;
bool
inClose
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
};
...
...
source/dnode/vnode/src/sma/smaCommit.c
浏览文件 @
4bc8e086
...
...
@@ -109,7 +109,7 @@ int32_t smaBegin(SSma *pSma) {
/**
* @brief pre-commit for rollup sma(sync commit).
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) wait
all triggered fetch tasks finished
* 2) wait
for all triggered fetch tasks to finish
* 3) perform persist task for qTaskInfo
*
* @param pSma
...
...
@@ -127,14 +127,14 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
// step 1: set rsma stat paused
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_PAUSED
);
// step 2: wait
all triggered fetch tasks finished
// step 2: wait
for all triggered fetch tasks to finish
int32_t
nLoops
=
0
;
while
(
1
)
{
if
(
T_REF_VAL_GET
(
pStat
)
==
0
)
{
smaDebug
(
"vgId:%d, rsma fetch tasks all finished"
,
SMA_VID
(
pSma
));
smaDebug
(
"vgId:%d, rsma fetch tasks a
re a
ll finished"
,
SMA_VID
(
pSma
));
break
;
}
else
{
smaDebug
(
"vgId:%d, rsma fetch tasks not all finished yet"
,
SMA_VID
(
pSma
));
smaDebug
(
"vgId:%d, rsma fetch tasks
are
not all finished yet"
,
SMA_VID
(
pSma
));
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
...
...
@@ -316,15 +316,17 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// step 1: set rsma stat
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_PAUSED
);
atomic_store_8
(
RSMA_COMMIT_STAT
(
pRSmaStat
),
1
);
pRSmaStat
->
commitAppliedVer
=
pSma
->
pVnode
->
state
.
applied
;
ASSERT
(
pRSmaStat
->
commitAppliedVer
>
0
);
// step 2: wait
all triggered fetch tasks finished
// step 2: wait
for all triggered fetch tasks to finish
int32_t
nLoops
=
0
;
while
(
1
)
{
if
(
T_REF_VAL_GET
(
pStat
)
==
0
)
{
smaDebug
(
"vgId:%d, rsma
fetch tasks
all finished"
,
SMA_VID
(
pSma
));
smaDebug
(
"vgId:%d, rsma
commit, fetch tasks are
all finished"
,
SMA_VID
(
pSma
));
break
;
}
else
{
smaDebug
(
"vgId:%d, rsma
fetch tasks
not all finished yet"
,
SMA_VID
(
pSma
));
smaDebug
(
"vgId:%d, rsma
commit, fetch tasks are
not all finished yet"
,
SMA_VID
(
pSma
));
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
...
...
@@ -338,30 +340,29 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
* 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/
nLoops
=
0
;
smaInfo
(
"vgId:%d, start to wait for rsma qtask free, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
if
(
tdRSmaProcessExecImpl
(
pSma
,
RSMA_EXEC_COMMIT
)
<
0
)
{
return
TSDB_CODE_FAILED
;
}
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
&
pRSmaStat
->
execStat
,
0
,
1
);
if
(
old
==
0
)
break
;
if
(
++
nLoops
>
1000
)
{
smaInfo
(
"vgId:%d, rsma commit, wait for all items to be consumed, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
())
;
nLoops
=
0
;
while
(
atomic_load_64
(
&
pRSmaStat
->
nBufItems
)
>
0
)
{
++
nLoops
;
if
(
nLoops
>
1000
)
{
sched_yield
();
nLoops
=
0
;
smaDebug
(
"vgId:%d, wait for rsma qtask free, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
}
}
smaInfo
(
"vgId:%d, end to wait for rsma qtask free, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
if
(
tdRSmaProcessExecImpl
(
pSma
,
RSMA_EXEC_COMMIT
)
<
0
)
{
atomic_store_8
(
&
pRSmaStat
->
execStat
,
0
);
smaInfo
(
"vgId:%d, rsma commit, all items are consumed, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
if
(
tdRSmaPersistExecImpl
(
pRSmaStat
,
RSMA_INFO_HASH
(
pRSmaStat
))
<
0
)
{
return
TSDB_CODE_FAILED
;
}
smaInfo
(
"vgId:%d, rsma commit, operator state commited, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
...
...
@@ -376,13 +377,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
atomic_store_64
(
&
pRSmaStat
->
qBufSize
,
0
);
atomic_store_8
(
&
pRSmaStat
->
execStat
,
0
);
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
// step 5: others
pRSmaStat
->
commitAppliedVer
=
pSma
->
pVnode
->
state
.
applied
;
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -398,13 +395,14 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
if
(
!
pSmaEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
#endif
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -426,10 +424,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// step 1: merge qTaskInfo and iQTaskInfo
// lock
taosWLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosWLockLatch(SMA_ENV_LOCK(pEnv));
void
*
pIter
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
NULL
)
;
while
(
pIter
)
{
void
*
pIter
=
NULL
;
while
(
(
pIter
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
pIter
))
)
{
tb_uid_t
*
pSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pIter
;
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
...
...
@@ -447,14 +445,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SMA_VID
(
pSma
),
refVal
,
*
pSuid
);
}
pIter
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
pIter
);
continue
;
}
#if 0
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo
(
pSma
,
pRSmaInfo
,
tru
e
);
tdFreeRSmaInfo(pSma, pRSmaInfo,
fals
e);
pRSmaInfo->iTaskInfo[0] = NULL;
}
} else {
...
...
@@ -463,8 +460,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
pIter
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
pIter
);
#endif
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
rsmaDeleted
);
++
i
)
{
...
...
@@ -480,10 +476,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashRemove
(
RSMA_INFO_HASH
(
pRSmaStat
),
pSuid
,
sizeof
(
tb_uid_t
));
}
taosArrayDestroy
(
rsmaDeleted
);
// TODO: remove suid in files?
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles
(
pSma
,
pRSmaStat
);
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
4bc8e086
...
...
@@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)(
*
pSmaStat
);
pRSmaStat
->
pSma
=
(
SSma
*
)
pSma
;
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INIT
);
tsem_init
(
&
pRSmaStat
->
notEmpty
,
0
,
0
);
// init smaMgmt
smaInit
();
...
...
@@ -230,12 +231,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if
(
!
RSMA_INFO_HASH
(
pRSmaStat
))
{
return
TSDB_CODE_FAILED
;
}
RSMA_FETCH_HASH
(
pRSmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
RSMA_FETCH_HASH
(
pRSmaStat
))
{
return
TSDB_CODE_FAILED
;
}
}
else
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
// TODO
}
else
{
...
...
@@ -267,6 +262,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
smaDebug
(
"vgId:%d, destroy rsma stat %p"
,
SMA_VID
(
pSma
),
pRSmaStat
);
// step 1: set rsma trigger stat cancelled
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_CANCELLED
);
tsem_destroy
(
&
(
pStat
->
notEmpty
));
// step 2: destroy the rsma info and associated fetch tasks
if
(
taosHashGetSize
(
RSMA_INFO_HASH
(
pStat
))
>
0
)
{
...
...
@@ -279,17 +275,14 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
}
taosHashCleanup
(
RSMA_INFO_HASH
(
pStat
));
// step 3: destroy the rsma fetch hash
taosHashCleanup
(
RSMA_FETCH_HASH
(
pStat
));
// step 4: wait all triggered fetch tasks finished
// step 3: wait for all triggered fetch tasks to finish
int32_t
nLoops
=
0
;
while
(
1
)
{
if
(
T_REF_VAL_GET
((
SSmaStat
*
)
pStat
)
==
0
)
{
smaDebug
(
"vgId:%d, rsma fetch tasks all finished"
,
SMA_VID
(
pSma
));
smaDebug
(
"vgId:%d, rsma fetch tasks a
re a
ll finished"
,
SMA_VID
(
pSma
));
break
;
}
else
{
smaDebug
(
"vgId:%d, rsma fetch tasks not all finished yet"
,
SMA_VID
(
pSma
));
smaDebug
(
"vgId:%d, rsma fetch tasks
are
not all finished yet"
,
SMA_VID
(
pSma
));
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
...
...
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
4bc8e086
...
...
@@ -146,6 +146,20 @@ int32_t smaClose(SSma *pSma) {
return
0
;
}
int32_t
smaPreClose
(
SSma
*
pSma
)
{
if
(
pSma
&&
VND_IS_RSMA
(
pSma
->
pVnode
))
{
SSmaEnv
*
pEnv
=
NULL
;
SRSmaStat
*
pStat
=
NULL
;
if
(
!
(
pEnv
=
SMA_RSMA_ENV
(
pSma
))
||
!
(
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
)))
{
return
0
;
}
for
(
int32_t
i
=
0
;
i
<
RSMA_EXECUTOR_MAX
;
++
i
)
{
tsem_post
(
&
(
pStat
->
notEmpty
));
}
}
return
0
;
}
/**
* @brief rsma env restore
*
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
4bc8e086
...
...
@@ -15,10 +15,12 @@
#include "sma.h"
#define RSMA_QTASKINFO_BUFSIZE (32768)
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_BUFSIZE (1048576)
#define RSMA_SUBMIT_BATCH_SIZE (1024)
#define RSMA_QTASKINFO_BUFSIZE (32768) // size
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (900000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
SSmaMgmt
smaMgmt
=
{
.
inited
=
0
,
...
...
@@ -40,11 +42,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz
static
SRSmaInfo
*
tdAcquireRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
);
static
void
tdReleaseRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
);
static
void
tdFreeRSmaSubmitItems
(
SArray
*
pItems
);
static
int32_t
tdRSma
ConsumeAndFetch
(
SSma
*
pSma
,
int64_t
suid
,
int8_t
level
,
SArray
*
pSubmitArr
);
static
int32_t
tdRSma
Fetch
AndSubmitResult
(
SSma
*
pSma
,
qTaskInfo_t
taskInfo
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
);
static
int32_t
tdRSma
FetchAllResult
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
SArray
*
pSubmitArr
);
static
int32_t
tdRSma
Exec
AndSubmitResult
(
SSma
*
pSma
,
qTaskInfo_t
taskInfo
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
int32_t
tdRSmaFetchSend
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
int8_t
level
);
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTaskInfoIter
*
pIter
,
STFile
*
pTFile
);
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTaskInfoIter
*
pIter
,
bool
*
isFinish
);
static
int32_t
tdRSmaQTaskInfoRestore
(
SSma
*
pSma
,
int8_t
type
,
SRSmaQTaskInfoIter
*
pIter
);
...
...
@@ -635,8 +636,8 @@ _end:
return
code
;
}
static
int32_t
tdRSma
Fetch
AndSubmitResult
(
SSma
*
pSma
,
qTaskInfo_t
taskInfo
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
)
{
static
int32_t
tdRSma
Exec
AndSubmitResult
(
SSma
*
pSma
,
qTaskInfo_t
taskInfo
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
)
{
SArray
*
pResList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
if
(
pResList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -729,22 +730,24 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
taosWriteQitem
(
pInfo
->
queue
,
qItem
);
pInfo
->
lastRecv
=
taosGetTimestampMs
();
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pSma
);
int64_t
bufSize
=
atomic_add_fetch_64
(
&
pRSmaStat
->
qBufSize
,
pReq
->
header
.
contLen
);
tsem_post
(
&
(
pRSmaStat
->
notEmpty
));
int64_t
nItems
=
atomic_fetch_add_64
(
&
pRSmaStat
->
nBufItems
,
1
);
// smoothing consume
int32_t
n
=
bufSize
/
RSMA_QTASKEXEC_BUF
SIZE
;
int32_t
n
=
nItems
/
RSMA_QTASKEXEC_SMOOTH_
SIZE
;
if
(
n
>
1
)
{
if
(
n
>
10
)
{
n
=
10
;
}
taosMsleep
(
n
<<
4
);
if
(
n
>
2
)
{
taosMsleep
(
n
<<
3
);
if
(
n
>
5
)
{
smaWarn
(
"vgId:%d, pInfo->queue itemSize:%d, memSize:%"
PRIi64
", sleep %d ms"
,
SMA_VID
(
pSma
),
taosQueueItemSize
(
pInfo
->
queue
),
taosQueueMemorySize
(
pInfo
->
queue
),
n
<<
4
);
}
else
{
smaDebug
(
"vgId:%d, pInfo->queue itemSize:%d, memSize:%"
PRIi64
", sleep %d ms"
,
SMA_VID
(
pSma
),
taosQueueItemSize
(
pInfo
->
queue
),
taosQueueMemorySize
(
pInfo
->
queue
),
n
<<
4
);
taosQueueItemSize
(
pInfo
->
queue
),
taosQueueMemorySize
(
pInfo
->
queue
),
n
<<
3
);
}
}
...
...
@@ -812,7 +815,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
}
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pInfo
,
idx
);
tdRSma
Fetch
AndSubmitResult
(
pSma
,
qTaskInfo
,
pItem
,
pInfo
->
pTSchema
,
pInfo
->
suid
);
tdRSma
Exec
AndSubmitResult
(
pSma
,
qTaskInfo
,
pItem
,
pInfo
->
pTSchema
,
pInfo
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -840,25 +843,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return
NULL
;
}
taosRLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosRLockLatch(SMA_ENV_LOCK(pEnv));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
&&
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return
NULL
;
}
if
(
!
pRSmaInfo
->
taskInfo
[
0
])
{
if
(
tdCloneRSmaInfo
(
pSma
,
pRSmaInfo
)
<
0
)
{
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return
NULL
;
}
}
tdRefRSmaInfo
(
pSma
,
pRSmaInfo
);
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT
(
pRSmaInfo
->
suid
==
suid
);
return
pRSmaInfo
;
}
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
//
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return
NULL
;
}
...
...
@@ -910,22 +913,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
static
int32_t
tdRSmaExecCheck
(
SSma
*
pSma
)
{
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pSma
);
int64_t
bufSize
=
atomic_load_64
(
&
pRSmaStat
->
qBufSize
);
if
(
bufSize
<
RSMA_QTASKEXEC_BUFSIZE
)
{
smaDebug
(
"vgId:%d, bufSize is %d but has no chance to exec as less than %d"
,
SMA_VID
(
pSma
),
bufSize
,
RSMA_QTASKEXEC_BUFSIZE
);
return
TSDB_CODE_SUCCESS
;
}
if
(
atomic_val_compare_exchange_8
(
&
pRSmaStat
->
execStat
,
0
,
1
)
==
1
)
{
smaDebug
(
"vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task"
,
SMA_VID
(
pSma
),
bufSize
);
if
(
atomic_load_8
(
&
pRSmaStat
->
nExecutor
)
>=
TMIN
(
RSMA_EXECUTOR_MAX
,
tsNumOfVnodeQueryThreads
/
2
))
{
return
TSDB_CODE_SUCCESS
;
}
smaDebug
(
"vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now"
,
SMA_VID
(
pSma
),
bufSize
);
SRSmaExecMsg
fetchMsg
;
int32_t
contLen
=
sizeof
(
SMsgHead
);
void
*
pBuf
=
rpcMallocCont
(
0
+
contLen
);
...
...
@@ -949,7 +941,6 @@ static int32_t tdRSmaExecCheck(SSma *pSma) {
return
TSDB_CODE_SUCCESS
;
_err:
atomic_store_8
(
&
pRSmaStat
->
execStat
,
0
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -959,7 +950,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
// only applicable when rsma env exists
return
TSDB_CODE_SUCCESS
;
}
STbUidStore
uidStore
=
{
0
};
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
if
(
!
RETENTION_VALID
(
pRetention
+
1
))
{
// return directly if retention level 1 is invalid
...
...
@@ -967,25 +958,34 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
}
if
(
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
STbUidStore
uidStore
=
{
0
};
tdFetchSubmitReqSuids
(
pMsg
,
&
uidStore
);
if
(
tdFetchSubmitReqSuids
(
pMsg
,
&
uidStore
)
<
0
)
{
goto
_err
;
}
if
(
uidStore
.
suid
!=
0
)
{
tdExecuteRSmaAsync
(
pSma
,
pMsg
,
inputType
,
uidStore
.
suid
);
if
(
tdExecuteRSmaAsync
(
pSma
,
pMsg
,
inputType
,
uidStore
.
suid
)
<
0
)
{
goto
_err
;
}
void
*
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
NULL
)
;
while
(
pIter
)
{
void
*
pIter
=
NULL
;
while
(
(
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
pIter
))
)
{
tb_uid_t
*
pTbSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
tdExecuteRSmaAsync
(
pSma
,
pMsg
,
inputType
,
*
pTbSuid
);
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
pIter
);
if
(
tdExecuteRSmaAsync
(
pSma
,
pMsg
,
inputType
,
*
pTbSuid
)
<
0
)
{
goto
_err
;
}
}
tdUidStoreDestory
(
&
uidStore
);
tdRSmaExecCheck
(
pSma
);
if
(
tdRSmaExecCheck
(
pSma
)
<
0
)
{
goto
_err
;
}
}
}
tdUidStoreDestory
(
&
uidStore
);
return
TSDB_CODE_SUCCESS
;
_err:
tdUidStoreDestory
(
&
uidStore
);
smaError
(
"vgId:%d, failed to process rsma submit since: %s"
,
SMA_VID
(
pSma
),
terrstr
());
return
TSDB_CODE_FAILED
;
}
/**
...
...
@@ -1416,7 +1416,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
}
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
#if 0
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
#endif
qTaskInfo_t
taskInfo
=
RSMA_INFO_QTASK
(
pRSmaInfo
,
i
);
if
(
!
taskInfo
)
{
smaDebug
(
"vgId:%d, rsma, table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
continue
;
...
...
@@ -1553,7 +1556,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
smaDebug
(
"vgId:%d, rsma fetch task started for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is active"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
// async process
tdRSmaFetchSend
(
pSma
,
pRSmaInfo
,
pItem
->
level
);
pItem
->
fetchLevel
=
pItem
->
level
;
#if 0
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
ASSERT(qItem->level == pItem->level);
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
#endif
tsem_post
(
&
(
pStat
->
notEmpty
));
smaInfo
(
"vgId:%d, rsma fetch task planned for level:%"
PRIi8
" suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is paused"
,
...
...
@@ -1568,8 +1580,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
default:
{
sma
Warn
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
sma
Debug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
}
...
...
@@ -1578,183 +1590,62 @@ _end:
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
}
/**
* @brief put rsma fetch msg to fetch queue
*
* @param pSma
* @param pInfo
* @param level
* @return int32_t
*/
static
int32_t
tdRSmaFetchSend
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
int8_t
level
)
{
SRSmaFetchMsg
fetchMsg
=
{.
suid
=
pInfo
->
suid
,
.
level
=
level
};
int32_t
ret
=
0
;
int32_t
contLen
=
0
;
SEncoder
encoder
=
{
0
};
tEncodeSize
(
tEncodeSRSmaFetchMsg
,
&
fetchMsg
,
contLen
,
ret
);
if
(
ret
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tEncoderClear
(
&
encoder
);
goto
_err
;
}
void
*
pBuf
=
rpcMallocCont
(
contLen
+
sizeof
(
SMsgHead
));
tEncoderInit
(
&
encoder
,
POINTER_SHIFT
(
pBuf
,
sizeof
(
SMsgHead
)),
contLen
);
if
(
tEncodeSRSmaFetchMsg
(
&
encoder
,
&
fetchMsg
)
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tEncoderClear
(
&
encoder
);
}
tEncoderClear
(
&
encoder
);
((
SMsgHead
*
)
pBuf
)
->
vgId
=
SMA_VID
(
pSma
);
((
SMsgHead
*
)
pBuf
)
->
contLen
=
contLen
+
sizeof
(
SMsgHead
);
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
msgType
=
TDMT_VND_FETCH_RSMA
,
.
pCont
=
pBuf
,
.
contLen
=
contLen
+
sizeof
(
SMsgHead
),
};
if
((
terrno
=
tmsgPutToQueue
(
&
pSma
->
pVnode
->
msgCb
,
QUERY_QUEUE
,
&
rpcMsg
))
!=
0
)
{
smaError
(
"vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%"
PRIi64
" level:%"
PRIi8
" since %s"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
level
,
terrstr
());
goto
_err
;
static
void
tdFreeRSmaSubmitItems
(
SArray
*
pItems
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pItems
);
++
i
)
{
taosFreeQitem
(
*
(
void
**
)
taosArrayGet
(
pItems
,
i
));
}
smaDebug
(
"vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%"
PRIi64
" level:%"
PRIi8
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
level
);
return
TSDB_CODE_SUCCESS
;
_err:
return
TSDB_CODE_FAILED
;
}
/**
* @brief fetch rsma
data of level 2/3 and submit
* @brief fetch rsma
result(consider the efficiency and functionality)
*
* @param pSma
* @param pMsg
* @param pInfo
* @param pSubmitArr
* @return int32_t
*/
int32_t
smaProcessFetch
(
SSma
*
pSma
,
void
*
pMsg
)
{
SRpcMsg
*
pRpcMsg
=
(
SRpcMsg
*
)
pMsg
;
SRSmaFetchMsg
req
=
{
0
};
SDecoder
decoder
=
{
0
};
void
*
pBuf
=
NULL
;
SRSmaStat
*
pRSmaStat
=
NULL
;
if
(
!
pRpcMsg
||
pRpcMsg
->
contLen
<
sizeof
(
SMsgHead
))
{
terrno
=
TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP
;
goto
_err
;
}
pBuf
=
POINTER_SHIFT
(
pRpcMsg
->
pCont
,
sizeof
(
SMsgHead
));
tDecoderInit
(
&
decoder
,
pBuf
,
pRpcMsg
->
contLen
);
if
(
tDecodeSRSmaFetchMsg
(
&
decoder
,
&
req
)
<
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_err
;
}
pRSmaStat
=
SMA_RSMA_STAT
(
pSma
);
if
(
atomic_val_compare_exchange_8
(
&
pRSmaStat
->
execStat
,
0
,
1
)
==
0
)
{
SArray
*
pSubmitArr
=
NULL
;
if
(
!
(
pSubmitArr
=
taosArrayInit
(
RSMA_SUBMIT_BATCH_SIZE
,
POINTER_BYTES
)))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
atomic_store_8
(
&
pRSmaStat
->
execStat
,
0
);
goto
_err
;
}
tdRSmaConsumeAndFetch
(
pSma
,
req
.
suid
,
req
.
level
,
pSubmitArr
);
atomic_store_8
(
&
pRSmaStat
->
execStat
,
0
);
taosArrayDestroy
(
pSubmitArr
);
}
else
{
int8_t
level
=
req
.
level
;
int8_t
*
val
=
taosHashGet
(
RSMA_FETCH_HASH
(
pRSmaStat
),
&
req
.
suid
,
sizeof
(
req
.
suid
));
if
(
val
)
{
level
|=
(
*
val
);
}
ASSERT
(
level
>=
1
&&
level
<=
3
);
taosHashPut
(
RSMA_FETCH_HASH
(
pRSmaStat
),
&
req
.
suid
,
sizeof
(
req
.
suid
),
&
level
,
sizeof
(
level
));
}
tDecoderClear
(
&
decoder
);
smaDebug
(
"vgId:%d, success to process rsma fetch msg for suid:%"
PRIi64
" level:%"
PRIi8
,
SMA_VID
(
pSma
),
req
.
suid
,
req
.
level
);
return
TSDB_CODE_SUCCESS
;
_err:
tDecoderClear
(
&
decoder
);
smaError
(
"vgId:%d, failed to process rsma fetch msg since %s"
,
SMA_VID
(
pSma
),
terrstr
());
return
TSDB_CODE_FAILED
;
}
static
void
tdFreeRSmaSubmitItems
(
SArray
*
pItems
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pItems
);
++
i
)
{
taosFreeQitem
(
*
(
void
**
)
taosArrayGet
(
pItems
,
i
));
}
}
static
int32_t
tdRSmaConsumeAndFetch
(
SSma
*
pSma
,
int64_t
suid
,
int8_t
level
,
SArray
*
pSubmitArr
)
{
SRSmaInfo
*
pInfo
=
tdAcquireRSmaInfoBySuid
(
pSma
,
suid
);
if
(
!
pInfo
)
{
return
TSDB_CODE_SUCCESS
;
}
// step 1: consume submit req
int64_t
qMemSize
=
0
;
if
((
qMemSize
=
taosQueueMemorySize
(
pInfo
->
queue
)
>
0
))
{
taosReadAllQitems
(
pInfo
->
queue
,
pInfo
->
qall
);
// queue has mutex lock
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pSma
);
atomic_fetch_sub_64
(
&
pRSmaStat
->
qBufSize
,
qMemSize
);
taosArrayClear
(
pSubmitArr
);
while
(
1
)
{
void
*
msg
=
NULL
;
taosGetQitem
(
pInfo
->
qall
,
(
void
**
)
&
msg
);
if
(
msg
)
{
if
(
taosArrayPush
(
pSubmitArr
,
&
msg
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
}
}
else
{
break
;
}
}
int32_t
size
=
taosArrayGetSize
(
pSubmitArr
);
if
(
size
>
0
)
{
for
(
int32_t
i
=
1
;
i
<=
TSDB_RETENTION_L2
;
++
i
)
{
if
(
tdExecuteRSmaImpl
(
pSma
,
pSubmitArr
->
pData
,
size
,
STREAM_INPUT__MERGED_SUBMIT
,
pInfo
,
RSMA_EXEC_TIMEOUT
,
i
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
}
}
tdFreeRSmaSubmitItems
(
pSubmitArr
);
}
}
// step 2: fetch rsma result
static
int32_t
tdRSmaFetchAllResult
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
SArray
*
pSubmitArr
)
{
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
for
(
int8_t
i
=
1
;
i
<=
TSDB_RETENTION_L2
;
++
i
)
{
if
(
level
&
i
)
{
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pInfo
,
i
-
1
);
if
(
pItem
->
fetchLevel
)
{
pItem
->
fetchLevel
=
0
;
qTaskInfo_t
taskInfo
=
RSMA_INFO_QTASK
(
pInfo
,
i
-
1
);
if
(
!
taskInfo
)
{
continue
;
}
int64_t
curMs
=
taosGetTimestampMs
();
if
((
pItem
->
nSkipped
*
pItem
->
maxDelay
)
>
RSMA_FETCH_DELAY_MAX
)
{
smaInfo
(
"vgId:%d, suid:%"
PRIi64
" level:%"
PRIi8
" nSkipped:%"
PRIi8
" maxDelay:%d, fetch executed"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
,
pItem
->
nSkipped
,
pItem
->
maxDelay
);
}
else
if
(((
curMs
-
pInfo
->
lastRecv
)
<
RSMA_FETCH_ACTIVE_MAX
))
{
++
pItem
->
nSkipped
;
smaDebug
(
"vgId:%d, suid:%"
PRIi64
" level:%"
PRIi8
" curMs:%"
PRIi64
" lastRecv:%"
PRIi64
", fetch skipped "
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
,
curMs
,
pInfo
->
lastRecv
);
continue
;
}
else
{
smaInfo
(
"vgId:%d, suid:%"
PRIi64
" level:%"
PRIi8
" curMs:%"
PRIi64
" lastRecv:%"
PRIi64
", fetch executed "
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
,
curMs
,
pInfo
->
lastRecv
);
}
pItem
->
nSkipped
=
0
;
if
((
terrno
=
qSetMultiStreamInput
(
taskInfo
,
&
dataBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
))
<
0
)
{
goto
_err
;
}
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pInfo
,
i
-
1
);
if
(
tdRSmaFetchAndSubmitResult
(
pSma
,
taskInfo
,
pItem
,
pInfo
->
pTSchema
,
suid
)
<
0
)
{
if
(
tdRSmaExecAndSubmitResult
(
pSma
,
taskInfo
,
pItem
,
pInfo
->
pTSchema
,
pInfo
->
suid
)
<
0
)
{
tdCleanupStreamInputDataBlock
(
taskInfo
);
goto
_err
;
}
tdCleanupStreamInputDataBlock
(
taskInfo
);
smaInfo
(
"vgId:%d, suid:%"
PRIi64
" level:%"
PRIi8
" nSkipped:%"
PRIi8
" maxDelay:%d, fetch finished"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
,
pItem
->
nSkipped
,
pItem
->
maxDelay
);
}
else
{
smaDebug
(
"vgId:%d, suid:%"
PRIi64
" level:%"
PRIi8
" nSkipped:%"
PRIi8
" maxDelay:%d, fetch not executed as fetch level is %"
PRIi8
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
,
pItem
->
nSkipped
,
pItem
->
maxDelay
,
pItem
->
fetchLevel
);
}
}
...
...
@@ -1766,6 +1657,45 @@ _err:
return
TSDB_CODE_FAILED
;
}
static
int32_t
tdRSmaBatchExec
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
STaosQall
*
qall
,
SArray
*
pSubmitArr
,
ERsmaExecType
type
)
{
taosArrayClear
(
pSubmitArr
);
while
(
1
)
{
void
*
msg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
msg
);
if
(
msg
)
{
if
(
taosArrayPush
(
pSubmitArr
,
&
msg
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
}
}
else
{
break
;
}
}
int32_t
size
=
taosArrayGetSize
(
pSubmitArr
);
if
(
size
>
0
)
{
for
(
int32_t
i
=
1
;
i
<=
TSDB_RETENTION_L2
;
++
i
)
{
if
(
tdExecuteRSmaImpl
(
pSma
,
pSubmitArr
->
pData
,
size
,
STREAM_INPUT__MERGED_SUBMIT
,
pInfo
,
type
,
i
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
}
}
tdFreeRSmaSubmitItems
(
pSubmitArr
);
}
return
TSDB_CODE_SUCCESS
;
_err:
while
(
1
)
{
void
*
msg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
msg
);
if
(
msg
)
{
taosFreeQitem
(
msg
);
}
else
{
break
;
}
}
return
TSDB_CODE_FAILED
;
}
/**
* @brief
*
...
...
@@ -1774,10 +1704,10 @@ _err:
* @return int32_t
*/
int32_t
tdRSmaProcessExecImpl
(
SSma
*
pSma
,
ERsmaExecType
type
)
{
SVnode
*
pVnode
=
pSma
->
pVnode
;
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
infoHash
=
NULL
;
SArray
*
pSubmitQArr
=
NULL
;
SArray
*
pSubmitArr
=
NULL
;
bool
isFetchAll
=
false
;
...
...
@@ -1786,106 +1716,97 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
goto
_err
;
}
if
(
type
==
RSMA_EXEC_OVERFLOW
)
{
taosRLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
if
(
atomic_load_64
(
&
pRSmaStat
->
qBufSize
)
<
RSMA_QTASKEXEC_BUFSIZE
)
{
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
return
TSDB_CODE_SUCCESS
;
}
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
}
if
(
!
(
pSubmitQArr
=
taosArrayInit
(
taosHashGetSize
(
infoHash
),
sizeof
(
SRSmaExecQItem
))))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
if
(
!
(
pSubmitArr
=
taosArrayInit
(
RSMA_SUBMIT_BATCH_SIZE
,
POINTER_BYTES
)))
{
if
(
!
(
pSubmitArr
=
taosArrayInit
(
TMIN
(
RSMA_SUBMIT_BATCH_SIZE
,
atomic_load_64
(
&
pRSmaStat
->
nBufItems
)),
POINTER_BYTES
)))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
// step 1: rsma exec - consume data in buffer queue for all suids
SRSmaExecQItem
qItem
=
{
0
};
void
*
pIter
=
taosHashIterate
(
infoHash
,
NULL
);
// infoHash has r/w lock
if
(
type
==
RSMA_EXEC_OVERFLOW
)
{
while
(
pIter
)
{
SRSmaInfo
*
pInfo
=
*
(
SRSmaInfo
**
)
pIter
;
if
(
taosQueueItemSize
(
pInfo
->
queue
))
{
taosReadAllQitems
(
pInfo
->
queue
,
pInfo
->
qall
);
// queue has mutex lock
qItem
.
qall
=
&
pInfo
->
qall
;
qItem
.
pRSmaInfo
=
pIter
;
taosArrayPush
(
pSubmitQArr
,
&
qItem
);
}
ASSERT
(
taosQueueItemSize
(
pInfo
->
queue
)
==
0
);
pIter
=
taosHashIterate
(
infoHash
,
pIter
);
}
}
else
if
(
type
==
RSMA_EXEC_COMMIT
)
{
while
(
pIter
)
{
SRSmaInfo
*
pInfo
=
*
(
SRSmaInfo
**
)
pIter
;
if
(
taosQueueItemSize
(
pInfo
->
iQueue
))
{
taosReadAllQitems
(
pInfo
->
iQueue
,
pInfo
->
iQall
);
qItem
.
qall
=
&
pInfo
->
iQall
;
qItem
.
pRSmaInfo
=
pIter
;
taosArrayPush
(
pSubmitQArr
,
&
qItem
);
}
ASSERT
(
taosQueueItemSize
(
pInfo
->
iQueue
)
==
0
);
pIter
=
taosHashIterate
(
infoHash
,
pIter
);
}
}
else
{
ASSERT
(
0
);
}
atomic_store_64
(
&
pRSmaStat
->
qBufSize
,
0
);
int32_t
qSize
=
taosArrayGetSize
(
pSubmitQArr
);
for
(
int32_t
i
=
0
;
i
<
qSize
;
++
i
)
{
SRSmaExecQItem
*
pItem
=
taosArrayGet
(
pSubmitQArr
,
i
);
while
(
1
)
{
void
*
msg
=
NULL
;
taosGetQitem
(
*
(
STaosQall
**
)
pItem
->
qall
,
(
void
**
)
&
msg
);
if
(
msg
)
{
if
(
taosArrayPush
(
pSubmitArr
,
&
msg
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
bool
isBusy
=
false
;
while
(
true
)
{
isBusy
=
false
;
// step 1: rsma exec - consume data in buffer queue for all suids
if
(
type
==
RSMA_EXEC_OVERFLOW
||
type
==
RSMA_EXEC_COMMIT
)
{
void
*
pIter
=
taosHashIterate
(
infoHash
,
NULL
);
// infoHash has r/w lock
while
(
pIter
)
{
SRSmaInfo
*
pInfo
=
*
(
SRSmaInfo
**
)
pIter
;
int64_t
itemSize
=
0
;
if
((
itemSize
=
taosQueueItemSize
(
pInfo
->
queue
))
||
RSMA_INFO_ITEM
(
pInfo
,
0
)
->
fetchLevel
||
RSMA_INFO_ITEM
(
pInfo
,
1
)
->
fetchLevel
)
{
smaDebug
(
"vgId:%d, queueItemSize is %"
PRIi64
" execType:%"
PRIi8
,
SMA_VID
(
pSma
),
itemSize
,
type
);
if
(
atomic_val_compare_exchange_8
(
&
pInfo
->
assigned
,
0
,
1
)
==
0
)
{
taosReadAllQitems
(
pInfo
->
queue
,
pInfo
->
qall
);
// queue has mutex lock
int32_t
qallItemSize
=
taosQallItemSize
(
pInfo
->
qall
);
if
(
qallItemSize
>
0
)
{
tdRSmaBatchExec
(
pSma
,
pInfo
,
pInfo
->
qall
,
pSubmitArr
,
type
);
}
if
(
type
==
RSMA_EXEC_OVERFLOW
)
{
tdRSmaFetchAllResult
(
pSma
,
pInfo
,
pSubmitArr
);
}
if
(
qallItemSize
>
0
)
{
// subtract the item size after the task finished, commit should wait for all items be consumed
atomic_fetch_sub_64
(
&
pRSmaStat
->
nBufItems
,
qallItemSize
);
isBusy
=
true
;
}
ASSERT
(
1
==
atomic_val_compare_exchange_8
(
&
pInfo
->
assigned
,
1
,
0
));
}
}
}
else
{
pIter
=
taosHashIterate
(
infoHash
,
pIter
);
}
if
(
type
==
RSMA_EXEC_COMMIT
)
{
break
;
}
}
int32_t
size
=
taosArrayGetSize
(
pSubmitArr
);
if
(
size
>
0
)
{
SRSmaInfo
*
pInfo
=
*
(
SRSmaInfo
**
)
pItem
->
pRSmaInfo
;
for
(
int32_t
i
=
1
;
i
<=
TSDB_RETENTION_L2
;
++
i
)
{
if
(
tdExecuteRSmaImpl
(
pSma
,
pSubmitArr
->
pData
,
size
,
STREAM_INPUT__MERGED_SUBMIT
,
pInfo
,
type
,
i
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
#if 0
else if (type == RSMA_EXEC_COMMIT) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->iQueue)) {
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
taosReadAllQitems(pInfo->iQueue, pInfo->iQall); // queue has mutex lock
int32_t qallItemSize = taosQallItemSize(pInfo->iQall);
if (qallItemSize > 0) {
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
nIdle = 0;
// batch exec
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
}
// tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
}
}
ASSERT(taosQueueItemSize(pInfo->iQueue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
tdFreeRSmaSubmitItems
(
pSubmitArr
);
taosArrayClear
(
pSubmitArr
);
break;
}
#endif
else
{
ASSERT
(
0
);
}
}
// step 2: rsma fetch - consume data in buffer queue for suids triggered by timer
if
(
taosHashGetSize
(
RSMA_FETCH_HASH
(
pRSmaStat
))
<=
0
)
{
goto
_end
;
}
pIter
=
taosHashIterate
(
RSMA_FETCH_HASH
(
pRSmaStat
),
NULL
);
if
(
pIter
)
{
tdRSmaConsumeAndFetch
(
pSma
,
*
(
int64_t
*
)
taosHashGetKey
(
pIter
,
NULL
),
*
(
int8_t
*
)
pIter
,
pSubmitArr
);
while
((
pIter
=
taosHashIterate
(
RSMA_FETCH_HASH
(
pRSmaStat
),
pIter
)))
{
tdRSmaConsumeAndFetch
(
pSma
,
*
(
int64_t
*
)
taosHashGetKey
(
pIter
,
NULL
),
*
(
int8_t
*
)
pIter
,
pSubmitArr
);
if
(
atomic_load_64
(
&
pRSmaStat
->
nBufItems
)
<=
0
)
{
if
(
pVnode
->
inClose
)
{
break
;
}
tsem_wait
(
&
pRSmaStat
->
notEmpty
);
if
(
pVnode
->
inClose
&&
(
atomic_load_64
(
&
pRSmaStat
->
nBufItems
)
<=
0
))
{
smaInfo
(
"vgId:%d, exec task end, inClose:%d, nBufItems:%"
PRIi64
,
SMA_VID
(
pSma
),
pVnode
->
inClose
,
atomic_load_64
(
&
pRSmaStat
->
nBufItems
));
break
;
}
}
}
}
// end of while(true)
_end:
taosArrayDestroy
(
pSubmitArr
);
taosArrayDestroy
(
pSubmitQArr
);
return
TSDB_CODE_SUCCESS
;
_err:
taosArrayDestroy
(
pSubmitArr
);
taosArrayDestroy
(
pSubmitQArr
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -1905,15 +1826,21 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) {
goto
_err
;
}
smaDebug
(
"vgId:%d, begin to process rsma exec msg by TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
if
(
tdRSmaProcessExecImpl
(
pSma
,
RSMA_EXEC_OVERFLOW
)
<
0
)
{
goto
_err
;
int8_t
nOld
=
atomic_fetch_add_8
(
&
pRSmaStat
->
nExecutor
,
1
);
if
(
nOld
<
TMIN
(
RSMA_EXECUTOR_MAX
,
tsNumOfVnodeQueryThreads
/
2
))
{
if
(
tdRSmaProcessExecImpl
(
pSma
,
RSMA_EXEC_OVERFLOW
)
<
0
)
{
goto
_err
;
}
}
else
{
atomic_fetch_sub_8
(
&
pRSmaStat
->
nExecutor
,
1
);
}
atomic_store_8
(
&
pRSmaStat
->
execStat
,
0
);
smaDebug
(
"vgId:%d, success to process rsma exec msg by TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
return
TSDB_CODE_SUCCESS
;
_err:
atomic_
store_8
(
&
pRSmaStat
->
execStat
,
0
);
atomic_
fetch_sub_8
(
&
pRSmaStat
->
nExecutor
,
1
);
smaError
(
"vgId:%d, failed to process rsma exec msg by TID:%p since %s"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
(),
terrstr
());
return
TSDB_CODE_FAILED
;
...
...
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
4bc8e086
...
...
@@ -375,6 +375,9 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if
(
TABLE_IS_ROLLUP
(
mr
.
me
.
flags
))
{
param
=
&
mr
.
me
.
stbEntry
.
rsmaParam
;
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
!
pInfo
->
iTaskInfo
[
i
])
{
continue
;
}
if
(
tdCloneQTaskInfo
(
pSma
,
pInfo
->
taskInfo
[
i
],
pInfo
->
iTaskInfo
[
i
],
param
,
pInfo
->
suid
,
i
)
<
0
)
{
goto
_err
;
}
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
4bc8e086
...
...
@@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) {
vInfo
(
"vgId:%d, start to commit, commit ID:%"
PRId64
" version:%"
PRId64
,
TD_VID
(
pVnode
),
pVnode
->
state
.
commitID
,
pVnode
->
state
.
applied
);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit
(
pVnode
->
pSma
);
vnodeBufPoolUnRef
(
pVnode
->
inUse
);
pVnode
->
inUse
=
NULL
;
pVnode
->
state
.
commitTerm
=
pVnode
->
state
.
applyTerm
;
// save info
...
...
@@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) {
}
walBeginSnapshot
(
pVnode
->
pWal
,
pVnode
->
state
.
applied
);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
if
(
smaAsyncPreCommit
(
pVnode
->
pSma
)
<
0
){
ASSERT
(
0
);
return
-
1
;
}
vnodeBufPoolUnRef
(
pVnode
->
inUse
);
pVnode
->
inUse
=
NULL
;
// commit each sub-system
if
(
metaCommit
(
pVnode
->
pMeta
)
<
0
)
{
ASSERT
(
0
);
...
...
@@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) {
}
if
(
VND_IS_RSMA
(
pVnode
))
{
smaAsyncCommit
(
pVnode
->
pSma
);
if
(
smaAsyncCommit
(
pVnode
->
pSma
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tsdbCommit
(
VND_RSMA0
(
pVnode
))
<
0
)
{
ASSERT
(
0
);
...
...
@@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) {
// postCommit
// smaSyncPostCommit(pVnode->pSma);
smaAsyncPostCommit
(
pVnode
->
pSma
);
if
(
smaAsyncPostCommit
(
pVnode
->
pSma
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// apply the commit (TODO)
walEndSnapshot
(
pVnode
->
pWal
);
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
4bc8e086
...
...
@@ -87,6 +87,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode
->
msgCb
=
msgCb
;
taosThreadMutexInit
(
&
pVnode
->
lock
,
NULL
);
pVnode
->
blocked
=
false
;
pVnode
->
inClose
=
false
;
tsem_init
(
&
pVnode
->
syncSem
,
0
,
0
);
tsem_init
(
&
(
pVnode
->
canCommit
),
0
,
1
);
...
...
@@ -181,6 +182,8 @@ _err:
void
vnodePreClose
(
SVnode
*
pVnode
)
{
if
(
pVnode
)
{
syncLeaderTransfer
(
pVnode
->
sync
);
pVnode
->
inClose
=
true
;
smaPreClose
(
pVnode
->
pSma
);
}
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
4bc8e086
...
...
@@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return
qWorkerProcessQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_SCH_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_VND_FETCH_RSMA
:
return
smaProcessFetch
(
pVnode
->
pSma
,
pMsg
);
case
TDMT_VND_EXEC_RSMA
:
return
smaProcessExec
(
pVnode
->
pSma
,
pMsg
);
default:
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4bc8e086
...
...
@@ -3137,6 +3137,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
initResultRow
(
resultRow
);
pInfo
->
resultRowInfo
.
cur
=
(
SResultRowPosition
){.
pageId
=
resultRow
->
pageId
,
.
offset
=
resultRow
->
offset
};
// releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
}
if
(
offset
!=
length
)
{
...
...
source/util/src/tqueue.c
浏览文件 @
4bc8e086
...
...
@@ -298,7 +298,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
return
num
;
}
void
taosResetQitems
(
STaosQall
*
qall
)
{
qall
->
current
=
qall
->
start
;
}
void
taosResetQitems
(
STaosQall
*
qall
)
{
qall
->
current
=
qall
->
start
;
}
int32_t
taosQallItemSize
(
STaosQall
*
qall
)
{
return
qall
->
numOfItems
;
}
STaosQset
*
taosOpenQset
()
{
STaosQset
*
qset
=
taosMemoryCalloc
(
sizeof
(
STaosQset
),
1
);
...
...
tests/script/tsim/sma/rsmaPersistenceRecovery.sim
浏览文件 @
4bc8e086
...
...
@@ -35,6 +35,7 @@ sleep 7000
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
...
...
@@ -51,6 +52,7 @@ endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
...
...
@@ -89,6 +91,7 @@ system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
...
...
@@ -104,6 +107,7 @@ endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
...
...
@@ -141,6 +145,7 @@ sleep 7000
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file/mem rows $rows > 2
return -1
...
...
@@ -163,6 +168,7 @@ endi
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file/mem rows $rows > 2
return -1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录