diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5fd9a8b12b10d024cae7f3e3189928093a102c3d..0d021e2fa223b29308fb6894f5c743e7de7bad36 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -135,7 +135,6 @@ typedef struct { typedef struct { int8_t type; int64_t ver; - int32_t* dataRef; SSDataBlock* pBlock; } SStreamRefDataBlock; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81d64fb98aba9b8652e0b335df6524025c809e51..81bb5034f198c0102c8344492fff1d9630feea9e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -947,6 +947,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; + *pRefBlock = NULL; + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); if (pRes->uidList == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -984,21 +986,13 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream } taosArrayDestroy(pRes->uidList); - - int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); - *pRef = 1; - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); if (pRefBlock == NULL) { - taosMemoryFree(pRef); return TSDB_CODE_OUT_OF_MEMORY; } (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; (*pRefBlock)->pBlock = pDelBlock; - (*pRefBlock)->dataRef = pRef; - atomic_add_fetch_32((*pRefBlock)->dataRef, 1); - return TSDB_CODE_SUCCESS; } @@ -1069,8 +1063,6 @@ int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->pBlock = pDelBlock; - pRefBlock->dataRef = pRef; - atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { atomic_sub_fetch_32(pRef, 1); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 27e817c50fcecec0e34b3aa27120e407da9c9390..c74a25eba117abcd9fdd8ee19ccd113c10968dfe 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -29,7 +29,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next ver:%" PRId64, pTask->id.idStr, /*pPackedData->ver*/ 0L); + tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); return -1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7cb3c00c1ac540edde0656a53f654aba1054f142..5abab30e7c30bd4e2b37d9f1be3a8f9d3c46b8cf 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1912,7 +1912,6 @@ FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); - /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3b38038f103b661b4a05aabace9ac31..96022850a3511b20b2b6003d1f9409b27400011a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; - - int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - blockDataDestroy(pRefBlock->pBlock); - taosMemoryFree(pRefBlock->dataRef); - } + blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); } }