From 5d1a1e2b2d06de4a2b059d3e61f5fcb5f00b630e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Jun 2020 11:00:08 +0800 Subject: [PATCH] [td-225] refactor code for fill --- src/client/src/tscSecondaryMerge.c | 41 +++++-------------- src/query/inc/qfill.h | 14 +++---- src/query/src/qExecutor.c | 20 ++++----- src/query/src/qfill.c | 66 ++++++++++++++++-------------- 4 files changed, 62 insertions(+), 79 deletions(-) diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 95d559b4fa..b159ffc5a1 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -324,7 +324,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd tfree(pReducer->discardData); tfree(pReducer->pResultBuf); tfree(pReducer->pFinalRes); -// tfree(pReducer->pBufForInterpo); tfree(pReducer->prevRowOfInput); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -363,7 +362,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pQueryInfo->fillType != TSDB_FILL_NONE) { SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - 4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->fillType, pFillCol); + 4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, + tinfo.precision, pQueryInfo->fillType, pFillCol); } int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; @@ -494,7 +494,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { tscTrace("%p waiting for delete procedure, status: %d", pSql, status); } - taosDestoryFillInfo(pLocalReducer->pFillInfo); + pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { @@ -980,8 +980,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO } /* all output for current group are completed */ - int32_t totalRemainRows = - taosGetNumOfResultWithFill(pFillInfo, rpoints, pFillInfo->slidingTime, actualETime); + int32_t totalRemainRows = getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); if (totalRemainRows <= 0) { break; } @@ -1267,13 +1266,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no SFillInfo* pFillInfo = pLocalReducer->pFillInfo; if (pFillInfo != NULL) { - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - TSKEY ekey = taosGetRevisedEndKey(pQueryInfo->window.ekey, pFillInfo->order, pFillInfo->slidingTime, - pQueryInfo->slidingTimeUnit, tinfo.precision); - - taosFillSetStartInfo(pFillInfo, pResBuf->num, ekey); + taosFillSetStartInfo(pFillInfo, pResBuf->num, pQueryInfo->window.ekey); taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); } @@ -1327,23 +1320,15 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) { SLocalReducer *pLocalReducer = pRes->pLocalReducer; SFillInfo *pFillInfo = pLocalReducer->pFillInfo; - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - int8_t p = tinfo.precision; - if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) { assert(pQueryInfo->fillType != TSDB_FILL_NONE); tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf; - int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); + int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); - int32_t remain = taosNumOfRemainRows(pFillInfo); - TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, p); - // the first column must be the timestamp column - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity); - if (rows > 0) { // do interpo + int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); + if (rows > 0) { // do fill gap doFillResult(pSql, pLocalReducer, false); } @@ -1362,10 +1347,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || prevGroupCompleted) { @@ -1373,9 +1355,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { if (pQueryInfo->fillType != TSDB_FILL_NONE) { int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; - etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, - pQueryInfo->slidingTimeUnit, tinfo.precision); - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity); + assert(pFillInfo->numOfRows == 0); + int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo doFillResult(pSql, pLocalReducer, true); } diff --git a/src/query/inc/qfill.h b/src/query/inc/qfill.h index 9ea9c8f7cf..da1cd8e5de 100644 --- a/src/query/inc/qfill.h +++ b/src/query/inc/qfill.h @@ -50,7 +50,8 @@ typedef struct SFillInfo { char * nextValues; // next row of data char** pData; // original result data block involved in filling data int32_t capacityInRows; // data buffer size in rows - + int8_t slidingUnit; // sliding time unit + int8_t precision; // time resoluation SFillColInfo* pFillCol; // column info for fill operations } SFillInfo; @@ -61,12 +62,13 @@ typedef struct SPoint { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision); -SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, - int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol); +SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, + int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, + SFillColInfo* pFillCol); void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp); -void taosDestoryFillInfo(SFillInfo *pFillInfo); +void* taosDestoryFillInfo(SFillInfo *pFillInfo); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); @@ -74,9 +76,7 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput); -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision); - -int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows); +int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 26502a7408..18f6ea0f7c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1466,7 +1466,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->pCtx); } - taosDestoryFillInfo(pRuntimeEnv->pFillInfo); + pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo); destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); @@ -3557,9 +3557,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { * first result row in the actual result set will fill nothing. */ if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, - pQuery->slidingTimeUnit, pQuery->precision); - int32_t numOfTotal = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pQuery->rec.capacity); + int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity); return numOfTotal > 0; } @@ -3601,7 +3599,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } -int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) { +int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; @@ -4013,7 +4011,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery); pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput, - pQuery->slidingTime, pQuery->fillType, pColInfo); + pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision, + pQuery->fillType, pColInfo); } // todo refactor @@ -4666,13 +4665,11 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { limitResults(pRuntimeEnv); break; } else { - TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, - pQuery->slidingTimeUnit, pQuery->precision); - taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey); + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey); taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata); numOfInterpo = 0; - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo); + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { limitResults(pRuntimeEnv); break; @@ -4704,8 +4701,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ int32_t numOfInterpo = 0; - int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo); - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo); + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo); if (pQuery->rec.rows > 0) { limitResults(pRuntimeEnv); diff --git a/src/query/src/qfill.c b/src/query/src/qfill.c index 7b3ea5c1f0..59bf7b423c 100644 --- a/src/query/src/qfill.c +++ b/src/query/src/qfill.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "qfill.h" #include "os.h" +#include "qfill.h" #include "qextbuffer.h" #include "taosdef.h" #include "taosmsg.h" @@ -58,7 +58,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch } SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, - int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) { + int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) { if (fillType == TSDB_FILL_NONE) { return NULL; } @@ -72,8 +72,10 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ pFillInfo->pFillCol = pFillCol; pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfCols; + pFillInfo->precision = precision; pFillInfo->slidingTime = slidingTime; - + pFillInfo->slidingUnit = slidingUnit; + pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); int32_t rowsize = 0; @@ -102,9 +104,9 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { pFillInfo->numOfTotal = 0; } -void taosDestoryFillInfo(SFillInfo* pFillInfo) { +void* taosDestoryFillInfo(SFillInfo* pFillInfo) { if (pFillInfo == NULL) { - return; + return NULL; } tfree(pFillInfo->prevValues); @@ -119,6 +121,15 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) { tfree(pFillInfo->pFillCol); tfree(pFillInfo); + return NULL; +} + +static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { + if (order == TSDB_ORDER_ASC) { + return ekey; + } else { + return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); + } } void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { @@ -126,8 +137,10 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) return; } + pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit, + pFillInfo->precision); + pFillInfo->rowIdx = 0; - pFillInfo->endKey = endKey; pFillInfo->numOfRows = numOfRows; // ensure the space @@ -165,36 +178,29 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu } } -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { - if (order == TSDB_ORDER_ASC) { - return ekey; - } else { - return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); - } -} +int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { + int64_t* tsList = (int64_t*) pFillInfo->pData[0]; -static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain, - int64_t nInterval, int64_t ekey) { - - if (remain > 0) { // still fill gap within current data block, not generating data after the result set. - TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1]; - int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1; + int32_t numOfRows = taosNumOfRemainRows(pFillInfo); + + TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit, + pFillInfo->precision); - assert(total >= remain); - return total; + int64_t numOfRes = -1; + if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. + TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; + + numOfRes = (int64_t)(labs(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1; + assert(numOfRes >= numOfRows); } else { // reach the end of data - if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || - (ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { + if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || + (ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { return 0; - } else { - return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1; + } else { // the numOfRes rows are all filled with specified policy + numOfRes = (labs(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1; } } -} -int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) { - int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows, - pFillInfo->slidingTime, ekey); return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; } @@ -496,8 +502,8 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); + int32_t rows = getFilledNumOfRes(pFillInfo, pFillInfo->endKey, capacity); int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData); assert(numOfRes == rows); -- GitLab