diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index bd5e61c321acd90a14a74053d101859c22cf285b..f3484509f8113f22050b959bb5c8226375d2bd3c 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -30,8 +30,6 @@ extern "C" { struct tExprNode; struct SSchema; -struct tSkipList; -struct tSkipListNode; enum { TSQL_NODE_EXPR = 0x1, diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 6ca8b3ab4acc7dafe79012edf4fc19d0881d8021..8ee337b8edd240d101601ceac38c5f33f2852411 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -50,6 +50,7 @@ #define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv))) #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC)) /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ @@ -84,15 +85,24 @@ typedef enum { QUERY_OVER = 0x8u, } vnodeQueryStatus; -static void setQueryStatus(SQuery *pQuery, int8_t status); -bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } - enum { TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TAG_NOT_EQUALS = 2, }; +typedef struct { + int32_t status; // query status + TSKEY lastKey; // the lastKey value before query executed + STimeWindow w; // whole query time window + STimeWindow current; // current query window + int32_t windowIndex; // index of active time window result for interval query + STSCursor cur; +} SQueryStatusInfo; + +static void setQueryStatus(SQuery *pQuery, int8_t status); +bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } + static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); @@ -2226,102 +2236,11 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -//int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { -// if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || -// (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { -// dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, -// pQuery->window.ekey, pQuery->order.order); -// -// sem_post(&pQInfo->dataReady); -// return TSDB_CODE_SUCCESS; -// } -// -// pQuery->status = 0; -// pQuery->rec = (SResultRec){0}; -// -// changeExecuteScanOrder(pQuery, true); -// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; -// -// /* -// * since we employ the output control mechanism in main loop. -// * so, disable it during data block scan procedure. -// */ -// setScanLimitationByResultBuffer(pQuery); -// -// // save raw query range for applying to each subgroup -// pQuery->lastKey = pQuery->window.skey; -// -// // create runtime environment -// // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; -// -// // get one queried meter -// assert(0); -// // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); -// -// pRuntimeEnv->pTSBuf = param; -// pRuntimeEnv->cur.vnodeIndex = -1; -// -// // set the ts-comp file traverse order -// if (param != NULL) { -// int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; -// tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); -// } -// -// assert(0); -// // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); -// // if (ret != TSDB_CODE_SUCCESS) { -// // return ret; -// // } -// -// // createTableGroup(pQInfo->pSidSet); -// -// int32_t size = getInitialPageNum(pQInfo); -// int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); -// if (ret != TSDB_CODE_SUCCESS) { -// return ret; -// } -// -// if (pQuery->intervalTime == 0) { -// int16_t type = TSDB_DATA_TYPE_NULL; -// -// if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; -// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); -// } else { -// type = TSDB_DATA_TYPE_INT; // group id -// } -// -// initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); -// } -// -// pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); -// -// STsdbQueryCond cond = { -// .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, -// .order = pQuery->order.order, -// .colList = pQuery->colList, -// -// }; -// -// pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo); -// -// // metric query do not invoke interpolation, it will be done at the second-stage merge -// if (!isPointInterpoQuery(pQuery)) { -// pQuery->interpoType = TSDB_INTERPO_NONE; -// } -// -// TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, -// pQuery->precision); -// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); -// pRuntimeEnv->stableQuery = true; -// -// return TSDB_CODE_SUCCESS; -//} - /** * decrease the refcount for each table involved in this query * @param pQInfo */ -void vnodeDecMeterRefcnt(SQInfo *pQInfo) { +UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { // assert(taosHashGetSize(pQInfo->groupInfo) >= 1); } @@ -2355,7 +2274,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { #endif } -void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { +UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -3255,10 +3174,10 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { pQuery->order.order = (pQuery->order.order) ^ 1u; } -void setCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { +void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } } @@ -3373,66 +3292,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { } } -typedef struct SQueryStatus { - int8_t status; -// TSKEY lastKey; - STSCursor cur; -} SQueryStatus; - -// todo refactor -static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - pStatus->status = pQuery->status; -// pStatus->lastKey = pQuery->lastKey; - - pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor - - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order ^= 1u; - tsBufNextPos(pRuntimeEnv->pTSBuf); - } - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - -// SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); -// pQuery->lastKey = pQuery->window.skey; -} - -static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - -// pQuery->lastKey = pStatus->lastKey; - pQuery->status = pStatus->status; - - tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); -} - -static UNUSED_FUNC void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQueryStatus qStatus = {0}; - - if (!needReverseScan(pQuery)) { - return; - } - - dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); - SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - - // close necessary function execution during supplementary scan - disableFuncInReverseScan(pRuntimeEnv); - queryStatusSave(pRuntimeEnv, &qStatus); - - // reverse scan from current position - doScanAllDataBlocks(pRuntimeEnv); - - queryStatusRestore(pRuntimeEnv, &qStatus); - setCtxOrder(pRuntimeEnv); - - SET_MASTER_SCAN_FLAG(pRuntimeEnv); -} - void setQueryStatus(SQuery *pQuery, int8_t status) { if (status == QUERY_NOT_COMPLETED) { pQuery->status = status; @@ -3488,45 +3347,118 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { return toContinue; } +static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + SQueryStatusInfo info = { + .status = pQuery->status, + .windowIndex = pRuntimeEnv->windowResInfo.curIndex, + .lastKey = pQuery->lastKey, + .w = pQuery->window, + }; + + return info; +} + +static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQuery* pQuery = pRuntimeEnv->pQuery; + + // the step should be placed before order changed + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor + if (pRuntimeEnv->pTSBuf) { + SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); + tsBufNextPos(pRuntimeEnv->pTSBuf); + } + + // reverse order time range + pQuery->window.skey = pQuery->lastKey - step; + pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query + + SWITCH_ORDER(pQuery->order.order); + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + // clean unused handle + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + } + + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + switchCtxOrder(pRuntimeEnv); + disableFuncInReverseScan(pRuntimeEnv); +} + +static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + SWITCH_ORDER(pQuery->order.order); + switchCtxOrder(pRuntimeEnv); + + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + } + + SET_MASTER_SCAN_FLAG(pRuntimeEnv); + + // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query + // during reverse scan + pQuery->lastKey = lastKey; + pQuery->status = pStatus->status; + pQuery->window = pStatus->w; +} + void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); // store the start query position SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); - - int64_t skey = pQuery->lastKey; - int32_t status = pQuery->status; - int32_t prevSlot = pRuntimeEnv->windowResInfo.curIndex; + SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); while (1) { doScanAllDataBlocks(pRuntimeEnv); + + if (pRuntimeEnv->scanFlag == MASTER_SCAN) { + qstatus.status = pQuery->status; + } if (!needScanDataBlocksAgain(pRuntimeEnv)) { - // restore the status + // restore the status code and jump out of loop if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { - pQuery->status = status; // restore the status code when abort from repeat scan + pQuery->status = qstatus.status; } break; } STsdbQueryCond cond = { - .twindow = {.skey = skey, .ekey = pQuery->lastKey - step}, + .twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step}, .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - if (pRuntimeEnv->pSecQueryHandle == NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - status = pQuery->status; // backup the status - pRuntimeEnv->windowResInfo.curIndex = prevSlot; + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; @@ -3541,63 +3473,14 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { return; } - // save the query time window - STimeWindow prev = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey}; - - // reverse order time range - pQuery->window.skey = pQuery->lastKey - step; - pQuery->window.ekey = skey; - - pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - - STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - // clean unused handle - if (pRuntimeEnv->pSecQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - } + TSKEY lastKey = pQuery->lastKey; + setEnvBeforeReverseScan(pRuntimeEnv, &qstatus); - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); - - dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); - SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - - int32_t status1 = pQuery->status; - - STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; - tsBufNextPos(pRuntimeEnv->pTSBuf); - } - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - setCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pRuntimeEnv); - // reverse scan from current position + dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); doScanAllDataBlocks(pRuntimeEnv); - pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - tsBufSetCursor(pRuntimeEnv->pTSBuf, &cur); - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; - } - - setCtxOrder(pRuntimeEnv); - - SET_MASTER_SCAN_FLAG(pRuntimeEnv); - - // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query - // during reverse scan - pQuery->lastKey = prev.skey; - pQuery->status = status1; - pQuery->window.ekey = prev.ekey; + clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus); } void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -4776,7 +4659,7 @@ static void doRestoreContext(SQInfo* pQInfo) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } - setCtxOrder(pRuntimeEnv); + switchCtxOrder(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b2177a4cbc296b215ab005c4c14d9c02456a7e0b..c9b3a655c747dfd474055a733e31de519f2e2ce0 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1049,7 +1049,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet assert(pCheckInfo->iter != NULL); - rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle); + rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 4000, &skey, &ekey, pHandle); // update the last key value pCheckInfo->lastKey = ekey + step;