From d7579e02a11dc654c8f587c2236276bfea8dfbc5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Aug 2020 15:08:15 +0800 Subject: [PATCH] [td-225] fix the bugs in fill for interval query --- src/client/src/tscLocalMerge.c | 139 ++++++++++++++++++--------------- src/query/src/qFill.c | 43 +++++----- 2 files changed, 97 insertions(+), 85 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index cbc4caad1b..186c2871a1 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -822,7 +822,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * } } -void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { +void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { // discard following dataset in the same group and reset the interpolation information STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -865,64 +865,66 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe } } -/* - * Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called - * by "interuptHandler" function in shell - */ -static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { - SSqlCmd * pCmd = &pSql->cmd; - SSqlRes * pRes = &pSql->res; - - tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); +static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) { + assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE); - // no interval query, no fill operation - if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { - pRes->data = pLocalReducer->pFinalRes; - pRes->numOfRows = pFinalDataPage->num; + tFilePage * pBeforeFillData = pLocalReducer->pResultBuf; - if (pQueryInfo->limit.offset > 0) { - if (pQueryInfo->limit.offset < pRes->numOfRows) { - int32_t prevSize = (int32_t)pFinalDataPage->num; - tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); + pRes->data = pLocalReducer->pFinalRes; + pRes->numOfRows = pBeforeFillData->num; - /* remove the hole in column model */ - tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); + if (pQueryInfo->limit.offset > 0) { + if (pQueryInfo->limit.offset < pRes->numOfRows) { + int32_t prevSize = (int32_t)pBeforeFillData->num; + tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); - pRes->numOfRows -= pQueryInfo->limit.offset; - pQueryInfo->limit.offset = 0; - } else { - pQueryInfo->limit.offset -= pRes->numOfRows; - pRes->numOfRows = 0; - } + /* remove the hole in column model */ + tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); + + pRes->numOfRows -= pQueryInfo->limit.offset; + pQueryInfo->limit.offset = 0; + } else { + pQueryInfo->limit.offset -= pRes->numOfRows; + pRes->numOfRows = 0; } + } - pRes->numOfRowsGroup += pRes->numOfRows; + pRes->numOfRowsGroup += pRes->numOfRows; - if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) { - /* impose the limitation of output rows on the final result */ - int32_t prevSize = (int32_t)pFinalDataPage->num; - int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit); - assert(overflow < pRes->numOfRows); + // impose the limitation of output rows on the final result + if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) { + int32_t prevSize = (int32_t)pBeforeFillData->num; + int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit); + assert(overflow < pRes->numOfRows); - pRes->numOfRowsGroup = pQueryInfo->limit.limit; - pRes->numOfRows -= overflow; - pFinalDataPage->num -= overflow; + pRes->numOfRowsGroup = pQueryInfo->limit.limit; + pRes->numOfRows -= overflow; + pBeforeFillData->num -= overflow; - tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); + tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); - /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); - } + // set remain data to be discarded, and reset the interpolation information + savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); + } - memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize); + memcpy(pRes->data, pBeforeFillData->data, pRes->numOfRows * pLocalReducer->finalRowSize); - pRes->numOfClauseTotal += pRes->numOfRows; - pFinalDataPage->num = 0; - return; - } + pRes->numOfClauseTotal += pRes->numOfRows; + pBeforeFillData->num = 0; +} + +/* + * Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called + * by "interuptHandler" function in shell + */ +static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + tFilePage *pBeforeFillData = pLocalReducer->pResultBuf; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SFillInfo *pFillInfo = pLocalReducer->pFillInfo; - SFillInfo *pFillInfo = pLocalReducer->pFillInfo; int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey); tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); @@ -960,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO break; } - /* all output for current group are completed */ + // all output in current group are completed int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); if (totalRemainRows <= 0) { break; @@ -970,15 +972,16 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO } if (pRes->numOfRows > 0) { - if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) { - int32_t overflow = (int32_t)(pRes->numOfRows - pQueryInfo->limit.limit); - pRes->numOfRows -= overflow; - pFinalDataPage->num -= overflow; + int32_t currentTotal = pRes->numOfRowsGroup + pRes->numOfRows; - assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0); + if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) { + int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit); + + pRes->numOfRows -= overflow; + assert(pRes->numOfRows >= 0); /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); + savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo); } if (pQueryInfo->order.order == TSDB_ORDER_ASC) { @@ -995,7 +998,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO pRes->numOfClauseTotal += pRes->numOfRows; } - pFinalDataPage->num = 0; + pBeforeFillData->num = 0; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { taosTFree(pResPages[i]); } @@ -1212,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { * @param noMoreCurrentGroupRes * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups */ -bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) { +bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -1246,13 +1249,21 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); #endif - SFillInfo* pFillInfo = pLocalReducer->pFillInfo; - if (pFillInfo != NULL) { - taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey); - taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); - } - doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes); + + // no interval query, no fill operation + if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { + genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo); + } else { + SFillInfo* pFillInfo = pLocalReducer->pFillInfo; + if (pFillInfo != NULL) { + taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey); + taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); + } + + doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes); + } + return true; } @@ -1337,7 +1348,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); - if (rows > 0) { // do interpo + if (rows > 0) { doFillResult(pSql, pLocalReducer, true); } } @@ -1502,7 +1513,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { */ if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) { // does not belong to the same group - bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup); + bool notSkipped = genFinalResults(pSql, pLocalReducer, !sameGroup); // this row needs to discard, since it belongs to the group of previous if (pLocalReducer->discard && sameGroup) { @@ -1571,7 +1582,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { } if (pLocalReducer->pResultBuf->num) { - doGenerateFinalResults(pSql, pLocalReducer, true); + genFinalResults(pSql, pLocalReducer, true); } assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0); diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 998049fad6..d9fe67e1b7 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -248,17 +248,17 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } -static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, SFillTagColInfo *pTags, int32_t start, int32_t num) { - for(int32_t j = 0; j < pColInfo->numOfCols; ++j) { - SFillColInfo* pCol = &pColInfo->pFillCol[j]; +static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t num) { + for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) { + SFillColInfo* pCol = &pFillInfo->pFillCol[j]; if (pCol->flag == TSDB_COL_NORMAL) { continue; } char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num); - for(int32_t i = 0; i < pColInfo->numOfTags; ++i) { - SFillTagColInfo* pTag = &pColInfo->pTags[i]; + for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) { + SFillTagColInfo* pTag = &pFillInfo->pTags[i]; if (pTag->col.colId == pCol->col.colId) { assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); break; @@ -267,8 +267,8 @@ static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, SFillTagColInfo } } -static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, - int64_t ts, SFillTagColInfo* pTags, bool outOfBound) { +static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts, + bool outOfBound) { char* prevValues = pFillInfo->prevValues; char* nextValues = pFillInfo->nextValues; @@ -312,7 +312,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu } } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) { // TODO : linear interpolation supports NULL value if (prevValues != NULL && !outOfBound) { @@ -337,7 +337,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu taosDoLinearInterpolation(type, &point1, &point2, &point); } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } else { for (int32_t i = 1; i < numOfValCols; ++i) { @@ -352,7 +352,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu } } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } } else { /* fill the default value */ @@ -363,7 +363,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } pFillInfo->start += (pFillInfo->slidingTime * step); @@ -397,8 +397,6 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu char** nextValues = &pFillInfo->nextValues; int32_t numOfTags = pFillInfo->numOfTags; - SFillTagColInfo* pTags = pFillInfo->pTags; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); if (numOfRows == 0) { /* @@ -406,7 +404,7 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. */ while (num < outputRows) { - doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); + doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, true); } pFillInfo->numOfTotal += pFillInfo->numOfCurrent; @@ -433,12 +431,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { - doFillResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false); + doFillResultImpl(pFillInfo, data, &num, srcData, ts, false); } /* output buffer is full, abort */ - if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || - (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) { + if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) { pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return outputRows; } @@ -447,10 +444,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu initBeforeAfterDataBuf(pFillInfo, prevValues); // assign rows to dst buffer - int32_t i = 0; - for (; i < pFillInfo->numOfCols - numOfTags; ++i) { + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - + if (pCol->flag == TSDB_COL_TAG) { + continue; + } + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx); @@ -472,10 +471,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu } // set the tag value for final result - setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num); + setTagsValue(pFillInfo, data, num); pFillInfo->start += (pFillInfo->slidingTime * step); pFillInfo->rowIdx += 1; + + pFillInfo->numOfCurrent +=1; num += 1; } -- GitLab