未验证 提交 b4162641 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17595 from taosdata/fix/liao_cov

fix(query): remove redundant ts cols in cache scan.
......@@ -37,6 +37,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
int32_t numOfRows = pBlock->info.rows;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
bool allNullRow = true;
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
......@@ -46,12 +48,15 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
p->ts = pColVal->ts;
p->bytes = TSDB_KEYSIZE;
*(int64_t*)p->buf = pColVal->ts;
allNullRow = false;
} else {
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
p->ts = pColVal->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
allNullRow = p->isNull & allNullRow;
if (!p->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(p->buf, pColVal->colVal.value.nData);
......@@ -69,6 +74,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
}
pBlock->info.rows += allNullRow? 0:1;
} else {
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
......@@ -96,9 +103,9 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
}
}
}
}
pBlock->info.rows += 1;
}
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
......
......@@ -896,8 +896,7 @@ typedef struct SJoinOperatorInfo {
void doDestroyExchangeOperatorInfo(void* param);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
......
......@@ -27,7 +27,8 @@
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
......@@ -40,12 +41,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
}
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
pInfo->pRes = createResDataBlock(pDescNode);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc,
&numOfCols, COL_MATCH_FROM_COL_ID);
code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
SArray* pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
pInfo->pColMatchInfo = removeRedundantTsCol(pScanNode, pColMatchInfo);
code = extractCacheScanSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -72,11 +76,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
}
if (pScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
pPseudoExpr->pCtx =
createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
SExprSupp* p = &pInfo->pseudoExprSup;
p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
}
pOperator->name = "LastrowScanOperator";
......@@ -88,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL);
pOperator->cost.openCost = 0;
return pOperator;
......@@ -130,7 +132,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
// check for tag values
int32_t resultRows = pInfo->pBufferredRes->info.rows;
ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList));
// the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
pInfo->indexOfBufferedRes = 0;
}
......@@ -240,7 +244,7 @@ void destroyLastrowScanOperator(void* param) {
taosMemoryFreeClear(param);
}
int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
......@@ -248,16 +252,18 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i);
for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId &&
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
if (pColMatch->colId == pWrapper->pSchema[j].colId &&
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->targetSlotId] = -1;
break;
}
if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) {
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
(*pSlotIds)[pColMatch->targetSlotId] = j;
break;
}
......@@ -266,3 +272,26 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
return TSDB_CODE_SUCCESS;
}
SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo) {
if (!pScanNode->ignoreNull) { // retrieve cached last value
return pColMatchInfo;
}
SArray* pMatchInfo = taosArrayInit(taosArrayGetSize(pColMatchInfo), sizeof(SColMatchInfo));
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
SColMatchInfo* pColInfo = taosArrayGet(pColMatchInfo, i);
int32_t slotId = pColInfo->targetSlotId;
SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
taosArrayPush(pMatchInfo, pColInfo);
}
}
taosArrayDestroy(pColMatchInfo);
return pMatchInfo;
}
\ No newline at end of file
......@@ -110,16 +110,13 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
}
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain) {
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain) {
SOperatorFpSet fpSet = {
._openFn = openFn,
.getNextFn = nextFn,
.getStreamResFn = streamFn,
.cleanupFn = cleanup,
.closeFn = closeFn,
.encodeResultRow = encode,
.decodeResultRow = decode,
.getExplainFn = explain,
};
......@@ -2305,7 +2302,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
destroyExchangeOperatorInfo, NULL);
return pOperator;
_error:
......@@ -3081,7 +3078,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;
......@@ -3306,7 +3303,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......
......@@ -443,7 +443,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
destroyGroupOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -820,7 +820,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
NULL, NULL, NULL);
NULL);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -1095,7 +1095,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
destroyStreamPartitionOperatorInfo, NULL, NULL, NULL);
destroyStreamPartitionOperatorInfo, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);
......
......@@ -116,7 +116,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......
......@@ -105,7 +105,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL);
destroyProjectOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -405,7 +405,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
destroyIndefinitOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -839,7 +839,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
NULL, NULL, getTableScannerExecInfo);
getTableScannerExecInfo);
// for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0;
......@@ -867,7 +867,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL);
return pOperator;
}
......@@ -988,7 +988,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
destroyBlockDistScanOperatorInfo, NULL);
return pOperator;
_error:
......@@ -2173,7 +2173,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL);
return pOperator;
_end:
......@@ -2353,7 +2353,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL);
return pOperator;
......@@ -3926,7 +3926,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
return pOperator;
......@@ -4063,7 +4063,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL);
return pOperator;
......@@ -4603,9 +4603,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 1024);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
NULL, getTableMergeScanExplainExecInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0;
return pOperator;
......
......@@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16;
// TODO dynamic set the available sort buffer
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL,
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo,
getExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -515,7 +515,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
NULL, NULL, getGroupSortExplainExecInfo);
getGroupSortExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -752,7 +752,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo);
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -1624,7 +1624,6 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
goto _error;
}
SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
......@@ -1694,7 +1693,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo,
NULL, NULL, NULL);
NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -1831,8 +1831,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2638,8 +2637,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
......@@ -2710,7 +2708,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
destroyStateWindowOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2784,7 +2782,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
destroySWindowOperatorInfo, NULL);
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -3465,8 +3463,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = pInfo;
pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
}
......@@ -4261,7 +4258,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator->info = pInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
NULL);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex);
......@@ -4408,7 +4405,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
destroyStreamSessionAggOperatorInfo, NULL);
}
pInfo->pGroupIdTbNameMap =
......@@ -4777,7 +4774,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
destroyStreamStateOperatorInfo, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1);
......@@ -5054,7 +5051,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
destroyMAIOperatorInfo, NULL, NULL, NULL);
destroyMAIOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5366,7 +5363,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
pOperator->info = pMergeIntervalInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
destroyMergeIntervalOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5599,7 +5596,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->info = pInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册