提交 387cfe8d 编写于 作者: H Haojun Liao

[td-13039] fix bug in stream interval query.

上级 8fe7c183
......@@ -167,7 +167,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
......
......@@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId);
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId);
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:
......
......@@ -422,6 +422,7 @@ typedef struct SStreamBlockScanInfo {
uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times
void* readerHandle; // stream block reader handle
SArray* pColMatchInfo; //
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......
......@@ -227,7 +227,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
taosArrayDestroy(pGroupResInfo->pRows);
}
pGroupResInfo->pRows = pArrayList->pData;
pGroupResInfo->pRows = pArrayList;
pGroupResInfo->index = 0;
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
}
......
......@@ -352,7 +352,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
continue;
}
idata.info.type = pDescNode->dataType.type;
idata.info.type = pDescNode->dataType.type;
idata.info.bytes = pDescNode->dataType.bytes;
idata.info.scale = pDescNode->dataType.scale;
idata.info.slotId = pDescNode->slotId;
......@@ -1551,8 +1551,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*)pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
// assert(tsCols[0] == pSDataBlock->info.window.skey &&
// tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1);
......@@ -3660,6 +3660,7 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
pCtx[j].fpSet.finalize(&pCtx[j]);
pResInfo->initialized = true;
}
if (pRow->numOfRows < pResInfo->numOfRes) {
......@@ -3778,10 +3779,6 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S
continue;
}
// int32_t functionId = pCtx[i].functionId;
// if (functionId < 0) {
// continue;
// }
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
// }
......@@ -4971,7 +4968,20 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
return NULL;
}
pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle);
SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle);
int32_t numOfCols = pInfo->pRes->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
ASSERT(pColMatchInfo->colId == p->info.colId);
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p);
}
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pTaskInfo->code = terrno;
......@@ -5625,8 +5635,18 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
return NULL;
}
int32_t numOfOutput = taosArrayGetSize(pColList);
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
for(int32_t i = 0; i < numOfOutput; ++i) {
int16_t* id = taosArrayGet(pColList, i);
taosArrayPush(pColIds, id);
}
pInfo->pColMatchInfo = pColList;
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColList);
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
if (code != 0) {
taosMemoryFreeClear(pInfo);
......@@ -6972,7 +6992,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->binfo.pRes;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
// STimeWindow win = {0};
......@@ -8801,8 +8821,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo);
int32_t numOfCols = 0;
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo);
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
......
......@@ -719,6 +719,9 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc
if (NULL == pScan->pScanCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
code = sortScanCols(pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
}
......
......@@ -152,6 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
//
blockDebugShowData(pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册