diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 3e1ebbd1ff756fb29306e465f29d41dd74051f26..61f0ac78779a1269f95296cb93c975b0db63a544 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -5504,20 +5504,35 @@ static bool tail_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn static void tail_function(SQLFunctionCtx *pCtx) { STailInfo *pRes = getOutputInfo(pCtx); - for (int32_t i = 0; i < pCtx->size; ++i) { - if (pRes->offset++ < (int32_t)pCtx->param[1].i64){ - continue; - } - if (pRes->num >= (int32_t)pCtx->param[0].i64){ - break; + if (pCtx->stableQuery){ + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_DATA(pCtx, i); + + TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; + do_tail_function_add(pRes, (int32_t)(pCtx->param[0].i64 + pCtx->param[1].i64), data, ts, + pCtx->inputBytes, &pCtx->tagInfo, NULL, pCtx->currentStage); } - char *data = GET_INPUT_DATA(pCtx, i); + }else{ + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + if (pRes->offset++ < (int32_t)pCtx->param[1].i64){ + continue; + } + if (pRes->num >= (int32_t)pCtx->param[0].i64){ // query complete + pCtx->resultInfo->complete = true; + for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { + SQLFunctionCtx *ctx = pCtx->tagInfo.pTagCtxList[j]; + ctx->resultInfo->complete = true; + } + break; + } + char *data = GET_INPUT_DATA(pCtx, i); - TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; + TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; - valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage); + valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage); - pRes->num++; + pRes->num++; + } } // treat the result as only one result @@ -5534,7 +5549,7 @@ static void tail_func_merge(SQLFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { - do_tail_function_add(pOutput, (int32_t)pCtx->param[0].i64, pInput->res[i]->data, pInput->res[i]->timestamp, + do_tail_function_add(pOutput, (int32_t)(pCtx->param[0].i64 + pCtx->param[1].i64), pInput->res[i]->data, pInput->res[i]->timestamp, pCtx->outputBytes, &pCtx->tagInfo, pInput->res[i]->data + pCtx->outputBytes, pCtx->currentStage); } @@ -5555,9 +5570,11 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) { int32_t bytes = 0; int32_t type = 0; + int32_t start = 0; if (pCtx->currentStage == MERGE_STAGE) { bytes = pCtx->outputBytes; type = pCtx->outputType; + start = pCtx->param[1].i64; assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); } else { bytes = pCtx->inputBytes; @@ -5580,7 +5597,7 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) { qError("calloc error in tail_func_finalizer: size:%d, num:%d", (int32_t)size, GET_RES_INFO(pCtx)->numOfRes); return; } - for(int32_t start = pCtx->param[1].i64, i = 0; start < pRes->num; start++, i++){ + for(int32_t i = 0; start < pRes->num; start++, i++){ memcpy(data + i * size, pRes->res[start], size); } taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 6aebb32191813ee7bdc79628246e8c2dddba79ee..cc85c421a9a9ee41eae368d0db823a61ee1c9f67 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -38,10 +38,12 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE || - pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM || - pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) { + pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM) { return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; } + if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) { + return (int32_t)(pQueryAttr->pExpr1[i].base.param[0].i64 + pQueryAttr->pExpr1[i].base.param[1].i64); + } } if (pQueryAttr->uniqueQuery){ return MAX_UNIQUE_RESULT_ROWS; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index abf14520e8c9fc5140200753b5272e1aa0c3b4e1..60c7311d4c0f3d784231fceb8a7e2628a5bd4eda 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1855,12 +1855,11 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; -// if (ASCENDING_TRAVERSE(pQueryHandle->order)) { -// pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; -// } else { -// pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; -// } + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } int32_t colIdOfRow1; if(j >= numOfColsOfRow1) { @@ -1991,12 +1990,11 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, if(forceSetNull) { while (i < numOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; -// if (ASCENDING_TRAVERSE(pQueryHandle->order)) { -// pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; -// } else { -// pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; -// } + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { setVardataNull(pData, pColInfo->info.type); @@ -2342,7 +2340,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* SWAP(cur->win.skey, cur->win.ekey, TSKEY); } - //moveDataToFront(pQueryHandle, numOfRows, numOfCols); + moveDataToFront(pQueryHandle, numOfRows, numOfCols); updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pQueryHandle); @@ -2980,14 +2978,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int assert(numOfRows <= maxRowsToRead); // if the buffer is not full in case of descending order query, move the data in the front of the buffer -// if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { -// int32_t emptySize = maxRowsToRead - numOfRows; -// -// for(int32_t i = 0; i < numOfCols; ++i) { -// SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); -// memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); -// } -// } + if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { + int32_t emptySize = maxRowsToRead - numOfRows; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + } + } int64_t elapsedTime = taosGetTimestampUs() - st; tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, 0x%"PRIx64, pQueryHandle,