From 866b6fb6eb713b6a0cc69da9e10cc82e14213e64 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 6 Jul 2021 17:24:58 +0800 Subject: [PATCH] [TD-4335] group by multi column --- src/query/src/qExecutor.c | 73 +++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6cf31b9a30..7ff70153cc 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -204,7 +204,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); -static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); +static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo); static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex); @@ -1329,16 +1329,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr *pGroupbyExpr, SGroupbyOperatorInfo *pInfo) { - // check inited or not if (pInfo->prevData != NULL) { + // no need build group-by info return true; } pInfo->pGroupbyDataInfo = taosArrayInit(pGroupbyExpr->numOfGroupCols, sizeof(SGroupbyDataInfo)); for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); + for (int32_t i = 0; i < pSDataBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pSDataBlock->pDataBlock, i); if (pColInfo->info.colId == pColIndex->colId) { int32_t type = pColInfo->info.type; if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { @@ -1350,7 +1350,7 @@ static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr taosArrayInsert(pInfo->pGroupbyDataInfo, k, &info); break; } - if (i == pDataBlock->info.numOfCols - 1) { + if (i == pSDataBlock->info.numOfCols - 1) { // not found groupby col in dataBlock, error return false; } @@ -1358,7 +1358,7 @@ static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr } return true; } -static void createGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) { +static void buildGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) { char *p = calloc(1, pInfo->totalBytes); if (p == NULL) { *buf = NULL; return; } @@ -1367,12 +1367,12 @@ static void createGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperator SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i); SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, pDataInfo->index); + //TODO(yihaoDeng): handle float & double char *val = ((char *)pColData->pData) + pDataInfo->bytes * rowId; if (isNull(val, pDataInfo->type)) { p += pDataInfo->bytes; continue; } - memcpy(p, val, pDataInfo->bytes); p += pDataInfo->bytes; } @@ -1381,7 +1381,7 @@ static bool isGroupbyKeyEqual(void *a, void *b, void *ext) { SGroupbyOperatorInfo *pInfo = (SGroupbyOperatorInfo *)ext; int32_t offset = 0; - for (i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) { SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i); char *k1 = (char *)a + offset; char *k2 = (char *)b + offset; @@ -1398,7 +1398,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - if (buildGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) { + if (!buildGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) { qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); return; } @@ -1410,11 +1410,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn STimeWindow w = TSWINDOW_INITIALIZER; char *key = NULL; - int32_t num = 0; + int16_t num = 0; + int32_t type = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { - createGroupbyKeyBuf(pSDataBlock, pInfo, j, &key); - if (key == NULL) {} + buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key); + if (key == NULL) { /* handle malloc failure*/} if (pInfo->prevData == NULL) { // first row of pInfo->prevData = key; @@ -1425,12 +1426,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn tfree(key); continue; } - + if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes); + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo); } - - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex); + + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } @@ -1438,25 +1439,22 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); num = 1; + tfree(pInfo->prevData); pInfo->prevData = key; } if (num > 0) { - char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num); - memcpy(pInfo->prevData, val, bytes); - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo); } - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex); + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); - tfree(pInfo->prevData); } } @@ -1556,10 +1554,10 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic // not assign result buffer yet, add new result buffer, TODO remove it char* d = pData; int16_t len = bytes; - if (IS_VAR_DATA_TYPE(type)) { - d = varDataVal(pData); - len = varDataLen(pData); - } + //if (IS_VAR_DATA_TYPE(type)) { + // d = varDataVal(pData); + // len = varDataLen(pData); + //} SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); assert (pResultRow != NULL); @@ -1826,7 +1824,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t)); + pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + 64); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize); pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen); @@ -3651,7 +3649,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } -void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) { +void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfExprs = pQueryAttr->numOfOutput; @@ -3663,6 +3661,23 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction pCtx[i].param[0].arr = NULL; pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + + + // find colid in dataBlock + int32_t bytes, offset = 0; + char* val = NULL; + for (int32_t idx = 0; idx < taosArrayGetSize(pInfo->pGroupbyDataInfo); idx++) { + SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, idx); + if (pDataInfo->index == pExpr1->colInfo.colId) { + bytes = pDataInfo->bytes; + val = pInfo->prevData + offset; + break; + } + offset += pDataInfo->bytes; + if (idx == taosArrayGetSize(pInfo->pGroupbyDataInfo) - 1) { + continue; + } + } // TODO use hash to speedup this loop int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); @@ -6146,8 +6161,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); - pInfo->colIndex = -1; // group by column index - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); -- GitLab