diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 88d472fdc1ca39c62a55037903613953e09ef89f..c9495f8b3c0f892054c92f7314f3fa6b1894bbcb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -411,6 +411,7 @@ typedef struct SHashGroupbyOperatorInfo { SQLFunctionCtx *pCtx; SResultRowInfo resultRowInfo; SSDataBlock *pRes; + int32_t colIndex; } SHashGroupbyOperatorInfo; void freeParam(SQueryParam *param); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 3698de044f5c3e5680d270000908a5e88306c5b9..ae0464062ad357cf131094318b507af09e553530 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -91,6 +91,6 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo); bool incNextGroup(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo); +int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv); #endif // TDENGINE_QUERYUTIL_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 47609cbd870ed66a70d3571936685f753f8993da..48a9795bae29e262066a7f3f93633273be7b3e9c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -154,6 +154,8 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { } static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); +static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols); + static void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); @@ -188,11 +190,16 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock); +static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); +static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock); +static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); +static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, + SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static void destroyOperatorInfo(SOperatorInfo* pOperator); void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); @@ -300,6 +307,33 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { return maxOutput; } +int64_t getNumOfResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput) { + SQuery *pQuery = pRuntimeEnv->pQuery; + bool hasMainFunction = hasMainOutput(pQuery); + + int64_t maxOutput = 0; + for (int32_t j = 0; j < numOfOutput; ++j) { + int32_t functionId = pCtx[j].functionId; + + /* + * ts, tag, tagprj function can not decide the output number of current query + * the number of output result is decided by main output + */ + if (hasMainFunction && + (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) { + continue; + } + + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]); + if (pResInfo != NULL && maxOutput < pResInfo->numOfRes) { + maxOutput = pResInfo->numOfRes; + } + } + + assert(maxOutput >= 0); + return maxOutput; +} + /* * the value of number of result needs to be update due to offset value upated. */ @@ -1302,11 +1336,9 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC } } -static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { +static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - SResultRowInfo*pResultRowInfo = &pRuntimeEnv->resultRowInfo; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); @@ -1389,32 +1421,32 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera } } -static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { +static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SResultRowInfo *pResultRowInfo, SQLFunctionCtx *pCtx, + SSDataBlock *pSDataBlock, int32_t colIndex) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; - SDataBlockInfo* pBlockInfo = &pSDataBlock->info; - - int16_t type = 0; - int16_t bytes = 0; - char* groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pSDataBlock->pDataBlock); + SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, colIndex); + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; - for (int32_t j = 0; j < pBlockInfo->rows; ++j) { + for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { int32_t offset = GET_COL_DATA_POS(pQuery, j, 1); - char *val = groupbyColumnData + bytes * offset; + char *val = pColInfoData->pData + bytes * offset; if (isNull(val, type)) { // ignore the null value continue; } - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex); + // TODO compare with the previous value to speedup the query processing + int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, pResultRowInfo, pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pExpr1[k].base.functionId; + for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pCtx[k], offset); } @@ -1545,7 +1577,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } } -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // not assign result buffer yet, add new result buffer, TODO remove it @@ -1560,7 +1592,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat return -1; } - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, d, len, true, groupIndex); + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); assert (pResultRow != NULL); int64_t v = -1; @@ -1588,6 +1620,50 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); return TSDB_CODE_SUCCESS; } +static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, + SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { + SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; + + // not assign result buffer yet, add new result buffer, TODO remove it + char* d = pData; + int16_t len = bytes; + if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) { + d = varDataVal(pData); + len = varDataLen(pData); + } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo); + return -1; + } + + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); + assert (pResultRow != NULL); + + int64_t v = -1; + GET_TYPED_DATA(v, int64_t, type, pData); + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + if (pResultRow->key == NULL) { + pResultRow->key = malloc(varDataTLen(pData)); + varDataCopy(pResultRow->key, pData); + } else { + assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); + } + } else { + pResultRow->win.skey = v; + pResultRow->win.ekey = v; + } + + if (pResultRow->pageId == -1) { + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage); + if (ret != 0) { + return -1; + } + } + + setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfCols); + initCtxOutputBuf_rv(pCtx, numOfCols); + return TSDB_CODE_SUCCESS; +} static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock) { SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; @@ -1612,6 +1688,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, *type = pQuery->colList[colIndex].type; *bytes = pQuery->colList[colIndex].bytes; + /* * the colIndex is acquired from the first tables of all qualified tables in this vnode during query prepare * stage, the remain tables may not have the required column in cache actually. So, the validation of required @@ -1630,6 +1707,26 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, return NULL; } +static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock) { + for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { + SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); + if (TSDB_COL_IS_TAG(pColIndex->flag)) { + continue; + } + + int32_t colId = pColIndex->colId; + + for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); + if (pColInfo->info.colId == colId) { + return i; + } + } + } + + assert(0); +} + static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -1931,7 +2028,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS continue; } - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex); + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, val, type, bytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } @@ -2356,25 +2453,38 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); // group by normal column, sliding window query, interval query are handled by interval query processor - if (!pQuery->stableQuery) { // interval (down sampling operation) - if (isFixedOutputQuery(pQuery)) { - pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); +// if (!pQuery->stableQuery) { // interval (down sampling operation) + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + if (pQuery->stableQuery) { + pRuntimeEnv->proot = createStableIntervalOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + } else { + pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + + if (pQuery->pExpr2 != NULL) { + pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } + + if (pQuery->fillType != TSDB_FILL_NONE) { + pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } + } + + } else if (pQuery->groupbyColumn) { + pRuntimeEnv->proot = createHashGroupbyAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); } - } else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + } else if (isFixedOutputQuery(pQuery)) { + pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); } - - if (pQuery->fillType != TSDB_FILL_NONE) { - pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); - } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); @@ -2387,7 +2497,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf if (pQuery->limit.limit > 0) { pRuntimeEnv->proot = createLimitOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); } - } +// } return TSDB_CODE_SUCCESS; @@ -3053,11 +3163,97 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW return TSDB_CODE_SUCCESS; } -int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo * pWindowResInfo, - void* pQueryHandle, SSDataBlock* pBlock, uint32_t* status) { +void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) { + int32_t numOfRows = pBlock->info.rows; + + int8_t *p = calloc(numOfRows, sizeof(int8_t)); + bool all = true; + + for (int32_t i = 0; i < numOfRows; ++i) { + bool qualified = false; + + for (int32_t k = 0; k < numOfFilterCols; ++k) { + char *pElem = (char *)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i; + + qualified = false; + for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { + SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j]; + + bool isnull = isNull(pElem, pFilterInfo[k].info.type); + if (isnull) { + if (pFilterElem->fp == isNullOperator) { + qualified = true; + break; + } else { + continue; + } + } else { + if (pFilterElem->fp == notNullOperator) { + qualified = true; + break; + } else if (pFilterElem->fp == isNullOperator) { + continue; + } + } + if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) { + qualified = true; + break; + } + } + + if (!qualified) { + break; + } + } + + p[i] = qualified ? 1 : 0; + if (!qualified) { + all = false; + } + } + + if (!all) { + int32_t start = 0; + int32_t len = 0; + for (int32_t j = 0; j < numOfRows; ++j) { + if (p[j] == 1) { + len++; + } else { + if (len > 0) { + int32_t cstart = j - len; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColumnInfoData->info.bytes; + memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes); + } + + start += len; + } + } + } + + pBlock->info.rows = start; + pBlock->pBlockStatis = NULL; // clean the block statistics info + + if (start > 0) { + SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0); + assert(pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && + pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX); + + pBlock->info.window.skey = *(int64_t *)pColumnInfoData->pData; + pBlock->info.window.ekey = *(int64_t *)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1)); + } + } + + tfree(p); +} + +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRowInfo *pWindowResInfo, + void *pQueryHandle, SSDataBlock *pBlock, uint32_t *status) { *status = BLK_DATA_NO_NEEDED; - pBlock->pDataBlock = NULL; + pBlock->pDataBlock = NULL; pBlock->pBlockStatis = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -3158,6 +3354,23 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* if (pBlock->pDataBlock == NULL) { return terrno; } + + if (pQuery->numOfFilterCols > 0) { + if (pQuery->pFilterInfo[0].pData == NULL) { + for(int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { + for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j); + + if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) { + pQuery->pFilterInfo[i].pData = pColInfo->pData; + break; + } + } + } + } + + filterDataBlock_rv(pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock); + } } return TSDB_CODE_SUCCESS; @@ -3478,6 +3691,64 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable) { } } } +void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr, SQLFunctionCtx* pCtx, int32_t numOfOutput) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + + SExprInfo *pExprInfo = &pQuery->pExpr1[0]; + if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) { + assert(pExprInfo->base.numOfParams == 1); + + int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); + + doSetTagValueInParam(pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes); + } else { + // set tag value, by which the results are aggregated. + int32_t offset = 0; + memset(pRuntimeEnv->tagVal, 0, pQuery->tagLen); + for (int32_t idx = 0; idx < numOfOutput; ++idx) { + SExprInfo* pLocalExprInfo = &pExpr[idx]; + + // ts_comp column required the tag value for join filter + if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) { + continue; + } + + // todo use tag column index to optimize performance + doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type, pLocalExprInfo->bytes); + + if (IS_NUMERIC_TYPE(pLocalExprInfo->type) || pLocalExprInfo->type == TSDB_DATA_TYPE_BOOL) { + memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->bytes); + } else { + memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pRuntimeEnv->pCtx[idx].tag.nLen); + } + + offset += pLocalExprInfo->bytes; + } + + // set the join tag for first column + SSqlFuncMsg *pFuncMsg = &pExprInfo->base; + if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTsBuf != NULL && + pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + assert(pFuncMsg->numOfParams == 1); + + int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + SColumnInfo *pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); + + doSetTagValueInParam(pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes); + + int16_t tagType = pRuntimeEnv->pCtx[0].tag.nType; + if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pQInfo, + pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.pz); + } else { + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pQInfo, + pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.i64); + } + } + } +} static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t srcDataType) { if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) { @@ -3592,7 +3863,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { // all results in current group have been returned to client, try next group if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { assert(pGroupResInfo->index == 0); - if ((pQInfo->code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pQInfo)) != TSDB_CODE_SUCCESS) { + if ((pQInfo->code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv)) != TSDB_CODE_SUCCESS) { return; } } @@ -3614,6 +3885,36 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } } +void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock) { + SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo; + + int32_t code = TSDB_CODE_SUCCESS; + while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { + // all results in current group have been returned to client, try next group + if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { + assert(pGroupResInfo->index == 0); + if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv)) != TSDB_CODE_SUCCESS) { + return; + } + } + + doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, TSDB_ORDER_ASC, pBlock); + + // current data are all dumped to result buffer, clear it + if (!hasRemainData(pGroupResInfo)) { + cleanupGroupResInfo(pGroupResInfo); + if (!incNextGroup(pGroupResInfo)) { + SET_STABLE_QUERY_OVER(pRuntimeEnv); + } + } + + // enough results in data buffer, return + if (pBlock->info.rows >= threshold) { + break; + } + } +} + static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; @@ -4216,7 +4517,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { setResultOutputBuf(pRuntimeEnv, buf); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - aAggs[pQuery->pExpr1[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); + aAggs[pRuntimeEnv->pCtx[j].functionId].xFinalize(&pRuntimeEnv->pCtx[j]); } /* @@ -4233,6 +4534,40 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { } } +void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo) { + SQuery *pQuery = pRuntimeEnv->pQuery; + if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { + // for each group result, call the finalize function for each column + if (pQuery->groupbyColumn) { + closeAllResultRows(pResultRowInfo); + } + + for (int32_t i = 0; i < pResultRowInfo->size; ++i) { + SResultRow *buf = pResultRowInfo->pResult[i]; + if (!isResultRowClosed(pResultRowInfo, i)) { + continue; + } + + setResultOutputBuf_rv(pRuntimeEnv, buf, pCtx, numOfOutput); + + for (int32_t j = 0; j < numOfOutput; ++j) { + aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); + } + + /* + * set the number of output results for group by normal columns, the number of output rows usually is 1 except + * the top and bottom query + */ + buf->numOfRows = (uint16_t)getNumOfResult(pRuntimeEnv); + } + + } else { + for (int32_t j = 0; j < numOfOutput; ++j) { + aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); + } + } +} + static bool hasMainOutput(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pExpr1[i].base.functionId; @@ -4281,8 +4616,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +void setExecutionContext(SQueryRuntimeEnv *pRuntimeEnv, int32_t groupIndex, TSKEY nextKey) { STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; @@ -4337,6 +4671,26 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { } } +void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols) { + // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); + + for (int32_t i = 0; i < numOfCols; ++i) { + pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, page); + + int32_t functionId = pCtx[i].functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + pCtx[i].ptsOutputBuf = pCtx[0].pOutput; + } + + /* + * set the output buffer information and intermediate buffer, + * not all queries require the interResultBuf, such as COUNT + */ + pCtx[i].resultInfo = getResultCell(pRuntimeEnv, pResult, i); + } +} + void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -4458,8 +4812,7 @@ int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) { * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there * is a previous result generated or not. */ -void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo *pTableQueryInfo = pQuery->current; SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo; @@ -5210,9 +5563,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) return TSDB_CODE_SUCCESS; } - if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) { - return TSDB_CODE_SUCCESS; - } +// if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) { +// return TSDB_CODE_SUCCESS; +// } STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); @@ -5408,9 +5761,8 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { } } -static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; +static FORCE_INLINE void setEnvForEachBlock(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) { + SQuery* pQuery = pRuntimeEnv->pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { @@ -5422,9 +5774,9 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa } if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - setIntervalQueryRange(pQInfo, pBlockInfo->window.skey); + setIntervalQueryRange(pRuntimeEnv, pBlockInfo->window.skey); } else { // non-interval query - setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step); + setExecutionContext(pRuntimeEnv, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step); } } @@ -5471,7 +5823,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); if (!pQuery->groupbyColumn) { - setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); + setEnvForEachBlock(pRuntimeEnv, *pTableQueryInfo, &blockInfo); } if (pQuery->stabledev) { @@ -5670,7 +6022,7 @@ static void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) { * * @param pQInfo */ -static void sequentialTableProcess(SQInfo *pQInfo) { +static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_COMPLETED); @@ -6133,7 +6485,7 @@ static void doCloseAllTimeWindow(SQInfo *pQInfo) { } } -static void multiTableQueryProcess(SQInfo *pQInfo) { +static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -6266,8 +6618,9 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { tfree(arithSup.data); } -static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { +static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { SSDataBlock *pBlock = &pTableScanInfo->block; + SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { pTableScanInfo->numOfBlocks += 1; @@ -6275,6 +6628,17 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { // todo check for query cancel tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1) { + STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( + pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.map, &pBlock->info.tid, sizeof(pBlock->info.tid)); + if (pTableQueryInfo == NULL) { + break; + } + + pQuery->current = *pTableQueryInfo; + doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); + } + // this function never returns error? uint32_t status; int32_t code = @@ -6303,7 +6667,7 @@ static SSDataBlock* doTableScan(void* param) { SQuery* pQuery = pRuntimeEnv->pQuery; while (pTableScanInfo->current < pTableScanInfo->times) { - SSDataBlock* p = doScanTableImpl(pTableScanInfo); + SSDataBlock* p = doTableScanImpl(pTableScanInfo); if (p != NULL) { return p; } @@ -6347,7 +6711,7 @@ static SSDataBlock* doTableScan(void* param) { pTableScanInfo->reverseTimes = 0; pTableScanInfo->order = cond.order; - SSDataBlock* p = doScanTableImpl(pTableScanInfo); + SSDataBlock* p = doTableScanImpl(pTableScanInfo); if (p != NULL) { return p; } @@ -6388,11 +6752,21 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf pTableScanInfo->pCtx = pAggInfo->pCtx; pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; - } else if (strcasecmp(name, "HashIntervalAggOp") == 0){ - SHashIntervalOperatorInfo* pIntervalInfo = pDownstream->optInfo; + } else if (strcasecmp(name, "HashIntervalAggOp") == 0) { + SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->optInfo; pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; + } else if (strcasecmp(name, "HashGroupbyAggOp") == 0) { + SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->optInfo; + + pTableScanInfo->pCtx = pGroupbyInfo->pCtx; + pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo; + } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { + SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo; + + pTableScanInfo->pCtx = pInfo->pCtx; + pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; } else { assert(0); } @@ -6467,9 +6841,12 @@ static SSDataBlock* doAggregation(void* param) { pOperator->completed = true; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult(pRuntimeEnv); - pRes->info.rows = getNumOfResult(pRuntimeEnv); + if (!pQuery->stableQuery) { + finalizeQueryResult(pRuntimeEnv); + } + + pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pCtx, pOperator->numOfOutput); destroySQLFunctionCtx(pCtx, pRes->info.numOfCols); return pRes; @@ -6501,6 +6878,8 @@ static SSDataBlock* doArithmeticOperation(void* param) { break; } + setTagVal_rv(pRuntimeEnv, pRuntimeEnv->pQuery->current->pTable, pOperator->pExpr, pArithInfo->pCtx, pOperator->numOfOutput); + // the pDataBlock are always the same one, no need to call this again for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pArithInfo->pCtx[i].size = pBlock->info.rows; @@ -6628,14 +7007,14 @@ static SSDataBlock* doHashIntervalAgg(void* param) { // the pDataBlock are always the same one, no need to call this again setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, order); - hashIntervalAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock); + hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, pIntervalInfo->pCtx, pBlock); } closeAllResultRows(&pRuntimeEnv->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult(pRuntimeEnv); + finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { @@ -6645,7 +7024,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { return pIntervalInfo->pRes; } -static SSDataBlock* doHashGroupbyAgg(void* param) { +static SSDataBlock* doSTableIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->completed) { return NULL; @@ -6656,6 +7035,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; if (hasRemainData(&pRuntimeEnv->groupResInfo)) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->completed = true; } @@ -6663,8 +7043,11 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { return pIntervalInfo->pRes; } + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t order = pQuery->order.order; + SOperatorInfo* upstream = pOperator->upstream; - pRuntimeEnv->pQuery->pos = 0; + pQuery->pos = 0; while(1) { SSDataBlock* pBlock = upstream->exec(upstream); @@ -6672,17 +7055,25 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { break; } + if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { + STableScanInfo* pScanInfo = upstream->optInfo; + order = getTableScanOrder(pScanInfo); + } + // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order); - hashGroupbyAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock); + setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, order); + setEnvForEachBlock(pRuntimeEnv, pRuntimeEnv->pQuery->current, &pBlock->info); + + hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo->pCtx, pBlock); } closeAllResultRows(&pRuntimeEnv->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult(pRuntimeEnv); + finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes); +// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); +// toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->completed = true; @@ -6691,6 +7082,55 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { return pIntervalInfo->pRes; } +static SSDataBlock* doHashGroupbyAgg(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed) { + return NULL; + } + + SHashGroupbyOperatorInfo *pInfo = pOperator->optInfo; + + SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; + if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); + if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = true; + } + return pInfo->pRes; + } + + SOperatorInfo* upstream = pOperator->upstream; + pRuntimeEnv->pQuery->pos = 0; + + while(1) { + SSDataBlock* pBlock = upstream->exec(upstream); + if (pBlock == NULL) { + break; + } + + // the pDataBlock are always the same one, no need to call this again + setInputSDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order); + if (pInfo->colIndex == -1) { + pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); + } + + hashGroupbyAgg(pRuntimeEnv, pOperator, &pInfo->resultRowInfo, pInfo->pCtx, pBlock, pInfo->colIndex); + } + + closeAllResultRows(&pInfo->resultRowInfo); + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo); + + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); + + if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = true; + } + + return pInfo->pRes; +} + static SSDataBlock* doFill(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->completed) { @@ -6858,16 +7298,42 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ return pOperator; } -static UNUSED_FUNC SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { + SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); + + pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pTableQueryInfo = pTableQueryInfo; + + SQuery* pQuery = pRuntimeEnv->pQuery; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "STableIntervalAggOp"; + pOperator->blockingOptr = true; + pOperator->completed = false; + pOperator->upstream = upstream; + pOperator->exec = doSTableIntervalAgg; + pOperator->pExpr = pQuery->pExpr1; + pOperator->numOfOutput = pQuery->numOfOutput; + pOperator->optInfo = pInfo; + + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + return pOperator; +} + +SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo)); pInfo->pRuntimeEnv = pRuntimeEnv; pInfo->pTableQueryInfo = pTableQueryInfo; + pInfo->colIndex = -1; SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "HashGroupbyOp"; + pOperator->name = "HashGroupbyAggOp"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; @@ -7161,13 +7627,15 @@ void stableQueryImpl(SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); - if (QUERY_IS_INTERVAL_QUERY(pQuery) || - (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { - multiTableQueryProcess(pQInfo); - } else { - assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn); - sequentialTableProcess(pQInfo); - } +// if (QUERY_IS_INTERVAL_QUERY(pQuery) || +// (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { + //multiTableQueryProcess(pQInfo); + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = pRuntimeEnv->outputBuf != NULL? pRuntimeEnv->outputBuf->info.rows:0; +// } else { +// assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn); +// sequentialTableProcess(pQInfo); +// } // record the total elapsed time pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index e8f6f9133bdc25be69ac283c65a73059474b3cf6..534c62f00ed251e93b281f63988b5d08cc16e1b3 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -438,7 +438,7 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void * } } -static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, void* qinfo) { +static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList) { bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery); int32_t code = TSDB_CODE_SUCCESS; @@ -456,7 +456,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes pTableQueryInfoList = malloc(POINTER_BYTES * size); if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) { - qError("QInfo:%p failed alloc memory", qinfo); + qError("QInfo:%p failed alloc memory", pRuntimeEnv->qinfo); code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _end; } @@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes int64_t endt = taosGetTimestampMs(); - qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", qinfo, + qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup, endt - startt); _end: @@ -539,14 +539,13 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes return code; } -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) { +int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv) { int64_t st = taosGetTimestampUs(); - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, pQInfo); + int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -556,7 +555,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) { break; } - qDebug("QInfo:%p no result in group %d, continue", pQInfo, pGroupResInfo->currentGroup); + qDebug("QInfo:%p no result in group %d, continue", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup); cleanupGroupResInfo(pGroupResInfo); incNextGroup(pGroupResInfo); } @@ -566,9 +565,9 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) { } int64_t elapsedTime = taosGetTimestampUs() - st; - qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo, + qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); - pQInfo->summary.firstStageMergeTime += elapsedTime; +// pQInfo->summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; }