From c8b180b129cf0155884c78d3e88762bd987aaf21 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Oct 2022 19:14:06 +0800 Subject: [PATCH] fix(query): update the cached last query. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 97 +++++++------------ source/libs/executor/src/cachescanoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 28 +++--- source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 93 ------------------ 5 files changed, 54 insertions(+), 167 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 2d4a1b54cd..be36848fec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -29,48 +29,37 @@ typedef struct SCacheRowsReader { SArray* pTableList; // table id list } SCacheRowsReader; -static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { +static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, + SFirstLastRes** pRes) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; - SColVal colVal = {0}; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE); - SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i); - - if (slotIds[i] == -1) { - pRes->ts = pColVal->ts; - pRes->bytes = TSDB_KEYSIZE; - pRes->isNull = false; - pRes->hasResult = true; - - colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); + if (slotIds[i] == -1) { // the primary timestamp + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + pRes[i]->ts = pColVal->ts; + memcpy(pRes[i]->buf, &pColVal->ts, TSDB_KEYSIZE); } else { - int32_t slotId = slotIds[i]; + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - int32_t bytes = pReader->pSchema->columns[slotId].bytes; - pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes); - pRes->bytes = bytes; - pRes->hasResult = true; + pRes[i]->ts = pColVal->ts; + pRes[i]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); - if (IS_VAR_DATA_TYPE(colVal.type)) { - if (!COL_VAL_IS_VALUE(&colVal)) { - pRes->isNull = true; - pRes->ts = pColVal->ts; - - colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); + if (!pRes[i]->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(pRes[i]->buf, pColVal->colVal.value.nData); + memcpy(varDataVal(pRes[i]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); } else { - varDataSetLen(pRes->buf, colVal.value.nData); - memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData); - pRes->bytes = colVal.value.nData; - colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); + memcpy(pRes[i]->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); } - } else { - colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal)); } } + + pRes[i]->hasResult = true; + colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); } pBlock->info.rows += 1; @@ -142,7 +131,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint } // no data in the table of Uid - if (*h != NULL) { // todo convert to SArray + if (*h != NULL) { *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } } else { @@ -172,15 +161,16 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 LRUHandle* h = NULL; SArray* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); + bool hasRes = false; - int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols); - for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { - lastTs[i] = INT64_MIN; + SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + for (int32_t j = 0; j < pr->numOfCols; ++j) { + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); + pRes[j]->ts = INT64_MIN; } // retrieve the only one last row of all tables in the uid list. if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { - bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -194,18 +184,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } { - SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); - for(int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); - pRes[j]->ts = INT64_MIN; - } - for (int32_t k = 0; k < pr->numOfCols; ++k) { SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k); - if (slotIds[k] == -1) { // the primary timestamp - SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k); + if (slotIds[k] == -1) { // the primary timestamp + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + hasRes = true; pRes[k]->hasResult = true; pRes[k]->ts = pColVal->ts; memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE); @@ -217,6 +202,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + hasRes = true; pRes[k]->hasResult = true; pRes[k]->ts = pColVal->ts; @@ -236,25 +222,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } -/* - if (pRow->ts > lastKey) { - printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey); - - // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already - // appended or not. - if (internalResult) { - pResBlock->info.rows -= 1; - taosArrayClear(pTableUidList); - } - - saveOneRow(pRow, pResBlock, pr, slotIds); - taosArrayPush(pTableUidList, &pKeyInfo->uid); - internalResult = true; - lastKey = pRow->ts; - } -*/ tsdbCacheRelease(lruCache, h); } + + if (hasRes) { + pResBlock->info.rows = 1; + } + } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -267,9 +241,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 continue; } - saveOneRow(pRow, pResBlock, pr, slotIds); - taosArrayPush(pTableUidList, &pKeyInfo->uid); + saveOneRow(pRow, pResBlock, pr, slotIds, pRes); + // TODO reset the pRes + taosArrayPush(pTableUidList, &pKeyInfo->uid); tsdbCacheRelease(lruCache, h); pr->tableIndex += 1; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6106544a35..5d63892b7b 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -193,6 +193,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->currentGroupIndex += 1; // check for tag values + // TODO NOTE: The uid of pInfo->pRes is required. if (pInfo->pRes->info.rows > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c1478bc935..9c219d2765 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1420,8 +1420,9 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { - SResultRowInfo dumyInfo; + SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; + STimeWindow win = {0}; if (IS_FINAL_OP(pInfo)) { win.skey = startTsCols[i]; @@ -5828,27 +5829,30 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->isFinal = false; - if (pIntervalPhyNode->window.pExprs != NULL) { - int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - } + pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); SExprSupp* pSup = &pOperator->exprSupp; + + initBasicInfo(&pInfo->binfo, pResBlock); + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&pInfo->binfo, pResBlock); - initStreamFunciton(pSup->pCtx, pSup->numOfExprs); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + if (pIntervalPhyNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } pInfo->invertible = allInvertible(pSup->pCtx, numOfCols); pInfo->invertible = false; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 2308aaf214..54e8c27d5b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2402,7 +2402,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = cachedLastRowFunction, + .processFunc = lastFunctionMerge, .finalizeFunc = firstLastFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 493561ca1e..3255ef7d26 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6148,99 +6148,6 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } -int32_t interpFunction(SqlFunctionCtx* pCtx) { -#if 0 - int32_t fillType = (int32_t) pCtx->param[2].i64; - //bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); - - if (pCtx->start.key == pCtx->startTs) { - assert(pCtx->start.key != INT64_MIN); - - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); - - goto interp_success_exit; - } else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); - - goto interp_success_exit; - } - - switch (fillType) { - case TSDB_FILL_NULL: - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); - break; - - case TSDB_FILL_SET_VALUE: - tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); - break; - - case TSDB_FILL_LINEAR: - if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs - || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { - goto interp_exit; - } - - double v1 = -1, v2 = -1; - GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); - GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); - - SPoint point1 = {.key = pCtx->start.key, .val = &v1}; - SPoint point2 = {.key = pCtx->end.key, .val = &v2}; - SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; - - int32_t srcType = pCtx->inputType; - if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { - setNull(pCtx->pOutput, srcType, pCtx->inputBytes); - } else { - bool exceedMax = false, exceedMin = false; - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); - if (exceedMax || exceedMin) { - __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); - if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); - } else { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); - } - } - } - break; - - case TSDB_FILL_PREV: - if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) { - goto interp_exit; - } - - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); - break; - - case TSDB_FILL_NEXT: - if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { - goto interp_exit; - } - - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); - break; - - case TSDB_FILL_NONE: - // do nothing - default: - goto interp_exit; - } - - - interp_success_exit: - *(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs; - INC_INIT_VAL(pCtx, 1); - - interp_exit: - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; - pCtx->endTs = pCtx->startTs; -#endif - - return TSDB_CODE_SUCCESS; -} - int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; -- GitLab