From f9997a122ba4bc0b591366668969d916a5fc4a7c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 15 Mar 2022 16:51:50 +0800 Subject: [PATCH] [td-13039] refactor and fix bug. --- contrib/CMakeLists.txt | 2 +- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/dataDispatcher.c | 3 +- source/libs/executor/src/executorimpl.c | 60 +++++++++++------------ source/libs/function/src/builtinsimpl.c | 2 +- source/libs/transport/CMakeLists.txt | 2 +- 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 5e2bfc52e1..3b7b1f3c10 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -128,7 +128,7 @@ target_include_directories( set(CMAKE_PROJECT_INCLUDE_BEFORE "${CMAKE_SUPPORT_DIR}/EnableCMP0048.txt.in") add_subdirectory(zlib) target_include_directories( - zlib + zlibstatic PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/zlib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/zlib ) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 97a51a46ce..833ac13226 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -463,13 +463,12 @@ typedef struct SAggSupporter { SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer -// SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. + SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; typedef struct STableIntervalOperatorInfo { SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file SGroupResInfo groupResInfo; SInterval interval; STimeWindow win; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index a2e526c2bd..a0ee048d82 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -115,7 +115,8 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows; + // struct size + data payload + length for each column + pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t); pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ef2dbea4fd..947fb08ff9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -858,9 +858,9 @@ static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pR static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_t id, STimeWindow *win, bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, - int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) { + int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) { assert(win->skey <= win->ekey); - SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, + SResultRow *pResultRow = doSetResultOutBufByKey_rv(pAggSup->pResultBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup); if (pResultRow == NULL) { @@ -871,7 +871,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_ // set time window for current result pResultRow->win = (*win); *pResult = pResultRow; - setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowOutputBufInitCtx_rv(pAggSup->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); return TSDB_CODE_SUCCESS; } @@ -1039,7 +1039,7 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of pCtx[k].input.numOfRows = forwardStep; if (tsCol != NULL) { - pCtx[k].ptsList = &tsCol[pos]; + pCtx[k].ptsList = tsCol; } // not a whole block involved in query processing, statistics data can not be used @@ -1490,7 +1490,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul SResultRow* pResult = NULL; int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1512,7 +1512,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow w = pRes->win; ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, - tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); + tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1530,7 +1530,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // restore current time window ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1550,7 +1550,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // null data, failed to allocate more memory buffer int32_t code = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId, - pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -2897,6 +2897,8 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; + *status = BLK_DATA_ALL_NEEDED; + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); if (pBlock->pDataBlock == NULL) { return terrno; @@ -3371,7 +3373,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t int64_t tid = 0; int64_t groupId = 0; - SResultRow* pRow = doSetResultOutBufByKey_rv(NULL, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); + SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); @@ -3531,7 +3533,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD for (int32_t j = 0; j < numOfOutput; ++j) { pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); - struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; + struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { continue; } @@ -4730,7 +4732,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo *pOperator, bool* newgroup) { // } // this function never returns error? - uint32_t status; + uint32_t status = BLK_DATA_ALL_NEEDED; int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { @@ -5724,7 +5726,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { return pOrderColumns; } -static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput); +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput, const char* pKey); static void cleanupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { @@ -6102,7 +6104,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num); + int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -6556,7 +6558,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); @@ -6577,7 +6579,7 @@ static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { } blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -7137,14 +7139,13 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } -int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput) { +int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput, const char* pKey) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); -// pAggSup->pool = initResultRowPool(pAggSup->resultRowSize); pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL || @@ -7152,6 +7153,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n return TSDB_CODE_OUT_OF_MEMORY; } + int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/"); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + return TSDB_CODE_SUCCESS; } @@ -7160,16 +7166,16 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { taosHashCleanup(pAggSup->pResultRowHashTable); taosHashCleanup(pAggSup->pResultRowListSet); taosArrayDestroy(pAggSup->pResultRowArrayList); -// destroyResultRowPool(pAggSup->pool); + destroyDiskbasedBuf(pAggSup->pResultBuf); } static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock) { + int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey) { pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; pBasicInfo->capacity = numOfRows; - doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols); + doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey); } static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { @@ -7206,7 +7212,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = 1; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -7271,7 +7277,6 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupAggSup(&pInfo->aggSup); - destroyDiskbasedBuf(pInfo->pResultBuf); } void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { @@ -7347,7 +7352,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); int32_t numOfRows = 1; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -7499,17 +7504,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) { goto _error; } - code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); pOperator->name = "TimeIntervalAggOperator"; @@ -7595,7 +7595,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo goto _error; } - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); + int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 78e0e9f8e7..f0f00434f0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -394,7 +394,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { int32_t *pData = (int32_t*)pCol->pData; int32_t *val = (int32_t*) buf; - for (int32_t i = 0; i < pCtx->size; ++i) { + for (int32_t i = start; i < start + numOfRows; ++i) { if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index 5cc436cf32..20dd3f7ad2 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -12,7 +12,7 @@ target_link_libraries( PUBLIC os PUBLIC util PUBLIC common - PUBLIC zlib + PUBLIC zlibstatic ) if (${BUILD_WITH_UV_TRANS}) if (${BUILD_WITH_UV}) -- GitLab