提交 e6dd04f5 编写于 作者: C Cary Xu

refactor: rsma restore

上级 0597f3b9
......@@ -78,7 +78,6 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
void debugCheckTags(STag *pTag); // TODO: remove
// STRUCT =================
struct STColumn {
......
......@@ -1886,7 +1886,7 @@ typedef struct SVCreateStbReq {
int8_t rollup;
SSchemaWrapper schemaRow;
SSchemaWrapper schemaTag;
SRSmaParam pRSmaParam;
SRSmaParam rsmaParam;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
......
......@@ -862,21 +862,6 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
printf("\n");
}
void debugCheckTags(STag *pTag) {
switch (pTag->flags) {
case 0x0:
case 0x20:
case 0x40:
case 0x60:
break;
default:
ASSERT(0);
}
ASSERT(pTag->nTag <= 128 && pTag->nTag >= 0);
ASSERT(pTag->ver <= 512 && pTag->ver >= 0); // temp condition for pTag->ver
}
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
int32_t n = 0;
......@@ -999,7 +984,6 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
debugPrintSTag(*ppTag, __func__, __LINE__);
#endif
debugCheckTags(*ppTag); // TODO: remove this line after debug
return code;
_err:
......
......@@ -4763,7 +4763,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
if (pReq->rollup) {
if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
if (tEncodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1;
}
tEndEncode(pCoder);
......@@ -4779,7 +4779,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
if (pReq->rollup) {
if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
if (tDecodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1;
}
tEndDecode(pCoder);
......
......@@ -427,17 +427,17 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.schemaTag.pSchema = pStb->pTags;
if (req.rollup) {
req.pRSmaParam.maxdelay[0] = pStb->maxdelay[0];
req.pRSmaParam.maxdelay[1] = pStb->maxdelay[1];
req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
if (pStb->ast1Len > 0) {
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[0], &req.pRSmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[0]) < 0) {
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) {
goto _err;
}
}
if (pStb->ast2Len > 0) {
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[1], &req.pRSmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[1]) < 0) {
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1]) < 0) {
goto _err;
}
}
......@@ -470,12 +470,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
tEncoderClear(&encoder);
*pContLen = contLen;
taosMemoryFreeClear(req.pRSmaParam.qmsg[0]);
taosMemoryFreeClear(req.pRSmaParam.qmsg[1]);
taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
return pHead;
_err:
taosMemoryFreeClear(req.pRSmaParam.qmsg[0]);
taosMemoryFreeClear(req.pRSmaParam.qmsg[1]);
taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
return NULL;
}
......
......@@ -199,15 +199,20 @@ typedef struct {
uint64_t groupId;
} STableKeyInfo;
#define TABLE_ROLLUP_ON ((int8_t)0x1)
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
struct SMetaEntry {
int64_t version;
int8_t type;
int8_t flags; // TODO: need refactor?
tb_uid_t uid;
char *name;
union {
struct {
SSchemaWrapper schemaRow;
SSchemaWrapper schemaTag;
SRSmaParam rsmaParam;
} stbEntry;
struct {
int64_t ctime;
......
......@@ -24,22 +24,25 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) {
if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor?
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
if (TABLE_IS_ROLLUP(pME->flags)) {
if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
}
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
if (pME->ctbEntry.commentLen > 0){
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
}
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
if (pME->ntbEntry.commentLen > 0){
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
}
......@@ -64,23 +67,26 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeI8(pCoder, &pME->flags) < 0) return -1; // TODO: need refactor?
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
if (TABLE_IS_ROLLUP(pME->flags)) {
if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
}
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
if (pME->ctbEntry.commentLen > 0){
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0)
return -1;
}
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
if (pME->ntbEntry.commentLen > 0){
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
}
......
......@@ -342,6 +342,7 @@ SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
if (pStbCur == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -351,6 +352,7 @@ SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
if (ret < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaULock(pMeta);
taosMemoryFree(pStbCur);
return NULL;
......
......@@ -139,6 +139,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
me.name = pReq->name;
me.stbEntry.schemaRow = pReq->schemaRow;
me.stbEntry.schemaTag = pReq->schemaTag;
if (pReq->rollup) {
TABLE_SET_ROLLUP(me.flags);
me.stbEntry.rsmaParam = pReq->rsmaParam;
}
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
......
......@@ -174,11 +174,8 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
// step 1: set persistence task cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
// step 2: clean timer
// step 2: stop the persistence timer
taosTmrStopA(&RSMA_TMR_ID(pStat));
if (RSMA_TMR_HANDLE(pStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
}
// step 3: wait the persistence thread to finish
int32_t nLoops = 0;
......@@ -194,7 +191,6 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
sched_yield();
nLoops = 0;
}
taosMsleep(1000); // TODO: remove this line when release
}
}
......@@ -219,7 +215,11 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
sched_yield();
nLoops = 0;
}
taosMsleep(1000); // TODO: remove this line when release
}
// step 6: free the timer handle
if (RSMA_TMR_HANDLE(pStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
}
}
}
......
......@@ -18,6 +18,7 @@
static int32_t smaEvalDays(SRetention *r, int8_t precision);
static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
static int32_t smaRestore(SSma *pSma);
#define SMA_SET_KEEP_CFG(l) \
do { \
......@@ -120,6 +121,12 @@ int32_t smaOpen(SVnode *pVnode) {
}
pVnode->pSma = pSma;
// restore the sma
if (smaRestore(pSma) < 0) {
goto _err;
}
return 0;
_err:
taosMemoryFreeClear(pSma);
......@@ -127,7 +134,7 @@ _err:
}
int32_t smaCloseEnv(SSma *pSma) {
if(pSma) {
if (pSma) {
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
}
......@@ -153,13 +160,41 @@ int32_t smaClose(SSma *pSma) {
/**
* @brief rsma env restore
*
* @param pSma
* @return int32_t
*
* @param pSma
* @return int32_t
*/
int32_t smaRestore(SSma *pSma) {
if (!pSma) return 0;
static int32_t smaRestore(SSma *pSma) {
// iterate all stables to restore the rsma env
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
smaError("failed to restore rsma since get stb id list error: %s", terrstr());
return TSDB_CODE_FAILED;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) {
metaReaderClear(&mr);
taosArrayDestroy(suidList);
smaError("failed to get table meta for %" PRIi64 " since %s", suid, terrstr());
return TSDB_CODE_FAILED;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < 2; ++i) {
smaDebug("%s:%d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, __func__, __LINE__, suid, i,
param->maxdelay[i], i, param->watermark[i]);
}
}
}
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -246,7 +246,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SMeta *pMeta = pVnode->pMeta;
SMsgCb *pMsgCb = &pVnode->msgCb;
SRSmaParam *param = &pReq->pRSmaParam;
SRSmaParam *param = &pReq->rsmaParam;
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
......@@ -502,8 +502,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
}
taosMemoryFreeClear(pReq);
} else if (terrno == 0) {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
}
tdDestroySDataBlockArray(pResult);
......@@ -661,18 +663,6 @@ static void *tdRSmaPersistExec(void *param) {
goto _end;
}
#if 0
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
ASSERT(0);
} else {
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
}
}
#endif
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
if (!infoHash) {
goto _end;
......@@ -852,6 +842,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
} break;
default: {
smaWarn("%s:%d rsma persistence not start since unknown stat %" PRIi8, __func__, __LINE__, tmrStat);
ASSERT(0);
} break;
}
}
\ No newline at end of file
......@@ -2886,6 +2886,9 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
*/
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
if(!pCur) {
return TSDB_CODE_FAILED;
}
while (1) {
tb_uid_t id = metaStbCursorNext(pCur);
......
......@@ -811,7 +811,7 @@ int32_t getMaximumIdleDurationSec();
* nOptrWithVal: *nOptrWithVal save the number of optr with value
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length);
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length, int32_t *nOptrWithVal);
/*
* ops: root operator, created by caller
......
......@@ -222,7 +222,13 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
return TSDB_CODE_INVALID_PARA;
}
return encodeOperator(pTaskInfo->pRoot, pOutput, len);
int32_t nOptrWithVal = 0;
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
taosMemoryFreeClear(*pOutput);
*len = 0;
}
return code;
}
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
......
......@@ -4472,12 +4472,12 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa
return 0;
}
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
int32_t code = TDB_CODE_SUCCESS;
char* pCurrent = NULL;
int32_t currLength = 0;
if (ops->fpSet.encodeResultRow) {
if (result == NULL || length == NULL) {
if (result == NULL || length == NULL || nOptrWithVal == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
......@@ -4488,8 +4488,13 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
*result = NULL;
}
return code;
} else if (currLength == 0) {
ASSERT(!pCurrent);
goto _downstream;
}
++(*nOptrWithVal);
ASSERT(currLength >= 0);
if (*result == NULL) {
......@@ -4516,8 +4521,10 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
taosMemoryFree(pCurrent);
*length = *(int32_t*)(*result);
}
_downstream:
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length);
code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
if (code != TDB_CODE_SUCCESS) {
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册