From 96da5e233e63bba4297853a3767b2a6ec2998da7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Jul 2020 10:56:58 +0800 Subject: [PATCH] [td-225] fix bugs in sliding query. --- src/common/src/tname.c | 2 +- src/query/src/qExecutor.c | 149 +++++++++++++++++++++++--------------- src/query/src/qUtil.c | 3 +- 3 files changed, 92 insertions(+), 62 deletions(-) diff --git a/src/common/src/tname.c b/src/common/src/tname.c index b3ff15f936..c8910a7867 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -81,7 +81,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in return startTime; } - int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime; + int64_t start = ((startTime - slidingTime) / slidingTime + 1) * slidingTime; if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) { /* * here we revised the start time of day according to the local time zone, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 37c21e069b..110cd23370 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -396,34 +396,38 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataSt } static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, - int16_t bytes) { + int16_t bytes, bool masterscan) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes); if (p1 != NULL) { pWindowResInfo->curIndex = *p1; - } else { // more than the capacity, reallocate the resources - if (pWindowResInfo->size >= pWindowResInfo->capacity) { - int64_t newCap = pWindowResInfo->capacity * 2; - - char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult)); - if (t != NULL) { - pWindowResInfo->pResult = (SWindowResult *)t; - memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * pWindowResInfo->capacity); - } else { - // todo - } + } else { + if (masterscan) { // more than the capacity, reallocate the resources + if (pWindowResInfo->size >= pWindowResInfo->capacity) { + int64_t newCap = pWindowResInfo->capacity * 2; + + char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult)); + if (t != NULL) { + pWindowResInfo->pResult = (SWindowResult *)t; + memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * pWindowResInfo->capacity); + } else { + // todo + } - for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { - SPosInfo pos = {-1, -1}; - createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos); + for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { + SPosInfo pos = {-1, -1}; + createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos); + } + pWindowResInfo->capacity = newCap; } - pWindowResInfo->capacity = newCap; - } - // add a new result set for a new group - pWindowResInfo->curIndex = pWindowResInfo->size++; - taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + // add a new result set for a new group + pWindowResInfo->curIndex = pWindowResInfo->size++; + taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + } else { + return NULL; + } } return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); @@ -510,15 +514,19 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult } static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid, - STimeWindow *win) { + STimeWindow *win, bool masterscan, bool* newWind) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, + TSDB_KEYSIZE, masterscan); if (pWindowRes == NULL) { - return -1; + *newWind = false; + + return masterscan? -1:0; } + *newWind = true; // not assign result buffer yet, add new result buffer if (pWindowRes->pos.pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage); @@ -685,24 +693,26 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + int32_t functionId = pQuery->pSelectExpr[k].base.functionId; - pCtx[k].nStartQueryTimestamp = pWin->skey; - pCtx[k].size = forwardStep; - pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); + pCtx[k].nStartQueryTimestamp = pWin->skey; + pCtx[k].size = forwardStep; + pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx[k].ptsList = &tsBuf[offset]; - } + if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { + pCtx[k].ptsList = &tsBuf[offset]; + } - // not a whole block involved in query processing, statistics data can not be used - if (forwardStep != numOfTotal) { - pCtx[k].preAggVals.isSet = false; - } + // not a whole block involved in query processing, statistics data can not be used + if (forwardStep != numOfTotal) { + pCtx[k].preAggVals.isSet = false; + } - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); + } } } } @@ -712,12 +722,14 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - pCtx[k].nStartQueryTimestamp = pWin->skey; + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + pCtx[k].nStartQueryTimestamp = pWin->skey; - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); + int32_t functionId = pQuery->pSelectExpr[k].base.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } } } } @@ -879,7 +891,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + SQuery *pQuery = pRuntimeEnv->pQuery; TSKEY *tsCols = NULL; if (pDataBlock != NULL) { @@ -902,18 +915,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = tsCols[offset]; + bool hasTimeWindow = false; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { tfree(sasArray); return; } - TSKEY ekey = reviseWindowEkey(pQuery, &win); - int32_t forwardStep = - getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); + if (hasTimeWindow) { + TSKEY ekey = reviseWindowEkey(pQuery, &win); + int32_t forwardStep = + getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); - SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows); + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows); + } int32_t index = pWindowResInfo->curIndex; STimeWindow nextWin = win; @@ -925,14 +941,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) { + bool hasTimeWindow = false; + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { break; } - ekey = reviseWindowEkey(pQuery, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); + if (!hasTimeWindow) { + continue; + } + + TSKEY ekey = reviseWindowEkey(pQuery, &nextWin); + int32_t forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); - pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + SWindowStatus* pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } @@ -982,7 +1003,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } // assert(pRuntimeEnv->windowResInfo.hashList->size <= 2); - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true); if (pWindowRes == NULL) { return -1; } @@ -1112,6 +1133,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; @@ -1183,11 +1205,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS int64_t ts = tsCols[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win); + bool hasTimeWindow = false; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } + if (!hasTimeWindow) { + continue; + } + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset); @@ -1207,12 +1234,15 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) { + bool hasTimeWindow = false; + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { break; } - pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); - doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, offset); + if (hasTimeWindow) { + pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, offset); + } } pWindowResInfo->curIndex = index; @@ -3388,7 +3418,8 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { } int32_t GROUPRESULTID = 1; - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex)); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, + sizeof(groupIndex), true); if (pWindowRes == NULL) { return; } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 3b2b2bfff0..03574388b7 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -190,8 +190,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_ } // get the result order - int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? - TSDB_ORDER_ASC:TSDB_ORDER_DESC; + int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? 1:-1; if (order != resultOrder) { return; -- GitLab