diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 23fb0ab67cded77ff737fac4246343486e80eb95..3328b4b9ed3c200a7434804eaf9a436a02c801e4 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -958,7 +958,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp } while (1) { - int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity); + int64_t newRows = taosFillResultDataBlock(pFillInfo, (void**)pResPages, pLocalMerge->resColModel->capacity); if (pQueryInfo->limit.offset < newRows) { newRows -= pQueryInfo->limit.offset; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b2841a29c135a70e97f911850efa71afba7730da..88d472fdc1ca39c62a55037903613953e09ef89f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -185,6 +185,7 @@ typedef struct SSDataBlock { SDataBlockInfo info; } SSDataBlock; + typedef struct SQuery { SLimitVal limit; @@ -395,19 +396,22 @@ typedef struct SHashIntervalOperatorInfo { SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; SResultRowInfo resultRowInfo; + SSDataBlock *pRes; } SHashIntervalOperatorInfo; typedef struct SFillOperatorInfo { - SResultRowInfo *pResultRowInfo; - STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; + SSDataBlock *pRes; } SFillOperatorInfo; -typedef struct SFilterOperatorInfo { +typedef struct SHashGroupbyOperatorInfo { SResultRowInfo *pResultRowInfo; STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; -} SFilterOperatorInfo; + SQLFunctionCtx *pCtx; + SResultRowInfo resultRowInfo; + SSDataBlock *pRes; +} SHashGroupbyOperatorInfo; void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index aa6df9279acb9e3cecf20d71f726974ee89a4030..dc08dcce4e3311da14919e3aced58940fab9e462 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -24,6 +24,8 @@ extern "C" { #include "qExtbuffer.h" #include "taosdef.h" +struct SSDataBlock; + typedef struct { STColumn col; // column info int16_t functionId; // sql function id @@ -80,6 +82,8 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput); +void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput); + void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput); bool taosFillHasMoreResults(SFillInfo* pFillInfo); @@ -88,7 +92,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); -int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity); +int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ee9348963c1af674d57f97af5d4f2c5a12eeef56..47609cbd870ed66a70d3571936685f753f8993da 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -186,8 +186,12 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -//static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); + +static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock); + +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static void destroyOperatorInfo(SOperatorInfo* pOperator); void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); @@ -1310,6 +1314,7 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); tsCols = pColDataInfo->pData; + assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows-1] == pSDataBlock->info.window.ekey); } TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step); @@ -1384,6 +1389,39 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera } } +static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + STableQueryInfo* item = pQuery->current; + + SDataBlockInfo* pBlockInfo = &pSDataBlock->info; + + int16_t type = 0; + int16_t bytes = 0; + char* groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pSDataBlock->pDataBlock); + + for (int32_t j = 0; j < pBlockInfo->rows; ++j) { + int32_t offset = GET_COL_DATA_POS(pQuery, j, 1); + + char *val = groupbyColumnData + bytes * offset; + if (isNull(val, type)) { // ignore the null value + continue; + } + + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + int32_t functionId = pQuery->pExpr1[k].base.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } + } + } +} + /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -2333,6 +2371,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); } + + if (pQuery->fillType != TSDB_FILL_NONE) { + pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); @@ -3013,7 +3055,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SSDataBlock* pBlock, uint32_t* status) { + *status = BLK_DATA_NO_NEEDED; + pBlock->pDataBlock = NULL; + pBlock->pBlockStatis = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; int64_t groupId = pQuery->current->groupIndex; @@ -4613,6 +4658,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); + pBlock->info.rows = 0; if (!hasRemainData(pGroupResInfo)) { return; } @@ -4620,6 +4666,12 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti SQuery* pQuery = pRuntimeEnv->pQuery; int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC; doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, orderType, pBlock); + + SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); + + STimeWindow* w = &pBlock->info.window; + w->skey = *(int64_t*)pInfoData->pData; + w->ekey = *(int64_t*)(pInfoData->pData + TSDB_KEYSIZE * (pBlock->info.rows - 1)); } static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { @@ -4763,8 +4815,8 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } +#if 0 int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { - SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; @@ -4774,13 +4826,13 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { // todo apply limit output function /* reached the start position of according to offset value, return immediately */ if (pQuery->limit.offset == 0) { - qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pQInfo, pFillInfo->numOfRows, ret); + qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret); return ret; } if (pQuery->limit.offset < ret) { qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, remain:%" PRId64 ", new offset:%d", - pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0); + pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0); ret -= (int32_t)pQuery->limit.offset; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { //???pExpr1 or pExpr2 @@ -4792,7 +4844,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { return ret; } else { qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, " - "remain:%d, new offset:%" PRId64, pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0, + "remain:%d, new offset:%" PRId64, pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0, pQuery->limit.offset - ret); pQuery->limit.offset -= ret; @@ -4806,6 +4858,28 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { } } } +#endif + +int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; + + void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); + p[i] = pColInfoData->pData; + } + + pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pQuery->rec.capacity); + + // no data in current data after fill + int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pQuery->rec.capacity); + if (numOfTotal == 0) { + return 0; + } + + return pOutput->info.rows; +} void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -6525,12 +6599,19 @@ static SSDataBlock* doHashIntervalAgg(void* param) { SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo; SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; + if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = true; + } + + return pIntervalInfo->pRes; + } SQuery* pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; - SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; @@ -6550,59 +6631,96 @@ static SSDataBlock* doHashIntervalAgg(void* param) { hashIntervalAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock); } - pOperator->completed = true; - closeAllResultRows(&pRuntimeEnv->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRes); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - return pRes; + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = true; + } + + return pIntervalInfo->pRes; } -static SSDataBlock* doFill(void* param) { +static SSDataBlock* doHashGroupbyAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->completed) { return NULL; } - SFillOperatorInfo *pInfo = pOperator->optInfo; - SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; + SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo; + + SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; + if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = true; + } + + return pIntervalInfo->pRes; + } + + SOperatorInfo* upstream = pOperator->upstream; + pRuntimeEnv->pQuery->pos = 0; while(1) { - SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); + SSDataBlock* pBlock = upstream->exec(upstream); if (pBlock == NULL) { - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - return NULL; + break; } - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - memcpy(pQuery->sdata[i]->data, pColInfoData->pData, pColInfoData->info.bytes*pBlock->info.rows); - } + // the pDataBlock are always the same one, no need to call this again + setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order); + hashGroupbyAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock); + } - taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); - taosFillSetDataBlockFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata); + closeAllResultRows(&pRuntimeEnv->resultRowInfo); + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + finalizeQueryResult(pRuntimeEnv); - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - // here the pQuery->rec.rows == 0 - if (!hasRemainData(&pRuntimeEnv->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { - break; - } + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = true; + } + return pIntervalInfo->pRes; +} + +static SSDataBlock* doFill(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed) { return NULL; } - return NULL; -} + SFillOperatorInfo *pInfo = pOperator->optInfo; + SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; -//SSDataBlock* doFilter(void* param) { -// -//} + if (taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { + doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); + return pInfo->pRes; + } + + while(1) { + SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); + if (pBlock == NULL) { + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey); + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + } else { + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); + taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock); + } + + doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); + return pInfo->pRes; + } + + return pInfo->pRes; +} // todo set the attribute of query scan count static int32_t getNumOfScanTimes(SQuery* pQuery) { @@ -6734,36 +6852,41 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ pOperator->optInfo = pInfo; pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); return pOperator; } -UNUSED_FUNC SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { - SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); +static UNUSED_FUNC SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { + SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo)); pInfo->pRuntimeEnv = pRuntimeEnv; pInfo->pTableQueryInfo = pTableQueryInfo; + SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "FilterOp"; - pOperator->blockingOptr = false; + pOperator->name = "HashGroupbyOp"; + pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; - pOperator->exec = NULL; - pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; - pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; + pOperator->exec = doHashGroupbyAgg; + pOperator->pExpr = pQuery->pExpr1; + pOperator->numOfOutput = pQuery->numOfOutput; pOperator->optInfo = pInfo; + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + return pOperator; } -static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { - SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); +static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { + SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); pInfo->pRuntimeEnv = pRuntimeEnv; - pInfo->pTableQueryInfo = pTableQueryInfo; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6776,6 +6899,7 @@ static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTable pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; pOperator->optInfo = pInfo; + pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); return pOperator; } @@ -6796,7 +6920,7 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; + pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0; } static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { @@ -6857,7 +6981,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) #endif } -static void copyAndFillResult(SQInfo* pQInfo) { +static UNUSED_FUNC void copyAndFillResult(SQInfo* pQInfo) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -6877,7 +7001,7 @@ static void copyAndFillResult(SQInfo* pQInfo) { taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, lastKey); taosFillSetDataBlockFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata); - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); +// pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); if (pQuery->rec.rows > 0) { limitOperator(pQuery, pQInfo); @@ -6908,46 +7032,21 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; - -#if 0 -// scanOneTableDataBlocks(pRuntimeEnv, newStartKey); -// finalizeQueryResult(pRuntimeEnv); - - // skip offset result rows -// pQuery->rec.rows = 0; - - // not fill or no result generated during this query - if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->resultRowInfo.size == 0 || isPointInterpoQuery(pQuery)) { - // all data scanned, the group by normal column can return - int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->resultRowInfo); - if (pQuery->limit.offset > numOfClosed || numOfClosed == 0) { - return; - } - - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); - copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); - doSecondaryArithmeticProcess(pQuery); - - limitOperator(pQuery, pQInfo); - } else { - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); - return copyAndFillResult(pQInfo); - } -#endif + pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0; } void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; +#if 0 if (hasNotReturnedResults(pRuntimeEnv, &pRuntimeEnv->groupResInfo)) { if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { /* * There are remain results that are not returned due to result interpolation * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); +// pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); if (pQuery->rec.rows > 0) { limitOperator(pQuery, pQInfo); qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -6975,6 +7074,7 @@ void tableQueryImpl(SQInfo *pQInfo) { return; } +#endif // number of points returned during this query pQuery->rec.rows = 0; diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index bc6376b80743bcad3e1b07f903c970eeb9e5dee2..540b4e9dcce00d0ed97c919f0e5f2c92c245f924 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -23,18 +23,19 @@ #include "qFill.h" #include "qExtbuffer.h" #include "queryLog.h" +#include "qExecutor.h" #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) #define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) -static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows) { +static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) { SFillColInfo* pCol = &pFillInfo->pFillCol[j]; if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { continue; } - char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, genRows); + char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows); assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; @@ -44,17 +45,17 @@ static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows } } -static void setNullValueForRow(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfCol, int32_t rowIndex) { +static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) { // the first are always the timestamp column, so start from the second column. for (int32_t i = 1; i < numOfCol; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - char* output = elePtrAt(data[i]->data, pCol->col.bytes, rowIndex); + char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex); setNull(output, pCol->col.type, pCol->col.bytes); } } -static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** srcData, int64_t ts, bool outOfBound) { +static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) { char* prev = pFillInfo->prevValues; char* next = pFillInfo->nextValues; @@ -63,7 +64,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr // set the primary timestamp column value int32_t index = pFillInfo->numOfCurrent; - char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, index); + char* val = elePtrAt(data[0], TSDB_KEYSIZE, index); *(TSKEY*) val = pFillInfo->currentKey; // set the other values @@ -77,7 +78,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr continue; } - char* output = elePtrAt(data[i]->data, pCol->col.bytes, index); + char* output = elePtrAt(data[i], pCol->col.bytes, index); assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); } } else { // no prev value yet, set the value for NULL @@ -93,7 +94,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr continue; } - char* output = elePtrAt(data[i]->data, pCol->col.bytes, index); + char* output = elePtrAt(data[i], pCol->col.bytes, index); assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); } } else { // no prev value yet, set the value for NULL @@ -111,7 +112,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr int16_t type = pCol->col.type; int16_t bytes = pCol->col.bytes; - char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, index); + char *val1 = elePtrAt(data[i], pCol->col.bytes, index); if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { setNull(val1, pCol->col.type, bytes); continue; @@ -132,7 +133,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr continue; } - char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, index); + char* val1 = elePtrAt(data[i], pCol->col.bytes, index); assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); } } @@ -162,7 +163,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu } } -static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t outputRows) { +static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) { pFillInfo->numOfCurrent = 0; char** srcData = pFillInfo->pData; @@ -213,7 +214,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou continue; } - char* output = elePtrAt(data[i]->data, pCol->col.bytes, pFillInfo->numOfCurrent); + char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent); char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index); if (i == 0 || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) || @@ -255,7 +256,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou return pFillInfo->numOfCurrent; } -static int64_t appendFilledResult(SFillInfo* pFillInfo, tFilePage** output, int64_t resultCapacity) { +static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) { /* * These data are generated according to fill strategy, since the current timestamp is out of the time window of * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. @@ -420,6 +421,15 @@ void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pI } } +void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) { + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); + pFillInfo->pData[i] = pColData->pData; + +// memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); + } +} + void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput) { assert(pFillInfo->numOfRows == pInput->num); @@ -490,7 +500,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* return TSDB_CODE_SUCCESS; } -int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { +int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);