diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 54ea17657a263a4fb66e6c944f2e7224fdca3f39..d3f441e72f3e1ac5675bdce958cdf5d4cf58171c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -471,6 +471,7 @@ typedef struct { bool stableQuery; // super table query or not bool topBotQuery; // TODO used bitwise flag + bool interpQuery; // interp query or not bool groupbyColumn; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index bd4d076e8ded06de83cfc4022478624816ae5202..5fe68e456fa11c7dd1fe0a2cdb5bb615dbc1bf8b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -206,7 +206,6 @@ typedef struct SQueryAttr { bool stableQuery; // super table query or not bool topBotQuery; // TODO used bitwise flag - bool interpQuery; // denote if this is an interp query bool groupbyColumn; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1f7de64210de6b72c5bcec7c26d98a150572102a..52e5dbb7f345f3015a82db94256e15810045e2b3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -631,37 +631,29 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t } // get the correct time window according to the handled timestamp -static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQuery) { +static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) { STimeWindow w = {0}; -#if 0 - if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value - if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { - getInitialStartTimeWindow(pQuery, ts, &w); - pResultRowInfo->prevSKey = w.skey; - } else { - w.skey = pResultRowInfo->prevSKey; - } + if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value + getInitialStartTimeWindow(pQueryAttr, ts, &w); - if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { - w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1; + if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { + w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; } else { - w.ekey = w.skey + pQuery->interval.interval - 1; + w.ekey = w.skey + pQueryAttr->interval.interval - 1; } } else { - int32_t slot = curTimeWindowIndex(pResultRowInfo); - SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot); - w = pWindowRes->win; + w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win; } /* * query border check, skey should not be bounded by the query time range, since the value skey will * be used as the time window index value. So we only change ekey of time window accordingly. */ - if (w.ekey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) { - w.ekey = pQuery->window.ekey; + if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) { + w.ekey = pQueryAttr->window.ekey; } -#endif + return w; } @@ -1059,7 +1051,7 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext } /* interp query with fill should not skip time window */ - if (pQueryAttr->interpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) { + if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) { return startPos; } @@ -1576,18 +1568,15 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } -static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) { - (void)getCurrentActiveTimeWindow; - -#if 0 +static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; - SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr; + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); TSKEY* tsCols = NULL; if (pSDataBlock->pDataBlock != NULL) { @@ -1598,34 +1587,35 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); + TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); - STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQuery); + STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SResultRow* pResult = NULL; int32_t forwardStep = 0; + int32_t ret = 0; while (1) { // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, - pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, + tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - TSKEY ekey = reviseWindowEkey(pQuery, &win); - forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); + TSKEY ekey = reviseWindowEkey(pQueryAttr, &win); + forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(pQuery, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); + startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); if (startPos < 0) { - if (win.skey <= pQuery->window.ekey) { - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, + if (win.skey <= pQueryAttr->window.ekey) { + int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1643,13 +1633,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } - if (pQuery->timeWindowInterpo) { + if (pQueryAttr->timeWindowInterpo) { int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); } - updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey); -#endif + updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } @@ -2150,6 +2139,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); break; } + case OP_AllMultiTableTimeInterval: { + pRuntimeEnv->proot = + createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); + break; + } case OP_TimeWindow: { pRuntimeEnv->proot = createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); @@ -2159,6 +2154,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } break; } + case OP_AllTimeWindow: { + pRuntimeEnv->proot = + createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; + if (opType != OP_DummyInput && opType != OP_Join) { + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); + } + break; + } case OP_Groupby: { pRuntimeEnv->proot = createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); @@ -3098,7 +3102,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (pQueryAttr->interpQuery) { + if (pQueryAttr->pointInterpQuery) { needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset); @@ -5769,6 +5773,66 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; } +static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; + + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + if (pOperator->status == OP_RES_TO_RETURN) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + + return pIntervalInfo->pRes; + } + + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + int32_t order = pQueryAttr->order.order; + STimeWindow win = pQueryAttr->window; + + SOperatorInfo* upstream = pOperator->upstream[0]; + + while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pBlock == NULL) { + break; + } + + setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); + hashAllIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0); + } + + // restore the value + pQueryAttr->order.order = order; + pQueryAttr->window = win; + + pOperator->status = OP_RES_TO_RETURN; + closeAllResultRows(&pIntervalInfo->resultRowInfo); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); + + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + + return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; +} + static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -6502,6 +6566,32 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp appendUpstream(pOperator, upstream); return pOperator; } + + +SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { + STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "AllTimeIntervalAggOperator"; + pOperator->operatorType = OP_AllTimeWindow; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doAllIntervalAgg; + pOperator->cleanup = destroyBasicOperatorInfo; + + appendUpstream(pOperator, upstream); + return pOperator; +} + SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; @@ -6525,7 +6615,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe appendUpstream(pOperator, upstream); return pOperator; } - SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index e01f41276fb73e9293ba5e3b379c85600053ed93..b8a5ee7699b34fed82ad67a592a6ca9148cc92cb 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -567,10 +567,18 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { } } else if (pQueryAttr->interval.interval > 0) { if (pQueryAttr->stableQuery) { - op = OP_MultiTableTimeInterval; + if (pQueryAttr->pointInterpQuery) { + op = OP_AllMultiTableTimeInterval; + } else { + op = OP_MultiTableTimeInterval; + } taosArrayPush(plan, &op); - } else { - op = OP_TimeWindow; + } else { + if (pQueryAttr->pointInterpQuery) { + op = OP_AllTimeWindow; + } else { + op = OP_TimeWindow; + } taosArrayPush(plan, &op); if (pQueryAttr->pExpr2 != NULL) { @@ -578,7 +586,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } - if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) { + if (pQueryAttr->fillType != TSDB_FILL_NONE) { op = OP_Fill; taosArrayPush(plan, &op); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index cec6c2b555fa275e53882cc7856c9b4a584018d0..6ad6a74eed76765a11e3cc875b7ff1ae51be648b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -74,7 +74,7 @@ cd ../../../debug; make ./test.sh -f general/parser/where.sim ./test.sh -f general/parser/slimit.sim ./test.sh -f general/parser/select_with_tags.sim -#./test.sh -f general/parser/interp.sim +./test.sh -f general/parser/interp.sim ./test.sh -f general/parser/tags_dynamically_specifiy.sim ./test.sh -f general/parser/groupby.sim ./test.sh -f general/parser/set_tag_vals.sim