From 08bd021a6ff3b9209a1b6ac5318a310cc19ffea6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Apr 2023 14:49:14 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 9 +- source/libs/executor/src/aggregateoperator.c | 142 ++++++++- source/libs/executor/src/executor.c | 3 +- source/libs/executor/src/executorimpl.c | 295 +----------------- source/libs/executor/src/scanoperator.c | 8 +- source/libs/executor/src/timewindowoperator.c | 1 + 6 files changed, 155 insertions(+), 303 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 67c9db13ed..a6353f722a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -485,10 +485,11 @@ typedef struct SStreamScanInfo { } SStreamScanInfo; typedef struct { - SVnode* vnode; - SSDataBlock pRes; // result SSDataBlock - STsdbReader* dataReader; - SSnapContext* sContext; + SVnode* vnode; + SSDataBlock pRes; // result SSDataBlock + STsdbReader* dataReader; + SSnapContext* sContext; + STableListInfo* pTableListInfo; } SStreamRawScanInfo; typedef struct STableCountScanSupp { diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 565a1ccf25..a26e3ace7b 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -34,6 +34,12 @@ #include "ttypes.h" #include "vnode.h" +typedef struct { + bool hasAgg; + int32_t numOfRows; + int32_t startOffset; +} SFunctionCtxStatus; + static void destroyAggOperatorInfo(void* param); static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); @@ -44,10 +50,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator); static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); -static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, + const char* pKey); + +static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); +static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); + SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); @@ -366,7 +378,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); + int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); } @@ -376,7 +388,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin } // a new buffer page for each table. Needs to opt this design -int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { +int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { if (pWindowRes->pageId != -1) { return 0; } @@ -425,3 +437,127 @@ int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, return 0; } + +int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, + const char* pKey) { + int32_t code = 0; + // _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + + pAggSup->currentPageId = -1; + pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); + pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); + pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash); + + if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + uint32_t defaultPgsz = 0; + uint32_t defaultBufsz = 0; + getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); + + if (!osTempSpaceAvailable()) { + code = TSDB_CODE_NO_AVAIL_DISK; + qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey); + return code; + } + + code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); + if (code != TSDB_CODE_SUCCESS) { + qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); + return code; + } + + return code; +} + +void cleanupAggSup(SAggSupporter* pAggSup) { + taosMemoryFreeClear(pAggSup->keyBuf); + tSimpleHashCleanup(pAggSup->pResultRowHashTable); + destroyDiskbasedBuf(pAggSup->pResultBuf); +} + +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, + const char* pkey, void* pState) { + int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + if (pState) { + pSup->pCtx[i].saveHandle.pBuf = NULL; + pSup->pCtx[i].saveHandle.pState = pState; + pSup->pCtx[i].exprIdx = i; + } else { + pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; + } + } + + return TSDB_CODE_SUCCESS; +} + +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { + for (int32_t k = 0; k < numOfOutput; ++k) { + // keep it temporarily + SFunctionCtxStatus status = {0}; + functionCtxSave(&pCtx[k], &status); + + pCtx[k].input.startRowIndex = offset; + pCtx[k].input.numOfRows = forwardStep; + + // not a whole block involved in query processing, statistics data can not be used + // NOTE: the original value of isSet have been changed here + if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) { + pCtx[k].input.colDataSMAIsSet = false; + } + + if (pCtx[k].isPseudoFunc) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); + + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + + SColumnInfoData idata = {0}; + idata.info.type = TSDB_DATA_TYPE_BIGINT; + idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + idata.pData = p; + + SScalarParam out = {.columnData = &idata}; + SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; + pCtx[k].sfp.process(&tw, 1, &out); + pEntryInfo->numOfRes = 1; + } else { + int32_t code = TSDB_CODE_SUCCESS; + if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { + code = pCtx[k].fpSet.process(&pCtx[k]); + + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + T_LONG_JMP(taskInfo->env, code); + } + } + + // restore it + functionCtxRestore(&pCtx[k], &status); + } + } +} + +void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pStatus->hasAgg = pCtx->input.colDataSMAIsSet; + pStatus->numOfRows = pCtx->input.numOfRows; + pStatus->startOffset = pCtx->input.startRowIndex; +} + +void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pCtx->input.colDataSMAIsSet = pStatus->hasAgg; + pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.startRowIndex = pStatus->startOffset; +} \ No newline at end of file diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b05d01f596..e1cd509cba 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1192,7 +1192,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SSnapContext* sContext = pInfo->sContext; SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id); - STableListInfo* pTableListInfo = ((STableScanInfo*)(p->info))->base.pTableListInfo; + STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo; + if (setForSnapShot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id); return -1; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 265f948bcd..c120c6055d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -34,9 +34,7 @@ #include "ttypes.h" #include "vnode.h" -#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) - #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #if 0 @@ -81,14 +79,14 @@ static void releaseQueryBuf(size_t numOfTables); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, - const char* pKey); + static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo); +static SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -268,72 +266,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataSetInt64(pColData, 4, &pQueryWindow->ekey); } -typedef struct { - bool hasAgg; - int32_t numOfRows; - int32_t startOffset; -} SFunctionCtxStatus; - -static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { - pStatus->hasAgg = pCtx->input.colDataSMAIsSet; - pStatus->numOfRows = pCtx->input.numOfRows; - pStatus->startOffset = pCtx->input.startRowIndex; -} - -static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { - pCtx->input.colDataSMAIsSet = pStatus->hasAgg; - pCtx->input.numOfRows = pStatus->numOfRows; - pCtx->input.startRowIndex = pStatus->startOffset; -} - -void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, - int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { - for (int32_t k = 0; k < numOfOutput; ++k) { - // keep it temporarily - SFunctionCtxStatus status = {0}; - functionCtxSave(&pCtx[k], &status); - - pCtx[k].input.startRowIndex = offset; - pCtx[k].input.numOfRows = forwardStep; - - // not a whole block involved in query processing, statistics data can not be used - // NOTE: the original value of isSet have been changed here - if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) { - pCtx[k].input.colDataSMAIsSet = false; - } - - if (pCtx[k].isPseudoFunc) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); - - char* p = GET_ROWCELL_INTERBUF(pEntryInfo); - - SColumnInfoData idata = {0}; - idata.info.type = TSDB_DATA_TYPE_BIGINT; - idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - idata.pData = p; - - SScalarParam out = {.columnData = &idata}; - SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; - pCtx[k].sfp.process(&tw, 1, &out); - pEntryInfo->numOfRes = 1; - } else { - int32_t code = TSDB_CODE_SUCCESS; - if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { - code = pCtx[k].fpSet.process(&pCtx[k]); - - if (code != TSDB_CODE_SUCCESS) { - qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); - taskInfo->code = code; - T_LONG_JMP(taskInfo->env, code); - } - } - - // restore it - functionCtxRestore(&pCtx[k], &status); - } - } -} - static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -1018,161 +950,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } } -// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, -// STableQueryInfo* pTableQueryInfo) { -// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; -// -// STimeWindow tw = *win; -// getNextTimeWindow(pQueryAttr, &tw); -// -// if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) || -// (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) { -// -// // load the data block and check data remaining in current data block -// // TODO optimize performance -// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); -// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); -// -// tw = *win; -// int32_t startPos = -// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1); -// -// // set the abort info -// pQueryAttr->pos = startPos; -// -// // reset the query start timestamp -// pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos]; -// pQueryAttr->window.skey = pTableQueryInfo->win.skey; -// TSKEY key = pTableQueryInfo->win.skey; -// -// pWindowResInfo->prevSKey = tw.skey; -// int32_t index = pRuntimeEnv->resultRowInfo.curIndex; -// -// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); -// pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index -// -// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, -// lastKey:%" PRId64, -// GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, -// pQueryAttr->current->lastKey); -// -// return key; -// } else { // do nothing -// pQueryAttr->window.skey = tw.skey; -// pWindowResInfo->prevSKey = tw.skey; -// pTableQueryInfo->lastKey = tw.skey; -// -// return tw.skey; -// } -// -// return true; -// } - -// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) { -// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// -// // if queried with value filter, do NOT forward query start position -// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || -// pRuntimeEnv->pFillInfo != NULL) { -// return true; -// } -// -// /* -// * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for -// * pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset -// value is -// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points -// */ -// STimeWindow w = TSWINDOW_INITIALIZER; -// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); -// -// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; -// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; -// -// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) { -// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo); -// -// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { -// if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { -// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey, -// &w); pWindowResInfo->prevSKey = w.skey; -// } -// } else { -// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w); -// pWindowResInfo->prevSKey = w.skey; -// } -// -// // the first time window -// STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr); -// -// while (pQueryAttr->limit.offset > 0) { -// STimeWindow tw = win; -// -// if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) { -// pQueryAttr->limit.offset -= 1; -// pWindowResInfo->prevSKey = win.skey; -// -// // current time window is aligned with blockInfo.window.ekey -// // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL; -// if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) { -// pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; -// } -// } -// -// if (pQueryAttr->limit.offset == 0) { -// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo); -// return true; -// } -// -// // current window does not ended in current data block, try next data block -// getNextTimeWindow(pQueryAttr, &tw); -// -// /* -// * If the next time window still starts from current data block, -// * load the primary timestamp column first, and then find the start position for the next queried time window. -// * Note that only the primary timestamp column is required. -// * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually -// required -// * time window resides in current data block. -// */ -// if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) { -// -// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); -// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); -// -// if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) { -// pQueryAttr->limit.offset -= 1; -// } -// -// if (pQueryAttr->limit.offset == 0) { -// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo); -// return true; -// } else { -// tw = win; -// int32_t startPos = -// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); -// // set the abort info -// pQueryAttr->pos = startPos; -// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; -// pWindowResInfo->prevSKey = tw.skey; -// win = tw; -// } -// } else { -// break; // offset is not 0, and next time window begins or ends in the next block. -// } -// } -// } -// -// // check for error -// if (terrno != TSDB_CODE_SUCCESS) { -// T_LONG_JMP(pRuntimeEnv->env, terrno); -// } -// -// return true; -// } - int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) { p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES); if (p->pDownstream == NULL) { @@ -1301,70 +1078,6 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul return 0; } -int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, - const char* pKey) { - int32_t code = 0; - // _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - - pAggSup->currentPageId = -1; - pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); - pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); - pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash); - - if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - uint32_t defaultPgsz = 0; - uint32_t defaultBufsz = 0; - getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); - - if (!osTempSpaceAvailable()) { - code = TSDB_CODE_NO_AVAIL_DISK; - qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey); - return code; - } - - code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); - if (code != TSDB_CODE_SUCCESS) { - qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); - return code; - } - - return code; -} - -void cleanupAggSup(SAggSupporter* pAggSup) { - taosMemoryFreeClear(pAggSup->keyBuf); - tSimpleHashCleanup(pAggSup->pResultRowHashTable); - destroyDiskbasedBuf(pAggSup->pResultBuf); -} - -int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey, void* pState) { - int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - for (int32_t i = 0; i < numOfCols; ++i) { - if (pState) { - pSup->pCtx[i].saveHandle.pBuf = NULL; - pSup->pCtx[i].saveHandle.pState = pState; - pSup->pCtx[i].exprIdx = i; - } else { - pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; - } - } - - return TSDB_CODE_SUCCESS; -} - void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) { if (numOfRows == 0) { numOfRows = 4096; @@ -1474,8 +1187,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v return pTaskInfo; } -SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); - int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); @@ -2286,7 +1997,7 @@ void qStreamCloseTsdbReader(void* task) { } } -void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { +static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = pOperator->info; taosArrayPush(pList, &pScanInfo->base.pTableListInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 14edf9f13c..1bfa8ccfc4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2172,19 +2172,19 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { code = tsdbNextDataBlock(pInfo->dataReader, &hasNext); if (code) { tsdbReleaseDataBlock(pInfo->dataReader); - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } if (pInfo->dataReader && hasNext) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(pInfo->dataReader); - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); if (pBlock == NULL) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); @@ -2283,6 +2283,7 @@ static void destroyRawScanOperatorInfo(void* param) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; tsdbReaderClose(pRawScan->dataReader); destroySnapContext(pRawScan->sContext); + tableListDestroy(pRawScan->pTableListInfo); taosMemoryFree(pRawScan); } @@ -2304,6 +2305,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT goto _end; } + pInfo->pTableListInfo = tableListCreate(); pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4a52683c3a..29acc1fea2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -938,6 +938,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } + TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); -- GitLab