diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fbfdfc1e39f49f9bd0bb09c141f2cdcfa0a11a32..e0c99e6054732ce7eb477be176ef0446ae403d07 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -227,8 +227,16 @@ typedef struct { } SSubmitBlkIter; typedef struct { - int32_t totalLen; - int32_t len; + int32_t totalLen; + int32_t len; + // head of SSubmitBlk + int64_t uid; // table unique id + int64_t suid; // stable id + int32_t sversion; // data schema version + int32_t dataLen; // data part length, not including the SSubmitBlk head + int32_t schemaLen; // schema length, if length is 0, no schema exists + int16_t numOfRows; // total number of rows in current submit block + // head of SSubmitBlk const void* pMsg; } SSubmitMsgIter; @@ -237,6 +245,15 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); +// TODO: KEEP one suite of iterator API finally. +// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts +// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later +// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter +int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); +int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); + typedef struct { int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 04f67b584faca5e50fbd0d7e2fc835b497e8fddb..4d289147d03840c7d8edf2868d90682d32b2db75 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -57,7 +57,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); * @param type * @return */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); /** * Set multiple input data blocks for the stream scan. @@ -67,7 +67,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool * @param type * @return */ -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted); +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); /** * Update the table id list, add or remove. diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index eb7c9a763f9e72b22e05da7357566755defca0b4..14195471b62536d6961cdc61add51384cd1f4e48 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -94,6 +94,86 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { } } +// TODO: KEEP one suite of iterator API finally. +// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts +// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later +// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter + +int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { + if (pMsg == NULL) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + pIter->totalLen = htonl(pMsg->length); + ASSERT(pIter->totalLen > 0); + pIter->len = 0; + pIter->pMsg = pMsg; + if (pIter->totalLen <= sizeof(SSubmitReq)) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + return 0; +} + +int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + ASSERT(pIter->len >= 0); + + if (pIter->len == 0) { + pIter->len += sizeof(SSubmitReq); + } else { + if (pIter->len >= pIter->totalLen) { + ASSERT(0); + } + + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen); + ASSERT(pIter->len > 0); + } + + if (pIter->len > pIter->totalLen) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + *pPBlock = NULL; + return -1; + } + + if (pIter->len == pIter->totalLen) { + *pPBlock = NULL; + } else { + *pPBlock = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->uid = htobe64((*pPBlock)->uid); + pIter->suid = htobe64((*pPBlock)->suid); + pIter->sversion = htonl((*pPBlock)->sversion); + pIter->dataLen = htonl((*pPBlock)->dataLen); + pIter->schemaLen = htonl((*pPBlock)->schemaLen); + pIter->numOfRows = htons((*pPBlock)->numOfRows); + } + return 0; +} + +int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { + if (pMsgIter->dataLen <= 0) return -1; + pIter->totalLen = pMsgIter->dataLen; + pIter->len = 0; + pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen); + return 0; +} + +STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { + STSRow *row = pIter->row; + + if (pIter->len >= pIter->totalLen) { + return NULL; + } else { + pIter->len += TD_ROW_LEN(row); + if (pIter->len < pIter->totalLen) { + pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row)); + } + return row; + } +} + int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; @@ -126,7 +206,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) { return 0; } - int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pEp->inUse); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 76e56bcfa28d4a2aa25535de2afa3973aba795ed..f4975c183e26da5f7a198451e1df134fea6e657b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -112,7 +112,6 @@ void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); -int32_t tqReadHandleSetMsgEx(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows, int16_t *pNumOfCols); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a84776abfd76df66348c2a9d1885a403b2352076..8cfbffc962256c77bc18a19b8ba683852c023376 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -56,7 +56,6 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); typedef enum { TSDB_FILE_HEAD = 0, // .head diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 28d1a6e9a012c00d5643e74d1bde8f06ef66c397..3a865ddc5194595d1212b22d4275fef19208bb6a 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -48,7 +48,8 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { return TSDB_CODE_SUCCESS; } -int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid); +int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); +void tsdbUidStoreDestory(STbUidStore *pStore); void *tsdbUidStoreFree(STbUidStore *pStore); int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f933c6e8de44496a1318bab2f23ee202012115e1..0b3feb5010f70bb2066090f2a8884d599610eb85 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -83,7 +83,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); - qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); + qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -454,7 +454,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -648,7 +648,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; qTaskInfo_t task = pTopic->buffer.output[workerId].task; ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { SSDataBlock* pDataBlock = NULL; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index dfc3fba1b4238f278416c2dbeb48e9ae25c37d90..ada2460d97a67ef68ef69e0c8a9ddb9c1a18e168 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -33,40 +33,15 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - // iterate and convert - if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; - if (pReadHandle->pBlock == NULL) break; - - pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); - pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid); - pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion); - pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen); - pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen); - pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); - } - - if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; - pReadHandle->ver = ver; - memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); - return 0; -} - -int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { - pReadHandle->pMsg = pMsg; // iterate - if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; while (true) { - if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; + if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (pReadHandle->pBlock == NULL) break; } - if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; pReadHandle->ver = ver; memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); return 0; @@ -74,7 +49,7 @@ int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64 bool tqNextDataBlock(STqReadHandle* pHandle) { while (1) { - if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; } if (pHandle->pBlock == NULL) return false; @@ -82,7 +57,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/ /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ ASSERT(pHandle->tbIdHash); - void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); + // void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); + void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t)); if (ret != NULL) { /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/ @@ -104,14 +80,14 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p // TODO set to real sversion int32_t sversion = 0; if (pHandle->sver != sversion) { - pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); + pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); tb_uid_t quid; - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid); if (pTbCfg->type == META_CHILD_TABLE) { quid = pTbCfg->ctbCfg.suid; } else { - quid = pHandle->pBlock->uid; + quid = pHandle->msgIter.uid; } pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); pHandle->sver = sversion; @@ -120,7 +96,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p STSchema* pTschema = pHandle->pSchema; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; - *pNumOfRows = pHandle->pBlock->numOfRows; + *pNumOfRows = pHandle->msgIter.numOfRows; int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); if (colNumNeed > pSchemaWrapper->nCols) { @@ -167,8 +143,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p tdSTSRowIterInit(&iter, pTschema); STSRow* row; int32_t curRow = 0; - tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); - while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { + tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); + while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); // get all wanted col of that block for (int32_t i = 0; i < colActual; i++) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index bea2b66af8ad94a9129489c073940077d270c37b..3082ee6f2b71533210d23a7919db000ae15b5b3b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -227,34 +227,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) { - ASSERT(pMsg != NULL); - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; - - terrno = TSDB_CODE_SUCCESS; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - - pBlock->uid = htobe64(pBlock->uid); - pBlock->suid = htobe64(pBlock->suid); - pBlock->sversion = htonl(pBlock->sversion); - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); - } - - if (terrno != TSDB_CODE_SUCCESS) return -1; - return 0; -} - static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index ddef1a649d4dc7c6becb75fb1e8a505419a4c418..9c2daaa6d7de3e78be78b0ee6424f8e6f1d0a559 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -95,6 +95,7 @@ typedef struct { * - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open, * without information about its previous state. * - TSDB_SMA_STAT_DROPPED: 1)sma dropped + * N.B. only applicable to tsma */ int8_t state; // ETsdbSmaStat SHashObj *expiredWindows; // key: skey of time window, value: N/A @@ -112,8 +113,6 @@ struct SSmaStat { SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; - SRSmaInfo rsmaInfo; - T_REF_DECLARE() }; #define SMA_STAT_ITEMS(s) ((s)->smaStatItems) @@ -173,6 +172,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); +static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -263,7 +263,7 @@ static void *poolMalloc(void *arg, size_t size) { SPoolMem *pMem; pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size); - if (pMem == NULL) { + if (!pMem) { assert(0); } @@ -355,7 +355,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *pa SSmaEnv *pEnv = NULL; pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv)); - if (pEnv == NULL) { + if (!pEnv) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -371,7 +371,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *pa ASSERT(path && (strlen(path) > 0)); SMA_ENV_PATH(pEnv) = strdup(path); - if (SMA_ENV_PATH(pEnv) == NULL) { + if (!SMA_ENV_PATH(pEnv)) { tsdbFreeSmaEnv(pEnv); return NULL; } @@ -390,7 +390,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *pa return NULL; } - if ((pEnv->pPool = openPool()) == NULL) { + if (!(pEnv->pPool = openPool())) { tsdbFreeSmaEnv(pEnv); return NULL; } @@ -404,8 +404,8 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SD return TSDB_CODE_FAILED; } - if (*pEnv == NULL) { - if ((*pEnv = tsdbNewSmaEnv(pTsdb, smaType, path, did)) == NULL) { + if (!(*pEnv)) { + if (!(*pEnv = tsdbNewSmaEnv(pTsdb, smaType, path, did))) { return TSDB_CODE_FAILED; } } @@ -437,7 +437,7 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { } static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { - if (pStat == NULL) return 0; + if (!pStat) return 0; int ref = T_REF_INC(pStat); tsdbDebug("vgId:%d ref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref); @@ -445,7 +445,7 @@ static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { } static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { - if (pStat == NULL) return 0; + if (!pStat) return 0; int ref = T_REF_DEC(pStat); tsdbDebug("vgId:%d unref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref); @@ -455,7 +455,7 @@ static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { ASSERT(pSmaStat != NULL); - if (*pSmaStat != NULL) { // no lock + if (*pSmaStat) { // no lock return TSDB_CODE_SUCCESS; } @@ -464,9 +464,9 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { * 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if * tsdbInitSmaStat invoked in other multithread environment later. */ - if (*pSmaStat == NULL) { + if (!(*pSmaStat)) { *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat)); - if (*pSmaStat == NULL) { + if (!(*pSmaStat)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } @@ -475,7 +475,7 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit( RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - if (SMA_STAT_INFO_HASH(*pSmaStat) == NULL) { + if (!SMA_STAT_INFO_HASH(*pSmaStat)) { taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED; } @@ -483,7 +483,7 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { SMA_STAT_ITEMS(*pSmaStat) = taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (SMA_STAT_ITEMS(*pSmaStat) == NULL) { + if (!SMA_STAT_ITEMS(*pSmaStat)) { taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED; } @@ -510,7 +510,7 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { } static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { - if (pSmaStatItem != NULL) { + if (pSmaStatItem) { tdDestroyTSma(pSmaStatItem->pSma); taosMemoryFreeClear(pSmaStatItem->pSma); taosHashCleanup(pSmaStatItem->expiredWindows); @@ -530,7 +530,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL); - while (item != NULL) { + while (item) { SSmaStatItem *pItem = *(SSmaStatItem **)item; tsdbFreeSmaStatItem(pItem); item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item); @@ -538,7 +538,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { taosHashCleanup(SMA_STAT_ITEMS(pSmaStat)); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL); - while (infoHash != NULL) { + while (infoHash) { SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; tsdbFreeRSmaInfo(pInfoHash); infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash); @@ -557,12 +557,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { // return if already init switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: - if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb))) != NULL) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)))) { return TSDB_CODE_SUCCESS; } break; case TSDB_SMA_TYPE_ROLLUP: - if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb))) != NULL) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)))) { return TSDB_CODE_SUCCESS; } break; @@ -575,7 +575,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { tsdbLockRepo(pTsdb); pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)) : atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)); - if (pEnv == NULL) { + if (!pEnv) { char rname[TSDB_FILENAME_LEN] = {0}; SDiskID did = {0}; @@ -607,10 +607,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); - if (pItem == NULL) { + if (!pItem) { // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK); // TODO use the real state - if (pItem == NULL) { + if (!pItem) { // Response to stream computing: OOM // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. return TSDB_CODE_FAILED; @@ -618,7 +618,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t // cache smaMeta STSma *pSma = metaGetSmaInfoByIndex(REPO_META(pTsdb), indexUid, true); - if (pSma == NULL) { + if (!pSma) { terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); taosMemoryFree(pItem); @@ -634,7 +634,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t taosMemoryFree(pItem); return TSDB_CODE_FAILED; } - } else if ((pItem = *(SSmaStatItem **)pItem) == NULL) { + } else if (!(pItem = *(SSmaStatItem **)pItem)) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } @@ -678,9 +678,9 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers return TSDB_CODE_FAILED; } - if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } + // if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) { + // return TSDB_CODE_FAILED; + // } if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; @@ -694,7 +694,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); - TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL); + TASSERT(pEnv && pStat && pItemsHash); // basic procedure // TODO: optimization @@ -705,26 +705,26 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers SInterval interval = {0}; TSKEY lastWinSKey = INT64_MIN; - if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { + if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } while (true) { - tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; + tGetSubmitMsgNextEx(&msgIter, &pBlock); + if (!pBlock) break; STSmaWrapper *pSW = NULL; STSma *pTSma = NULL; SSubmitBlkIter blkIter = {0}; - if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { + if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) { pSW = tdFreeTSmaWrapper(pSW); break; } while (true) { - STSRow *row = tGetSubmitBlkNext(&blkIter); - if (row == NULL) { + STSRow *row = tGetSubmitBlkNextEx(&blkIter); + if (!row) { tdFreeTSmaWrapper(pSW); break; } @@ -732,10 +732,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers if (pSW) { pSW = tdFreeTSmaWrapper(pSW); } - if ((pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) { + if (!(pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid))) { break; } - if ((pSW->number) <= 0 || (pSW->tSma == NULL)) { + if ((pSW->number) <= 0 || !pSW->tSma) { pSW = tdFreeTSmaWrapper(pSW); break; } @@ -781,10 +781,10 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind tsdbRefSmaStat(pTsdb, pStat); - if (pStat && pStat->smaStatItems) { - pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + if (pStat && SMA_STAT_ITEMS(pStat)) { + pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); } - if ((pItem != NULL) && ((pItem = *(SSmaStatItem **)pItem) != NULL)) { + if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) { // pItem resides in hash buffer all the time unless drop sma index // TODO: multithread protect if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { @@ -994,7 +994,7 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; - ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); + ASSERT(!pSmaH->dFile.path && !pSmaH->dFile.pDB); pSmaH->dFile.fid = fid; char tSmaFile[TSDB_FILENAME_LEN] = {0}; @@ -1064,6 +1064,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char STsdbCfg *pCfg = REPO_CFG(pTsdb); const SArray *pDataBlocks = (const SArray *)msg; + // TODO: destroy SSDataBlocks(msg) + // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus // the sma data would arrive ahead of the update-expired-window msg. if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { @@ -1071,7 +1073,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char return TSDB_CODE_FAILED; } - if (pDataBlocks == NULL) { + if (!pDataBlocks) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb)); return terrno; @@ -1089,11 +1091,11 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tsdbRefSmaStat(pTsdb, pStat); - if (pStat && pStat->smaStatItems) { - pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + if (pStat && SMA_STAT_ITEMS(pStat)) { + pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); } - if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) { + if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) { terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; @@ -1287,7 +1289,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid); SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); - if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) { + if ((pItem) || ((pItem = *(SSmaStatItem **)pItem))) { if (tsdbSmaStatIsDropped(pItem)) { tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid); return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode @@ -1343,13 +1345,13 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { SSmaEnv *pEnv = atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)); int64_t indexUid = SMA_TEST_INDEX_UID; - if (pEnv == NULL) { + if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return terrno; } - if (pDataBlocks == NULL) { + if (!pDataBlocks) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb)); return terrno; @@ -1366,11 +1368,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { tsdbRefSmaStat(pTsdb, pStat); - if (pStat && pStat->smaStatItems) { - pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + if (pStat && SMA_STAT_ITEMS(pStat)) { + pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); } - if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) { + if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) { terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; @@ -1491,7 +1493,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { ++pReadH->smaFsIter.iter; } - if (pReadH->pDFile != NULL) { + if (pReadH->pDFile) { tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); return true; } @@ -1524,7 +1526,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, tsdbRefSmaStat(pTsdb, pStat); SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); - if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) { + if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. tsdbUnRefSmaStat(pTsdb, pStat); @@ -1537,7 +1539,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, int32_t nQueryWin = taosArrayGetSize(pQuerySKey); for (int32_t n = 0; n < nQueryWin; ++n) { TSKEY skey = taosArrayGet(pQuerySKey, n); - if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) { + if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) { // TODO: mark this window as expired. } } @@ -1553,7 +1555,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, return TSDB_CODE_FAILED; } - if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) { + if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) { // TODO: mark this window as expired. tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey, indexUid); @@ -1588,7 +1590,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, void *result = NULL; int32_t valueSize = 0; - if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) { + if (!(result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) { tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s", REPO_ID(pTsdb), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno)); tsdbCloseDBF(&tReadH.dFile); @@ -1632,7 +1634,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { SSmaCfg vCreateSmaReq = {0}; - if (tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq) == NULL) { + if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); return -1; @@ -1658,7 +1660,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { SVDropTSmaReq vDropSmaReq = {0}; - if (tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq) == NULL) { + if (!tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1697,7 +1699,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; - if (param == NULL) { + if (!param) { tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); return TSDB_CODE_SUCCESS; @@ -1716,23 +1718,21 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - SRSmaInfo *tRSmaInfo = &pStat->rsmaInfo; - TASSERT(pEnv != NULL && pStat != NULL); - - pRSmaInfo = taosHashGet(pStat->rsmaInfoHash, &pReq->stbCfg.suid, sizeof(tb_uid_t)); - if (pRSmaInfo != NULL) { - pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo); + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t)); + if (pRSmaInfo) { + tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); + return TSDB_CODE_SUCCESS; } pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); - if (pRSmaInfo == NULL) { + if (!pRSmaInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); - if (pReadHandle == NULL) { + if (!pReadHandle) { taosMemoryFree(pRSmaInfo); terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; @@ -1745,65 +1745,83 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { if (param->qmsg1) { pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle); - if (pRSmaInfo->taskInfo[0] == NULL) { + if (!pRSmaInfo->taskInfo[0]) { taosMemoryFree(pRSmaInfo); taosMemoryFree(pReadHandle); return TSDB_CODE_FAILED; } - tRSmaInfo->taskInfo[0] = pRSmaInfo->taskInfo[0]; } if (param->qmsg2) { pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle); - if (pRSmaInfo->taskInfo[1] == NULL) { + if (!pRSmaInfo->taskInfo[1]) { taosMemoryFree(pRSmaInfo); taosMemoryFree(pReadHandle); return TSDB_CODE_FAILED; } - tRSmaInfo->taskInfo[1] = pRSmaInfo->taskInfo[1]; } if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != - 0) { + TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } return TSDB_CODE_SUCCESS; } -int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid) { +/** + * @brief store suid/[uids], prefer to use array and then hash + * + * @param pStore + * @param suid + * @param uid + * @return int32_t + */ +int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { + // prefer to store suid/uids in array if ((suid == pStore->suid) || (pStore->suid == 0)) { - if (pStore->tbUids == NULL) { - if ((pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t))) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } + if (pStore->suid == 0) { pStore->suid = suid; } - if (taosArrayPush(pStore->tbUids, &uid) == NULL) { - return TSDB_CODE_FAILED; + if (uid) { + if (!pStore->tbUids) { + if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } + if (!taosArrayPush(pStore->tbUids, &uid)) { + return TSDB_CODE_FAILED; + } } } else { - if (pStore->uidHash == NULL) { + // store other suid/uids in hash when multiple stable/table included in 1 batch of request + if (!pStore->uidHash) { pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (pStore->uidHash == NULL) { + if (!pStore->uidHash) { return TSDB_CODE_FAILED; } } - SArray *hashVal = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); - if (hashVal && ((hashVal = *(SArray **)hashVal) != NULL)) { - taosArrayPush(hashVal, &uid); - } else { - SArray *pItem = taosArrayInit(1, sizeof(tb_uid_t)); - if (pItem == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - if (taosArrayPush(pItem, &uid) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + if (uid) { + SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); + if (uidArray && ((uidArray = *(SArray **)uidArray))) { + taosArrayPush(uidArray, &uid); + } else { + SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t)); + if (!pUidArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + if (!taosArrayPush(pUidArray, &uid)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) { + return TSDB_CODE_FAILED; + } } - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pItem, sizeof(pItem)) != 0) { + } else { + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) { return TSDB_CODE_FAILED; } } @@ -1811,8 +1829,42 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid) { return TSDB_CODE_SUCCESS; } +void tsdbUidStoreDestory(STbUidStore *pStore) { + if (pStore) { + if (pStore->uidHash) { + if (pStore->tbUids) { + void *pIter = taosHashIterate(pStore->uidHash, NULL); + while (pIter) { + SArray *arr = *(SArray **)pIter; + taosArrayDestroy(arr); + pIter = taosHashIterate(pStore->uidHash, pIter); + } + } + taosHashCleanup(pStore->uidHash); + } + taosArrayDestroy(pStore->tbUids); + } +} + +void *tsdbUidStoreFree(STbUidStore *pStore) { + tsdbUidStoreDestory(pStore); + taosMemoryFree(pStore); + return NULL; +} + +/** + * @brief fetch suid/uids when create child tables of rollup SMA + * + * @param pTsdb + * @param ppStore + * @param suid + * @param uid + * @return int32_t + */ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb); + + // only applicable to rollup SMA ctables if (!pEnv) { return TSDB_CODE_SUCCESS; } @@ -1824,97 +1876,194 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { return TSDB_CODE_FAILED; } - // info cached when create rsma stable - if (!taosHashGet(infoHash, suid, sizeof(suid))) { + // info cached when create rsma stable and return directly for non-rsma ctables + if (!taosHashGet(infoHash, suid, sizeof(tb_uid_t))) { return TSDB_CODE_SUCCESS; } - if (*ppStore == NULL) { + if (!(*ppStore)) { if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) { return TSDB_CODE_FAILED; } } - if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, *(tb_uid_t *)uid) != 0) { + + if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, (tb_uid_t *)uid) != 0) { *ppStore = tsdbUidStoreFree(*ppStore); return TSDB_CODE_FAILED; } + return TSDB_CODE_SUCCESS; } -void *tsdbUidStoreFree(STbUidStore *pStore) { - if (pStore) { - taosArrayDestroy(pStore->tbUids); - if (pStore->uidHash) { - void *pIter = taosHashIterate(pStore->uidHash, NULL); - while (pIter != NULL) { - SArray *arr = *(SArray **)pIter; - taosArrayDestroy(arr); - pIter = taosHashIterate(pStore->uidHash, pIter); - } - } - taosMemoryFree(pStore); +static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids) { + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SRSmaInfo *pRSmaInfo = NULL; + + if (!suid || !tbUids) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno)); + return TSDB_CODE_FAILED; } - return NULL; + + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64, REPO_ID(pTsdb), *suid); + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + return TSDB_CODE_FAILED; + } + + if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) { + tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno)); + return TSDB_CODE_FAILED; + } + if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) { + tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno)); + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; } -int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore) { - if (!pUidStore || (taosArrayGetSize(pUidStore->tbUids) == 0)) { - tsdbDebug("vgId:%d empty uidStore and no need to update", REPO_ID(pTsdb)); +int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { + if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { + tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb)); + tsdbUidStoreFree(pStore); return TSDB_CODE_SUCCESS; } - SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); - SSmaStat *pStat = SMA_ENV_STAT(pEnv); - SRSmaInfo *pRSmaInfo = &pStat->rsmaInfo; - if (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], pUidStore->tbUids, true) != 0) { - tsdbUidStoreFree(pUidStore); - tsdbError("vgId:%d update tbUidList failed since %s", REPO_ID(pTsdb), terrstr(terrno)); + if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) { + tsdbUidStoreFree(pStore); return TSDB_CODE_FAILED; } - tsdbUidStoreFree(pUidStore); + void *pIter = taosHashIterate(pStore->uidHash, NULL); + while (pIter) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + SArray *pTbUids = *(SArray **)pIter; + + if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) { + taosHashCancelIterate(pStore->uidHash, pIter); + tsdbUidStoreFree(pStore); + return TSDB_CODE_FAILED; + } + + pIter = taosHashIterate(pStore->uidHash, pIter); + } + + tsdbUidStoreFree(pStore); + return TSDB_CODE_SUCCESS; } -int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType) { +static int32_t tsdbFetchSubmitReqSuids(const SSubmitReq *pMsg, STbUidStore *pStore) { + ASSERT(pMsg != NULL); + SSubmitMsgIter msgIter = {0}; + SSubmitBlk *pBlock = NULL; + SSubmitBlkIter blkIter = {0}; + STSRow *row = NULL; + + terrno = TSDB_CODE_SUCCESS; + // pMsg->length = htonl(pMsg->length); + // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1; + if (!pBlock) break; + tsdbUidStorePut(pStore, msgIter.suid, NULL); + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} + +int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType, tb_uid_t *suid) { SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); if (!pEnv) { + // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + + if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), *suid); + return TSDB_CODE_SUCCESS; + } + SArray *pResult = NULL; pResult = taosArrayInit(0, sizeof(SSDataBlock)); - if (pResult == NULL) { + if (!pResult) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - // pRSmaInfo = taosHashGet(pStat->rsmaInfoHash, &pReq->stbCfg.suid, sizeof(tb_uid_t)); - // if (pRSmaInfo != NULL) { - // pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo); + if (pRSmaInfo->taskInfo[0]) { + qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType); + while (1) { + SSDataBlock *output; + uint64_t ts; + if (qExecTask(pRSmaInfo->taskInfo[0], &output, &ts) < 0) { + ASSERT(false); + } + if (!output) { + break; + } + taosArrayPush(pResult, output); + } + blockDebugShowData(pResult); + } + + // if (pRSmaInfo->taskInfo[1]) { + // qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType); + // while (1) { + // SSDataBlock *output; + // uint64_t ts; + // if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) { + // ASSERT(false); + // } + // if (!output) { + // break; + // } + // taosArrayPush(pResult, output); + // } + // blockDebugShowData(pResult); // } + } - SRSmaInfo *tRSmaInfo = &pStat->rsmaInfo; + return TSDB_CODE_SUCCESS; +} - qSetStreamInput(tRSmaInfo->taskInfo[0], pMsg, inputType, true); - while (1) { - SSDataBlock *output; - uint64_t ts; - if (qExecTask(tRSmaInfo->taskInfo[0], &output, &ts) < 0) { - ASSERT(false); - } - if (output == NULL) { - break; +int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType) { + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + if (!pEnv) { + // only applicable when rsma env exists + return TSDB_CODE_SUCCESS; + } + + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + STbUidStore uidStore = {0}; + tsdbFetchSubmitReqSuids(pMsg, &uidStore); + + if (uidStore.suid != 0) { + tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, &uidStore.suid); + + void *pIter = taosHashIterate(uidStore.uidHash, NULL); + while (pIter) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, pTbSuid); + pIter = taosHashIterate(uidStore.uidHash, pIter); } - taosArrayPush(pResult, output); + + tsdbUidStoreDestory(&uidStore); } } - blockDebugShowData(pResult); return TSDB_CODE_SUCCESS; } @@ -1960,6 +2109,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) { if ((code = tsdbInsertTSmaDataImpl(pTsdb, indexUid, msg)) < 0) { tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } + // TODO: destroy SSDataBlocks(msg) return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0417acf290a19ed771a741b57b78930de9f23947..6c82edbadcc4ba6d4c99dd488e41439e10763929 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -83,7 +83,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_SUBMIT: pRsp->msgType = TDMT_VND_SUBMIT_RSP; vnodeProcessSubmitReq(pVnode, ptr, pRsp); - tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); + tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pMsg->pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); break; case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), @@ -348,6 +348,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR } } + tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result); + vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { @@ -360,8 +362,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR pRsp->contLen = contLen; } - tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result); - return 0; } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index b0217a046280abb37065005d90d4cb5d70806fba..ab617cb18660bc6663b500d7ef9da60a5c2d9fa5 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -407,7 +407,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { } } - EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); + // EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg, 0), 0); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8f7f55eb33d5c6fe731e164353d5ab1bc743d2cc..4863b03fb9f69fcba57bea23b359081fd857568f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -19,7 +19,7 @@ #include "tdatablock.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id, bool converted) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -32,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id, converted); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { pOperator->status = OP_NOT_OPENED; @@ -46,13 +46,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - if (converted) { - if (tqReadHandleSetMsgEx(pInfo->readerHandle, input, 0) < 0) { - qError("converted: submit msg messed up when initing stream block, %s" PRIx64, id); - return TSDB_CODE_QRY_APP_ERROR; - } - } else if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { - qError("raw msg: submit msg messed up when initing stream block, %s" PRIx64, id); + if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } } else { @@ -72,11 +67,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted) { - return qSetMultiStreamInput(tinfo, input, 1, type, converted); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { + return qSetMultiStreamInput(tinfo, input, 1, type); } -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted) { +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } @@ -87,7 +82,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo), converted); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 1f0c269ca14bd41364e63de41b1d2e6cbd3cf962..8aaaa414ca265f31f09f26f877b2ec3dea7ce5b3 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -110,7 +110,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in return -1; } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - qSetStreamInput(exec, input, inputType, true); + qSetStreamInput(exec, input, inputType); while (1) { SSDataBlock* output; uint64_t ts; @@ -128,7 +128,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in /*for (int32_t i = 0; i < sz; i++) {*/ /*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/ /*qSetStreamInput(exec, pBlock, inputType);*/ - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, true); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); while (1) { SSDataBlock* output; uint64_t ts;