提交 7ade2b3b 编写于 作者: L Liu Jicong

fix double free in sma

上级 ae294b98
......@@ -479,6 +479,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
#if 1
#endif
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
......
......@@ -49,14 +49,19 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SStreamScanInfo* pInfo = pOperator->info;
#if 0
// TODO: if a block was set but not consumed,
// prevent setting a different type of block
pInfo->validBlockIndex = 0;
/*if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {*/
/*taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);*/
/*} else {*/
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);
} else {
taosArrayClear(pInfo->pBlockLists);
/*}*/
}
#endif
ASSERT(pInfo->validBlockIndex == 0);
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1);
......@@ -79,17 +84,15 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
#if 0
// TODO optimize
SSDataBlock* p = createOneDataBlock(pDataBlock, false);
/*qError("alloc p i, %d %p", i, p);*/
p->info = pDataBlock->info;
taosArrayClear(p->pDataBlock);
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p);
#endif
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else {
......@@ -103,6 +106,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
#if 0
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
return;
......@@ -119,6 +123,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
} else {
ASSERT(0);
}
#endif
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
......
......@@ -920,6 +920,17 @@ _error:
}
static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
for (int32_t i = 0; i < total; i++) {
SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
taosArrayDestroy(p->pDataBlock);
taosMemoryFree(p);
}
}
taosArrayClear(pInfo->pBlockLists);
pInfo->validBlockIndex = 0;
#if 0
size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->validBlockIndex = 0;
......@@ -928,6 +939,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
blockDataDestroy(p);
}
taosArrayClear(pInfo->pBlockLists);
#endif
}
static bool isSessionWindow(SStreamScanInfo* pInfo) {
......@@ -1576,9 +1588,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor
FETCH_NEXT_BLOCK:
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
/*doClearBufferedBlocks(pInfo);*/
doClearBufferedBlocks(pInfo);
/*pOperator->status = OP_EXEC_DONE;*/
return NULL;
}
......@@ -1613,7 +1626,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pDelBlock, "stream scan delete result");
if (pInfo->pDeleteDataRes->info.rows > 0) {
return pInfo->pDeleteDataRes;
} else {
goto FETCH_NEXT_BLOCK;
}
} else {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
......@@ -1626,7 +1643,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
if (pInfo->pDeleteDataRes->info.rows > 0) {
return pInfo->pDeleteDataRes;
} else {
goto FETCH_NEXT_BLOCK;
}
}
} break;
default:
......@@ -1691,6 +1712,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
while (1) {
if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
doClearBufferedBlocks(pInfo);
return NULL;
}
......
......@@ -5655,7 +5655,6 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
int32_t aa = 4;
ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册