From 1d14c057897d840703ab60940467482a278949ce Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Mar 2020 22:14:13 +0800 Subject: [PATCH] [TD-28] refactor some codes --- src/system/detail/src/vnodeQueryImpl.c | 131 ++++++++++++++++--------- 1 file changed, 87 insertions(+), 44 deletions(-) diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 4bdae30df3..7d64abf052 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -592,7 +592,7 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) { static void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, - void *param, SBlockInfo *pBlockInfo, int32_t index); + void *param); void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); @@ -1407,7 +1407,7 @@ static char *doGetDataBlocks(SQuery *pQuery, SData **data, int32_t colIdx) { return pData; } -static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, int32_t* index) { +static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -1446,7 +1446,6 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa * So, the validation of required column in cache with the corresponding meter schema is reinforced. */ dataBlock = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pCol->colIdxInBuf); - *index = pCol->colIdxInBuf; } } @@ -1994,8 +1993,8 @@ static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* * 2. time window start exactly from a timestamp with data */ if (skey == win->skey || win->skey < pWindowResInfo->startTime || - (win->skey < pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || - (win->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + (win->skey <= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || + (win->skey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { pCtx->prev.key = -1; return; } @@ -2089,6 +2088,40 @@ static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, S } } +static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo, SBlockInfo* pBlockInfo, + TSKEY ts, int32_t offset, STimeWindow* win) { + // get current not closed time window + SQuery* pQuery = pRuntimeEnv->pQuery; + + int32_t slot = pWindowResInfo->curIndex; + if (slot == -1) { + return; + } + + while (slot < pWindowResInfo->size) { + STimeWindow w = getWindowResult(pWindowResInfo, slot)->window; + if (w.skey == win->skey) { + assert(w.ekey == win->ekey); + break; + } + + // if current active window locates before current data block, do interpolate the result and close it + assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || + (w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); + + int32_t forwardStep = 0; + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); + + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); + + closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo)); + + // try next time window + slot += 1; + } +} + /** * * @param pRuntimeEnv @@ -2116,9 +2149,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t SField dummyField = {0}; bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); - - int32_t index = 0; - char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep, &index); + char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep); SField *tpField = NULL; @@ -2135,9 +2166,10 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, - hasNull, &sasArray[k], pBlockInfo, index); + hasNull, &sasArray[k]); } + // save the last row in current data block for(int32_t i = 0; i < pQuery->numOfCols; ++i) { SColumnInfo* pColInfo = &pQuery->colList[i].data; int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0; @@ -2151,33 +2183,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t TSKEY ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - - // get current not closed time window - int32_t slot = pWindowResInfo->curIndex; - if (slot != -1) { - while (slot < pWindowResInfo->size) { - STimeWindow w = getWindowResult(pWindowResInfo, slot)->window; - - // if current active window locates before current data block, do interpolate the result and close it - if (w.skey != win.skey || w.ekey != win.ekey) { - assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || - (w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); - - forwardStep = 0; - doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); - - SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); - - closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo)); - } else { - break; - } - - // try next time window - slot += 1; - } - } + doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win); if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; @@ -2614,14 +2620,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - int32_t index = -1; bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); - char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep, &index); + char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); TSKEY ts = pQuery->skey; - // pRuntimeEnv->intervalWindow.ekey; setExecParams(pRuntimeEnv, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, - &sasArray[k], pBlockInfo, 0); + &sasArray[k]); } // set the input column data @@ -2647,7 +2651,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * int32_t j = 0; TSKEY lastKey = -1; - + int32_t lastIndex = -1; + bool firstAccessedPoint = true; + for (j = 0; j < (*forwardStep); ++j) { int32_t offset = GET_COL_DATA_POS(pQuery, j, step); @@ -2669,8 +2675,31 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // interval window query if (isIntervalQuery(pQuery)) { // decide the time window according to the primary timestamp - int64_t ts = primaryKeyCol[offset]; + TSKEY ts = primaryKeyCol[offset]; + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); +// if (firstAccessedPoint) { +// doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win); +// firstAccessedPoint = false; +// } else { +// int32_t index = pWindowResInfo->curIndex; +// STimeWindow w = getWindowResult(pWindowResInfo, index)->window; +// +// if (w.skey == win.skey) { // do nothing +// assert(w.ekey == win.ekey); +// } else { +// assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || +// (w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); +// +// // set the endkey interpolation for the previous +// for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { +// SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; +// +// interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); +// } +// +// } +// } int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -2684,6 +2713,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset); lastKey = ts; + lastIndex = j; + STimeWindow nextWin = win; int32_t index = pWindowResInfo->curIndex; int32_t sid = pRuntimeEnv->pMeterObj->sid; @@ -2728,6 +2759,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].next.key = -1; + pCtx[k].prev.key = -1; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pCtx[k], offset); } @@ -2741,7 +2775,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * break; } } - + /* * pointsOffset is the maximum available space in result buffer update the actual forward step for query that * requires checking buffer during loop @@ -2752,6 +2786,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * break; } } + + // save the last accessed row of current data block for interpolation + int32_t index = GET_COL_DATA_POS(pQuery, lastIndex, step); + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + SColumnInfo* pColInfo = &pQuery->colList[i].data; + int32_t s = pColInfo->bytes * index; + + memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes); + } free(sasArray); @@ -2975,7 +3018,7 @@ static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, in void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, - void *param, SBlockInfo *pBlockInfo, int32_t index) { + void *param) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1); -- GitLab