diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index ad743eeea3d6a895728e98a9a3af1f23d46cf223..fb38326b8cbafb008619cbde1432b2f8160597a4 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -60,7 +60,7 @@ typedef struct SLocalReducer { char * prevRowOfInput; tFilePage * pResultBuf; int32_t nResultBufSize; - char * pBufForInterpo; // intermediate buffer for interpolation +// char * pBufForInterpo; // intermediate buffer for interpolation tFilePage * pTempBuffer; struct SQLFunctionCtx *pCtx; int32_t rowSize; // size of each intermediate result. @@ -68,9 +68,9 @@ typedef struct SLocalReducer { bool hasPrevRow; // cannot be released bool hasUnprocessedRow; tOrderDescriptor * pDesc; - SColumnModel * resColModel; + SColumnModel * resColModel; tExtMemBuffer ** pExtMemBuffer; // disk-based buffer - SInterpolationInfo interpolationInfo; // interpolation support structure + SFillInfo* pFillInfo; // interpolation support structure char * pFinalRes; // result data after interpo tFilePage * discardData; SResultInfo * pResInfo; diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a7eec31388d3a4fa389b164bb721316c9453b1ee..3625900cd265db29eddc1ed8a8d27fad859415f3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -210,7 +210,7 @@ typedef struct SQueryInfo { SLimitVal slimit; STagCond tagCond; SOrderVal order; - int16_t interpoType; // interpolate type + int16_t fillType; // interpolate type int16_t numOfTables; STableMetaInfo **pTableMetaInfo; struct STSBuf * tsBuf; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 5675416e6b999c4be8efaf75586fda9a712892e3..f019c9b1e16992ad251ab0150e3db71321958660 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -4159,16 +4159,16 @@ static void interp_function(SQLFunctionCtx *pCtx) { SInterpInfoDetail *pInfoDetail = interpInfo.pInterpDetail; /* set no output result */ - if (pInfoDetail->type == TSDB_INTERPO_NONE) { + if (pInfoDetail->type == TSDB_FILL_NONE) { pCtx->param[3].i64Key = 0; } else if (pInfoDetail->primaryCol == 1) { *(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts; } else { - if (pInfoDetail->type == TSDB_INTERPO_NULL) { + if (pInfoDetail->type == TSDB_FILL_NULL) { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); - } else if (pInfoDetail->type == TSDB_INTERPO_SET_VALUE) { + } else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) { tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType); - } else if (pInfoDetail->type == TSDB_INTERPO_PREV) { + } else if (pInfoDetail->type == TSDB_FILL_PREV) { char *data = pCtx->param[1].pz; char *pVal = data + TSDB_KEYSIZE; @@ -4179,7 +4179,7 @@ static void interp_function(SQLFunctionCtx *pCtx) { assignVal(pCtx->aOutputBuf, pVal, pCtx->outputBytes, pCtx->outputType); } - } else if (pInfoDetail->type == TSDB_INTERPO_LINEAR) { + } else if (pInfoDetail->type == TSDB_FILL_LINEAR) { char *data1 = pCtx->param[1].pz; char *data2 = pCtx->param[2].pz; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ebb081bb3ae168c71a1d214af303567e0fdded3a..5d48ff5e5cecac162de8cb549836fbfc999b2a5f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4020,19 +4020,19 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { } if (strncasecmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) { - pQueryInfo->interpoType = TSDB_INTERPO_NONE; + pQueryInfo->fillType = TSDB_FILL_NONE; } else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) { - pQueryInfo->interpoType = TSDB_INTERPO_NULL; + pQueryInfo->fillType = TSDB_FILL_NULL; for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) { TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes); } } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { - pQueryInfo->interpoType = TSDB_INTERPO_PREV; + pQueryInfo->fillType = TSDB_FILL_PREV; } else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) { - pQueryInfo->interpoType = TSDB_INTERPO_LINEAR; + pQueryInfo->fillType = TSDB_FILL_LINEAR; } else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) { - pQueryInfo->interpoType = TSDB_INTERPO_SET_VALUE; + pQueryInfo->fillType = TSDB_FILL_SET_VALUE; if (pFillToken->nExpr == 1) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index a3a0441ff899a4c4cf317b1e9d7d39909069bddf..38fce0630672590012271ac6474fd14e5832ebbc 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -25,7 +25,7 @@ typedef struct SCompareParam { SLocalDataSource **pLocalData; tOrderDescriptor * pDesc; - int32_t numOfElems; + int32_t num; int32_t groupOrderType; } SCompareParam; @@ -47,11 +47,11 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { } if (pParam->groupOrderType == TSDB_ORDER_DESC) { // desc - return compare_d(pDesc, pParam->numOfElems, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data, - pParam->numOfElems, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data); + return compare_d(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data, + pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data); } else { - return compare_a(pDesc, pParam->numOfElems, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data, - pParam->numOfElems, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data); + return compare_a(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data, + pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data); } } @@ -132,6 +132,26 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc } } +static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { + int32_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); + int32_t offset = 0; + + SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); + for(int32_t i = 0; i < numOfCols; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + + pFillCol[i].col.bytes = pExpr->resBytes; + pFillCol[i].col.type = pExpr->resType; + pFillCol[i].flag = pExpr->colInfo.flag; + pFillCol[i].col.offset = offset; + pFillCol[i].functionId = pExpr->functionId; + pFillCol[i].defaultVal.i = pQueryInfo->defaultVal[i]; + offset += pExpr->resBytes; + } + + return pFillCol; +} + void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, SColumnModel *finalmodel, SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; @@ -217,24 +237,24 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ds->pMemBuffer = pMemBuffer[i]; ds->flushoutIdx = j; - ds->filePage.numOfElems = 0; + ds->filePage.num = 0; ds->pageId = 0; ds->rowIdx = 0; tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1); tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0); #ifdef _DEBUG_VIEW - printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.numOfElems); + printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num); SSrcColumnInfo colInfo[256] = {0}; SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.numOfElems, + tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.num, pMemBuffer[0]->numOfElemsPerPage, colInfo); #endif - if (ds->filePage.numOfElems == 0) { // no data in this flush, the index does not increase + if (ds->filePage.num == 0) { // no data in this flush, the index does not increase tscTrace("%p flush data is empty, ignore %d flush record", pSql, idx); tfree(ds); continue; @@ -254,7 +274,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd SCompareParam *param = malloc(sizeof(SCompareParam)); param->pLocalData = pReducer->pLocalDataSrc; param->pDesc = pReducer->pDesc; - param->numOfElems = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; + param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); param->groupOrderType = pQueryInfo->groupbyExpr.orderType; @@ -295,25 +315,25 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd assert(finalRowLength <= pReducer->rowSize); pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); - pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize); +// pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize); if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || - pReducer->pBufForInterpo == NULL || pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { + /*pReducer->pBufForInterpo == NULL || */pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { tfree(pReducer->pTempBuffer); tfree(pReducer->discardData); tfree(pReducer->pResultBuf); tfree(pReducer->pFinalRes); - tfree(pReducer->pBufForInterpo); +// tfree(pReducer->pBufForInterpo); tfree(pReducer->prevRowOfInput); pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; return; } - - size = tscSqlExprNumOfExprs(pQueryInfo); - pReducer->pTempBuffer->numOfElems = 0; - pReducer->pResInfo = calloc(size, sizeof(SResultInfo)); + size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); + + pReducer->pTempBuffer->num = 0; + pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo)); tscCreateResPointerInfo(pRes, pQueryInfo); tscInitSqlContext(pCmd, pReducer, pDesc); @@ -333,55 +353,58 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pRes->numOfGroups = 0; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);; - int16_t prec = tinfo.precision; - int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; + TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); - - SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; - taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - pReducer->rowSize); + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision); + + if (pQueryInfo->fillType != TSDB_FILL_NONE) { + SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); + pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, + 4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->fillType, pFillCol); + } int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { - pInterpoInfo->pTags[0] = (char *)pInterpoInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols; + pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols; for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1); - pInterpoInfo->pTags[i] = pSchema->bytes + pInterpoInfo->pTags[i - 1]; + pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1]; } } else { - assert(pInterpoInfo->pTags == NULL); + if (pReducer->pFillInfo != NULL) { + assert(pReducer->pFillInfo->pTags == NULL); + } } } static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, int32_t orderType) { - if (pPage->numOfElems == 0) { + if (pPage->num == 0) { return 0; } - assert(pPage->numOfElems <= pDesc->pColumnModel->capacity); + assert(pPage->num <= pDesc->pColumnModel->capacity); // sort before flush to disk, the data must be consecutively put on tFilePage. if (pDesc->orderIdx.numOfCols > 0) { - tColDataQSort(pDesc, pPage->numOfElems, 0, pPage->numOfElems - 1, pPage->data, orderType); + tColDataQSort(pDesc, pPage->num, 0, pPage->num - 1, pPage->data, orderType); } #ifdef _DEBUG_VIEW - printf("%" PRIu64 " rows data flushed to disk after been sorted:\n", pPage->numOfElems); - tColModelDisplay(pDesc->pColumnModel, pPage->data, pPage->numOfElems, pPage->numOfElems); + printf("%" PRIu64 " rows data flushed to disk after been sorted:\n", pPage->num); + tColModelDisplay(pDesc->pColumnModel, pPage->data, pPage->num, pPage->num); #endif // write to cache after being sorted - if (tExtMemBufferPut(pMemoryBuf, pPage->data, pPage->numOfElems) < 0) { + if (tExtMemBufferPut(pMemoryBuf, pPage->data, pPage->num) < 0) { tscError("failed to save data in temporary buffer"); return -1; } - pPage->numOfElems = 0; + pPage->num = 0; return 0; } @@ -402,17 +425,17 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa int32_t numOfRows, int32_t orderType) { SColumnModel *pModel = pDesc->pColumnModel; - if (pPage->numOfElems + numOfRows <= pModel->capacity) { + if (pPage->num + numOfRows <= pModel->capacity) { tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows); return 0; } // current buffer is overflow, flush data to extensive buffer - int32_t numOfRemainEntries = pModel->capacity - pPage->numOfElems; + int32_t numOfRemainEntries = pModel->capacity - pPage->num; tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows); // current buffer is full, need to flushed to disk - assert(pPage->numOfElems == pModel->capacity); + assert(pPage->num == pModel->capacity); int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType); if (ret != 0) { return -1; @@ -430,12 +453,12 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows); - if (pPage->numOfElems == pModel->capacity) { + if (pPage->num == pModel->capacity) { if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) { return -1; } } else { - pPage->numOfElems = numOfWriteElems; + pPage->num = numOfWriteElems; } remain -= numOfWriteElems; @@ -470,7 +493,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { tscTrace("%p waiting for delete procedure, status: %d", pSql, status); } - taosDestoryInterpoInfo(&pLocalReducer->interpolationInfo); + taosDestoryFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { @@ -503,8 +526,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { tfree(pLocalReducer->pLoserTree); } - tfree(pLocalReducer->pBufForInterpo); - tfree(pLocalReducer->pFinalRes); tfree(pLocalReducer->discardData); @@ -740,7 +761,7 @@ int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSource *p #if defined(_DEBUG_VIEW) printf("new page load to buffer\n"); tColModelDisplay(pOneInterDataSrc->pMemBuffer->pColumnModel, pOneInterDataSrc->filePage.data, - pOneInterDataSrc->filePage.numOfElems, pOneInterDataSrc->pMemBuffer->pColumnModel->capacity); + pOneInterDataSrc->filePage.num, pOneInterDataSrc->pMemBuffer->pColumnModel->capacity); #endif *needAdjustLoserTree = true; } else { @@ -761,7 +782,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree */ bool needToAdjust = true; - if (pOneInterDataSrc->filePage.numOfElems <= pOneInterDataSrc->rowIdx) { + if (pOneInterDataSrc->filePage.num <= pOneInterDataSrc->rowIdx) { loadNewDataFromDiskFor(pLocalReducer, pOneInterDataSrc, &needToAdjust); } @@ -788,22 +809,20 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * } void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, - SInterpolationInfo *pInterpoInfo) { + SFillInfo *pFillInfo) { // discard following dataset in the same group and reset the interpolation information STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - + int16_t prec = tinfo.precision; int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); - - taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - pLocalReducer->rowSize); + taosResetFillInfo(pFillInfo, revisedSTime); pLocalReducer->discard = true; - pLocalReducer->discardData->numOfElems = 0; + pLocalReducer->discardData->num = 0; SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel; tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1); @@ -856,6 +875,7 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { SSqlCmd * pCmd = &pSql->cmd; SSqlRes * pRes = &pSql->res; + tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -868,15 +888,15 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo assert(pRes->pLocalReducer == NULL); } - if (pQueryInfo->intervalTime == 0 || pQueryInfo->interpoType == TSDB_INTERPO_NONE) { - // no interval query, no interpolation + if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { + // no interval query, no fill operation pRes->data = pLocalReducer->pFinalRes; - pRes->numOfRows = pFinalDataPage->numOfElems; + pRes->numOfRows = pFinalDataPage->num; pRes->numOfTotalInCurrentClause += pRes->numOfRows; if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset < pRes->numOfRows) { - int32_t prevSize = pFinalDataPage->numOfElems; + int32_t prevSize = pFinalDataPage->num; tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, pQueryInfo->limit.offset - 1); /* remove the hole in column model */ @@ -894,65 +914,40 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotalInCurrentClause > pQueryInfo->limit.limit) { /* impose the limitation of output rows on the final result */ - int32_t prevSize = pFinalDataPage->numOfElems; + int32_t prevSize = pFinalDataPage->num; int32_t overFlow = pRes->numOfTotalInCurrentClause - pQueryInfo->limit.limit; assert(overFlow < pRes->numOfRows); pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit; pRes->numOfRows -= overFlow; - pFinalDataPage->numOfElems -= overFlow; + pFinalDataPage->num -= overFlow; tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, &pLocalReducer->interpolationInfo); + savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); } int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize); - pFinalDataPage->numOfElems = 0; + pFinalDataPage->num = 0; return; } - int64_t *pPrimaryKeys = (int64_t *)pLocalReducer->pBufForInterpo; - - SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; - - int64_t actualETime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; + SFillInfo *pFillInfo = pLocalReducer->pFillInfo; + int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey); tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity); } - - char ** srcData = (char **)malloc((POINTER_BYTES + sizeof(int32_t)) * pQueryInfo->fieldsInfo.numOfOutput); - int32_t *functions = (int32_t *)((char *)srcData + pQueryInfo->fieldsInfo.numOfOutput * sizeof(void *)); - - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - srcData[i] = - pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows; - functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId; - } - - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - int8_t precision = tinfo.precision; - while (1) { - int32_t remains = taosNumOfRemainPoints(pInterpoInfo); - TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, - pQueryInfo->slidingTimeUnit, precision); - int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime, - pLocalReducer->resColModel->capacity); - - int32_t newRows = taosDoInterpoResult(pInterpoInfo, pQueryInfo->interpoType, pResPages, remains, nrows, - pQueryInfo->intervalTime, pPrimaryKeys, pLocalReducer->resColModel, srcData, - pQueryInfo->defaultVal, functions, pLocalReducer->resColModel->capacity); - assert(newRows <= nrows); + int64_t newRows = -1; + taosGenerateDataBlock(pFillInfo, pResPages, &newRows, pLocalReducer->resColModel->capacity); if (pQueryInfo->limit.offset < newRows) { newRows -= pQueryInfo->limit.offset; @@ -975,16 +970,15 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo pQueryInfo->limit.offset -= newRows; pRes->numOfRows = 0; - int32_t rpoints = taosNumOfRemainPoints(pInterpoInfo); + int32_t rpoints = taosNumOfRemainRows(pFillInfo); if (rpoints <= 0) { - if (!doneOutput) { - /* reduce procedure is not completed, but current results for interpolation are exhausted */ + if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted break; } /* all output for current group are completed */ int32_t totalRemainRows = - taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeys, rpoints, pQueryInfo->intervalTime, actualETime); + taosGetNumOfResultWithFill(pFillInfo, rpoints, pFillInfo->slidingTime, actualETime); if (totalRemainRows <= 0) { break; } @@ -1000,10 +994,10 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo assert(pRes->numOfRows >= 0); pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit; - pFinalDataPage->numOfElems -= overFlow; + pFinalDataPage->num -= overFlow; /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pInterpoInfo); + savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); } if (pQueryInfo->order.order == TSDB_ORDER_ASC) { @@ -1017,18 +1011,17 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo } } - pFinalDataPage->numOfElems = 0; + pFinalDataPage->num = 0; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { tfree(pResPages[i]); } + tfree(pResPages); - - free(srcData); } static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { SColumnModel *pColumnModel = pLocalReducer->pDesc->pColumnModel; - assert(pColumnModel->capacity == 1 && tmpBuffer->numOfElems == 1); + assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1); // copy to previous temp buffer for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) { @@ -1038,7 +1031,7 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes); } - tmpBuffer->numOfElems = 0; + tmpBuffer->num = 0; pLocalReducer->hasPrevRow = true; } @@ -1168,7 +1161,7 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { pLocalReducer->hasPrevRow = false; int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalReducer->pCtx); - pLocalReducer->pResultBuf->numOfElems += numOfRes; + pLocalReducer->pResultBuf->num += numOfRes; fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalReducer); return numOfRes; @@ -1244,37 +1237,38 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no pRes->code = TSDB_CODE_SUCCESS; /* - * ignore the output of the current group since this group is skipped by user + * Ignore the output of the current group since this group is skipped by user * We set the numOfRows to be 0 and discard the possible remain results. */ if (pQueryInfo->slimit.offset > 0) { pRes->numOfRows = 0; pQueryInfo->slimit.offset -= 1; pLocalReducer->discard = !noMoreCurrentGroupRes; + return false; } tColModelCompact(pModel, pResBuf, pModel->capacity); - memcpy(pLocalReducer->pBufForInterpo, pResBuf->data, pLocalReducer->nResultBufSize); +// memcpy(pLocalReducer->pBufForInterpo, pResBuf->data, pLocalReducer->nResultBufSize); #ifdef _DEBUG_VIEW printf("final result before interpo:\n"); - tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->numOfElems, pResBuf->numOfElems); + assert(0); +// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); #endif - - SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; - int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; - - for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - int16_t offset = getColumnModelOffset(pModel, startIndex + i); - SSchema *pSchema = getColumnModelSchema(pModel, startIndex + i); - - memcpy(pInterpoInfo->pTags[i], pLocalReducer->pBufForInterpo + offset * pResBuf->numOfElems, pSchema->bytes); - } - - taosInterpoSetStartInfo(&pLocalReducer->interpolationInfo, pResBuf->numOfElems, pQueryInfo->interpoType); + + SFillInfo* pFillInfo = pLocalReducer->pFillInfo; + + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + + TSKEY ekey = taosGetRevisedEndKey(pQueryInfo->window.ekey, pFillInfo->order, pFillInfo->slidingTime, + pQueryInfo->slidingTimeUnit, tinfo.precision); + + taosFillSetStartInfo(pFillInfo, pResBuf->num, ekey); + taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); + doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes); - return true; } @@ -1302,13 +1296,13 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer int8_t precision = tinfo.precision; // for group result interpolation, do not return if not data is generated - if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { - int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; + if (pQueryInfo->fillType != TSDB_FILL_NONE) { + TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); int64_t newTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); - - taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, - pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); + taosGetIntervalStartTimestamp(skey, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); +// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime, +// pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize); + taosResetFillInfo(pLocalReducer->pFillInfo, newTime); } } @@ -1320,26 +1314,26 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - SLocalReducer * pLocalReducer = pRes->pLocalReducer; - SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SLocalReducer *pLocalReducer = pRes->pLocalReducer; + SFillInfo *pFillInfo = pLocalReducer->pFillInfo; STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); int8_t p = tinfo.precision; - if (taosHasRemainsDataForInterpolation(pInterpoInfo)) { - assert(pQueryInfo->interpoType != TSDB_INTERPO_NONE); + if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) { + assert(pQueryInfo->fillType != TSDB_FILL_NONE); tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf; - int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1)); + int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); - int32_t remain = taosNumOfRemainPoints(pInterpoInfo); - TSKEY ekey = - taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p); - int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, - pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity); + int32_t remain = taosNumOfRemainRows(pFillInfo); + TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, p); + + // the first column must be the timestamp column + int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo doInterpolateResult(pSql, pLocalReducer, false); } @@ -1355,7 +1349,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SLocalReducer * pLocalReducer = pRes->pLocalReducer; - SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; + SFillInfo *pFillInfo = pLocalReducer->pFillInfo; bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; @@ -1363,18 +1357,16 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - int8_t precision = tinfo.precision; if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || prevGroupCompleted) { - // if interpoType == TSDB_INTERPO_NONE, return directly - if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { + // if fillType == TSDB_FILL_NONE, return directly + if (pQueryInfo->fillType != TSDB_FILL_NONE) { int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, - pQueryInfo->slidingTimeUnit, precision); - int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime, - pLocalReducer->resColModel->capacity); + pQueryInfo->slidingTimeUnit, tinfo.precision); + int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo doInterpolateResult(pSql, pLocalReducer, true); } @@ -1474,7 +1466,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); #endif assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && - tmpBuffer->numOfElems == 0); + tmpBuffer->num == 0); // chosen from loser tree SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index]; @@ -1487,7 +1479,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { SSrcColumnInfo colInfo[256] = {0}; tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->numOfElems, pModel->capacity, colInfo); + tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo); #endif if (pLocalReducer->discard) { @@ -1495,7 +1487,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { /* current record belongs to the same group of previous record, need to discard it */ if (isSameGroup(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpBuffer)) { - tmpBuffer->numOfElems = 0; + tmpBuffer->num = 0; pOneDataSrc->rowIdx += 1; adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree); @@ -1509,7 +1501,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { continue; } else { pLocalReducer->discard = false; - pLocalReducer->discardData->numOfElems = 0; + pLocalReducer->discardData->num = 0; if (saveGroupResultInfo(pSql)) { pLocalReducer->status = TSC_LOCALREDUCE_READY; @@ -1538,17 +1530,17 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { tFilePage *pResBuf = pLocalReducer->pResultBuf; /* - * if the previous group does NOT generate any result (pResBuf->numOfElems == 0), + * if the previous group does NOT generate any result (pResBuf->num == 0), * continue to process results instead of return results. */ - if ((!sameGroup && pResBuf->numOfElems > 0) || (pResBuf->numOfElems == pLocalReducer->resColModel->capacity)) { + if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) { // does not belong to the same group bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup); // this row needs to discard, since it belongs to the group of previous if (pLocalReducer->discard && sameGroup) { pLocalReducer->hasUnprocessedRow = false; - tmpBuffer->numOfElems = 0; + tmpBuffer->num = 0; } else { // current row does not belongs to the previous group, so it is not be handled yet. pLocalReducer->hasUnprocessedRow = true; @@ -1611,7 +1603,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { finalizeRes(pQueryInfo, pLocalReducer); } - if (pLocalReducer->pResultBuf->numOfElems) { + if (pLocalReducer->pResultBuf->num) { doGenerateFinalResults(pSql, pLocalReducer, true); } @@ -1641,6 +1633,6 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1; pRes->pLocalReducer->pResultBuf = (tFilePage *)calloc(1, allocSize); - pRes->pLocalReducer->pResultBuf->numOfElems = numOfRes; + pRes->pLocalReducer->pResultBuf->num = numOfRes; pRes->data = pRes->pLocalReducer->pResultBuf->data; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bc717ed88cda2a06ce8ce055e8aeb59b5b335f62..e8af57816bfc1ed5ef27f89d7c9087c5b5b16d54 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -672,7 +672,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); - pQueryMsg->interpoType = htons(pQueryInfo->interpoType); + pQueryMsg->fillType = htons(pQueryInfo->fillType); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList)); @@ -800,7 +800,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { + if (pQueryInfo->fillType != TSDB_FILL_NONE) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]); pMsg += sizeof(pQueryInfo->defaultVal[0]); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index a00f856b2a9917a261368ba4f3e29fd09e111894..f42bf819caab22d28e99e7043b2b7260d54280b8 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -208,7 +208,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); if (pStream->numOfRes == 0) { - if (pQueryInfo->interpoType == TSDB_INTERPO_SET_VALUE || pQueryInfo->interpoType == TSDB_INTERPO_NULL) { + if (pQueryInfo->fillType == TSDB_FILL_SET_VALUE || pQueryInfo->fillType == TSDB_FILL_NULL) { SSqlRes *pRes = &pSql->res; /* failed to retrieve any result in this retrieve */ diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 4bcfd6e8225e67adce509d8f61ce426dabffe26d..6f9ad6d8507f11656795f3b5f9359338449ec86c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1390,7 +1390,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); // clear local saved number of results - trsupport->localBuffer->numOfElems = 0; + trsupport->localBuffer->num = 0; pthread_mutex_unlock(&trsupport->queryMutex); tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, @@ -1457,7 +1457,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; // data in from current vnode is stored in cache and disk - uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; + uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, numOfRowsFromSubquery, idx); @@ -1465,11 +1465,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); #ifdef _DEBUG_VIEW - printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); + printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num); SSrcColumnInfo colInfo[256] = {0}; tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, - trsupport->localBuffer->numOfElems, colInfo); + tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num, + trsupport->localBuffer->num, colInfo); #endif if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1d1e06d3a9a5d0bac0e4eee20f35345d65365de6..9163e0f11fa2cf68ea1818c7b7ee586e8881b323 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -280,7 +280,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) { return; } - pQueryInfo->interpoType = TSDB_INTERPO_NONE; + pQueryInfo->fillType = TSDB_FILL_NONE; tfree(pQueryInfo->defaultVal); } @@ -1779,7 +1779,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); - if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { + if (pQueryInfo->fillType != TSDB_FILL_NONE) { pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e58bcf52371084ad254ce9d69c1ab87d99382bfa..1dc353b7a7d5a8ee26543c2cc58b7af0536bea5b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -140,19 +140,19 @@ enum _mgmt_table { TSDB_MGMT_TABLE_MAX, }; -#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 -#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 +#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 +#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 #define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 -#define TSDB_INTERPO_NONE 0 -#define TSDB_INTERPO_NULL 1 -#define TSDB_INTERPO_SET_VALUE 2 -#define TSDB_INTERPO_LINEAR 3 -#define TSDB_INTERPO_PREV 4 +#define TSDB_FILL_NONE 0 +#define TSDB_FILL_NULL 1 +#define TSDB_FILL_SET_VALUE 2 +#define TSDB_FILL_LINEAR 3 +#define TSDB_FILL_PREV 4 #define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_PRIVILEGES 0x2 @@ -164,8 +164,8 @@ enum _mgmt_table { #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_COL_NORMAL 0x0u -#define TSDB_COL_TAG 0x1u -#define TSDB_COL_JOIN 0x2u +#define TSDB_COL_TAG 0x1u +#define TSDB_COL_JOIN 0x2u extern char *taosMsg[]; @@ -439,7 +439,7 @@ typedef struct { uint16_t queryType; // denote another query process int16_t numOfOutput; // final output columns numbers int16_t tagNameRelType; // relation of tag criteria and tbname criteria - int16_t interpoType; // interpolate type + int16_t fillType; // interpolate type uint64_t defaultVal; // default value array list int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qextbuffer.h index 9389e5a5ab9d9eb603866ff3bb97c183423666c8..0d608f1f1bbc3515db461f5ee5c98bfe324a073f 100644 --- a/src/query/inc/qextbuffer.h +++ b/src/query/inc/qextbuffer.h @@ -68,7 +68,7 @@ typedef struct SExtFileInfo { } SExtFileInfo; typedef struct tFilePage { - uint64_t numOfElems; + uint64_t num; char data[]; } tFilePage; diff --git a/src/query/inc/qinterpolation.h b/src/query/inc/qinterpolation.h index c8ebd850b61cdc69e5816837d5a439a10ba99e43..fccae6172986de78ec4fb95f30e380cd1f34424e 100644 --- a/src/query/inc/qinterpolation.h +++ b/src/query/inc/qinterpolation.h @@ -24,18 +24,34 @@ extern "C" { #include "taosdef.h" #include "qextbuffer.h" -typedef struct SInterpolationInfo { - int64_t startTimestamp; - int32_t order; // order [asc/desc] - int32_t numOfRawDataInRows; // number of points in pQuery->sdata - int32_t rowIdx; // rowIdx in pQuery->sdata - int32_t numOfTotalInterpo; // number of interpolated rows in one round - int32_t numOfCurrentInterpo; // number of interpolated rows in current results - char * prevValues; // previous row of data +typedef struct { + STColumn col; // column info + int16_t functionId; // sql function id + int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN + union {int64_t i; double d;} defaultVal; +} SFillColInfo; + +typedef struct SFillInfo { + TSKEY start; // start timestamp + TSKEY endKey; // endKey for fill + int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC] + int32_t fillType; // fill type + int32_t numOfRows; // number of rows in the input data block + int32_t rowIdx; // rowIdx + int32_t numOfTotal; // number of filled rows in one round + int32_t numOfCurrent; // number of filled rows in current results + + int32_t numOfTags; // number of tags + int32_t numOfCols; // number of columns, including the tags columns + int32_t rowSize; // size of each row + char ** pTags; // tags value for current interpolation + + int64_t slidingTime; // sliding value to determine the number of result for a given time window + char * prevValues; // previous row of data, to generate the interpolation results char * nextValues; // next row of data - int32_t numOfTags; - char ** pTags; // tags value for current interoplation -} SInterpolationInfo; + SFillColInfo* pFillCol; // column info for fill operations + char** pData; // original result data block involved in filling data +} SFillInfo; typedef struct SPoint { int64_t key; @@ -44,49 +60,31 @@ typedef struct SPoint { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision); -void taosInitInterpoInfo(SInterpolationInfo *pInterpoInfo, int32_t order, int64_t startTimeStamp, int32_t numOfTags, - int32_t rowSize); +SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, + int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol); -void taosDestoryInterpoInfo(SInterpolationInfo *pInterpoInfo); +void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp); -void taosInterpoSetStartInfo(SInterpolationInfo *pInterpoInfo, int32_t numOfRawDataInRows, int32_t type); +void taosDestoryFillInfo(SFillInfo *pFillInfo); -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision); +void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); -/** - * - * @param pInterpoInfo - * @param pPrimaryKeyArray - * @param numOfRows - * @param nInterval - * @param ekey - * @param maxNumOfRows - * @return - */ -int32_t taosGetNumOfResultWithInterpo(SInterpolationInfo *pInterpoInfo, int64_t *pPrimaryKeyArray, int32_t numOfRows, - int64_t nInterval, int64_t ekey, int32_t maxNumOfRows); +void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput); -int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo *pInterpoInfo, int64_t *pPrimaryKeyArray, - int32_t numOfRawDataInRows, int64_t nInterval, int64_t ekey); -/** - * - * @param pInterpoInfo - * @return - */ -bool taosHasRemainsDataForInterpolation(SInterpolationInfo *pInterpoInfo); +void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput); -int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo); +TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision); -/** - * - */ -int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoType, tFilePage **data, - int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval, - const int64_t *pPrimaryKeyArray, SColumnModel *pModel, char **srcData, int64_t *defaultVal, - const int32_t *functionIDs, int32_t bufSize); +int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows); +int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); + +int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData); + int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); +void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity); + #ifdef __cplusplus } #endif diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 991b3b73f7d7820dc5d292f95a815e734cfdb64b..c5442d17c7e01de4a94ba7c453ef2851d0f3435d 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -28,10 +28,10 @@ #include "tsqlfunction.h" #include "tarray.h" -typedef struct SData { - int32_t num; - char data[]; -} SData; +//typedef struct tFilePage { +// int64_t num; +// char data[]; +//} tFilePage; struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); @@ -129,7 +129,7 @@ typedef struct SQuery { char slidingTimeUnit; // interval data type, used for daytime revise int8_t precision; int16_t numOfOutput; - int16_t interpoType; + int16_t fillType; int16_t checkBuffer; // check if the buffer is full during scan each block SLimitVal limit; int32_t rowSize; @@ -139,11 +139,10 @@ typedef struct SQuery { SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* defaultVal; -// TSKEY lastKey; uint32_t status; // query status SResultRec rec; int32_t pos; - SData** sdata; + tFilePage** sdata; STableQueryInfo* current; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -151,12 +150,11 @@ typedef struct SQuery { typedef struct SQueryRuntimeEnv { SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SQuery* pQuery; - SData** pInterpoBuf; SQLFunctionCtx* pCtx; int16_t numOfRowsPerPage; int16_t offset[TSDB_MAX_COLUMNS]; uint16_t scanFlag; // denotes reversed scan of data or not - SInterpolationInfo interpoInfo; + SFillInfo* pFillInfo; SWindowResInfo windowResInfo; STSBuf* pTSBuf; STSCursor cur; diff --git a/src/query/src/qextbuffer.c b/src/query/src/qextbuffer.c index adf15d1de0d90e287131bf7b076fa089e4156020..98d830c26fc0507e4ef4391d3b51f29cebb49ae9 100644 --- a/src/query/src/qextbuffer.c +++ b/src/query/src/qextbuffer.c @@ -136,7 +136,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) { } item->pNext = NULL; - item->item.numOfElems = 0; + item->item.num = 0; if (pMemBuffer->pTail != NULL) { pMemBuffer->pTail->pNext = item; @@ -167,13 +167,13 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow pLast = pMemBuffer->pTail; } - if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records + if (pLast->item.num + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRows, numOfRows); pMemBuffer->numOfElemsInBuffer += numOfRows; pMemBuffer->numOfTotalElems += numOfRows; } else { - int32_t numOfRemainEntries = pMemBuffer->numOfElemsPerPage - pLast->item.numOfElems; + int32_t numOfRemainEntries = pMemBuffer->numOfElemsPerPage - pLast->item.num; tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRemainEntries, numOfRows); pMemBuffer->numOfElemsInBuffer += numOfRemainEntries; @@ -271,7 +271,7 @@ bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) { ret = false; } - pMemBuffer->fileMeta.numOfElemsInFile += first->item.numOfElems; + pMemBuffer->fileMeta.numOfElemsInFile += first->item.num; pMemBuffer->fileMeta.nFileSize += 1; tFilePagesItem *ptmp = first; @@ -985,16 +985,16 @@ void tColModelDisplayEx(SColumnModel *pModel, void *pData, int32_t numOfRows, in //////////////////////////////////////////////////////////////////////////////////////////// void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) { - if (inputBuffer->numOfElems == 0 || maxElemsCapacity == inputBuffer->numOfElems) { + if (inputBuffer->num == 0 || maxElemsCapacity == inputBuffer->num) { return; } /* start from the second column */ for (int32_t i = 1; i < pModel->numOfCols; ++i) { SSchemaEx* pSchemaEx = &pModel->pFields[i]; - memmove(inputBuffer->data + pSchemaEx->offset * inputBuffer->numOfElems, + memmove(inputBuffer->data + pSchemaEx->offset * inputBuffer->num, inputBuffer->data + pSchemaEx->offset * maxElemsCapacity, - pSchemaEx->field.bytes * inputBuffer->numOfElems); + pSchemaEx->field.bytes * inputBuffer->num); } } @@ -1009,13 +1009,13 @@ int16_t getColumnModelOffset(SColumnModel *pColumnModel, int32_t index) { } void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockCapacity, int32_t s, int32_t e) { - if (inputBuffer->numOfElems == 0 || (e - s + 1) <= 0) { + if (inputBuffer->num == 0 || (e - s + 1) <= 0) { return; } int32_t removed = e - s + 1; - int32_t remain = inputBuffer->numOfElems - removed; - int32_t secPart = inputBuffer->numOfElems - e - 1; + int32_t remain = inputBuffer->num - removed; + int32_t secPart = inputBuffer->num - e - 1; /* start from the second column */ for (int32_t i = 0; i < pModel->numOfCols; ++i) { @@ -1028,7 +1028,7 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC memmove(startPos, endPos, pSchema->bytes * secPart); } - inputBuffer->numOfElems = remain; + inputBuffer->num = remain; } /* @@ -1040,16 +1040,16 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC */ void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t start, int32_t numOfRows, int32_t srcCapacity) { - assert(dstPage->numOfElems + numOfRows <= dstModel->capacity); + assert(dstPage->num + numOfRows <= dstModel->capacity); for (int32_t col = 0; col < dstModel->numOfCols; ++col) { - char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->numOfElems, col); + char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->num, col); char *src = COLMODEL_GET_VAL((char *)srcData, dstModel, srcCapacity, start, col); memmove(dst, src, dstModel->pFields[col].field.bytes * numOfRows); } - dstPage->numOfElems += numOfRows; + dstPage->num += numOfRows; } tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrderCols, SColumnModel *pModel, diff --git a/src/query/src/qinterpolation.c b/src/query/src/qinterpolation.c index 40fcf63c363912ff9f5eefaaf98ecdf91eaebf26..c1939badcc697056f39fe8490967607006db4db6 100644 --- a/src/query/src/qinterpolation.c +++ b/src/query/src/qinterpolation.c @@ -20,7 +20,7 @@ #include "taosmsg.h" #include "tsqlfunction.h" -#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSDB_ORDER_ASC) +#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) { if (timeRange == 0) { @@ -37,6 +37,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char * TODO dynamically decide the start time of a day */ + // todo refactor to extract function that is available for Linux/Windows/Mac platform #if defined(WINDOWS) && _MSC_VER >= 1900 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 int64_t timezone = _timezone; @@ -56,130 +57,169 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char } } -void taosInitInterpoInfo(SInterpolationInfo* pInterpoInfo, int32_t order, int64_t startTimestamp, - int32_t numOfGroupbyTags, int32_t rowSize) { - pInterpoInfo->startTimestamp = startTimestamp; - pInterpoInfo->rowIdx = -1; - pInterpoInfo->numOfRawDataInRows = 0; - pInterpoInfo->numOfCurrentInterpo = 0; - pInterpoInfo->numOfTotalInterpo = 0; - pInterpoInfo->order = order; - - pInterpoInfo->numOfTags = numOfGroupbyTags; - if (pInterpoInfo->pTags == NULL && numOfGroupbyTags > 0) { - pInterpoInfo->pTags = calloc(1, numOfGroupbyTags * POINTER_BYTES + rowSize); +SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, + int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) { + if (fillType == TSDB_FILL_NONE) { + return NULL; } + + SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); + + taosResetFillInfo(pFillInfo, skey); + + pFillInfo->order = order; + pFillInfo->fillType = fillType; + pFillInfo->pFillCol = pFillCol; + pFillInfo->numOfTags = numOfTags; + pFillInfo->numOfCols = numOfCols; + pFillInfo->slidingTime = slidingTime; + + pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); + + int32_t rowsize = 0; + for (int32_t i = 0; i < numOfCols; ++i) { + int32_t bytes = pFillInfo->pFillCol[i].col.bytes; + pFillInfo->pData[i] = calloc(1, sizeof(tFilePage) + bytes * capacity); + + rowsize += bytes; + } + + if (numOfTags > 0) { + pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize); + } + + pFillInfo->rowSize = rowsize; + return pFillInfo; +} - // set the previous value to be null - tfree(pInterpoInfo->prevValues); +void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { + pFillInfo->start = startTimestamp; + pFillInfo->rowIdx = -1; + pFillInfo->numOfRows = 0; + pFillInfo->numOfCurrent = 0; + pFillInfo->numOfTotal = 0; } -// the SInterpolationInfo itself will not be released -void taosDestoryInterpoInfo(SInterpolationInfo* pInterpoInfo) { - if (pInterpoInfo == NULL) { +void taosDestoryFillInfo(SFillInfo* pFillInfo) { + if (pFillInfo == NULL) { return; } - tfree(pInterpoInfo->prevValues); - tfree(pInterpoInfo->nextValues); - - tfree(pInterpoInfo->pTags); + tfree(pFillInfo->prevValues); + tfree(pFillInfo->nextValues); + tfree(pFillInfo->pTags); + tfree(pFillInfo); } -void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawDataInRows, int32_t type) { - if (type == TSDB_INTERPO_NONE) { +void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { + if (pFillInfo->fillType == TSDB_FILL_NONE) { return; } - pInterpoInfo->rowIdx = 0; - pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; + pFillInfo->rowIdx = 0; + pFillInfo->numOfRows = numOfRows; + + pFillInfo->endKey = endKey; } -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) { - if (order == TSDB_ORDER_ASC) { - return ekey; - } else { - return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision); +void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) { + // copy the data into source data buffer + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); } } -int32_t taosGetNumOfResultWithInterpo(SInterpolationInfo* pInterpoInfo, TSKEY* pPrimaryKeyArray, - int32_t numOfRawDataInRows, int64_t nInterval, int64_t ekey, - int32_t maxNumOfRows) { - int32_t numOfRes = taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeyArray, numOfRawDataInRows, nInterval, ekey); - return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; +void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) { + assert(pFillInfo->numOfRows == pInput->num); + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + char* s = pInput->data + pCol->col.offset * pInput->num; + memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes); + + if (pCol->flag == TSDB_COL_TAG) { // copy the tag value + memcpy(pFillInfo->pTags[i], pFillInfo->pData[i], pCol->col.bytes); + } + } } -int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* pPrimaryKeyArray, - int32_t numOfAvailRawData, int64_t nInterval, int64_t ekey) { - if (numOfAvailRawData > 0) { - int32_t finalNumOfResult = 0; - - // get last timestamp, calculate the result size - int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1]; - finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1; - - assert(finalNumOfResult >= numOfAvailRawData); - return finalNumOfResult; +TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { + if (order == TSDB_ORDER_ASC) { + return ekey; } else { - /* reach the end of data */ - if ((ekey < pInterpoInfo->startTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (ekey > pInterpoInfo->startTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { + return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); + } +} + +static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain, + int64_t nInterval, int64_t ekey) { + + if (remain > 0) { // still fill gap within current data block, not generating data after the result set. + TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1]; + int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1; + + assert(total >= remain); + return total; + } else { // reach the end of data + if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || + (ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { return 0; } else { - return (int32_t)(labs(ekey - pInterpoInfo->startTimestamp) / nInterval) + 1; + return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1; } } } -bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { - return taosNumOfRemainPoints(pInterpoInfo) > 0; +int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) { + int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows, + pFillInfo->slidingTime, ekey); + return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; } -int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) { - if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) { +int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { + if (pFillInfo->rowIdx == -1 || pFillInfo->numOfRows == 0) { return 0; } - return INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? (pInterpoInfo->numOfRawDataInRows - pInterpoInfo->rowIdx) - : pInterpoInfo->rowIdx + 1; + return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx) + : pFillInfo->rowIdx + 1; } -static double doLinearInterpolationImpl(double v1, double v2, double k1, double k2, double k) { +static double linearInterpolationImpl(double v1, double v2, double k1, double k2, double k) { return v1 + (v2 - v1) * (k - k1) / (k2 - k1); } int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { switch (type) { case TSDB_DATA_TYPE_INT: { - *(int32_t*)point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, + *(int32_t*)point->val = linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, point2->key, point->key); break; } case TSDB_DATA_TYPE_FLOAT: { *(float*)point->val = - doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key); + linearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key); break; }; case TSDB_DATA_TYPE_DOUBLE: { *(double*)point->val = - doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key); + linearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key); break; }; case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: { - *(int64_t*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, + *(int64_t*)point->val = linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, point2->key, point->key); break; }; case TSDB_DATA_TYPE_SMALLINT: { - *(int16_t*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, + *(int16_t*)point->val = linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, point2->key, point->key); break; }; case TSDB_DATA_TYPE_TINYINT: { *(int8_t*)point->val = - doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key); + linearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key); break; }; default: { @@ -191,239 +231,247 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } -static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; } - -static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, - int32_t start, int32_t capacity, int32_t num) { - for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) { - SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, num); - assignVal(val1, pTags[j], pSchema->bytes, pSchema->type); +static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) { + for (int32_t j = 0, i = start; i < pColInfo->numOfCols + pColInfo->numOfTags; ++i, ++j) { + SFillColInfo* pCol = &pColInfo->pFillCol[i]; + + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); + assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type); } } -static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, - SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval, - int64_t* defaultVal, int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, - char** pTags, bool outOfBound) { - char** prevValues = &pInterpoInfo->prevValues; - char** nextValues = &pInterpoInfo->nextValues; +static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, + int64_t ts, char** pTags, bool outOfBound) { + char** prevValues = &pFillInfo->prevValues; + char** nextValues = &pFillInfo->nextValues; SPoint point1, point2, point; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); - char* val = getPos(data[0]->data, TSDB_KEYSIZE, *num); - *(TSKEY*)val = pInterpoInfo->startTimestamp; + char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num); + *(TSKEY*) val = pFillInfo->start; - int32_t numOfValCols = pModel->numOfCols - numOfTags; + int32_t numOfValCols = pFillInfo->numOfCols - pFillInfo->numOfTags; // set the other values - if (interpoType == TSDB_INTERPO_PREV) { - char* pInterpolationData = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? *prevValues : *nextValues; + if (pFillInfo->fillType == TSDB_FILL_PREV) { + char* pInterpolationData = FILL_IS_ASC_FILL(pFillInfo) ? *prevValues : *nextValues; if (pInterpolationData != NULL) { for (int32_t i = 1; i < numOfValCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pModel, i); - int16_t offset = getColumnModelOffset(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, *num); - - if (isNull(pInterpolationData + offset, pSchema->type)) { - setNull(val1, pSchema->type, pSchema->bytes); + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); + if (isNull(pInterpolationData + pCol->col.offset, pCol->col.type)) { + setNull(val1, pCol->col.type, pCol->col.bytes); } else { - assignVal(val1, pInterpolationData + offset, pSchema->bytes, pSchema->type); + assignVal(val1, pInterpolationData + pCol->col.offset, pCol->col.bytes, pCol->col.type); } } - } else { /* no prev value yet, set the value for null */ + } else { // no prev value yet, set the value for NULL for (int32_t i = 1; i < numOfValCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pModel, i); + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - char* val1 = getPos(data[i]->data, pSchema->bytes, *num); - setNull(val1, pSchema->type, pSchema->bytes); + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); + setNull(val1, pCol->col.type, pCol->col.bytes); } } - setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); - } else if (interpoType == TSDB_INTERPO_LINEAR) { + setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) { // TODO : linear interpolation supports NULL value if (*prevValues != NULL && !outOfBound) { for (int32_t i = 1; i < numOfValCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pModel, i); - int16_t offset = getColumnModelOffset(pModel, i); - - int16_t type = pSchema->type; - char* val1 = getPos(data[i]->data, pSchema->bytes, *num); - + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + int16_t type = pCol->col.type; + int16_t bytes = pCol->col.bytes; + + char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { - setNull(val1, type, pSchema->bytes); + setNull(val1, pCol->col.type, bytes); continue; } - point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + offset}; - point2 = (SPoint){.key = currentTimestamp, .val = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes}; - point = (SPoint){.key = pInterpoInfo->startTimestamp, .val = val1}; + point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + pCol->col.offset}; + point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes}; + point = (SPoint){.key = pFillInfo->start, .val = val1}; taosDoLinearInterpolation(type, &point1, &point2, &point); } - setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); + setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); } else { for (int32_t i = 1; i < numOfValCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, *num); - setNull(val1, pSchema->type, pSchema->bytes); + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); + setNull(val1, pCol->col.type, pCol->col.bytes); } - setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); + setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + } } else { /* default value interpolation */ for (int32_t i = 1; i < numOfValCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, *num); - assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type); + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); + assignVal(val1, (char*)&pCol->defaultVal.i, pCol->col.bytes, pCol->col.type); } - setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); + setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); } - pInterpoInfo->startTimestamp += (nInterval * step); - pInterpoInfo->numOfCurrentInterpo++; + pFillInfo->start += (pFillInfo->slidingTime * step); + pFillInfo->numOfCurrent++; (*num) += 1; } -static void initBeforeAfterDataBuf(SColumnModel* pModel, char** nextValues) { +static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** nextValues) { if (*nextValues != NULL) { return; } - *nextValues = calloc(1, pModel->rowSize); - for (int i = 1; i < pModel->numOfCols; i++) { - int16_t offset = getColumnModelOffset(pModel, i); - SSchema* pSchema = getColumnModelSchema(pModel, i); - - setNull(*nextValues + offset, pSchema->type, pSchema->bytes); + *nextValues = calloc(1, pFillInfo->rowSize); + for (int i = 1; i < pFillInfo->numOfCols; i++) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes); } } -int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, - int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval, - const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal, - const int32_t* functionIDs, int32_t bufSize) { +int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) { int32_t num = 0; - pInterpoInfo->numOfCurrentInterpo = 0; + pFillInfo->numOfCurrent = 0; - char** prevValues = &pInterpoInfo->prevValues; - char** nextValues = &pInterpoInfo->nextValues; + char** prevValues = &pFillInfo->prevValues; + char** nextValues = &pFillInfo->nextValues; - int32_t numOfTags = pInterpoInfo->numOfTags; - char** pTags = pInterpoInfo->pTags; + int32_t numOfTags = pFillInfo->numOfTags; + char** pTags = pFillInfo->pTags; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); - if (numOfRawDataInRows == 0) { + if (numOfRows == 0) { /* - * we need to rebuild whole data - * NOTE:we need to keep the last saved data, to satisfy the interpolation + * we need to rebuild whole result set + * NOTE:we need to keep the last saved data, to generated the filled data */ while (num < outputRows) { - doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal, - pInterpoInfo->startTimestamp, bufSize, numOfTags, pTags, true); + doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); } - pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; + + pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return outputRows; } else { while (1) { - int64_t currentTimestamp = pPrimaryKeyArray[pInterpoInfo->rowIdx]; + int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->rowIdx]; - if ((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { + if ((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) { /* set the next value for interpolation */ - initBeforeAfterDataBuf(pModel, nextValues); + initBeforeAfterDataBuf(pFillInfo, nextValues); - int32_t offset = pInterpoInfo->rowIdx; - for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) { - SSchema* pSchema = getColumnModelSchema(pModel, i); - - memcpy(*nextValues + tlen, srcData[i] + offset * pSchema->bytes, pSchema->bytes); - tlen += pSchema->bytes; + int32_t offset = pFillInfo->rowIdx; + for (int32_t i = 0; i < pFillInfo->numOfCols - numOfTags; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + memcpy(*nextValues + pCol->col.offset, srcData[i] + offset * pCol->col.bytes, pCol->col.bytes); } } - if (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && - num < outputRows) { - while (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && - num < outputRows) { - doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal, - currentTimestamp, bufSize, numOfTags, pTags, false); + if (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { + + while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { + doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, false); } /* output buffer is full, abort */ - if ((num == outputRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (num < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { - pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; + if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || + (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) { + pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return outputRows; } } else { - assert(pInterpoInfo->startTimestamp == currentTimestamp); - - initBeforeAfterDataBuf(pModel, prevValues); + assert(pFillInfo->start == ts); + initBeforeAfterDataBuf(pFillInfo, prevValues); // assign rows to dst buffer int32_t i = 0; - for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) { - int16_t offset = getColumnModelOffset(pModel, i); - SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, num); - char* src = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes; + for (; i < pFillInfo->numOfCols - numOfTags; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); + char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx); if (i == 0 || - (functionIDs[i] != TSDB_FUNC_COUNT && !isNull(src, pSchema->type)) || - (functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(src) != 0)) { - assignVal(val1, src, pSchema->bytes, pSchema->type); - memcpy(*prevValues + tlen, src, pSchema->bytes); + (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) || + (pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) { + assignVal(val1, src, pCol->col.bytes, pCol->col.type); + memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes); } else { // i > 0 and data is null , do interpolation - if (interpoType == TSDB_INTERPO_PREV) { - assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type); - } else if (interpoType == TSDB_INTERPO_LINEAR) { - assignVal(val1, src, pSchema->bytes, pSchema->type); - memcpy(*prevValues + tlen, src, pSchema->bytes); + if (pFillInfo->fillType == TSDB_FILL_PREV) { + assignVal(val1, *prevValues + pCol->col.offset, pCol->col.bytes, pCol->col.type); + } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) { + assignVal(val1, src, pCol->col.bytes, pCol->col.type); + memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes); } else { - assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type); + assignVal(val1, (char*) &pCol->defaultVal.i, pCol->col.bytes, pCol->col.type); } } - tlen += pSchema->bytes; } - /* set the tag value for final result */ - setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, pModel->numOfCols - numOfTags, bufSize, - num); + // set the tag value for final result + setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num); - pInterpoInfo->startTimestamp += (nInterval * step); - pInterpoInfo->rowIdx += 1; + pFillInfo->start += (pFillInfo->slidingTime * step); + pFillInfo->rowIdx += 1; num += 1; } - if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (pInterpoInfo->rowIdx < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || num >= outputRows) { - if (pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows || pInterpoInfo->rowIdx < 0) { - pInterpoInfo->rowIdx = -1; - pInterpoInfo->numOfRawDataInRows = 0; + if ((pFillInfo->rowIdx >= pFillInfo->numOfRows && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->rowIdx < 0 && !FILL_IS_ASC_FILL(pFillInfo)) || num >= outputRows) { + if (pFillInfo->rowIdx >= pFillInfo->numOfRows || pFillInfo->rowIdx < 0) { + pFillInfo->rowIdx = -1; + pFillInfo->numOfRows = 0; /* the raw data block is exhausted, next value does not exists */ tfree(*nextValues); } - pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; + pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return num; } } } } + +void taosFillInfoSetSource(SFillInfo* pFillInfo, tFilePage **data, TSKEY endKey) { + pFillInfo->endKey = endKey; + + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + memcpy(pFillInfo->pData[i], data[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); + } +} + +void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) { + int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? + +// TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, +// pQuery->slidingTimeUnit, pQuery->precision); +// if (QUERY_IS_ASC_QUERY(pQuery)) { +// assert(ekey >= pQuery->window.ekey); +// } else { +// assert(ekey <= pQuery->window.ekey); +// } + + int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); + + int32_t numOfRes = taosDoInterpoResult(pFillInfo, output, remain, rows, pFillInfo->pData); + *outputRows = rows; + + assert(numOfRes == rows); +} diff --git a/src/query/src/qpercentile.c b/src/query/src/qpercentile.c index 2561bdf284cf8bafa02cd2312b4298c7e43171c1..286171bdab794dde6fdf07d65fbcfa1d6875215b 100644 --- a/src/query/src/qpercentile.c +++ b/src/query/src/qpercentile.c @@ -64,26 +64,26 @@ static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx, for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) { ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); UNUSED(ret); - assert(pPage->numOfElems > 0); + assert(pPage->num > 0); - tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, pPage->numOfElems, pPage->numOfElems); - printf("id: %d count: %" PRIu64 "\n", j, buffer->numOfElems); + tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, pPage->num, pPage->num); + printf("id: %d count: %" PRIu64 "\n", j, buffer->num); } } tfree(pPage); - assert(buffer->numOfElems == pMemBuffer->fileMeta.numOfElemsInFile); + assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile); } // load data in pMemBuffer to buffer tFilePagesItem *pListItem = pMemBuffer->pHead; while (pListItem != NULL) { - tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, pListItem->item.numOfElems, - pListItem->item.numOfElems); + tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, pListItem->item.num, + pListItem->item.num); pListItem = pListItem->pNext; } - tColDataQSort(pDesc, buffer->numOfElems, 0, buffer->numOfElems - 1, buffer->data, TSDB_ORDER_ASC); + tColDataQSort(pDesc, buffer->num, 0, buffer->num - 1, buffer->data, TSDB_ORDER_ASC); pDesc->pColumnModel->capacity = oldCapacity; // restore value return buffer; @@ -881,7 +881,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); UNUSED(ret); - tMemBucketPut(pMemBucket, pPage->data, pPage->numOfElems); + tMemBucketPut(pMemBucket, pPage->data, pPage->num); } fclose(pMemBuffer->file); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index ef6ff8a70bf3cb9ba08fa63e34b29bdfaa7693f4..913cd4280c4661e8c658ea60d9d5409c0ca7ebf5 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include #include "os.h" #include "hash.h" @@ -529,10 +530,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult pageId = getLastPageId(&list); pData = getResultBufferPageById(pResultBuf, pageId); - if (pData->numOfElems >= numOfRowsPerPage) { + if (pData->num >= numOfRowsPerPage) { pData = getNewDataBuf(pResultBuf, sid, &pageId); if (pData != NULL) { - assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer + assert(pData->num == 0); // number of elements must be 0 for new allocated buffer } } } @@ -544,7 +545,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult // set the number of rows in current disk page if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer pWindowRes->pos.pageId = pageId; - pWindowRes->pos.rowId = pData->numOfElems++; + pWindowRes->pos.rowId = pData->num++; } return 0; @@ -1202,6 +1203,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS while (1) { getNextTimeWindow(pQuery, &nextWin); + assert(pWindowResInfo->startTime <= nextWin.skey); + if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.skey > pQuery->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { @@ -1415,7 +1418,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) { - goto _error_clean; + goto _clean; } pRuntimeEnv->offset[0] = 0; @@ -1427,7 +1430,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order int32_t index = pSqlFuncMsg->colInfo.colIndex; if (TSDB_COL_IS_TAG(pIndex->flag)) { - if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { + if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor pCtx->inputBytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pCtx->inputType = TSDB_DATA_TYPE_BINARY; } else { @@ -1489,7 +1492,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx); return TSDB_CODE_SUCCESS; -_error_clean: +_clean: tfree(pRuntimeEnv->resultInfo); tfree(pRuntimeEnv->pCtx); @@ -1524,15 +1527,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->pCtx); } - taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo); - - if (pRuntimeEnv->pInterpoBuf != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - tfree(pRuntimeEnv->pInterpoBuf[i]); - } - - tfree(pRuntimeEnv->pInterpoBuf); - } + taosDestoryFillInfo(pRuntimeEnv->pFillInfo); destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); @@ -1975,7 +1970,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI // set the direct previous(next) point for process count = 2; - if (pQuery->interpoType == TSDB_INTERPO_SET_VALUE) { + if (pQuery->fillType == TSDB_FILL_SET_VALUE) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -2005,7 +2000,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI } pInterpDetail->ts = pQuery->window.skey; - pInterpDetail->type = pQuery->interpoType; + pInterpDetail->type = pQuery->fillType; } } else { TSKEY prevKey = *(TSKEY *)pPointInterpSupport->pPrevPoint[0]; @@ -2040,7 +2035,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT); pInterpDetail->ts = pQInfo->runtimeEnv.pQuery->window.skey; - pInterpDetail->type = pQuery->interpoType; + pInterpDetail->type = pQuery->fillType; } } } @@ -2094,23 +2089,6 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter *pPointInterpSupport) { #endif } -static UNUSED_FUNC void allocMemForInterpo(SQInfo *pQInfo, SQuery *pQuery, void *pMeterObj) { -#if 0 - if (pQuery->interpoType != TSDB_INTERPO_NONE) { - assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery))); - - if (isIntervalQuery(pQuery)) { - pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput); - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - pQInfo->runtimeEnv.pInterpoBuf[i] = - calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock); - } - } - } -#endif -} - static int32_t getInitialPageNum(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t INITIAL_RESULT_ROWS_VALUE = 16; @@ -2414,6 +2392,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->prevSKey = w.skey; } + + if (pRuntimeEnv->pFillInfo != NULL) { + pRuntimeEnv->pFillInfo->start = w.skey; + } } // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block @@ -2427,10 +2409,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pQuery->pSelectExpr[i].bytes; - char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData)); + char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); if (tmp == NULL) { // todo handle the oom } else { - pQuery->sdata[i] = (SData *)tmp; + pQuery->sdata[i] = (tFilePage *)tmp; } // set the pCtx output buffer position @@ -2648,7 +2630,7 @@ static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t } } -void UNUSED_FUNC displayInterResult(SData **pdata, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) { +void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfCols = pQuery->numOfOutput; printf("super table query intermediate result, total:%d\n", numOfRows); @@ -2787,7 +2769,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { int32_t total = 0; for (int32_t i = 0; i < list.size; ++i) { tFilePage *pData = getResultBufferPageById(pResultBuf, list.pData[i]); - total += pData->numOfElems; + total += pData->num; } int32_t rows = total; @@ -2800,11 +2782,11 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; char * pDest = pQuery->sdata[i]->data; - memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, - bytes * pData->numOfElems); + memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num, + bytes * pData->num); } - offset += pData->numOfElems; + offset += pData->num; } assert(pQuery->rec.rows == 0); @@ -2907,7 +2889,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { if (ts == lastTimestamp) { // merge with the last one doMerge(pRuntimeEnv, ts, pWindowRes, true); } else { // copy data to disk buffer - if (buffer[0]->numOfElems == pQuery->rec.capacity) { + if (buffer[0]->num == pQuery->rec.capacity) { if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { return -1; } @@ -2916,7 +2898,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { } doMerge(pRuntimeEnv, ts, pWindowRes, false); - buffer[0]->numOfElems += 1; + buffer[0]->num += 1; } lastTimestamp = ts; @@ -2935,7 +2917,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { tLoserTreeAdjust(pTree, pos + pTree->numOfEntries); } - if (buffer[0]->numOfElems != 0) { // there are data in buffer + if (buffer[0]->num != 0) { // there are data in buffer if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo); @@ -2993,10 +2975,10 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { // pagewise copy to dest buffer for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - buf->numOfElems = r; + buf->num = r; - memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char *)pQuery->sdata[i]->data) + offset * bytes, - buf->numOfElems * bytes); + memcpy(buf->data + pRuntimeEnv->offset[i] * buf->num, ((char *)pQuery->sdata[i]->data) + offset * bytes, + buf->num * bytes); } offset += r; @@ -3534,7 +3516,7 @@ void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIdx, T static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; - // Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group + // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); @@ -3788,80 +3770,43 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo); } -bool vnodeHasRemainResults(void *handle) { - SQInfo *pQInfo = (SQInfo *)handle; - - if (pQInfo == NULL || pQInfo->runtimeEnv.pQuery->interpoType == TSDB_INTERPO_NONE) { +bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; + + if (pQuery->fillType == TSDB_FILL_NONE) { + assert(pFillInfo == NULL); return false; } - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo; if (pQuery->limit.limit > 0 && pQuery->rec.rows >= pQuery->limit.limit) { return false; } - int32_t remain = taosNumOfRemainPoints(pInterpoInfo); + // There are results not returned to client, fill operation applied to the remain result set in the + // first place is required. + int32_t remain = taosNumOfRemainRows(pFillInfo); if (remain > 0) { return true; - } else { - if (pRuntimeEnv->pInterpoBuf == NULL) { - return false; - } - - // query has completed - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - /*TSKEY ekey =*/taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime, - pQuery->slidingTimeUnit, pQuery->precision); - // int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY - // *)pRuntimeEnv->pInterpoBuf[0]->data, - // remain, pQuery->intervalTime, ekey, - // pQuery->pointsToRead); - // return numOfTotal > 0; - assert(0); - return false; - } - - return false; - } -} - -static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **pDataSrc, int32_t numOfRows, - int32_t outputRows) { -#if 0 - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = &pRuntimeEnv->pQuery; - - assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE); - - // build support structure for performing interpolation - SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput); - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes; - pSchema[i].type = pQuery->pSelectExpr[i].type; } -// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead); - - char * srcData[TSDB_MAX_COLUMNS] = {0}; - int32_t functions[TSDB_MAX_COLUMNS] = {0}; - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - srcData[i] = pDataSrc[i]->data; - functions[i] = pQuery->pSelectExpr[i].base.functionId; + /* + * There are no results returned to client now. + * If query is not completed yet, the gaps between two results blocks need to be handled after next data block + * is retrieved from TSDB. + * + * NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result + * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the + * first result row in the actual result set will fill nothing. + */ + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, + pQuery->slidingTimeUnit, pQuery->precision); + int32_t numOfTotal = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pQuery->rec.capacity); + return numOfTotal > 0; } - - assert(0); -// int32_t numOfRes = taosDoInterpoResult(&pRuntimeEnv->interpoInfo, pQuery->interpoType, data, numOfRows, outputRows, -// pQuery->intervalTime, (int64_t *)pDataSrc[0]->data, pModel, srcData, -// pQuery->defaultVal, functions, pRuntimeEnv->pTabObj->pointsPerFileBlock); - - destroyColumnModel(pModel); - free(pSchema); -#endif - return 0; + + return false; } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { @@ -3887,37 +3832,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data // all data returned, set query over if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) { + if (pQInfo->runtimeEnv.stableQuery) { if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_OVER); } } else { - setQueryStatus(pQuery, QUERY_OVER); + if (!queryHasRemainResults(&pQInfo->runtimeEnv)) { + setQueryStatus(pQuery, QUERY_OVER); + } } } } -int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, - int32_t *numOfInterpo) { -// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; -// SQuery * pQuery = pRuntimeEnv->pQuery; -#if 0 +int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) { + SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { - numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - - TSKEY ekey = taosGetRevisedEndKey(pQuery->window.skey, pQuery->order.order, pQuery->intervalTime, - pQuery->slidingTimeUnit, pQuery->precision); - int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, - numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead); - - int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows); - assert(ret == numOfFinalRows); + taosGenerateDataBlock(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata, &pQuery->rec.rows, pQuery->rec.capacity); + int32_t ret = pQuery->rec.rows; + // todo apply limit output function /* reached the start position of according to offset value, return immediately */ if (pQuery->limit.offset == 0) { return ret; } - + if (pQuery->limit.offset < ret) { ret -= pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. @@ -3932,13 +3870,12 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage pQuery->limit.offset -= ret; ret = 0; } - - if (!vnodeHasRemainResults(pQInfo)) { + + if (!queryHasRemainResults(pRuntimeEnv)) { return ret; } } -#endif - + return 0; } @@ -4062,7 +3999,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; // if queried with value filter, do NOT forward query start position - if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { + if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->pFillInfo != NULL) { return true; } @@ -4199,6 +4136,27 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { } +static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { + int32_t numOfCols = pQuery->numOfOutput; + int32_t offset = 0; + + SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); + for(int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; + + pFillCol[i].col.bytes = pExprInfo->bytes; + pFillCol[i].col.type = pExprInfo->type; + pFillCol[i].col.offset = offset; + pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query + pFillCol[i].functionId = pExprInfo->base.functionId; + pFillCol[i].defaultVal.i = pQuery->defaultVal[i]; + + offset += pExprInfo->bytes; + } + + return pFillCol; +} + int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -4290,16 +4248,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool // pointInterpSupporterSetData(pQInfo, &interpInfo); // pointInterpSupporterDestroy(&interpInfo); -// int64_t rs = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, -// pQuery->precision); -// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); - // allocMemForInterpo(pQInfo, pQuery, pMeterObj); - -// if (!isPointInterpoQuery(pQuery)) { - // assert(pQuery->pos >= 0 && pQuery->slot >= 0); -// } - - // the pQuery->window.skey is changed during normalizedFirstQueryRange, so set the newest lastkey value + if (pQuery->fillType != TSDB_FILL_NONE) { + SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery); + pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput, + pQuery->slidingTime, pQuery->fillType, pColInfo); + } + return TSDB_CODE_SUCCESS; } @@ -4952,7 +4906,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { // here we can ignore the records in case of no interpolation // todo handle offset, in case of top/bottom interval query if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && - pQuery->interpoType == TSDB_INTERPO_NONE) { + pQuery->fillType == TSDB_FILL_NONE) { // maxOutput <= 0, means current query does not generate any results int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); @@ -4977,7 +4931,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { // skip blocks without load the actual data block from file if no filter condition present skipTimeInterval(pRuntimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0 && pRuntimeEnv->pFillInfo == NULL) { setQueryStatus(pQuery, QUERY_COMPLETED); return; } @@ -4994,22 +4948,18 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } // the offset is handled at prepare stage if no interpolation involved - if (pQuery->interpoType == TSDB_INTERPO_NONE) { + if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) { limitResults(pQInfo); break; } else { - taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.rows, pQuery->interpoType); - SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.rows * pQuery->pSelectExpr[i].bytes); - } - + TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, + pQuery->slidingTimeUnit, pQuery->precision); + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey); + taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata); numOfInterpo = 0; - pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, - pQuery->rec.rows, &numOfInterpo); + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo); - qTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.rows); + qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { limitResults(pQInfo); break; @@ -5035,19 +4985,20 @@ static void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (vnodeHasRemainResults(pQInfo)) { + if (queryHasRemainResults(pRuntimeEnv)) { /* * There are remain results that are not returned due to result interpolation * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ int32_t numOfInterpo = 0; - int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, - (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); - - limitResults(pQInfo); - - pQInfo->pointsInterpo += numOfInterpo; + int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo); + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo); + + qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); + if (pQuery->rec.rows > 0) { + limitResults(pQInfo); + } + qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); return; } @@ -5105,8 +5056,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p query is killed", pQInfo); } else {// todo set the table uid and tid in log -// SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); -// SPair* pair = taosArrayGet(p, 0); qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } @@ -5130,7 +5079,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); - // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); + // taosFillSetStartInfo(&pQInfo->runtimeEnv.pFillInfo, pQuery->size, pQInfo->query.fillType); if (pQuery->rec.rows == 0) { qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, @@ -5377,8 +5326,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->orderType = htons(pQueryMsg->orderType); } - pQueryMsg->interpoType = htons(pQueryMsg->interpoType); - if (pQueryMsg->interpoType != TSDB_INTERPO_NONE) { + pQueryMsg->fillType = htons(pQueryMsg->fillType); + if (pQueryMsg->fillType != TSDB_FILL_NONE) { pQueryMsg->defaultVal = (uint64_t)(pMsg); int64_t *v = (int64_t *)pMsg; @@ -5422,7 +5371,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, - pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); + pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); return 0; } @@ -5699,7 +5648,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; - pQuery->interpoType = pQueryMsg->interpoType; + pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; // todo do not allocate ?? @@ -5729,7 +5678,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, } // prepare the result buffer - pQuery->sdata = (SData **)calloc(pQuery->numOfOutput, POINTER_BYTES); + pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES); if (pQuery->sdata == NULL) { goto _cleanup; } @@ -5742,14 +5691,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, assert(pExprs[col].interBytes >= pExprs[col].bytes); // allocate additional memory for interResults that are usually larger then final results - size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(SData); - pQuery->sdata[col] = (SData *)calloc(1, size); + size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage); + pQuery->sdata[col] = (tFilePage *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _cleanup; } } - if (pQuery->interpoType != TSDB_INTERPO_NONE) { + if (pQuery->fillType != TSDB_FILL_NONE) { pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); if (pQuery->defaultVal == NULL) { goto _cleanup; diff --git a/src/util/inc/talgo.h b/src/util/inc/talgo.h index d5e089b687807cf8e028ca721c64959afc92914d..e71e340a213bc4809a4124d4fbab963b900d104c 100644 --- a/src/util/inc/talgo.h +++ b/src/util/inc/talgo.h @@ -26,6 +26,8 @@ extern "C" { #define TD_GE (TD_EQ | TD_GT) #define TD_LE (TD_EQ | TD_LT) +#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx)) + typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param); /** diff --git a/src/util/src/talgo.c b/src/util/src/talgo.c index f343912cde267855bc083fbde47f380e9609742e..3a594faeb9bbc20608737c0160cfd0f4373bb9a8 100644 --- a/src/util/src/talgo.c +++ b/src/util/src/talgo.c @@ -23,8 +23,6 @@ memcpy((__right), (__buf), (__size));\ } while (0); -#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx)) - static void median(void *src, size_t size, size_t s, size_t e, const void *param, __ext_compar_fn_t comparFn, void* buf) { int32_t mid = ((e - s) >> 1u) + s;