diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b395b5e1b8170299fb4635b5f3ea36c53a0c95d0..e5c2eca4f0ead8d4e7a9e0c84f9f8c511dc319a2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -39,8 +39,6 @@ #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) -#define GET_QINFO_ADDR(x) ((SQInfo *)((char *)(x)-offsetof(SQInfo, runtimeEnv))) - #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -163,15 +161,12 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, SDataStatis *pStatis, SExprInfo* pExprInfo); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); -//static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); -//static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); //static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); -//static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STableIdInfo createTableIdInfo(SQuery* pQuery); @@ -409,7 +404,6 @@ static bool isProjQuery(SQuery *pQuery) { static bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; } - static bool isTopBottomQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pExpr1[i].base.functionId; @@ -1438,7 +1432,7 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult d = varDataVal(pData); len = varDataLen(pData); } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQInfo* pQInfo = pRuntimeEnv->qinfo; qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo); return -1; } @@ -1798,7 +1792,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) { - qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); + qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo); SQuery *pQuery = pRuntimeEnv->pQuery; pRuntimeEnv->prevGroupId = INT32_MIN; @@ -1817,9 +1811,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); - if (/*pRuntimeEnv->rowCellInfoOffset == NULL || */pRuntimeEnv->sasArray == NULL || - pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL || - pRuntimeEnv->tagVal == NULL) { + if (pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || + pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) { goto _clean; } @@ -1833,7 +1826,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; } - qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); + qDebug("QInfo:%p init runtime completed", pRuntimeEnv->qinfo); // group by normal column, sliding window query, interval query are handled by interval query processor // interval (down sampling operation) @@ -1915,7 +1908,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); + SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo; qDebug("QInfo:%p teardown runtime env", pQInfo); @@ -2135,11 +2128,11 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { /* * todo add more parameters to check soon.. */ -bool colIdCheck(SQuery *pQuery) { +bool colIdCheck(SQuery *pQuery, void* qinfo) { // load data column information is incorrect for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) { if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) { - qError("QInfo:%p invalid data load column for query", GET_QINFO_ADDR(pQuery)); + qError("QInfo:%p invalid data load column for query", qinfo); return false; } } @@ -2220,8 +2213,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo if (isPointInterpoQuery(pQuery) && pQuery->interval.interval == 0) { if (!QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, - pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); + qDebug(msg, pQInfo, "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); } @@ -2232,7 +2224,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo if (pQuery->interval.interval == 0) { if (onlyFirstQuery(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, GET_QINFO_ADDR(pQuery), "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, + qDebug(msg, pQInfo, "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2242,7 +2234,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo pQuery->order.order = TSDB_ORDER_ASC; } else if (onlyLastQuery(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, GET_QINFO_ADDR(pQuery), "only-last", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, + qDebug(msg, pQInfo, "only-last", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2256,7 +2248,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo if (stableQuery) { if (onlyFirstQuery(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, GET_QINFO_ADDR(pQuery), "only-first stable", pQuery->order.order, TSDB_ORDER_ASC, + qDebug(msg, pQInfo, "only-first stable", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2266,7 +2258,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo pQuery->order.order = TSDB_ORDER_ASC; } else if (onlyLastQuery(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, GET_QINFO_ADDR(pQuery), "only-last stable", pQuery->order.order, TSDB_ORDER_DESC, + qDebug(msg, pQInfo, "only-last stable", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2576,7 +2568,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* SQuery* pQuery = pRuntimeEnv->pQuery; int64_t groupId = pQuery->current->groupIndex; - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQInfo* pQInfo = pRuntimeEnv->qinfo; SQueryCostInfo* pCost = &pQInfo->summary; if (pRuntimeEnv->pTsBuf != NULL && pQuery->stableQuery) { @@ -3081,34 +3073,6 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1; } -#if 0 -static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order, int32_t numOfOutput) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - bool closed = getResultRowStatus(pWindowResInfo, i); - if (!closed) { - continue; - } - - SResultRow *pRow = getResultRow(pWindowResInfo, i); - - // open/close the specified query for each group result - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functId = pQuery->pExpr1[j].base.functionId; - SResultRowCellInfo* pInfo = getResultCell(pRow, numOfOutput, j); - - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { - pInfo->complete = false; - } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { - pInfo->complete = true; - } - } - } -} - -#endif static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); @@ -3565,7 +3529,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { } //static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType) { -// void* qinfo = GET_QINFO_ADDR(pRuntimeEnv); +// void* qinfo = pRuntimeEnv->qinfo; // SQuery *pQuery = pRuntimeEnv->pQuery; // // int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); @@ -3595,7 +3559,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { // //current output space is not enough to accommodate all data of this page, prepare more space // if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { // int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); -// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); +// expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo); // } // // pGroupResInfo->index += 1; @@ -3659,7 +3623,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG //current output space is not enough to accommodate all data of this page, prepare more space // if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { // int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); -// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); +// expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo); // } pGroupResInfo->index += 1; @@ -3927,7 +3891,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); // -// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), +// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, pRuntimeEnv->qinfo, // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); //} @@ -4000,7 +3964,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // while (tsdbNextDataBlock(pQueryHandle)) { -// if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { +// if (isQueryKilled(pRuntimeEnv->qinfo)) { // longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); // } // @@ -4011,7 +3975,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { // pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; // pTableQueryInfo->lastKey += step; // -// qDebug("QInfo:%p skip rows:%d, offset:%" PRId64, GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, +// qDebug("QInfo:%p skip rows:%d, offset:%" PRId64, pRuntimeEnv->qinfo, blockInfo.rows, // pQuery->limit.offset); // } else { // find the appropriated start position in current block // updateOffsetVal(pRuntimeEnv, &blockInfo); @@ -4060,7 +4024,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { // pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index // // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, -// GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, +// pRuntimeEnv->qinfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, // pQuery->current->lastKey); // // return key; @@ -4234,6 +4198,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); + cond.loadExternalRows = isPointInterpoQuery(pQuery); if (!isSTableQuery && (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1) @@ -4338,9 +4303,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->cur.vgroupIndex = -1; if (onlyQueryTags(pQuery)) { + // TODO refactor. pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); - } else if (isTsCompQuery(pQuery)) { + } else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) { pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); } else if (needReverseScan(pQuery)) { pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); @@ -4411,195 +4377,6 @@ static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTa } } -#if 0 -static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; - SQueryCostInfo* summary = &pQInfo->summary; - - int64_t st = taosGetTimestampMs(); - - TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; - SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - while (tsdbNextDataBlock(pQueryHandle)) { - summary->totalBlocks += 1; - - if (isQueryKilled(pQInfo)) { - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); - STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pRuntimeEnv->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); - if(pTableQueryInfo == NULL) { - break; - } - - pQuery->current = *pTableQueryInfo; - doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); - - if (!pQuery->groupbyColumn) { - setEnvForEachBlock(pRuntimeEnv, *pTableQueryInfo, &blockInfo); - } - - if (pQuery->stabledev) { - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) { - setParamValue(pRuntimeEnv); - break; - } - } - } - - uint32_t status = 0; - SDataStatis *pStatis = NULL; - SArray *pDataBlock = NULL; - - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->resInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); - if (ret != TSDB_CODE_SUCCESS) { - break; - } - - if (status == BLK_DATA_DISCARD) { - pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step; - continue; - } - - summary->totalRows += blockInfo.rows; - stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey); - - qDebug("QInfo:%p check data block completed, uid:%"PRId64", tid:%d, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, " - "lastKey:%" PRId64, - pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, - pQuery->current->lastKey); - } - - if (terrno != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, terrno); - } - - updateWindowResNumOfRes(pRuntimeEnv); - - int64_t et = taosGetTimestampMs(); - return et - st; -} - - - -static UNUSED_FUNC bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); - STableQueryInfo* pCheckInfo = taosArrayGetP(group, index); - - if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { - setTagVal(pRuntimeEnv, pCheckInfo->pTable); - } - - STableId* id = TSDB_TABLEID(pCheckInfo->pTable); - qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, - id->uid, id->tid, pCheckInfo->lastKey, pCheckInfo->win.ekey); - - STsdbQueryCond cond = { - .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - .loadExternalRows = false, - }; - - // todo refactor - SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo)); - - STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey}; - taosArrayPush(tx, &info); - - taosArrayPush(g1, &tx); - STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; - - // include only current table - if (pRuntimeEnv->pQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; - } - - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef); - taosArrayDestroy(tx); - taosArrayDestroy(g1); - if (pRuntimeEnv->pQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } - - if (pRuntimeEnv->pTsBuf != NULL) { - tVariant* pTag = &pRuntimeEnv->pCtx[0].tag; - - if (pRuntimeEnv->cur.vgroupIndex == -1) { - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag); - // failed to find data with the specified tag value and vnodeId - if (!tsBufIsValidElem(&elem)) { - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); - } else { - qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64); - } - - return false; - } else { - STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, - cur.blockIndex, cur.tsIndex); - } else { - qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, - cur.blockIndex, cur.tsIndex); - } - } - } else { - STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); - if (tVariantCompare(elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) { - - STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag); - // failed to find data with the specified tag value and vnodeId - if (!tsBufIsValidElem(&elem1)) { - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); - } else { - qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64); - } - - return false; - } else { - STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex); - } else { - qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, cur.blockIndex, cur.tsIndex); - } - } - - } else { - tsBufSetCursor(pRuntimeEnv->pTsBuf, &pRuntimeEnv->cur); - STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex); - } else { - qDebug("QInfo:%p continue scan ts_comp file, tag:%"PRId64" blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, cur.blockIndex, cur.tsIndex); - } - } - } - } - - initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); - return true; -} -#endif - STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) { STsdbQueryCond cond = { .colList = pQuery->colList, @@ -4636,478 +4413,19 @@ static UNUSED_FUNC void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo } } -#if 0 -/** - * super table query handler - * 1. super table projection query, group-by on normal columns query, ts-comp query - * 2. point interpolation query, last row query - * - * @param pQInfo - */ -static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - setQueryStatus(pQuery, QUERY_COMPLETED); - - size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); - - if (isPointInterpoQuery(pQuery)) { - resetDefaultResInfoOutputBuf(pRuntimeEnv); - assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); - - while (pRuntimeEnv->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); - - qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, - pRuntimeEnv->groupIndex, numOfGroups, group); - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - - SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayDup(group); - taosArrayPush(g1, &tx); - - STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; - - // include only current table - if (pRuntimeEnv->pQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; - } - - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef); - - taosArrayDestroy(tx); - taosArrayDestroy(g1); - if (pRuntimeEnv->pQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } - - initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); - - SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); - assert(taosArrayGetSize(s) >= 1); - - setTagVal(pRuntimeEnv, taosArrayGetP(s, 0)); - taosArrayDestroy(s); - - // here we simply set the first table as current table - SArray *first = GET_TABLEGROUP(pRuntimeEnv, pRuntimeEnv->groupIndex); - pQuery->current = taosArrayGetP(first, 0); - - scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); - - int64_t numOfRes = getNumOfResult(pRuntimeEnv); - if (numOfRes > 0) { - pRuntimeEnv->resultInfo.rows += numOfRes; - forwardCtxOutputBuf(pRuntimeEnv, numOfRes); - } - - skipResults(pRuntimeEnv); - pRuntimeEnv->groupIndex += 1; - - // enable execution for next table, when handling the projection query - enableExecutionForNextTable(pRuntimeEnv); - - if (pRuntimeEnv->resultInfo.rows >= pRuntimeEnv->resultInfo.capacity) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - break; - } - } - } else if (pQuery->groupbyColumn) { // group-by on normal columns query - while (pRuntimeEnv->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); - - qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pRuntimeEnv->groupIndex, - numOfGroups); - - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - - SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayDup(group); - taosArrayPush(g1, &tx); - - STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; - - // include only current table - if (pRuntimeEnv->pQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; - } - - // no need to update the lastkey for each table - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef); - - taosArrayDestroy(g1); - taosArrayDestroy(tx); - if (pRuntimeEnv->pQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } - - SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); - assert(taosArrayGetSize(s) >= 1); - - setTagVal(pRuntimeEnv, taosArrayGetP(s, 0)); - - // here we simply set the first table as current table - scanMultiTableDataBlocks(pQInfo); - pRuntimeEnv->groupIndex += 1; - - taosArrayDestroy(s); - - // no results generated for current group, continue to try the next group - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - if (pWindowResInfo->size <= 0) { - continue; - } - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns - - SResultRow *pResult = pWindowResInfo->pResult[i]; - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SResultRowCellInfo *pCell = getResultCell(pRuntimeEnv, pResult, j); - pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); - } - } - - qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size, - pRuntimeEnv->groupIndex); - - pRuntimeEnv->resultInfo.rows = 0; - if (pWindowResInfo->size > pRuntimeEnv->resultInfo.capacity) { - expandBuffer(pRuntimeEnv, pWindowResInfo->size, pQInfo); - } - - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); - copyToOutputBuf(pRuntimeEnv, pWindowResInfo); - assert(pRuntimeEnv->resultInfo.rows == pWindowResInfo->size); - - resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - cleanupGroupResInfo(&pRuntimeEnv->groupResInfo); - break; - } - } else if (pQuery->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) { - //super table projection query with identical query time range for all tables. - SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - resetDefaultResInfoOutputBuf(pRuntimeEnv); - - SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); - assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables && - 1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList)); - - void *pQueryHandle = pRuntimeEnv->pQueryHandle; - if (pQueryHandle == NULL) { - STsdbQueryCond con = createTsdbQueryCond(pQuery, &pQuery->window); - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &con, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); - pQueryHandle = pRuntimeEnv->pQueryHandle; - } - - // skip blocks without load the actual data block from file if no filter condition present - // skipBlocks(&pQInfo->runtimeEnv); - // if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { - // setQueryStatus(pQuery, QUERY_COMPLETED); - // return; - // } - - if (pQuery->prjInfo.vgroupLimit != -1) { - assert(pQuery->limit.limit == -1 && pQuery->limit.offset == 0); - } else if (pQuery->limit.limit != -1) { - assert(pQuery->prjInfo.vgroupLimit == -1); - } - - bool hasMoreBlock = true; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - SQueryCostInfo *summary = &pQInfo->summary; - while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { - summary->totalBlocks += 1; - - if (isQueryKilled(pQInfo)) { - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); - STableQueryInfo **pTableQueryInfo = - (STableQueryInfo **) taosHashGet(pRuntimeEnv->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); - if (pTableQueryInfo == NULL) { - break; - } - - pQuery->current = *pTableQueryInfo; - doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); - - if (pQuery->hasTagResults) { - setTagVal(pRuntimeEnv, pQuery->current->pTable); - } - - if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->current->resInfo.size > pQuery->prjInfo.vgroupLimit) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; - continue; - } - - // it is a super table ordered projection query, check for the number of output for each vgroup - if (pQuery->prjInfo.vgroupLimit > 0 && pRuntimeEnv->resultInfo.rows >= pQuery->prjInfo.vgroupLimit) { - if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; - continue; - } else if (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.ekey <= pQuery->prjInfo.ts) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; - continue; - } - } - - uint32_t status = 0; - SDataStatis *pStatis = NULL; - SArray *pDataBlock = NULL; - - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->resInfo, pQueryHandle, &blockInfo, - &pStatis, &pDataBlock, &status); - if (ret != TSDB_CODE_SUCCESS) { - break; - } - - if(status == BLK_DATA_DISCARD) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; - continue; - } - - ensureOutputBuffer(pRuntimeEnv, blockInfo.rows); - int64_t prev = getNumOfResult(pRuntimeEnv); - - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); - - summary->totalRows += blockInfo.rows; - qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, - GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, - pQuery->current->lastKey); - - pRuntimeEnv->resultInfo.rows = getNumOfResult(pRuntimeEnv); - - int64_t inc = pRuntimeEnv->resultInfo.rows - prev; - pQuery->current->resInfo.size += (int32_t) inc; - - // the flag may be set by tableApplyFunctionsOnBlock, clear it here - CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED); - - updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); - - if (pQuery->prjInfo.vgroupLimit >= 0) { - if (((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) < pQuery->prjInfo.vgroupLimit) || ((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) { - if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) { - pQuery->prjInfo.ts = blockInfo.window.ekey; - } else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) { - pQuery->prjInfo.ts = blockInfo.window.skey; - } - } - } else { - // the limitation of output result is reached, set the query completed - skipResults(pRuntimeEnv); - if (limitOperator(pQuery, pQInfo)) { - SET_STABLE_QUERY_OVER(pRuntimeEnv); - break; - } - } - - // while the output buffer is full or limit/offset is applied, query may be paused here - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL|QUERY_COMPLETED)) { - break; - } - } - - if (!hasMoreBlock) { - setQueryStatus(pQuery, QUERY_COMPLETED); - SET_STABLE_QUERY_OVER(pRuntimeEnv); - } - } else { - /* - * the following two cases handled here. - * 1. ts-comp query, and 2. the super table projection query with different query time range for each table. - * If the subgroup index is larger than 0, results generated by group by tbname,k is existed. - * we need to return it to client in the first place. - */ - if (hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - pRuntimeEnv->resultInfo.total += pRuntimeEnv->resultInfo.rows; - - if (pRuntimeEnv->resultInfo.rows > 0) { - return; - } - } - - // all data have returned already - if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { - return; - } - - resetDefaultResInfoOutputBuf(pRuntimeEnv); - resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - - SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); - assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables && - 1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList)); - - while (pRuntimeEnv->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { - if (isQueryKilled(pQInfo)) { - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - pQuery->current = taosArrayGetP(group, pRuntimeEnv->tableIndex); - if (!multiTableMultioutputHelper(pQInfo, pRuntimeEnv->tableIndex)) { - pRuntimeEnv->tableIndex++; - continue; - } - - // TODO handle the limit offset problem - if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - pRuntimeEnv->tableIndex++; - continue; - } - } - - scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); - skipResults(pRuntimeEnv); - - // the limitation of output result is reached, set the query completed - if (limitOperator(pQuery, pQInfo)) { - SET_STABLE_QUERY_OVER(pRuntimeEnv); - break; - } - - // enable execution for next table, when handling the projection query - enableExecutionForNextTable(pRuntimeEnv); - - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - /* - * query range is identical in terms of all meters involved in query, - * so we need to restore them at the *beginning* of query on each meter, - * not the consecutive query on meter on which is aborted due to buffer limitation - * to ensure that, we can reset the query range once query on a meter is completed. - */ - pRuntimeEnv->tableIndex++; - updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); - - // if the buffer is full or group by each table, we need to jump out of the loop - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - break; - } - - if (pRuntimeEnv->pTsBuf != NULL) { - pRuntimeEnv->cur = pRuntimeEnv->pTsBuf->cur; - } - - } else { - // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter - if (pRuntimeEnv->resultInfo.rows == 0) { - assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); - continue; - } else { - // buffer is full, wait for the next round to retrieve data from current meter - assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); - break; - } - } - } - - if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { - setQueryStatus(pQuery, QUERY_COMPLETED); - } - - /* - * 1. super table projection query, group-by on normal columns query, ts-comp query - * 2. point interpolation query, last row query - * - * group-by on normal columns query and last_row query do NOT invoke the finalizer here, - * since the finalize stage will be done at the client side. - * - * projection query, point interpolation query do not need the finalizer. - * - * Only the ts-comp query requires the finalizer function to be executed here. - */ - if (isTsCompQuery(pQuery)) { - finalizeQueryResult(pRuntimeEnv); - } +static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { + size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); + for (int32_t i = 0; i < numOfGroup; ++i) { + SArray* group = GET_TABLEGROUP(pRuntimeEnv, i); - if (pRuntimeEnv->pTsBuf != NULL) { - pRuntimeEnv->cur = pRuntimeEnv->pTsBuf->cur; + size_t num = taosArrayGetSize(group); + for (int32_t j = 0; j < num; ++j) { + STableQueryInfo* item = taosArrayGetP(group, j); + closeAllResultRows(&item->resInfo); } - - qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 - " points returned, total:%" PRId64 ", offset:%" PRId64, - pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pRuntimeEnv->resultInfo.rows, - pRuntimeEnv->resultInfo.total, pQuery->limit.offset); } } - -static int32_t doSaveContext(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - SWITCH_ORDER(pQuery->order.order); - - if (pRuntimeEnv->pTsBuf != NULL) { - SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); - } - - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - - // clean unused handle - if (pRuntimeEnv->pSecQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - } - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - switchCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pRuntimeEnv); - setupQueryRangeForReverseScan(pRuntimeEnv); - - pRuntimeEnv->prevGroupId = INT32_MIN; - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); - return (pRuntimeEnv->pSecQueryHandle == NULL)? -1:0; -} - -static UNUSED_FUNC void doRestoreContext(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - SWITCH_ORDER(pQuery->order.order); - - if (pRuntimeEnv->pTsBuf != NULL) { - SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); - } - - switchCtxOrder(pRuntimeEnv); - SET_MASTER_SCAN_FLAG(pRuntimeEnv); -} - -#endif - -static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { -// if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { - size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); - for (int32_t i = 0; i < numOfGroup; ++i) { - SArray* group = GET_TABLEGROUP(pRuntimeEnv, i); - - size_t num = taosArrayGetSize(group); - for (int32_t j = 0; j < num; ++j) { - STableQueryInfo* item = taosArrayGetP(group, j); - closeAllResultRows(&item->resInfo); - } - } - // } else { // close results for group result - // closeAllResultRows(&pQInfo->runtimeEnv.resultRowInfo); - // } -// } -} - static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { SSDataBlock *pBlock = &pTableScanInfo->block; SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; @@ -5817,7 +5135,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; @@ -6216,7 +5534,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { int64_t startTime = taosGetTimestampUs(); while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) { - if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(pRuntimeEnv->qinfo)) { freeTableBlockDist(pTableBlockDist); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -7192,7 +6510,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr } } - colIdCheck(pQuery); + colIdCheck(pQuery, pQInfo); // todo refactor pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);