diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 2244b91c28136b7d4f076259888ef8906fab011b..ecd8db8ea9a3457c49b267a7ab51c42ee6cad40e 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -113,7 +113,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { if (!pDataBlocks) { terrno = TSDB_CODE_TSMA_INVALID_PTR; smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); - return terrno; + return TSDB_CODE_FAILED; } if (taosArrayGetSize(pDataBlocks) <= 0) { @@ -127,9 +127,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { return TSDB_CODE_FAILED; } - SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); - SSmaStat *pStat = NULL; - STSmaStat *pItem = NULL; + SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); + SSmaStat *pStat = NULL; + STSmaStat *pTsmaStat = NULL; if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { terrno = TSDB_CODE_TSMA_INVALID_STAT; @@ -137,32 +137,43 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } tdRefSmaStat(pSma, pStat); - pItem = &pStat->tsmaStat; - - ASSERT(pItem); + pTsmaStat = SMA_TSMA_STAT(pStat); - if (!pItem->pTSma) { + if (!pTsmaStat->pTSma) { STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); if (!pTSma) { - terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META; - smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); - return TSDB_CODE_FAILED; + smaError("vgId:%d, failed to get STSma while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), + indexUid, tstrerror(terrno)); + goto _err; + } + pTsmaStat->pTSma = pTSma; + pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1); + if (!pTsmaStat->pTSchema) { + smaError("vgId:%d, failed to get STSchema while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), + indexUid, tstrerror(terrno)); + goto _err; } - pItem->pTSma = pTSma; - pItem->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1); - ASSERT(pItem->pTSchema); // TODO } - ASSERT(pItem->pTSma->indexUid == indexUid); - - SSubmitReq *pSubmitReq = NULL; + if (pTsmaStat->pTSma->indexUid != indexUid) { + terrno = TSDB_CODE_VND_APP_ERROR; + smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 "(!=%" PRIi64 ") failed since %s", SMA_VID(pSma), indexUid, + pTsmaStat->pTSma->indexUid, tstrerror(terrno)); + goto _err; + } - pSubmitReq = tdBlockToSubmit((const SArray *)msg, pItem->pTSchema, true, pItem->pTSma->dstTbUid, - pItem->pTSma->dstTbName, pItem->pTSma->dstVgId); + SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, + pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId); - ASSERT(pSubmitReq); // TODO + if (!pSubmitReq) { + smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), + indexUid, tstrerror(terrno)); + goto _err; + } - ASSERT(!strncasecmp("td.tsma.rst.tb", pItem->pTSma->dstTbName, 14)); +#if 0 + ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)); +#endif SRpcMsg submitReqMsg = { .msgType = TDMT_VND_SUBMIT, @@ -170,9 +181,15 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { .contLen = ntohl(pSubmitReq->length), }; - ASSERT(tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) == 0); + if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) { + smaError("vgId:%d, failed to put SubmitReq msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), + indexUid, tstrerror(terrno)); + goto _err; + } tdUnRefSmaStat(pSma, pStat); - return TSDB_CODE_SUCCESS; +_err: + tdUnRefSmaStat(pSma, pStat); + return TSDB_CODE_FAILED; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index dbbe3d375042ea6c152f39cdfb0137366740a7f9..02a07839272a3c3fce1d64c7847c0b2688655b6f 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -266,6 +266,8 @@ int vnodeCommit(SVnode *pVnode) { ASSERT(0); return -1; } + + pVnode->state.committed = info.state.committed; // apply the commit (TODO) vnodeBufPoolReset(pVnode->onCommit);