提交 c1393fb2 编写于 作者: K kailixu

chore: invoke streamStateCommit for rsma

上级 e8cdf132
...@@ -105,17 +105,16 @@ struct SRSmaFS { ...@@ -105,17 +105,16 @@ struct SRSmaFS {
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit int64_t refId; // shared by fetch tasks
int64_t refId; // shared by fetch tasks volatile int64_t nBufItems; // number of items in queue buffer
volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) volatile int32_t nFetchAll; // active number of fetch all
volatile int32_t nFetchAll; // active number of fetch all volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t triggerStat; // shared by fetch tasks volatile int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t commitStat; // 0 not in committing, 1 in committing volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo SRSmaFS fs; // for recovery/snapshot r/w
SRSmaFS fs; // for recovery/snapshot r/w SHashObj *infoHash; // key: suid, value: SRSmaInfo
SHashObj *infoHash; // key: suid, value: SRSmaInfo tsem_t notEmpty; // has items in queue buffer
tsem_t notEmpty; // has items in queue buffer
}; };
struct SSmaStat { struct SSmaStat {
...@@ -156,9 +155,9 @@ struct SRSmaInfo { ...@@ -156,9 +155,9 @@ struct SRSmaInfo {
int16_t padding; int16_t padding;
T_REF_DECLARE() T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2]; SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq STaosQall *qall; // buffer qall of SubmitReq
}; };
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
...@@ -221,6 +220,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); ...@@ -221,6 +220,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
...@@ -234,7 +234,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { ...@@ -234,7 +234,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
} }
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -149,13 +149,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { ...@@ -149,13 +149,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
tdSmaLoopsCheck(&nLoops, 1000); tdSmaLoopsCheck(&nLoops, 1000);
} }
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1",
pRSmaStat->commitAppliedVer)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
// step 2: wait for all triggered fetch tasks to finish // step 2: wait for all triggered fetch tasks to finish
nLoops = 0; nLoops = 0;
...@@ -183,6 +176,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { ...@@ -183,6 +176,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit; if (!isCommit) goto _exit;
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
// all rsma results are written completely // all rsma results are written completely
......
...@@ -1052,6 +1052,47 @@ _err: ...@@ -1052,6 +1052,47 @@ _err:
return code; return code;
} }
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0;
int32_t lino = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
terrno = code;
return code;
}
/** /**
* @brief trigger to get rsma result in async mode * @brief trigger to get rsma result in async mode
* *
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册