From b8883c59be4b82b4c7bbaa729d0e2c17beef11ae Mon Sep 17 00:00:00 2001 From: wpan Date: Tue, 3 Aug 2021 09:16:30 +0800 Subject: [PATCH] fix merge issue --- src/query/src/qExecutor.c | 111 ++++++++++++++++++++++++++++++-------- 1 file changed, 88 insertions(+), 23 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c26e623c6e..82789394c1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -457,17 +457,24 @@ static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *p (SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); // in case of repeat scan/reverse scan, no new time window added. - if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { + if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) { if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists. return p1 != NULL; } if (p1 != NULL) { - for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) { - if (pResultRowInfo->pResult[i] == (*p1)) { - pResultRowInfo->curIndex = i; + if (pResultRowInfo->size == 0) { + existed = false; + assert(pResultRowInfo->curPos == -1); + } else if (pResultRowInfo->size == 1) { + existed = (pResultRowInfo->pResult[0] == (*p1)); + } else { // check if current pResultRowInfo contains the existed pResultRow + SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo); + int64_t* index = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); + if (index != NULL) { existed = true; - break; + } else { + existed = false; } } } @@ -479,8 +486,8 @@ static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *p } -static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, - int16_t bytes, bool masterscan, uint64_t uid) { +static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid, + char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) { bool existed = false; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId); @@ -624,8 +631,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t } // get the correct time window according to the handled timestamp -static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQuery *pQuery) { +static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQuery) { STimeWindow w = {0}; +#if 0 if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { @@ -653,7 +661,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i if (w.ekey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) { w.ekey = pQuery->window.ekey; } - +#endif return w; } @@ -712,8 +720,8 @@ static bool chkWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInf return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId); } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win, - bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx, +static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win, + bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; @@ -840,7 +848,7 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } } - pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey; + //pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey; } static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) { @@ -1051,7 +1059,7 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext } /* interp query with fill should not skip time window */ - if (pQuery->interpQuery && pQuery->fillType != TSDB_FILL_NONE) { + if (pQueryAttr->interpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) { return startPos; } @@ -1569,11 +1577,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) { + (void)getCurrentActiveTimeWindow; + +#if 0 STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; - SQuery* pQuery = pRuntimeEnv->pQuery; + SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); @@ -1638,6 +1649,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey); +#endif } @@ -3086,18 +3098,12 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - - STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery); - if (pQuery->interpQuery) { + if (pQueryAttr->interpQuery) { needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset); } else { - if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -5818,6 +5824,64 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } +static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + + if (pOperator->status == OP_RES_TO_RETURN) { + copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + + return pIntervalInfo->pRes; + } + + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + int32_t order = pQueryAttr->order.order; + + SOperatorInfo* upstream = pOperator->upstream[0]; + + while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pBlock == NULL) { + break; + } + + // the pDataBlock are always the same one, no need to call this again + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; + + setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); + setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); + setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); + + hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); + } + + pOperator->status = OP_RES_TO_RETURN; + pQueryAttr->order.order = order; // TODO : restore the order + doCloseAllTimeWindow(pRuntimeEnv); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + + copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + + return pIntervalInfo->pRes; +} + + + + static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; @@ -6524,7 +6588,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; - pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -6533,6 +6596,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu pOperator->exec = doAllSTableIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; + appendUpstream(pOperator, upstream); + return pOperator; } -- GitLab