未验证 提交 f516e3a4 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #15317 from taosdata/feature/TD-11274-3.0

enh: rsma level 2/3 submitReq msg use wal version
...@@ -67,8 +67,6 @@ struct STSmaStat { ...@@ -67,8 +67,6 @@ struct STSmaStat {
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit int64_t commitAppliedVer; // vnode applied version for async commit
int64_t commitSubmitVer; // rsma submit version for async commit
int64_t submitVer; // latest submit version
int64_t refId; // shared by fetch tasks int64_t refId; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing int8_t commitStat; // 0 not in committing, 1 in committing
...@@ -91,7 +89,6 @@ struct SSmaStat { ...@@ -91,7 +89,6 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId) #define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
struct SRSmaInfoItem { struct SRSmaInfoItem {
void *taskInfo; // qTaskInfo_t void *taskInfo; // qTaskInfo_t
...@@ -223,13 +220,6 @@ struct STFInfo { ...@@ -223,13 +220,6 @@ struct STFInfo {
uint32_t ftype; uint32_t ftype;
uint32_t fver; uint32_t fver;
int64_t fsize; int64_t fsize;
// specific fields
union {
struct {
int64_t submitVer;
} qTaskInfo;
};
}; };
enum { enum {
......
...@@ -178,7 +178,6 @@ int32_t smaAsyncPostCommit(SSma* pSma); ...@@ -178,7 +178,6 @@ int32_t smaAsyncPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
......
...@@ -146,7 +146,6 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { ...@@ -146,7 +146,6 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
// step 3: perform persist task for qTaskInfo // step 3: perform persist task for qTaskInfo
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma)); smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
...@@ -317,7 +316,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -317,7 +316,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// step 4: others // step 4: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -560,17 +560,6 @@ static void tdDestroySDataBlockArray(SArray *pArray) { ...@@ -560,17 +560,6 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
if (level == TSDB_RETENTION_L0) {
return pSma->pVnode->state.applied;
}
SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv));
return atomic_load_64(&pRSmaStat->submitVer);
}
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType) { int8_t blkType) {
SArray *pResult = NULL; SArray *pResult = NULL;
...@@ -615,13 +604,16 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche ...@@ -615,13 +604,16 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
goto _err; goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr()); SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err; goto _err;
} }
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%"PRIi64, SMA_VID(pSma),
suid, pItem->level, output->info.version);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
taosArrayClear(pResult); taosArrayClear(pResult);
} else if (terrno == 0) { } else if (terrno == 0) {
...@@ -908,12 +900,8 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { ...@@ -908,12 +900,8 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err; goto _err;
} }
ASSERT(tFileInfo.qTaskInfo.submitVer > 0);
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv; SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer);
smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer);
SRSmaQTaskInfoIter fIter = {0}; SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
...@@ -1266,7 +1254,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1266,7 +1254,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
} }
if (isFileCreated) { if (isFileCreated) {
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->commitSubmitVer);
if (tdUpdateTFileHeader(&tFile) < 0) { if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno)); tstrerror(terrno));
...@@ -1346,6 +1333,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1346,6 +1333,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
tdUnRefRSmaInfo(pSma, pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} break; } break;
case TASK_TRIGGER_STAT_PAUSED: { case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
......
...@@ -32,9 +32,6 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { ...@@ -32,9 +32,6 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen += taosEncodeFixedU32(buf, pInfo->ftype); tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize); tlen += taosEncodeFixedI64(buf, pInfo->fsize);
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer);
}
return tlen; return tlen;
} }
...@@ -44,10 +41,6 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { ...@@ -44,10 +41,6 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver)); buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize)); buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
// specific
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer));
}
return buf; return buf;
} }
......
...@@ -2265,10 +2265,6 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret ...@@ -2265,10 +2265,6 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) { SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion; int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
}
int64_t endVer = 0; int64_t endVer = 0;
if (pCond->endVersion == if (pCond->endVersion ==
-1) { // user not specified end version, set current maximum version of vnode as the endVersion -1) { // user not specified end version, set current maximum version of vnode as the endVersion
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册