diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c05c24db74287eb5f132dfa0c50b9c5c8837692a..747206d71eda1f365bcc93f036643313bc71409c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -252,7 +252,7 @@ END: } STqReader* tqOpenReader(SVnode* pVnode) { - STqReader* pReader = taosMemoryMalloc(sizeof(STqReader)); + STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader)); if (pReader == NULL) { return NULL; } @@ -437,12 +437,15 @@ bool tqNextDataBlock2(STqReader* pReader) { int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); + ASSERT(pSubmitTbData->uid); + if (pReader->tbIdHash == NULL) return true; void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); if (ret != NULL) { return true; } + pReader->nextBlk++; } tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fe7c388ffc20eec6310b2f047a2ff257e42109ee..64bb07bc6de079bcd294df6bcba25559d2ff2408 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -115,13 +115,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); for (int32_t i = 0; i < numOfBlocks; i++) { - SPackedSubmit* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(SPackedSubmit)); - taosArrayPush(pInfo->pBlockLists, &pReq); + SPackedSubmit* pReq = POINTER_SHIFT(input, i * sizeof(SPackedSubmit)); + taosArrayPush(pInfo->pBlockLists, pReq); } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(numOfBlocks == 1); - taosArrayPush(pInfo->pBlockLists, &input); + taosArrayPush(pInfo->pBlockLists, input); pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eb8d7d2e78b6e16fe25adb9aab964be9d4f69059..e04fe0c30ab93d26138b3029f64e93a6cbf6c64c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1929,7 +1929,7 @@ FETCH_NEXT_BLOCK: } int32_t current = pInfo->validBlockIndex++; - SPackedSubmit* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); + SPackedSubmit* pSubmit = taosArrayGet(pInfo->pBlockLists, current); /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,