diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d5d052c80a689e560499d288480433090d3b1461..a8053d885403a94fa3486e39525209866a59a117 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -181,8 +181,8 @@ typedef struct SField { } SField; typedef struct SRetention { - int32_t freq; - int32_t keep; + int64_t freq; + int64_t keep; int8_t freqUnit; int8_t keepUnit; } SRetention; @@ -240,13 +240,7 @@ typedef struct { // head of SSubmitBlk const void* pMsg; } SSubmitMsgIter; -#if 0 -int32_t tInitSubmitMsgIterOrigin(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); -int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -int32_t tInitSubmitBlkIterOrigin(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); -STSRow* tGetSubmitBlkNextOrigin(SSubmitBlkIter* pIter); -#endif -// TODO: KEEP one suite of iterator API finally. + int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 469d336cddad63cb843ad69b903a4b40cbd3b239..6235dcf895c94554a826fcf431b8db9c5580eba6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,75 +27,7 @@ #define TD_MSG_DICT_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" -#if 0 -int32_t tInitSubmitMsgIterOrigin(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { - if (pMsg == NULL) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; - } - - pIter->totalLen = pMsg->length; - ASSERT(pIter->totalLen > 0); - pIter->len = 0; - pIter->pMsg = pMsg; - if (pMsg->length <= sizeof(SSubmitReq)) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; - } - - return 0; -} - -int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { - ASSERT(pIter->len >= 0); - - if (pIter->len == 0) { - pIter->len += sizeof(SSubmitReq); - } else { - if (pIter->len >= pIter->totalLen) { - ASSERT(0); - } - - SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); - pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); - ASSERT(pIter->len > 0); - } - - if (pIter->len > pIter->totalLen) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - *pPBlock = NULL; - return -1; - } - - *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); - - return 0; -} - -int32_t tInitSubmitBlkIterOrigin(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { - if (pBlock->dataLen <= 0) return -1; - pIter->totalLen = pBlock->dataLen; - pIter->len = 0; - pIter->row = (STSRow *)(pBlock->data + pBlock->schemaLen); - return 0; -} - -STSRow *tGetSubmitBlkNextOrigin(SSubmitBlkIter *pIter) { - STSRow *row = pIter->row; - - if (pIter->len >= pIter->totalLen) { - return NULL; - } else { - pIter->len += TD_ROW_LEN(row); - if (pIter->len < pIter->totalLen) { - pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row)); - } - return row; - } -} -#endif -// TODO: KEEP one suite of iterator API finally. int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; @@ -1679,8 +1611,8 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention *pRetension = taosArrayGet(pReq->pRetensions, i); - if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1; - if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1; + if (tEncodeI64(&encoder, pRetension->freq) < 0) return -1; + if (tEncodeI64(&encoder, pRetension->keep) < 0) return -1; if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1; if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; } @@ -1725,8 +1657,8 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention rentension = {0}; - if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1; - if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1; + if (tDecodeI64(&decoder, &rentension.freq) < 0) return -1; + if (tDecodeI64(&decoder, &rentension.keep) < 0) return -1; if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1; if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1; if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) { @@ -2155,8 +2087,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1; for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) { SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i); - if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1; - if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1; + if (tEncodeI64(&encoder, pRetension->freq) < 0) return -1; + if (tEncodeI64(&encoder, pRetension->keep) < 0) return -1; if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1; if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; } @@ -2199,8 +2131,8 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) { for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) { SRetention rentension = {0}; - if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1; - if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1; + if (tDecodeI64(&decoder, &rentension.freq) < 0) return -1; + if (tDecodeI64(&decoder, &rentension.keep) < 0) return -1; if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1; if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1; if (taosArrayPush(pRsp->pRetensions, &rentension) == NULL) { @@ -2817,8 +2749,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention *pRetension = taosArrayGet(pReq->pRetensions, i); - if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1; - if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1; + if (tEncodeI64(&encoder, pRetension->freq) < 0) return -1; + if (tEncodeI64(&encoder, pRetension->keep) < 0) return -1; if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1; if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; } @@ -2874,8 +2806,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention rentension = {0}; - if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1; - if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1; + if (tDecodeI64(&decoder, &rentension.freq) < 0) return -1; + if (tDecodeI64(&decoder, &rentension.keep) < 0) return -1; if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1; if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1; if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 442bfb5c8adfe5fb666ce21593590a7d578f3e65..5861f8d245f369d4f029e41baf6c5027665b7b24 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -107,8 +107,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { TASSERT(taosArrayGetSize(pDb->cfg.pRetensions) == pDb->cfg.numOfRetensions); SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i); - SDB_SET_INT32(pRaw, dataPos, pRetension->freq, _OVER) - SDB_SET_INT32(pRaw, dataPos, pRetension->keep, _OVER) + SDB_SET_INT64(pRaw, dataPos, pRetension->freq, _OVER) + SDB_SET_INT64(pRaw, dataPos, pRetension->keep, _OVER) SDB_SET_INT8(pRaw, dataPos, pRetension->freqUnit, _OVER) SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER) } @@ -180,8 +180,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { if (pDb->cfg.pRetensions == NULL) goto _OVER; for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { SRetention retension = {0}; - SDB_GET_INT32(pRaw, dataPos, &retension.freq, _OVER) - SDB_GET_INT32(pRaw, dataPos, &retension.keep, _OVER) + SDB_GET_INT64(pRaw, dataPos, &retension.freq, _OVER) + SDB_GET_INT64(pRaw, dataPos, &retension.keep, _OVER) SDB_GET_INT8(pRaw, dataPos, &retension.freqUnit, _OVER) SDB_GET_INT8(pRaw, dataPos, &retension.keepUnit, _OVER) if (taosArrayPush(pDb->cfg.pRetensions, &retension) == NULL) { diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 162d733cc3654bb2d743334ff6758508c7c98683..5215812ac577223b45450a4183c50a7911e4e917 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -40,7 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { return TSDB_CODE_SUCCESS; } - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 92923d487604a5ef258e5c5b3d0f21cb768d9e6a..a9ba2e1de32e67aa0f1f6f036f58aa71d704e4a1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -123,7 +123,7 @@ int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, t int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); void tsdbUidStoreDestory(STbUidStore* pStore); void* tsdbUidStoreFree(STbUidStore* pStore); -int32_t tsdbTriggerRSma(STsdb* pTsdb, SMeta* pMeta, void* pMsg, int32_t inputType); +int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType); typedef struct { int8_t streamType; // sma or other diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 380e376e2495445fbfda5606b645b2bd8370627e..d4532cc3ac03f5ba83677032ba9936b865c2ebb0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -174,6 +174,8 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); +static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType, + qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -1881,8 +1883,10 @@ int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, t return TSDB_CODE_SUCCESS; } + ASSERT(ppStore != NULL); + if (!(*ppStore)) { - if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) { + if (tsdbUidStoreInit(ppStore) != 0) { return TSDB_CODE_FAILED; } } @@ -1978,7 +1982,44 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { return 0; } -int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType, tb_uid_t *suid) { +static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType, + qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention) { + SArray *pResult = NULL; + tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), retention, taskInfo, + suid); + qSetStreamInput(taskInfo, pMsg, inputType); + while (1) { + SSDataBlock *output; + uint64_t ts; + if (qExecTask(taskInfo, &output, &ts) < 0) { + ASSERT(false); + } + if (!output) { + break; + } + if (!pResult) { + pResult = taosArrayInit(0, sizeof(SSDataBlock)); + if (!pResult) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } + + taosArrayPush(pResult, output); + } + + if (taosArrayGetSize(pResult) > 0) { + blockDebugShowData(pResult); + } else { + tsdbWarn("vgId:%d no rsma_1 data generated since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + + taosArrayDestroy(pResult); + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid) { SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); if (!pEnv) { // only applicable when rsma env exists @@ -1988,65 +2029,22 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), *suid); + tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid); return TSDB_CODE_SUCCESS; } - SArray *pResult = NULL; - - pResult = taosArrayInit(0, sizeof(SSDataBlock)); - if (!pResult) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - if (pRSmaInfo->taskInfo[0]) { - tsdbDebug("vgId:%d execute rsma task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), pRSmaInfo->taskInfo[0], - *suid); - qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType); - while (1) { - SSDataBlock *output; - uint64_t ts; - if (qExecTask(pRSmaInfo->taskInfo[0], &output, &ts) < 0) { - ASSERT(false); - } - if (!output) { - break; - } - taosArrayPush(pResult, output); - } - if (taosArrayGetSize(pResult) > 0) { - blockDebugShowData(pResult); - } else { - tsdbWarn("vgId:%d no sma data generated since %s", REPO_ID(pTsdb), tstrerror(terrno)); - } - } - - // if (pRSmaInfo->taskInfo[1]) { - // qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType); - // while (1) { - // SSDataBlock *output; - // uint64_t ts; - // if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) { - // ASSERT(false); - // } - // if (!output) { - // break; - // } - // taosArrayPush(pResult, output); - // } - // blockDebugShowData(pResult); - // } + tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], suid, TSDB_RSMA_RETENTION_1); + tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], suid, TSDB_RSMA_RETENTION_2); } return TSDB_CODE_SUCCESS; } -int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType) { +int32_t tsdbTriggerRSma(STsdb *pTsdb, void *pMsg, int32_t inputType) { SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); if (!pEnv) { // only applicable when rsma env exists @@ -2058,12 +2056,12 @@ int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputTyp tsdbFetchSubmitReqSuids(pMsg, &uidStore); if (uidStore.suid != 0) { - tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, &uidStore.suid); + tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid); void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, pTbSuid); + tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid); pIter = taosHashIterate(uidStore.uidHash, pIter); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4eeba06027f18e19b1bec9c79a9a26ac406d590c..eddd295e2ebc1df21e70fbac07281f2baad9c412 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -461,7 +461,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in SSubmitRsp rsp = {0}; pRsp->code = 0; - tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); // handle the request if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) { pRsp->code = terrno; @@ -470,7 +470,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in // pRsp->msgType = TDMT_VND_SUBMIT_RSP; // vnodeProcessSubmitReq(pVnode, ptr, pRsp); - // tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + // tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); // encode the response (TODO) pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4e2d75a79f0dd730939e8e15379eb1ebb506a4f9..30e55420d478af25032139cd2a8fcaa1c9492449 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1947,9 +1947,6 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) { return pCxt->errCode; } - if (!TIME_IS_VAR_DURATION(pVal->unit)) { - pVal->datum.i = convertTimeFromPrecisionToUnit(pVal->datum.i, pVal->node.resType.precision, pVal->unit); - } } }