提交 eedaeda8 编写于 作者: C Cary Xu

enh: rsma level 1/2 utilize separated version

上级 a12f04b2
......@@ -62,6 +62,7 @@ struct STSmaStat {
struct SRSmaStat {
SSma *pSma;
int64_t submitVer;
int64_t refId; // shared by fetch tasks
void *tmrHandle; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks
......@@ -84,6 +85,7 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
enum {
TASK_TRIGGER_STAT_INIT = 0,
......@@ -208,11 +210,15 @@ struct STFInfo {
// specific fields
union {
struct {
int64_t applyVer[2];
int64_t submitVer;
} qTaskInfo;
};
};
enum {
TD_FTYPE_RSMA_QTASKINFO = 0,
};
struct STFile {
uint8_t state;
STFInfo info;
......
......@@ -172,6 +172,7 @@ int32_t smaPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
......
......@@ -523,6 +523,17 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray);
}
int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
if (level == TSDB_RETENTION_L0) {
return pSma->pVnode->state.applied;
}
SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv));
return atomic_load_64(&pRSmaStat->submitVer);
}
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
SArray *pResult = NULL;
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
......@@ -562,7 +573,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, 1, pReq) < 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pRSmaInfo->pStat->submitVer, 1), pReq) < 0) {
taosMemoryFreeClear(pReq);
goto _err;
}
......@@ -814,6 +825,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
}
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
*committed = 0;
if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
......@@ -828,6 +840,18 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err;
}
STFInfo tFileInfo = {0};
if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
goto _err;
}
ASSERT(tFileInfo.qTaskInfo.submitVer > 0);
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer);
smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer);
SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);
......@@ -1094,6 +1118,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
}
STFile tFile = {0};
if (RSMA_SUBMIT_VER(pRSmaStat) > 0) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true;
}
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
......@@ -1129,12 +1169,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, -1) < 0) {
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
i + 1, TD_TFILE_FULL_NAME(&tFile));
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true;
}
......@@ -1161,6 +1201,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
}
if (isFileCreated) {
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->submitVer);
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno));
......
......@@ -32,6 +32,9 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer);
}
return tlen;
}
......@@ -41,6 +44,11 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
// specific
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer));
}
return buf;
}
......
......@@ -154,7 +154,8 @@ static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArr
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t *pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
......@@ -366,6 +367,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) {
int32_t code = 0;
int8_t level = 0;
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -374,13 +376,13 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = 4096;
pReader->idStr = (idstr != NULL)? strdup(idstr):NULL;
pReader->verRange = (SVersionRange) {.minVer = pCond->startVersion, .maxVer = 10000};
pReader->type = pCond->type;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows);
// todo remove this
......@@ -2379,12 +2381,13 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
}
STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr) {
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
int8_t* pLevel) {
if (VND_IS_RSMA(pVnode)) {
int level = 0;
int8_t level = 0;
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + level;
if (pRetention->keep <= 0) {
if (level > 0) {
......@@ -2398,16 +2401,19 @@ STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions
++level;
}
int32_t vgId = TD_VID(pVnode);
const char* str = (idStr != NULL)? idStr:"";
int32_t vgId = TD_VID(pVnode);
const char* str = (idStr != NULL) ? idStr : "";
if (level == TSDB_RETENTION_L0) {
*pLevel = TSDB_RETENTION_L0;
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
*pLevel = TSDB_RETENTION_L1;
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
return VND_RSMA1(pVnode);
} else {
*pLevel = TSDB_RETENTION_L2;
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
return VND_RSMA2(pVnode);
}
......@@ -2416,6 +2422,14 @@ STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions
return VND_TSDB(pVnode);
}
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = pCond->startVersion, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
}
return (SVersionRange){.minVer = pCond->startVersion, .maxVer = pVnode->state.applied};
}
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
// // filter the queried time stamp in the first place
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册