From cc0110d32b6b38909c4b58a23630e2f07ff06a1d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Mar 2021 14:50:43 +0800 Subject: [PATCH] [td-2895] refactor. --- src/inc/ttype.h | 8 ++-- src/query/inc/qExecutor.h | 4 +- src/query/src/qAggMain.c | 7 +-- src/query/src/qExecutor.c | 91 ++++++++++++++++++++++----------------- 4 files changed, 58 insertions(+), 52 deletions(-) diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 2f6b80f89d..662a23bfdb 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -171,10 +171,10 @@ extern tDataTypeDescriptor tDataTypes[15]; bool isValidDataType(int32_t type); -void setVardataNull(char* val, int32_t type); -void setNull(char *val, int32_t type, int32_t bytes); -void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); -void* getNullValue(int32_t type); +void setVardataNull(char* val, int32_t type); +void setNull(char *val, int32_t type, int32_t bytes); +void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); +void *getNullValue(int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 04ff27fa33..6baf7549ea 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -86,7 +86,7 @@ typedef struct SResultRow { bool closed; // this result status: closed or opened uint32_t numOfRows; // number of rows of current time window SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo - union {STimeWindow win; char* key;}; // start key of current time window + union {STimeWindow win; char* key;}; // start key of current result row } SResultRow; typedef struct SGroupResInfo { @@ -131,7 +131,6 @@ typedef struct SSingleColumnFilterInfo { typedef struct STableQueryInfo { TSKEY lastKey; int32_t groupIndex; // group id in table list - int16_t queryRangeSet; // denote if the query range is set, only available for interval query tVariant tag; STimeWindow win; STSCursor cur; @@ -409,6 +408,7 @@ typedef struct SFillOperatorInfo { typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; int32_t colIndex; + char *prevData; // previous group by value } SGroupbyOperatorInfo; void freeParam(SQueryParam *param); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 2572feb2d7..c82b36e1f0 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -2167,12 +2167,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) { // do nothing at the first stage SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->hasResult != DATA_SET_FLAG) { - if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pCtx->pOutput, pCtx->outputType); - } else { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); - } - + setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); return; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index eef1515fcd..2a8f55a81d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -190,9 +190,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static bool isPointInterpoQuery(SQuery *pQuery); @@ -1283,26 +1282,37 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } } -static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, - SSDataBlock *pSDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - STableQueryInfo* item = pQuery->current; +static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STableQueryInfo* item = pRuntimeEnv->pQuery->current; SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; + + if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + qError("QInfo:%p group by not supported on double/float columns, abort", pRuntimeEnv->qinfo); + return; + } for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { - char *val = pColInfoData->pData + bytes * j; - if (isNull(val, type)) { // TODO: ignore the null value + char* val = pColInfoData->pData + bytes * j; + if (isNull(val, type)) { continue; } - // TODO compare with the previous value to speedup the query processing - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &pInfo->binfo.resultRowInfo, pInfo->binfo.pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->binfo.rowCellInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) { + if (pInfo->prevData == NULL) { + pInfo->prevData = malloc(bytes); + } + + memcpy(pInfo->prevData, val, bytes); + + int32_t ret = + setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } } for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { @@ -1315,25 +1325,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat } } -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* rowCellInfoOffset) { - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - - // not assign result buffer yet, add new result buffer, TODO remove it - char* d = pData; - int16_t len = bytes; - if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) { - d = varDataVal(pData); - len = varDataLen(pData); - } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { - SQInfo* pQInfo = pRuntimeEnv->qinfo; - qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo); - return -1; - } - - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); - assert (pResultRow != NULL); - +static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { int64_t v = -1; GET_TYPED_DATA(v, int64_t, type, pData); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { @@ -1347,7 +1339,27 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow pResultRow->win.skey = v; pResultRow->win.ekey = v; } +} + +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { + SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; + + int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; + SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo; + SQLFunctionCtx *pCtx = pInfo->binfo.pCtx; + + // not assign result buffer yet, add new result buffer, TODO remove it + char* d = pData; + int16_t len = bytes; + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + d = varDataVal(pData); + len = varDataLen(pData); + } + + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); + assert (pResultRow != NULL); + setResultRowKey(pResultRow, pData, type); if (pResultRow->pageId == -1) { int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize); if (ret != 0) { @@ -4654,7 +4666,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { return pIntervalInfo->pRes; } -static SSDataBlock* doHashGroupbyAgg(void* param) { +static SSDataBlock* hashGroupbyAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4688,16 +4700,14 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); } - hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); + doHashGroupbyAgg(pOperator, pInfo, pBlock); } pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); - // todo need finalize the result - if (!pRuntimeEnv->pQuery->stableQuery) { - // finalize include the update of result rows + if (!pRuntimeEnv->pQuery->stableQuery) { // finalize include the update of result rows finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); } else { updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); @@ -4830,6 +4840,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + tfree(pInfo->prevData); } static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { @@ -5000,7 +5011,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doHashGroupbyAgg; + pOperator->exec = hashGroupbyAggregate; pOperator->cleanup = destroyGroupbyOperatorInfo; return pOperator; -- GitLab