From 3f24ff44e09ac692dd949a0b3fdb20f7e8274587 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 21 Apr 2020 11:07:54 +0800 Subject: [PATCH] [td-169] fix limit offset error for table projection/diff query --- src/query/src/queryExecutor.c | 1464 +++++++++++++++++++-------------- 1 file changed, 835 insertions(+), 629 deletions(-) diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 6ded52d11d..e958a53e87 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -16,17 +16,17 @@ #include "hash.h" #include "hashfunc.h" -#include "taosmsg.h" -#include "tlosertree.h" -#include "tscompression.h" -#include "ttime.h" #include "qast.h" #include "qresultBuf.h" -#include "queryExecutor.h" -#include "queryUtil.h" #include "query.h" -#include "tsdbMain.h" //todo use TableId instead of STable object +#include "queryExecutor.h" #include "queryLog.h" +#include "queryUtil.h" +#include "taosmsg.h" +#include "tlosertree.h" +#include "tscompression.h" +#include "tsdbMain.h" //todo use TableId instead of STable object +#include "ttime.h" #define DEFAULT_INTERN_BUF_SIZE 16384L @@ -45,16 +45,15 @@ #define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) -#define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv))) +#define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv))) #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) -#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC)) +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.bytes) -#define GET_COLUMN_TYPE(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.type) +#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.type) typedef struct SPointInterpoSupporter { int32_t numOfCols; @@ -76,11 +75,11 @@ typedef enum { * 2. when all data within queried time window, it is also denoted as query_completed */ QUERY_COMPLETED = 0x4u, - + /* when the result is not completed return to client, this status will be * usually used in case of interval query with interpolation option */ - QUERY_OVER = 0x8u, + QUERY_OVER = 0x8u, } vnodeQueryStatus; enum { @@ -90,19 +89,19 @@ enum { }; typedef struct { - int32_t status; // query status - TSKEY lastKey; // the lastKey value before query executed - STimeWindow w; // whole query time window - STimeWindow current; // current query window - int32_t windowIndex; // index of active time window result for interval query - STSCursor cur; + int32_t status; // query status + TSKEY lastKey; // the lastKey value before query executed + STimeWindow w; // whole query time window + STimeWindow curWindow; // current query window + int32_t windowIndex; // index of active time window result for interval query + STSCursor cur; } SQueryStatusInfo; static void setQueryStatus(SQuery *pQuery, int8_t status); bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } -static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group); -static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); +static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group); +static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); @@ -114,9 +113,9 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); -static void createTableDataInfo(SQInfo* pQInfo); +static void createTableDataInfo(SQInfo *pQInfo); -static int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo); +static int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo); static int32_t flushFromResultBuf(SQInfo *pQInfo); bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) { @@ -374,7 +373,7 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) { if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total; assert(pQuery->rec.rows > 0); - + setQueryStatus(pQuery, QUERY_COMPLETED); return true; } @@ -427,21 +426,21 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo */ static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SDataStatis **pColStatis) { - SColIndex* pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo; + SColIndex *pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo; if (TSDB_COL_IS_TAG(pColIndex->flag)) { return false; } - + // query on primary timestamp column, not null value at all if (pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { return false; } - + *pColStatis = NULL; if (pStatis != NULL) { *pColStatis = getStatisInfo(pQuery, pStatis, pDataBlockInfo, col); } - + if ((*pColStatis) != NULL && (*pColStatis)->numOfNull == 0) { return false; } @@ -865,9 +864,9 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 dataBlock = NULL; } else { /* - * the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare stage, - * the remain meter may not have the required column in cache actually. - * So, the validation of required column in cache with the corresponding meter schema is reinforced. + * the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare + * stage, the remain meter may not have the required column in cache actually. So, the validation of required + * column in cache with the corresponding meter schema is reinforced. */ if (pDataBlock == NULL) { return NULL; @@ -898,32 +897,31 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 * such as count/min/max etc. */ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, - SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, - __block_search_fn_t searchFn, SArray *pDataBlock) { + SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, + __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; SColumnInfoData *pColInfo = NULL; - TSKEY * primaryKeyCol = NULL; + TSKEY * primaryKeyCol = NULL; if (pDataBlock != NULL) { pColInfo = taosArrayGet(pDataBlock, 0); primaryKeyCol = (TSKEY *)(pColInfo->pData); } - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->rows - 1; SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport)); for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; SDataStatis *tpField = NULL; - - bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField); + + bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField, - hasNull, &sasArray[k], pRuntimeEnv->scanFlag); + setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField, hasNull, + &sasArray[k], pRuntimeEnv->scanFlag); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -979,7 +977,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati } } } - + tfree(sasArray); } @@ -1131,13 +1129,13 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &pColStatis); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis, - hasNull, &sasArray[k], pRuntimeEnv->scanFlag); + setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis, hasNull, + &sasArray[k], pRuntimeEnv->scanFlag); } // set the input column data for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { -// SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; + // SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; assert(0); /* * NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column, @@ -1308,17 +1306,17 @@ static UNUSED_FUNC int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int } static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, - SDataStatis *pStatis, __block_search_fn_t searchFn, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { - + SDataStatis *pStatis, __block_search_fn_t searchFn, + SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - /*numOfRes = */rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); + /*numOfRes = */ rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } - - TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; + + TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); @@ -1336,7 +1334,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl if (numOfRes > 0 && pQuery->checkBuffer == 1) { assert(numOfRes >= pQuery->rec.rows); pQuery->rec.rows = numOfRes; - + if (numOfRes >= pQuery->rec.threshold) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); } @@ -1359,16 +1357,20 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY } else { pCtx->preAggVals.isSet = false; } - - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 && (tsCol != NULL)) { - pCtx->ptsList = tsCol; + + pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery)? pQuery->pos : 0; + pCtx->size = QUERY_IS_ASC_QUERY(pQuery)? size - pQuery->pos : pQuery->pos + 1; + + uint32_t status = aAggs[functionId].nStatus; + if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { + pCtx->ptsList = &tsCol[pCtx->startOffset]; } if (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) { // last_dist or first_dist function // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null - pCtx->ptsList = tsCol; +// pCtx->ptsList = tsCol; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* @@ -1384,15 +1386,12 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY pTWAInfo->EKey = pQuery->window.ekey; } - pCtx->ptsList = tsCol; +// pCtx->ptsList = tsCol; } else if (functionId == TSDB_FUNC_ARITHM) { pCtx->param[1].pz = param; } - - pCtx->startOffset = 0; - pCtx->size = size; - + #if defined(_DEBUG_VIEW) // int64_t *tsList = (int64_t *)primaryColumnData; // int64_t s = tsList[0]; @@ -1446,7 +1445,7 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i } } -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order) { +static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) { qTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -1562,7 +1561,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { destroyResultBuf(pRuntimeEnv->pResultBuf); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - + pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -1583,9 +1582,7 @@ static bool isQueryKilled(SQInfo *pQInfo) { #endif } -static void setQueryKilled(SQInfo* pQInfo) { - pQInfo->code = TSDB_CODE_QUERY_CANCELLED; -} +static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_QUERY_CANCELLED; } bool isFixedOutputQuery(SQuery *pQuery) { if (pQuery->intervalTime != 0) { @@ -1679,8 +1676,8 @@ static bool needReverseScan(SQuery *pQuery) { } ///////////////////////////////////////////////////////////////////////////////////////////// -void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, - int64_t *realSkey, int64_t *realEkey, STimeWindow *win) { +void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, int64_t *realSkey, + int64_t *realEkey, STimeWindow *win) { assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime); win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision); @@ -1739,7 +1736,7 @@ static UNUSED_FUNC bool doGetQueryPos(TSKEY key, SQInfo *pQInfo, SPointInterpoSu } static UNUSED_FUNC bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter, void *pMeterObj, - TSKEY nextKey) { + TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -2100,7 +2097,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI continue; } - int32_t colInBuf = 0;//pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf; + int32_t colInBuf = 0; // pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf; SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf; pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail)); @@ -2198,8 +2195,8 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table size_t s = pQInfo->groupInfo.numOfTables; num = MAX(s, INITIAL_RESULT_ROWS_VALUE); - } else { // for super table query, one page for each subset - num = 1;//pQInfo->pSidSet->numOfSubSet; + } else { // for super table query, one page for each subset + num = 1; // pQInfo->pSidSet->numOfSubSet; } assert(num > 0); @@ -2374,8 +2371,8 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl } if (r == BLK_DATA_NO_NEEDED) { - qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d", - GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); } else if (r == BLK_DATA_FILEDS_NEEDED) { if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { // return DISK_DATA_LOAD_FAILED; @@ -2418,10 +2415,10 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { } assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - - TSKEY* keyList = (TSKEY *)pValue; + + TSKEY * keyList = (TSKEY *)pValue; int32_t firstPos = 0; - int32_t lastPos = num - 1; + int32_t lastPos = num - 1; if (order == TSDB_ORDER_DESC) { // find the first position which is smaller than the key @@ -2477,9 +2474,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); - TsdbQueryHandleT pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSecQueryHandle; + TsdbQueryHandleT pQueryHandle = + pRuntimeEnv->scanFlag == MASTER_SCAN ? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { - if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return 0; } @@ -2488,13 +2485,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { // todo extract methods if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == 0) { - TSKEY skey1, ekey1; - STimeWindow w = {0}; + TSKEY skey1, ekey1; + STimeWindow w = {0}; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (QUERY_IS_ASC_QUERY(pQuery)) { - getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, - &skey1, &ekey1, &w); + getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, + &ekey1, &w); pWindowResInfo->startTime = w.skey; pWindowResInfo->prevSKey = w.skey; } else { @@ -2506,39 +2503,41 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pWindowResInfo->prevSKey = w.skey; } } - + // in case of prj/diff query, ensure the output buffer is sufficient to accomodate the results of current block if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) { - SResultRec* pRec = &pQuery->rec; - + SResultRec *pRec = &pQuery->rec; + if (pQuery->rec.capacity - pQuery->rec.rows < blockInfo.rows) { int32_t remain = pRec->capacity - pRec->rows; int32_t newSize = pRec->capacity + (blockInfo.rows - remain); - - for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t bytes = pQuery->pSelectExpr[i].resBytes; - - char* tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData)); - if (tmp == NULL) { // todo handle the oom + + char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData)); + if (tmp == NULL) { // todo handle the oom } else { - pQuery->sdata[i] = (SData*) tmp; + pQuery->sdata[i] = (SData *)tmp; } - + // set the pCtx output buffer position - pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows*bytes; + pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes; } - + pRec->capacity = newSize; } } SDataStatis *pStatis = NULL; - SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); + SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); + + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, - &pRuntimeEnv->windowResInfo, pDataBlock); + &pRuntimeEnv->windowResInfo, pDataBlock); - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", - GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), + blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); // save last access position if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { @@ -2572,19 +2571,19 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColId, tVariant *param) { +static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVariant *param) { tVariantDestroy(param); - char* val = NULL; + char * val = NULL; int16_t bytes = 0; - int16_t type = 0; + int16_t type = 0; tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val); tVariantCreateFromBinary(param, val, bytes, type); } -void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) { - SQuery * pQuery = pRuntimeEnv->pQuery; +void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) { + SQuery *pQuery = pRuntimeEnv->pQuery; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { @@ -2599,8 +2598,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) { if (!TSDB_COL_IS_TAG(pCol->flag)) { continue; } - - + // todo use tag column index to optimize performance doSetTagValueInParam(tsdb, id, pCol->colId, &pRuntimeEnv->pCtx[idx].tag); } @@ -2609,8 +2607,8 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) { if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pRuntimeEnv->pTSBuf != NULL) { assert(pFuncMsg->numOfParams == 1); - assert(0); // to do fix me -// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); + assert(0); // to do fix me + // doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); } } } @@ -2816,9 +2814,9 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int32_t ret = TSDB_CODE_SUCCESS; int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - + while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); + SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; @@ -2835,8 +2833,8 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1); } - qTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", - pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); + qTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", pQInfo, + pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -2916,23 +2914,23 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW return maxOutput; } -int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) { +int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; + SQuery * pQuery = pRuntimeEnv->pQuery; size_t size = taosArrayGetSize(pGroup); - + tFilePage **buffer = (tFilePage **)pQuery->sdata; - int32_t *posList = calloc(size, sizeof(int32_t)); - + int32_t * posList = calloc(size, sizeof(int32_t)); + STableDataInfo **pTableList = malloc(POINTER_BYTES * size); // todo opt for the case of one table per group int32_t numOfTables = 0; for (int32_t i = 0; i < size; ++i) { - SPair* p = taosArrayGet(pGroup, i); - STableDataInfo* pInfo = p->sec; - + SPair * p = taosArrayGet(pGroup, i); + STableDataInfo *pInfo = p->sec; + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->pTableQInfo->tid); if (list.size > 0 && pInfo->pTableQInfo->windowResInfo.size > 0) { pTableList[numOfTables] = pInfo; @@ -2956,7 +2954,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) { SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); - + int64_t lastTimestamp = -1; int64_t startt = taosGetTimestampMs(); @@ -3017,7 +3015,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) { if (buffer[0]->numOfElems != 0) { // there are data in buffer if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo); - + tfree(pTree); tfree(pTableList); tfree(posList); @@ -3128,7 +3126,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; - + // group by normal columns and interval query on normal table SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { @@ -3136,7 +3134,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; - + SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || @@ -3158,12 +3156,12 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { } if (isIntervalQuery(pQuery)) { -// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { -// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; -// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; -// -// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); -// } + // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { + // STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + // SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; + // + // doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); + // } } else { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); @@ -3175,7 +3173,8 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + SWITCH_ORDER(pRuntimeEnv->pCtx[i] + .order); // = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } } @@ -3209,7 +3208,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - memset(pQuery->sdata[i]->data, 0, (size_t) pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity); + memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity); } initCtxOutputBuf(pRuntimeEnv); @@ -3248,7 +3247,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; - + pRuntimeEnv->pCtx[j].currentStage = 0; aAggs[functionId].init(&pRuntimeEnv->pCtx[j]); } @@ -3302,7 +3301,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - + bool toContinue = false; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { // for each group result, call the finalize function for each column @@ -3345,74 +3344,72 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { return toContinue; } -static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) { - SQuery* pQuery = pRuntimeEnv->pQuery; - +static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SQueryStatusInfo info = { .status = pQuery->status, .windowIndex = pRuntimeEnv->windowResInfo.curIndex, .lastKey = pQuery->lastKey, .w = pQuery->window, + .curWindow = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey}, }; - + return info; } -static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) { - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); - SQuery* pQuery = pRuntimeEnv->pQuery; - - // the step should be placed before order changed - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - +static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { + SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQuery *pQuery = pRuntimeEnv->pQuery; + pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor if (pRuntimeEnv->pTSBuf) { SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); tsBufNextPos(pRuntimeEnv->pTSBuf); } - + // reverse order time range - pQuery->window.skey = pQuery->lastKey - step; - pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query - + pQuery->window = pStatus->curWindow; + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); + SWITCH_ORDER(pQuery->order.order); SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - + STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - + // clean unused handle if (pRuntimeEnv->pSecQueryHandle != NULL) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); - + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pRuntimeEnv); } -static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) { - SQuery* pQuery = pRuntimeEnv->pQuery; - +static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SWITCH_ORDER(pQuery->order.order); switchCtxOrder(pRuntimeEnv); - + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); if (pRuntimeEnv->pTSBuf) { pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; } - + SET_MASTER_SCAN_FLAG(pRuntimeEnv); - + // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query // during reverse scan - pQuery->lastKey = lastKey; + pQuery->lastKey = pStatus->lastKey; pQuery->status = pStatus->status; pQuery->window = pStatus->w; } @@ -3422,7 +3419,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { setQueryStatus(pQuery, QUERY_NOT_COMPLETED); // store the start query position - SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); + SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv); SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); @@ -3430,9 +3427,10 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (1) { doScanAllDataBlocks(pRuntimeEnv); - + if (pRuntimeEnv->scanFlag == MASTER_SCAN) { qstatus.status = pQuery->status; + qstatus.curWindow.ekey = pQuery->lastKey - step; } if (!needScanDataBlocksAgain(pRuntimeEnv)) { @@ -3445,16 +3443,16 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } STsdbQueryCond cond = { - .twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step}, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = qstatus.curWindow, + .order = pQuery->order.order, + .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - + if (pRuntimeEnv->pSecQueryHandle != NULL) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; @@ -3466,19 +3464,18 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { return; } } - + if (!needReverseScan(pQuery)) { return; } - - TSKEY lastKey = pQuery->lastKey; + setEnvBeforeReverseScan(pRuntimeEnv, &qstatus); // reverse scan from current position - qTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); + qTrace("QInfo:%p start to reverse scan", pQInfo); doScanAllDataBlocks(pRuntimeEnv); - - clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus); + + clearEnvAfterReverseScan(pRuntimeEnv, &qstatus); } void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -3588,7 +3585,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable* pTable, int32_t groupIdx, +void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable *pTable, int32_t groupIdx, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; @@ -3642,7 +3639,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * } } -int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo) { +int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; assert(pTableQueryInfo->lastKey > 0); @@ -3841,7 +3838,7 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { int32_t numOfResult = doCopyToSData(pQInfo, result, orderType); pQuery->rec.rows += numOfResult; - + assert(pQuery->rec.rows <= pQuery->rec.capacity); } @@ -3860,11 +3857,12 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf } } -void stableApplyFunctionsOnBlock(SQueryRuntimeEnv* pRuntimeEnv, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo, - SDataStatis *pStatis, SArray *pDataBlock, __block_search_fn_t searchFn) { - SQuery * pQuery = pRuntimeEnv->pQuery; - STableQueryInfo * pTableQueryInfo = pTableDataInfo->pTableQInfo; - SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; +void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo, + SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock, + __block_search_fn_t searchFn) { + SQuery * pQuery = pRuntimeEnv->pQuery; + STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo; + SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { // numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo); @@ -3901,8 +3899,8 @@ bool vnodeHasRemainResults(void *handle) { // query has completed if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - /*TSKEY ekey =*/ taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime, - pQuery->slidingTimeUnit, pQuery->precision); + /*TSKEY ekey =*/taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime, + pQuery->slidingTimeUnit, pQuery->precision); // int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY // *)pRuntimeEnv->pInterpoBuf[0]->data, // remain, pQuery->intervalTime, ekey, @@ -3917,7 +3915,7 @@ bool vnodeHasRemainResults(void *handle) { } static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **pDataSrc, int32_t numOfRows, - int32_t outputRows) { + int32_t outputRows) { #if 0 SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = &pRuntimeEnv->pQuery; @@ -3956,11 +3954,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SQuery *pQuery = pQInfo->runtimeEnv.pQuery; for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { int32_t bytes = pQuery->pSelectExpr[col].resBytes; - + memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); data += bytes * numOfRows; } - + // all data returned, set query over if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { setQueryStatus(pQuery, QUERY_OVER); @@ -4058,115 +4056,312 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { #endif } -int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { + SQuery *pQuery = pRuntimeEnv->pQuery; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - int32_t code = TSDB_CODE_SUCCESS; + if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed + pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step; + pQuery->limit.offset = 0; + return; + } + + if (QUERY_IS_ASC_QUERY(pQuery)) { + pQuery->pos = pQuery->limit.offset; + } else { + pQuery->pos = pBlockInfo->rows - pQuery->limit.offset - 1; + } - setScanLimitationByResultBuffer(pQuery); - changeExecuteScanOrder(pQuery, false); + assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->rows - 1); - // dataInCache requires lastKey value - pQuery->lastKey = pQuery->window.skey; + SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); + SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); - STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); - pQInfo->tsdb = tsdb; + // update the pQuery->limit.offset value, and pQuery->pos value + TSKEY *keys = (TSKEY *)pColInfoData->pData; + + // update the offset value + pQuery->lastKey = keys[pQuery->pos]; + pQuery->limit.offset = 0; - pRuntimeEnv->pQuery = pQuery; - pRuntimeEnv->pTSBuf = param; - pRuntimeEnv->cur.vnodeIndex = -1; - pRuntimeEnv->stableQuery = isSTableQuery; + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, + &pRuntimeEnv->windowResInfo, pDataBlock); - if (param != NULL) { - int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); - } + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes); +} - // create runtime environment - code = setupQueryRuntimeEnv(pRuntimeEnv, NULL, pQuery->order.order); - if (code != TSDB_CODE_SUCCESS) { - return code; +void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0) { + return; } - pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, isSTableQuery); - - if (isSTableQuery) { - int32_t rows = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (pQuery->intervalTime == 0) { - int16_t type = TSDB_DATA_TYPE_NULL; - - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - } else { - type = TSDB_DATA_TYPE_INT; // group id - } - - initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); - } + pQuery->pos = 0; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; - } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { - int32_t rows = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); - if (code != TSDB_CODE_SUCCESS) { - return code; + while (tsdbNextDataBlock(pQueryHandle)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { + return; } - int16_t type = TSDB_DATA_TYPE_NULL; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - } else { - type = TSDB_DATA_TYPE_TIMESTAMP; - } + SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); - initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type); + if (pQuery->limit.offset > blockInfo.rows) { + pQuery->limit.offset -= blockInfo.rows; + pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; + pQuery->lastKey += step; + + qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, pQuery->limit.offset); + } else { // find the appropriated start position in current block + updateOffsetVal(pRuntimeEnv, &blockInfo); + break; + } } +} - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - SPointInterpoSupporter interpInfo = {0}; - pointInterpSupporterInit(pQuery, &interpInfo); +static UNUSED_FUNC bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; - /* - * in case of last_row query without query range, we set the query timestamp to - * pMeterObj->lastKey. Otherwise, keep the initial query time range unchanged. - */ - if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { - if (!normalizeUnBoundLastRowQuery(pQInfo, &interpInfo)) { - sem_post(&pQInfo->dataReady); - pointInterpSupporterDestroy(&interpInfo); - return TSDB_CODE_SUCCESS; - } + // if queried with value filter, do NOT forward query start position + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { + return true; } - /* - * here we set the value for before and after the specified time into the - * parameter for interpolation query - */ - pointInterpSupporterSetData(pQInfo, &interpInfo); - pointInterpSupporterDestroy(&interpInfo); + if (pQuery->limit.offset > 0 && (!isTopBottomQuery(pQuery)) && pQuery->interpoType == TSDB_INTERPO_NONE) { + /* + * 1. for top/bottom query, the offset applies to the final result, not here + * 2. for interval without interpolation query we forward pQuery->intervalTime at a time for + * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is + * not valid. otherwise, we only forward pQuery->limit.offset number of points + */ + if (isIntervalQuery(pQuery)) { + assert(pRuntimeEnv->windowResInfo.prevSKey == 0); - // todo move to other location - // if (!forwardQueryStartPosIfNeeded(pQInfo, pQInfo, dataInDisk, dataInCache)) { - // return TSDB_CODE_SUCCESS; - // } + TSKEY skey1, ekey1; + STimeWindow w = {0}; + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - int64_t rs = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, - pQuery->precision); - taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); - // allocMemForInterpo(pQInfo, pQuery, pMeterObj); + if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { + // todo handle no data situation + } + + SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); + + if (QUERY_IS_ASC_QUERY(pQuery)) { + getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, + &ekey1, &w); + pWindowResInfo->startTime = w.skey; + pWindowResInfo->prevSKey = w.skey; + } else { + // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp + TSKEY start = blockInfo.window.ekey - pQuery->intervalTime; + getAlignQueryTimeWindow(pQuery, start, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); + + pWindowResInfo->startTime = pQuery->window.skey; + pWindowResInfo->prevSKey = w.skey; + } + + // the first time window + STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery); + + while (pQuery->limit.offset > 0) { + STimeWindow tw = win; + getNextTimeWindow(pQuery, &tw); + + // next time window starts from current data block + if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { + // query completed + if ((tw.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (tw.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } + + tw = win; + SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); + SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); + + int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &tw, pWindowResInfo, &blockInfo, pColInfoData->pData, + binarySearchForKey); + assert(startPos >= 0); + pQuery->limit.offset -= 1; + + // set the abort info + pQuery->pos = startPos; + pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; + pWindowResInfo->prevSKey = tw.skey; + win = tw; + continue; + } else { + if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } + + blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); + if ((blockInfo.window.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (blockInfo.window.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } + + // set the window that start from the next data block + TSKEY key = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.skey : blockInfo.window.ekey; + STimeWindow n = getActiveTimeWindow(pWindowResInfo, key, pQuery); + + // next data block are still covered by current time window + if (n.skey == win.skey && n.ekey == win.ekey) { + // do nothing + } else { + pQuery->limit.offset -= 1; + + // query completed + if ((n.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (n.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } + + // set the abort info + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; + pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.skey : blockInfo.window.ekey; + pWindowResInfo->prevSKey = n.skey; + + win = n; + } + } + } + + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) || pQuery->limit.offset > 0) { + setQueryStatus(pQuery, QUERY_COMPLETED); + return false; + } else { + assert(0); + // if (IS_DISK_DATA_BLOCK(pQuery)) { + // getTimestampInDiskBlock(pRuntimeEnv, 0); + } + } + } else { // forward the start position for projection query + skipBlocks(&pQInfo->runtimeEnv); + if (pQuery->limit.offset > 0) { + setQueryStatus(pQuery, QUERY_COMPLETED); + return false; + } + } + + return true; +} + +int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + int32_t code = TSDB_CODE_SUCCESS; + + setScanLimitationByResultBuffer(pQuery); + changeExecuteScanOrder(pQuery, false); + + // dataInCache requires lastKey value + pQuery->lastKey = pQuery->window.skey; + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); + pQInfo->tsdb = tsdb; + + pRuntimeEnv->pQuery = pQuery; + pRuntimeEnv->pTSBuf = param; + pRuntimeEnv->cur.vnodeIndex = -1; + pRuntimeEnv->stableQuery = isSTableQuery; + + if (param != NULL) { + int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; + tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); + } + + // create runtime environment + code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, isSTableQuery); + + if (isSTableQuery) { + int32_t rows = getInitialPageNum(pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pQuery->intervalTime == 0) { + int16_t type = TSDB_DATA_TYPE_NULL; + + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; + type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); + } else { + type = TSDB_DATA_TYPE_INT; // group id + } + + initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); + } + + } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { + int32_t rows = getInitialPageNum(pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int16_t type = TSDB_DATA_TYPE_NULL; + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); + } else { + type = TSDB_DATA_TYPE_TIMESTAMP; + } + + initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type); + } + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + + SPointInterpoSupporter interpInfo = {0}; + pointInterpSupporterInit(pQuery, &interpInfo); + + /* + * in case of last_row query without query range, we set the query timestamp to + * pMeterObj->lastKey. Otherwise, keep the initial query time range unchanged. + */ + if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { + if (!normalizeUnBoundLastRowQuery(pQInfo, &interpInfo)) { + sem_post(&pQInfo->dataReady); + pointInterpSupporterDestroy(&interpInfo); + return TSDB_CODE_SUCCESS; + } + } + + /* + * here we set the value for before and after the specified time into the + * parameter for interpolation query + */ + pointInterpSupporterSetData(pQInfo, &interpInfo); + pointInterpSupporterDestroy(&interpInfo); + + int64_t rs = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, + pQuery->precision); + taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); + // allocMemForInterpo(pQInfo, pQuery, pMeterObj); if (!isPointInterpoQuery(pQuery)) { // assert(pQuery->pos >= 0 && pQuery->slot >= 0); @@ -4216,29 +4411,29 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - + int64_t st = taosGetTimestampMs(); - + TsdbQueryHandleT *pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { if (isQueryKilled(pQInfo)) { break; } - SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); - STableDataInfo* pTableDataInfo = NULL; - STable* pTable = NULL; - + SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); + STableDataInfo *pTableDataInfo = NULL; + STable * pTable = NULL; + // todo opt performance using hash table size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - for(int32_t i = 0; i < numOfGroup; ++i) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); - + for (int32_t i = 0; i < numOfGroup; ++i) { + SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + size_t num = taosArrayGetSize(group); - for(int32_t j = 0; j < num; ++j) { - SPair* p = taosArrayGet(group, j); - STableDataInfo* pInfo = p->sec; - + for (int32_t j = 0; j < num; ++j) { + SPair * p = taosArrayGet(group, j); + STableDataInfo *pInfo = p->sec; + if (pInfo->pTableQInfo->tid == blockInfo.sid) { pTableDataInfo = p->sec; pTable = p->first; @@ -4246,7 +4441,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { } } } - + assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL); STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo; @@ -4261,7 +4456,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { } else { // interval query setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey); int32_t ret = setAdditionalInfo(pQInfo, pTable, pTableQueryInfo); - + if (ret != TSDB_CODE_SUCCESS) { pQInfo->code = ret; return taosGetTimestampMs() - st; @@ -4270,42 +4465,41 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { stableApplyFunctionsOnBlock(pRuntimeEnv, pTableDataInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey); } - + int64_t et = taosGetTimestampMs(); return et - st; } static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; + SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - SPair* p = taosArrayGet(group, index); - - STable* pTable = p->first; - STableDataInfo* pInfo = p->sec; - + SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + SPair * p = taosArrayGet(group, index); + + STable * pTable = p->first; + STableDataInfo *pInfo = p->sec; + setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb); - + qTrace("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, pTable->tableId.uid, pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey); - + STsdbQueryCond cond = { .twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey}, - .order = pQuery->order.order, + .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - - - SArray* g1 = taosArrayInit(1, POINTER_BYTES); - SArray* tx = taosArrayInit(1, sizeof(SPair)); - + + SArray *g1 = taosArrayInit(1, POINTER_BYTES); + SArray *tx = taosArrayInit(1, sizeof(SPair)); + taosArrayPush(tx, p); taosArrayPush(g1, &tx); STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; - + // include only current table pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); @@ -4313,7 +4507,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { if (pRuntimeEnv->cur.vnodeIndex == -1) { int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag); - + // failed to find data with the specified tag value if (elem.vnode < 0) { return false; @@ -4377,11 +4571,11 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i */ static void sequentialTableProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; + SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_COMPLETED); - + size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - + if (isPointInterpoQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); @@ -4441,10 +4635,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } #endif - + } else { createTableDataInfo(pQInfo); - + /* * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. @@ -4453,67 +4647,68 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (pQInfo->groupIndex > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.total += pQuery->rec.rows; - + if (pQuery->rec.rows > 0) { return; } } - + // all data have returned already if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { return; } - + resetCtxOutputBuf(pRuntimeEnv); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); - - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); - + + SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && + 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); + while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) { if (isQueryKilled(pQInfo)) { return; } - - SPair *p = taosArrayGet(group, pQInfo->tableIndex); - STableDataInfo* pInfo = p->sec; - + + SPair * p = taosArrayGet(group, pQInfo->tableIndex); + STableDataInfo *pInfo = p->sec; + TSKEY skey = pInfo->pTableQInfo->lastKey; if (skey > 0) { pQuery->window.skey = skey; } - + if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; } - -// SPointInterpoSupporter pointInterpSupporter = {0}; - + + // SPointInterpoSupporter pointInterpSupporter = {0}; + // TODO handle the limit problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { -// forwardQueryStartPosition(pRuntimeEnv); - + // skipBlocks(pRuntimeEnv); + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { pQInfo->tableIndex++; continue; } } - + scanAllDataBlocks(pRuntimeEnv); - + pQuery->rec.rows = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); - + // the limitation of output result is reached, set the query completed if (doRevisedResultsByLimit(pQInfo)) { pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; break; } - + // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); - + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { /* * query range is identical in terms of all meters involved in query, @@ -4523,30 +4718,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) { */ pQInfo->tableIndex++; pInfo->pTableQInfo->lastKey = pQuery->lastKey; - + // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) { break; } - + } else { // forward query range pQuery->window.skey = pQuery->lastKey; - + // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter if (pQuery->rec.rows == 0) { assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); continue; } else { -// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; -// // buffer is full, wait for the next round to retrieve data from current meter -// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); -// break; + // pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; + // // buffer is full, wait for the next round to retrieve data from current meter + // assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); + // break; } } } } - + /* * 1. super table projection query, group-by on normal columns query, ts-comp query * 2. point interpolation query, last row query @@ -4561,123 +4756,126 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (isTSCompQuery(pQuery)) { finalizeQueryResult(pRuntimeEnv); } - + if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; } - + // todo refactor if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SWindowStatus *pStatus = &pWindowResInfo->pResult[i].status; pStatus->closed = true; // enable return all results for group by normal columns - + SWindowResult *pResult = &pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes); } } - + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } - + pQuery->rec.total += pQuery->rec.rows; - - qTrace( "QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%d totalReturn:%d," - " offset:%" PRId64, pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, - pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset); + + qTrace( + "QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%d totalReturn:%d," + " offset:%" PRId64, + pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, + pQuery->limit.offset); } -static void createTableDataInfo(SQInfo* pQInfo) { - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - +static void createTableDataInfo(SQInfo *pQInfo) { + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + // todo make sure the table are added the reference count to gauranteed that all involved tables are valid size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - + int32_t index = 0; for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info - SArray *group = *(SArray**) taosArrayGet(pQInfo->groupInfo.pGroupList, i); - + SArray *group = *(SArray **)taosArrayGet(pQInfo->groupInfo.pGroupList, i); + size_t s = taosArrayGetSize(group); - for(int32_t j = 0; j < s; ++j) { - SPair* p = (SPair*) taosArrayGet(group, j); - + for (int32_t j = 0; j < s; ++j) { + SPair *p = (SPair *)taosArrayGet(group, j); + // STableDataInfo has been created for each table if (p->sec != NULL) { // todo refactor return; } - - STableDataInfo* pInfo = calloc(1, sizeof(STableDataInfo)); - + + STableDataInfo *pInfo = calloc(1, sizeof(STableDataInfo)); + setTableDataInfo(pInfo, index, i); - pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, ((STable*)(p->first))->tableId.tid, pQuery->window); - + pInfo->pTableQInfo = + createTableQueryInfo(&pQInfo->runtimeEnv, ((STable *)(p->first))->tableId.tid, pQuery->window); + p->sec = pInfo; - + index += 1; } } } static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) { -// SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - -// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { -// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; -// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo); -// } + // SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + + // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { + // STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + // changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo); + // } } -static void doSaveContext(SQInfo* pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; - +static void doSaveContext(SQInfo *pQInfo) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); disableFuncForReverseScan(pQInfo, pQuery->order.order); - + if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u; } - + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); prepareQueryInfoForReverseScan(pQInfo); } -static void doRestoreContext(SQInfo* pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; - +static void doRestoreContext(SQInfo *pQInfo) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - + if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } - + switchCtxOrder(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } -static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) { - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - +static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + if (isIntervalQuery(pQuery)) { -// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { -// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; -// closeAllTimeWindow(&pTableQueryInfo->windowResInfo); -// } + // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { + // STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + // closeAllTimeWindow(&pTableQueryInfo->windowResInfo); + // } size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - for(int32_t i = 0; i < numOfGroup; ++i) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); - + for (int32_t i = 0; i < numOfGroup; ++i) { + SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + size_t num = taosArrayGetSize(group); - for(int32_t j = 0; j < num; ++j) { - SPair* p = taosArrayGet(group, j); - STableDataInfo* pInfo = p->sec; - + for (int32_t j = 0; j < num; ++j) { + SPair * p = taosArrayGet(group, j); + STableDataInfo *pInfo = p->sec; + closeAllTimeWindow(&pInfo->pTableQInfo->windowResInfo); } } @@ -4714,48 +4912,48 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { qTrace("QInfo:%p current:%lld, total:%lld", pQInfo, pQuery->rec.rows, pQuery->rec.total); return; } - - qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, pQuery->window.skey, - pQuery->window.ekey, pQuery->order.order); - + + qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, + pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); + // create the query support structures createTableDataInfo(pQInfo); - + // do check all qualified data blocks int64_t el = queryOnDataBlocks(pQInfo); qTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start, order:%d", pQInfo, el, pQuery->order.order ^ 1u); - + // query error occurred or query is killed, abort current execution if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { qTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); return; } - + // close all time window results doCloseAllTimeWindowAfterScan(pQInfo); - + if (needReverseScan(pQuery)) { doSaveContext(pQInfo); - + el = queryOnDataBlocks(pQInfo); qTrace("QInfo:%p reversed scan completed, elapsed time: %lldms", pQInfo, el); - + doRestoreContext(pQInfo); } else { qTrace("QInfo:%p no need to do reversed scan, query completed", pQInfo); } - + setQueryStatus(pQuery, QUERY_COMPLETED); - + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { qTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); return; } - + if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { -// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0); - + // assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0); + if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); @@ -4766,7 +4964,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } else { // not a interval query copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } - + // handle the limitation of output buffer qTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } @@ -4802,12 +5000,19 @@ static void tableFixedOutputProcess(SQInfo *pQInfo) { static void tableMultiOutputProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; + SQuery * pQuery = pRuntimeEnv->pQuery; // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); } + + // skip blocks without load the actual data block from file if no filter condition present + skipBlocks(&pQInfo->runtimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { + setQueryStatus(pQuery, QUERY_COMPLETED); + return; + } while (1) { scanAllDataBlocks(pRuntimeEnv); @@ -4838,13 +5043,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) { doRevisedResultsByLimit(pQInfo); if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, - pQInfo, pQuery->lastKey, pQuery->window.ekey); + qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->lastKey, + pQuery->window.ekey); } -// qTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, -// pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned); - if (!isTSCompQuery(pQuery)) { assert(pQuery->rec.rows <= pQuery->rec.capacity); } @@ -4875,7 +5077,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= c; } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_RESBUF_FULL)) { break; } } @@ -4912,8 +5114,8 @@ static void tableIntervalProcess(SQInfo *pQInfo) { } numOfInterpo = 0; - pQuery->rec.rows = vnodeQueryResultInterpolate( - pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.rows, &numOfInterpo); + pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, + pQuery->rec.rows, &numOfInterpo); qTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { @@ -4937,10 +5139,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) { pQInfo->pointsInterpo += numOfInterpo; } -static void tableQueryImpl(SQInfo* pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; - +static void tableQueryImpl(SQInfo *pQInfo) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + if (vnodeHasRemainResults(pQInfo)) { /* * There are remain results that are not returned due to result interpolation @@ -4950,31 +5152,30 @@ static void tableQueryImpl(SQInfo* pQInfo) { int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); - + doRevisedResultsByLimit(pQInfo); - + pQInfo->pointsInterpo += numOfInterpo; qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); sem_post(&pQInfo->dataReady); return; } - + // here we have scan all qualified data in both data file and cache if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || - ((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) { - + ((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) { // todo limit the output for interval query? pQuery->rec.rows = 0; pQInfo->groupIndex = 0; // always start from 0 - + if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.rows += pQuery->rec.rows; - + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); - + if (pQuery->rec.rows > 0) { qTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); sem_post(&pQInfo->dataReady); @@ -4982,17 +5183,17 @@ static void tableQueryImpl(SQInfo* pQInfo) { } } } - + qTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); return; } - + // number of points returned during this query pQuery->rec.rows = 0; int64_t st = taosGetTimestampUs(); - + // group by normal column, sliding window query, interval query are handled by interval query processor if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) tableIntervalProcess(pQInfo); @@ -5002,48 +5203,50 @@ static void tableQueryImpl(SQInfo* pQInfo) { assert(pQuery->checkBuffer == 1); tableMultiOutputProcess(pQInfo); } - + // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); assert(pQInfo->groupInfo.numOfTables == 1); - + /* check if query is killed or not */ if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p query is killed", pQInfo); } else { -// STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0); -// qTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", -// pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + // STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0); + // qTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " + // rows", + // pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } - + sem_post(&pQInfo->dataReady); } -static void stableQueryImpl(SQInfo* pQInfo) { - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; +static void stableQueryImpl(SQInfo *pQInfo) { + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; pQuery->rec.rows = 0; - + int64_t st = taosGetTimestampUs(); - + if (isIntervalQuery(pQuery) || (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) { multiTableQueryProcess(pQInfo); } else { assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || - isGroupbyNormalCol(pQuery->pGroupbyExpr)); - + isGroupbyNormalCol(pQuery->pGroupbyExpr)); + sequentialTableProcess(pQInfo); } - + // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); -// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); - + // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); + if (pQuery->rec.rows == 0) { - qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total); -// vnodePrintQueryStatistics(pSupporter); + qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, + pQuery->rec.total); + // vnodePrintQueryStatistics(pSupporter); } - + sem_post(&pQInfo->dataReady); } @@ -5095,32 +5298,32 @@ static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) { return 0; } -static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** pTableIdList) { +static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) { assert(pQueryMsg->numOfTables > 0); - + *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId)); - + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); - + STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->sid}; taosArrayPush(*pTableIdList, &id); - + pMsg += sizeof(STableIdInfo); - + for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) { pTableIdInfo = (STableIdInfo *)pMsg; - + pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); - + taosArrayPush(*pTableIdList, pTableIdInfo); pMsg += sizeof(STableIdInfo); } - + return pMsg; } @@ -5133,28 +5336,28 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p * @return */ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, - char** tagCond, SColIndex** groupbyCols) { - pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); - - pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); - pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); - pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime); - pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime); - pQueryMsg->limit = htobe64(pQueryMsg->limit); - pQueryMsg->offset = htobe64(pQueryMsg->offset); - - pQueryMsg->order = htons(pQueryMsg->order); - pQueryMsg->orderColId = htons(pQueryMsg->orderColId); - pQueryMsg->queryType = htons(pQueryMsg->queryType); + char **tagCond, SColIndex **groupbyCols) { + pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); + + pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); + pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); + pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime); + pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime); + pQueryMsg->limit = htobe64(pQueryMsg->limit); + pQueryMsg->offset = htobe64(pQueryMsg->offset); - pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); + pQueryMsg->order = htons(pQueryMsg->order); + pQueryMsg->orderColId = htons(pQueryMsg->orderColId); + pQueryMsg->queryType = htons(pQueryMsg->queryType); + + pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols); pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols); - pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen); - pQueryMsg->tsOffset = htonl(pQueryMsg->tsOffset); - pQueryMsg->tsLen = htonl(pQueryMsg->tsLen); + pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen); + pQueryMsg->tsOffset = htonl(pQueryMsg->tsOffset); + pQueryMsg->tsLen = htonl(pQueryMsg->tsLen); pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); - pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); + pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); // query msg safety check if (validateQueryMsg(pQueryMsg) != 0) { @@ -5164,10 +5367,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) { - SColumnInfo* pColInfo = &pQueryMsg->colList[col]; - + SColumnInfo *pColInfo = &pQueryMsg->colList[col]; + pColInfo->colId = htons(pColInfo->colId); - pColInfo->type = htons(pColInfo->type); + pColInfo->type = htons(pColInfo->type); pColInfo->bytes = htons(pColInfo->bytes); pColInfo->numOfFilters = htons(pColInfo->numOfFilters); @@ -5211,15 +5414,15 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, (*pExpr)[i] = pExprMsg; pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); - pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); - pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - pExprMsg->functionId = htons(pExprMsg->functionId); - pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->functionId = htons(pExprMsg->functionId); + pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pMsg += sizeof(SSqlFuncExprMsg); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); + pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes); if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { @@ -5252,26 +5455,26 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->colNameList = (int64_t)pMsg; pMsg += pQueryMsg->colNameLen; } - + pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList); if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns - *groupbyCols = malloc(pQueryMsg->numOfGroupCols*sizeof(SColIndex)); - - for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { - (*groupbyCols)[i].colId = *(int16_t*) pMsg; + *groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex)); + + for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { + (*groupbyCols)[i].colId = *(int16_t *)pMsg; pMsg += sizeof((*groupbyCols)[i].colId); - - (*groupbyCols)[i].colIndex = *(int16_t*) pMsg; + + (*groupbyCols)[i].colIndex = *(int16_t *)pMsg; pMsg += sizeof((*groupbyCols)[i].colIndex); - (*groupbyCols)[i].flag = *(int16_t*) pMsg; + (*groupbyCols)[i].flag = *(int16_t *)pMsg; pMsg += sizeof((*groupbyCols)[i].flag); memcpy((*groupbyCols)[i].name, pMsg, tListLen(groupbyCols[i]->name)); pMsg += tListLen((*groupbyCols)[i].name); } - + pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); pQueryMsg->orderType = htons(pQueryMsg->orderType); } @@ -5284,22 +5487,23 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { v[i] = htobe64(v[i]); } - + pMsg += sizeof(int64_t) * pQueryMsg->numOfOutputCols; } - + // the tag query condition expression string is located at the end of query msg if (pQueryMsg->tagCondLen > 0) { *tagCond = calloc(1, pQueryMsg->tagCondLen); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen); } - - qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, " - "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, - pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, - pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols, - pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, - pQueryMsg->limit, pQueryMsg->offset); + + qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 + ", numOfGroupbyTagCols:%d, ts order:%d, " + "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 + ", offset:%" PRId64, + pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, + pQueryMsg->order, pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime, + pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); return 0; } @@ -5355,7 +5559,8 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs return TSDB_CODE_SUCCESS; } -static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr, SSqlFuncExprMsg** pExprMsg) { +static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr, + SSqlFuncExprMsg **pExprMsg) { *pSqlFuncExpr = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -5434,7 +5639,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct return TSDB_CODE_SUCCESS; } -static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex* pColIndex, int32_t *code) { +static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) { if (pQueryMsg->numOfGroupCols == 0) { return NULL; } @@ -5538,31 +5743,31 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_SUCCESS; } -static void doUpdateExprColumnIndex(SQuery* pQuery) { +static void doUpdateExprColumnIndex(SQuery *pQuery) { assert(pQuery->pSelectExpr != NULL && pQuery != NULL); -// int32_t i = 0, j = 0; -// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) { -// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) { -// pQuery->colList[i++].colIndex = (int16_t)j++; -// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) { -// pQuery->colList[i++].colIndex = -1; -// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) { -// j++; -// } -// } + // int32_t i = 0, j = 0; + // while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) { + // if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) { + // pQuery->colList[i++].colIndex = (int16_t)j++; + // } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) { + // pQuery->colList[i++].colIndex = -1; + // } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) { + // j++; + // } + // } -// while (i < pQuery->numOfCols) { -// pQuery->colList[i++].colIndex = -1; // not such column in current meter -// } - - for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; + // while (i < pQuery->numOfCols) { + // pQuery->colList[i++].colIndex = -1; // not such column in current meter + // } + + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + SSqlFuncExprMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { continue; } - - SColIndex* pColIndexEx = &pSqlExprMsg->colInfo; - for(int32_t f = 0; f < pQuery->numOfCols; ++f) { + + SColIndex *pColIndexEx = &pSqlExprMsg->colInfo; + for (int32_t f = 0; f < pQuery->numOfCols; ++f) { if (pColIndexEx->colId == pQuery->colList[f].info.colId) { pColIndexEx->colIndex = f; break; @@ -5610,7 +5815,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou for (int16_t i = 0; i < numOfCols; ++i) { pQuery->colList[i].info = pQueryMsg->colList[i]; - + SColumnInfo *pColInfo = &pQuery->colList[i].info; pColInfo->filters = NULL; // if (colList[i].numOfFilters > 0) { @@ -5629,7 +5834,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou assert(pExprs[col].resBytes > 0); pQuery->rowSize += pExprs[col].resBytes; } - + doUpdateExprColumnIndex(pQuery); int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery); @@ -5646,7 +5851,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // set the output buffer capacity pQuery->rec.capacity = 4096; pQuery->rec.threshold = 4000; - + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { assert(pExprs[col].interResBytes >= pExprs[col].resBytes); @@ -5673,18 +5878,18 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->groupInfo = *groupInfo; pQuery->pos = -1; - + pQuery->window.skey = pQueryMsg->window.skey; pQuery->window.ekey = pQueryMsg->window.ekey; - pQuery->lastKey = pQuery->window.skey; - + pQuery->lastKey = pQuery->window.skey; + if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); goto _clean_memory; } - + vnodeParametersSafetyCheck(pQuery); - + qTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; @@ -5719,30 +5924,30 @@ static bool isValidQInfo(void *param) { * pQInfo->signature may be changed by another thread, so we assign value of signature * into local variable, then compare by using local variable */ - uint64_t sig = (uint64_t) pQInfo->signature; + uint64_t sig = (uint64_t)pQInfo->signature; return (sig == (uint64_t)pQInfo); } -static void freeQInfo(SQInfo *pQInfo); -static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void* tsdb, SQInfo *pQInfo, bool isSTable) { +static void freeQInfo(SQInfo *pQInfo); +static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, bool isSTable) { int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - + STSBuf *pTSBuf = NULL; if (pQueryMsg->tsLen > 0) { // open new file to save the result char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset; pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); - + tsBufResetPos(pTSBuf); tsBufNextPos(pTSBuf); } - + // only the successful complete requries the sem_post/over = 1 operations. if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { qTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); - + sem_post(&pQInfo->dataReady); setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; @@ -5767,67 +5972,67 @@ static void freeQInfo(SQInfo *pQInfo) { if (!isValidQInfo(pQInfo)) { return; } - - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; setQueryKilled(pQInfo); - + qTrace("QInfo:%p start to free QInfo", pQInfo); for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } - + sem_destroy(&(pQInfo->dataReady)); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - + for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; if (pColFilter->numOfFilters > 0) { tfree(pColFilter->pFilters); } } - + tfree(pQuery->pFilterInfo); tfree(pQuery->colList); tfree(pQuery->sdata); - + if (pQuery->pSelectExpr != NULL) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; - + if (pBinExprInfo->numOfCols > 0) { tfree(pBinExprInfo->pReqColumns); tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL); } } - + tfree(pQuery->pSelectExpr); } - + if (pQuery->defaultVal != NULL) { tfree(pQuery->defaultVal); } - + tfree(pQuery->pGroupbyExpr); tfree(pQuery); - + int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + for (int32_t i = 0; i < numOfGroups; ++i) { + SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); taosArrayDestroy(p); } - + taosArrayDestroy(pQInfo->groupInfo.pGroupList); - + qTrace("QInfo:%p QInfo is freed", pQInfo); - + // destroy signature, in order to avoid the query process pass the object safety check memset(pQInfo, 0, sizeof(SQInfo)); tfree(pQInfo); } static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + /* * get the file size and set the numOfRows to be the file size, since for tsComp query, * the returned row size is equalled to 1 @@ -5850,20 +6055,20 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // the remained number of retrieved rows, not the interpolated result SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - + // load data from file to msg buffer if (isTSCompQuery(pQuery)) { int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); - + // make sure file exist if (FD_VALID(fd)) { size_t s = lseek(fd, 0, SEEK_END); qTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); - + lseek(fd, 0, SEEK_SET); read(fd, data, s); close(fd); - + unlink(pQuery->sdata[0]->data); } else { qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, @@ -5872,25 +6077,25 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { } else { doCopyQueryResultToMsg(pQInfo, pQuery->rec.rows, data); } - + pQuery->rec.total += pQuery->rec.rows; qTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); - + return TSDB_CODE_SUCCESS; - + // todo if interpolation exists, the result may be dump to client by several rounds } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { +int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { assert(pQueryMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; - - char* tagCond = NULL; - SArray *pTableIdList = NULL; - SSqlFuncExprMsg** pExprMsg = NULL; - SColIndex* pGroupColIndex = NULL; - + + char * tagCond = NULL; + SArray * pTableIdList = NULL; + SSqlFuncExprMsg **pExprMsg = NULL; + SColIndex * pGroupColIndex = NULL; + if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) { return code; } @@ -5917,37 +6122,38 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { goto _query_over; } - - bool isSTableQuery = false; + + bool isSTableQuery = false; STableGroupInfo groupInfo = {0}; - + if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { isSTableQuery = true; - - STableId* id = taosArrayGet(pTableIdList, 0); - id->uid = -1; //todo fix me - - /*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); - if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query + + STableId *id = taosArrayGet(pTableIdList, 0); + id->uid = -1; // todo fix me + + /*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, + pQueryMsg->numOfGroupCols); + if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; } } else { assert(taosArrayGetSize(pTableIdList) == 1); - - STableId* id = taosArrayGet(pTableIdList, 0); + + STableId *id = taosArrayGet(pTableIdList, 0); if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { goto _query_over; } } - + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; } - + code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery); - + _query_over: taosArrayDestroy(pTableIdList); @@ -5962,36 +6168,36 @@ void qDestroyQueryInfo(qinfo_t pQInfo) { } void qTableQuery(qinfo_t qinfo) { - SQInfo* pQInfo = (SQInfo*) qinfo; - + SQInfo *pQInfo = (SQInfo *)qinfo; + if (pQInfo == NULL || pQInfo->signature != pQInfo) { qTrace("%p freed abort query", pQInfo); return; } - + if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p it is already killed, abort", pQInfo); return; } - + qTrace("QInfo:%p query task is launched", pQInfo); - + if (pQInfo->runtimeEnv.stableQuery) { stableQueryImpl(pQInfo); } else { tableQueryImpl(pQInfo); } - + // vnodeDecRefCount(pQInfo); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { - SQInfo* pQInfo = (SQInfo*) qinfo; - + SQInfo *pQInfo = (SQInfo *)qinfo; + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } - + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); @@ -6000,19 +6206,19 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { sem_wait(&pQInfo->dataReady); qTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, - pQInfo->code); - + pQInfo->code); + return pQInfo->code; } bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { - SQInfo* pQInfo = (SQInfo*) qinfo; - + SQInfo *pQInfo = (SQInfo *)qinfo; + if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { return false; } - - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { return false; } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { @@ -6024,21 +6230,21 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { } } -int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen) { - SQInfo* pQInfo = (SQInfo*) qinfo; - +int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) { + SQInfo *pQInfo = (SQInfo *)qinfo; + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } - - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - size_t size = getResultSize(pQInfo, &pQuery->rec.rows); + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + size_t size = getResultSize(pQInfo, &pQuery->rec.rows); *contLen = size + sizeof(SRetrieveTableRsp); - + // todo handle failed to allocate memory *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); (*pRsp)->numOfRows = htonl(pQuery->rec.rows); - + int32_t code = pQInfo->code; if (code == TSDB_CODE_SUCCESS) { (*pRsp)->offset = htobe64(pQuery->limit.offset); @@ -6047,23 +6253,23 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co (*pRsp)->offset = 0; (*pRsp)->useconds = 0; } - + if (pQuery->rec.rows > 0 && code == TSDB_CODE_SUCCESS) { code = doDumpQueryResult(pQInfo, (*pRsp)->data); } else { setQueryStatus(pQuery, QUERY_OVER); code = pQInfo->code; } - + if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { - (*pRsp)->completed = 1; // notify no more result to client + (*pRsp)->completed = 1; // notify no more result to client } - + return code; - -// if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { -// qTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); -// vnodeDecRefCount(pObj->qhandle); -// pObj->qhandle = NULL; -// } + + // if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { + // qTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); + // vnodeDecRefCount(pObj->qhandle); + // pObj->qhandle = NULL; + // } } -- GitLab