提交 5200a481 编写于 作者: J jiacy-jcy

fix: update

上级 bc1af6de
......@@ -1431,9 +1431,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
if (pBlockInfo->rows > 0) {
break;
} else {
pInfo->tqReader->pMsg = NULL;
return NULL;
}
/*blockDataCleanup(pInfo->pRes);*/
pInfo->tqReader->pMsg = NULL;
}
// record the scan action.
......
......@@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus
......
......@@ -83,6 +83,7 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(int32_t*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged;
FAIL:
if (pMerged->reqs) taosArrayDestroy(pMerged->reqs);
......@@ -121,7 +122,7 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
}
}
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem);
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
......@@ -129,13 +130,13 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(elem);
return 0;
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem);
return 0;
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
ASSERT(pMerged);
......@@ -143,9 +144,9 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem);
taosFreeQitem(dst);
taosFreeQitem(elem);
return 0;
return (SStreamQueueItem*)pMerged;
} else {
return -1;
return NULL;
}
}
......
......@@ -162,11 +162,13 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
/*}*/
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
data = newRet;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册