diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d0926948dcbc03898755dfe5f3e562101621ce3c..335d20d0960a2f29ca9736c33b994297c7d77a1f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -230,12 +230,12 @@ typedef struct { int32_t totalLen; int32_t len; // head of SSubmitBlk - // int64_t uid; // table unique id - // int64_t suid; // stable id - // int32_t sversion; // data schema version - // int32_t dataLen; // data part length, not including the SSubmitBlk head - // int32_t schemaLen; // schema length, if length is 0, no schema exists - // int16_t numOfRows; // total number of rows in current submit block + int64_t uid; // table unique id + int64_t suid; // stable id + int32_t sversion; // data schema version + int32_t dataLen; // data part length, not including the SSubmitBlk head + int32_t schemaLen; // schema length, if length is 0, no schema exists + int16_t numOfRows; // total number of rows in current submit block // head of SSubmitBlk const void* pMsg; } SSubmitMsgIter; @@ -249,10 +249,10 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts // 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later // 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter -// int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); -// int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -// int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); -// STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); +int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); +int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); typedef struct { int32_t index; // index of failed block in submit blocks diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 05fd9e301bf535739e642ca77022978e6220dbc8..e03850ef00452497b5d2ec77d06c6798c8286df9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -93,7 +93,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { return row; } } -#if 0 + // TODO: KEEP one suite of iterator API finally. // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts // 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later @@ -173,7 +173,7 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { return row; } } -#endif + int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index dddc17081398a961b5a9122b975da46a632614f8..8af3068b8d5846de4158af32001f70b2d8734381 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -56,7 +56,6 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); typedef enum { TSDB_FILE_HEAD = 0, // .head TSDB_FILE_DATA, // .data diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 434c5bc5bb997973f544c7d74eb29b35b6594873..d5eb932b81e6650b41e13cf0fb0cef6e14f8f50d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -197,10 +197,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) } memcpy(data, msg, msgLen); - if (msgType == TDMT_VND_SUBMIT) { - if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { - return -1; - } + // make sure msgType == TDMT_VND_SUBMIT + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { + return -1; } SRpcMsg req = { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index eb45577e0a491ab4ebeb02752f5c07edc3d3bcde..967d56edfef3bdb343a650dacc424d418f4983be 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -33,24 +33,24 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + // pMsg->length = htonl(pMsg->length); + // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); // iterate and convert - if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; while (true) { - if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; + if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (pReadHandle->pBlock == NULL) break; - pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); - pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid); - pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion); - pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen); - pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen); - pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); + // pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); + // pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid); + // pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion); + // pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen); + // pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen); + // pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); } - if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; pReadHandle->ver = ver; memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); return 0; @@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t bool tqNextDataBlock(STqReadHandle* pHandle) { while (1) { - if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; } if (pHandle->pBlock == NULL) return false; @@ -66,7 +66,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/ /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ ASSERT(pHandle->tbIdHash); - void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); + void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t)); if (ret != NULL) { /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/ @@ -88,16 +88,18 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p // TODO set to real sversion int32_t sversion = 0; if (pHandle->sver != sversion) { - pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); - + pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); +#if 0 tb_uid_t quid; - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid); if (pTbCfg->type == META_CHILD_TABLE) { quid = pTbCfg->ctbCfg.suid; } else { - quid = pHandle->pBlock->uid; + quid = pHandle->msgIter.uid; } pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); +#endif + pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->sver = sversion; } @@ -151,8 +153,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p tdSTSRowIterInit(&iter, pTschema); STSRow* row; int32_t curRow = 0; - tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); - while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { + tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); + while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); // get all wanted col of that block for (int32_t i = 0; i < colActual; i++) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 9143e8ed1247e06ee81a0578908f81480e47cc1d..806512f07d458a0f9cfd1e603678d671d277d2fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -678,9 +678,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers return TSDB_CODE_FAILED; } - if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; @@ -705,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers SInterval interval = {0}; TSKEY lastWinSKey = INT64_MIN; - if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { + if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } while (true) { - tGetSubmitMsgNext(&msgIter, &pBlock); + tGetSubmitMsgNextEx(&msgIter, &pBlock); if (!pBlock) break; STSmaWrapper *pSW = NULL; STSma *pTSma = NULL; SSubmitBlkIter blkIter = {0}; - if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { + if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) { pSW = tdFreeTSmaWrapper(pSW); break; } while (true) { - STSRow *row = tGetSubmitBlkNext(&blkIter); + STSRow *row = tGetSubmitBlkNextEx(&blkIter); if (!row) { tdFreeTSmaWrapper(pSW); break; @@ -1764,7 +1761,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } else { - tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid); + tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid); } return TSDB_CODE_SUCCESS; @@ -1791,7 +1788,7 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { return TSDB_CODE_FAILED; } } - if (!taosArrayPush(pStore->tbUids, &uid)) { + if (!taosArrayPush(pStore->tbUids, uid)) { return TSDB_CODE_FAILED; } } @@ -1806,14 +1803,14 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { if (uid) { SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); if (uidArray && ((uidArray = *(SArray **)uidArray))) { - taosArrayPush(uidArray, &uid); + taosArrayPush(uidArray, uid); } else { SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t)); if (!pUidArray) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } - if (!taosArrayPush(pUidArray, &uid)) { + if (!taosArrayPush(pUidArray, uid)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } @@ -1975,12 +1972,12 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { // pMsg->length = htonl(pMsg->length); // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1; while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1; if (!pBlock) break; - tsdbUidStorePut(pStore, pBlock->suid, NULL); + tsdbUidStorePut(pStore, msgIter.suid, NULL); } if (terrno != TSDB_CODE_SUCCESS) return -1; @@ -2014,6 +2011,8 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (pRSmaInfo->taskInfo[0]) { + tsdbDebug("vgId:%d execute rsma task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), pRSmaInfo->taskInfo[0], + *suid); qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType); while (1) { SSDataBlock *output; @@ -2026,7 +2025,11 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in } taosArrayPush(pResult, output); } - blockDebugShowData(pResult); + if (taosArrayGetSize(pResult) > 0) { + blockDebugShowData(pResult); + } else { + tsdbWarn("vgId:%d no sma data generated since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } } // if (pRSmaInfo->taskInfo[1]) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 66cc928c129f7329a42b93a9d80268b3a1c39d8f..2ab69e9eb661e5a1f4f1ad061dcb4f1cbcc55715 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -81,9 +81,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_DROP_TABLE: break; case TDMT_VND_SUBMIT: + tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); pRsp->msgType = TDMT_VND_SUBMIT_RSP; vnodeProcessSubmitReq(pVnode, ptr, pRsp); - tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); break; case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),