diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 0469b556874ce59d6c87e8237fcedf197766feeb..d01e1fcae3b4824959dced85f31b3cc252cda6c5 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -440,6 +440,15 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu rlen += pExpr->base.resBytes; } + int32_t pg = DEFAULT_PAGE_SIZE; + int32_t overhead = sizeof(tFilePage); + while((pg - overhead) < rlen * 2) { + pg *= 2; + } + + if (*nBufferSizes < pg){ + *nBufferSizes = 2 * pg; + } int32_t capacity = 0; if (rlen != 0) { if ((*nBufferSizes) < rlen) { @@ -454,12 +463,6 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t pg = DEFAULT_PAGE_SIZE; - int32_t overhead = sizeof(tFilePage); - while((pg - overhead) < rlen * 2) { - pg *= 2; - } - assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups); for (int32_t i = 0; i < numOfSub; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(*nBufferSizes, rlen, pg, pModel); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e67dd7e14b6ae7c2032ba459242e2ce3db9b1cbb..01cee60f209853af709b5469b4a0f68b4e31225c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -4938,7 +4938,11 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu pse->colInfo.colIndex = i; pse->colType = pExpr->base.resType; - pse->colBytes = pExpr->base.resBytes; + if(pExpr->base.resBytes > INT16_MAX && pExpr->base.functionId == TSDB_FUNC_UNIQUE){ + pQueryAttr->interBytesForGlobal = pExpr->base.resBytes; + }else{ + pse->colBytes = pExpr->base.resBytes; + } } { diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index 7a401d8a7f71c094654d06a2ed37ae3fd7fc9c94..e6b7dd1463a754bfaa78bb1081e3b6b0b753d752 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -54,7 +54,7 @@ typedef struct SSqlExpr { int32_t resBytes; // length of return value int32_t interBytes; // inter result buffer size - int16_t colType; // table column type + int16_t colType; // table column type, this should be int32_t, because it is too small for globale merge stage, pQueryAttr->interBytesForGlobal int16_t colBytes; // table column bytes int16_t numOfParams; // argument value of each function diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 23e52bfaf97a9bf4923e54e5cdde87136a1c8ef9..aa5e2abd803d611be005115ba387a45e1138ed56 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -175,7 +175,7 @@ typedef struct SQLFunctionCtx { void * pInput; // input data buffer uint32_t order; // asc|desc int16_t inputType; - int16_t inputBytes; + int32_t inputBytes; int16_t outputType; int32_t outputBytes; // size of results, determined by function and input column data type @@ -202,7 +202,7 @@ typedef struct SQLFunctionCtx { SPoint1 start; SPoint1 end; - SHashObj *pUniqueSet; // for unique function + SHashObj **pUniqueSet; // for unique function } SQLFunctionCtx; typedef struct SAggFunctionInfo { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ffe8e48aa011aeda1be3303093ea88ee0ef878e7..c4aebc07b15749da343b4d0175812ca6e4211021 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -90,6 +90,7 @@ typedef struct SResultRow { SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo STimeWindow win; char *key; // start key of current result row + SHashObj *uniqueHash; // for unique function } SResultRow; typedef struct SResultRowCell { @@ -282,7 +283,7 @@ typedef struct SQueryAttr { STableGroupInfo tableGroupInfo; // table list SArray int32_t vgId; SArray *pUdfInfo; // no need to free - int32_t maxUniqueResult; + int32_t interBytesForGlobal; } SQueryAttr; typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 0882df77c2a8bc38560269ce093568fd96467dae..ce0a0648f554e007e46d441d9607d1d8edb971e3 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -70,8 +70,12 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage* int32_t offset) { assert(rowOffset >= 0 && pQueryAttr != NULL); - int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); - return ((char *)page->data) + rowOffset + offset * numOfRows; + int64_t numOfRows = (int64_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); + numOfRows *= offset; + if(numOfRows >= INT32_MAX){ + assert(0); + } + return ((char *)page->data) + rowOffset + numOfRows; } bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index b2a54f9c60b3910abcbb5b7726ede3a1b4eba78c..18ef2221b0db81ada8438524367baca81006e91f 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -5190,11 +5190,13 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes if (!function_setup(pCtx, pResInfo)) { return false; } + *pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + return true; } static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag, int32_t bytes){ - UniqueUnit *unique = taosHashGet(pCtx->pUniqueSet, pData, bytes); + UniqueUnit *unique = taosHashGet(*pCtx->pUniqueSet, pData, bytes); if (unique == NULL) { size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; char *tmp = pInfo->res + pInfo->num * size; @@ -5220,7 +5222,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK } } - taosHashPut(pCtx->pUniqueSet, pData, bytes, &tmp, sizeof(UniqueUnit*)); + taosHashPut(*pCtx->pUniqueSet, pData, bytes, &tmp, sizeof(UniqueUnit*)); pInfo->num++; }else if(unique->timestamp > timestamp){ unique->timestamp = timestamp; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c4cfdd1d4d80beb6f896bef69201840d995de9a5..8d2bb10295a9d30933cbc6360ae1acb913626c1b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -281,7 +281,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) { tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); - int16_t offset = supporter->dataOffset; + int32_t offset = supporter->dataOffset; char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); char *in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); @@ -1964,8 +1964,12 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->requireNull = false; } - pCtx->inputBytes = pSqlExpr->colBytes; pCtx->inputType = pSqlExpr->colType; + if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && pSqlExpr->functionId == TSDB_FUNC_UNIQUE){ + pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal; + }else{ + pCtx->inputBytes = pSqlExpr->colBytes; + } pCtx->ptsOutputBuf = NULL; @@ -2030,8 +2034,6 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; } else if (functionId == TSDB_FUNC_SCALAR_EXPR) { pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i]; - } else if (functionId == TSDB_FUNC_UNIQUE){ - pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } } @@ -2056,9 +2058,6 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { tVariantDestroy(&pCtx[i].tag); tfree(pCtx[i].tagInfo.pTagCtxList); - if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){ - taosHashClear(pCtx[i].pUniqueSet); - } } tfree(pCtx); @@ -3688,6 +3687,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i RESET_RESULT_INFO(pCellInfo); pCtx[i].resultInfo = pCellInfo; + pCtx[i].pUniqueSet = &pRow->uniqueHash; pCtx[i].pOutput = pData->pData; pCtx[i].currentStage = stage; assert(pCtx[i].pOutput != NULL); @@ -4022,6 +4022,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe int32_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); + pCtx[i].pUniqueSet = &pResult->uniqueHash; SResultRowCellInfo* pResInfo = pCtx[i].resultInfo; if (pResInfo->initialized && pResInfo->complete) { @@ -4097,7 +4098,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); - int16_t offset = 0; + int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset); offset += pCtx[i].outputBytes; @@ -4115,6 +4116,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF * not all queries require the interResultBuf, such as COUNT */ pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); + pCtx[i].pUniqueSet = &pResult->uniqueHash; } } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index c585f44050c2db603952c1e53ceb21098571f772..1490564970692d258e150722b2407d74f8a4e628 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -88,6 +88,9 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { for(int32_t i = 0; i < pResultRowInfo->size; ++i) { if (pResultRowInfo->pResult[i]) { tfree(pResultRowInfo->pResult[i]->key); + if (pResultRowInfo->pResult[i]->uniqueHash){ + taosHashCleanup(pResultRowInfo->pResult[i]->uniqueHash); + } } } @@ -153,11 +156,11 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 if (pResultRow->pageId >= 0) { tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); - int16_t offset = 0; + int32_t offset = 0; for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; - int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resBytes; + int32_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resBytes; char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset); memset(s, 0, size);