From a9f0a30dc5d70ab1cc4dda6af51c12e6bc3e23f6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 2 Nov 2020 23:44:15 +0800 Subject: [PATCH] [TD-1870]: optimize the memory consumption. --- src/client/inc/tscLocalMerge.h | 5 +- src/client/src/tscFunctionImpl.c | 22 ++-- src/client/src/tscLocalMerge.c | 19 +--- src/query/inc/qExecutor.h | 4 +- src/query/inc/qUtil.h | 4 +- src/query/src/qExecutor.c | 167 +++++++++++-------------------- src/query/src/qUtil.c | 37 ++++--- src/util/src/hash.c | 10 +- 8 files changed, 107 insertions(+), 161 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index edd83ad071..0af8c8b576 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -64,9 +64,8 @@ typedef struct SLocalReducer { SColumnModel * resColModel; tExtMemBuffer ** pExtMemBuffer; // disk-based buffer SFillInfo* pFillInfo; // interpolation support structure - char * pFinalRes; // result data after interpo - tFilePage * discardData; - SResultRowCellInfo * pResInfo; + char* pFinalRes; // result data after interpo + tFilePage* discardData; bool discard; int32_t offset; // limit offset value bool orderPrjOnSTable; // projection query on stable diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index b22c178466..e0bd762333 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -99,7 +99,7 @@ typedef struct SSumInfo { // the attribute of hasResult is not needed since the num attribute would server as this purpose typedef struct SAvgInfo { double sum; - int64_t num; // num servers as the hasResult attribute in other struct + int64_t num; } SAvgInfo; typedef struct SStddevInfo { @@ -167,7 +167,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *interBytes = 0;//*bytes; + + if (functionId == TSDB_FUNC_INTERP) { + *interBytes = sizeof(SInterpInfoDetail); + } else { + *interBytes = 0; + } + return TSDB_CODE_SUCCESS; } @@ -175,21 +181,21 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct *type = TSDB_DATA_TYPE_BINARY; *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); - *interBytes = 0;//*bytes; + *interBytes = 0; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_COUNT) { *type = TSDB_DATA_TYPE_BIGINT; *bytes = sizeof(int64_t); - *interBytes = 0;//*bytes; + *interBytes = 0; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_ARITHM) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *interBytes = 0;//*bytes; + *interBytes = 0; return TSDB_CODE_SUCCESS; } @@ -334,11 +340,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } -//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf) { -// assert(GET_ROWCELL_INTERBUF(pResInfo) == NULL); -// GET_ROWCELL_INTERBUF(pResInfo) = buf; -//} - // set the query flag to denote that query is completed static void no_next_step(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -3998,7 +3999,6 @@ static void interp_function(SQLFunctionCtx *pCtx) { } } } - } SET_VAL(pCtx, pCtx->size, 1); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 58c2b76b72..c67edf5b5a 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -99,11 +99,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc pCtx->param[1].i64Key = pQueryInfo->order.orderColId; } -// SResultRowCellInfo *pResInfo = &pReducer->pResInfo[i]; - pCtx->interBufBytes = pExpr->interBytes; -// pResInfo->interResultBuf = calloc(1, (size_t) pCtx->interBufBytes); - - pCtx->resultInfo = &pReducer->pResInfo[i]; + pCtx->interBufBytes = pExpr->interBytes; + pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo)); pCtx->stableQuery = true; } @@ -345,7 +342,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); pReducer->pTempBuffer->num = 0; - pReducer->pResInfo = calloc(numOfCols, sizeof(SResultRowCellInfo)); tscCreateResPointerInfo(pRes, pQueryInfo); tscInitSqlContext(pCmd, pReducer, pDesc); @@ -496,6 +492,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; tVariantDestroy(&pCtx->tag); + taosTFree(pCtx->resultInfo); + if (pCtx->tagInfo.pTagCtxList != NULL) { taosTFree(pCtx->tagInfo.pTagCtxList); } @@ -509,15 +507,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { taosTFree(pLocalReducer->pTempBuffer); taosTFree(pLocalReducer->pResultBuf); - if (pLocalReducer->pResInfo != NULL) { - size_t num = tscSqlExprNumOfExprs(pQueryInfo); - for (int32_t i = 0; i < num; ++i) { -// taosTFree(pLocalReducer->pResInfo[i].interResultBuf); - } - - taosTFree(pLocalReducer->pResInfo); - } - if (pLocalReducer->pLoserTree) { taosTFree(pLocalReducer->pLoserTree->param); taosTFree(pLocalReducer->pLoserTree); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 30fc49e74a..d7db897cdf 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -168,7 +168,7 @@ typedef struct SQuery { typedef struct SQueryRuntimeEnv { jmp_buf env; - SResultRowCellInfo* resultInfo; // todo refactor to merge with SWindowResInfo + SResultRow* pResultRow; // todo refactor to merge with SWindowResInfo SQuery* pQuery; SQLFunctionCtx* pCtx; int32_t numOfRowsPerPage; @@ -190,7 +190,7 @@ typedef struct SQueryRuntimeEnv { SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer - SResultRowPool* pool; // window result object pool + SResultRowPool* pool; // window result object pool int32_t* rowCellInfoOffset;// offset value for each row result cell info } SQueryRuntimeEnv; diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 77ea0dd013..5d649261a6 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -42,7 +42,7 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); -static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { +static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]; } @@ -52,7 +52,7 @@ static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo, bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); -int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow); +int32_t initResultRow(SResultRow *pResultRow); static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, tFilePage* page) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 462966459e..c0390b51e4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -178,7 +178,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { // todo move to utility static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group); -static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); +static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); @@ -449,10 +449,9 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); - int32_t *p1 = (int32_t *) taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + int32_t *p1 = + (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (p1 != NULL) { pWindowResInfo->curIndex = *p1; } else { @@ -470,9 +469,6 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow } char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); - // pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SResultRow); - // pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity); - if (t == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -484,17 +480,18 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow pWindowResInfo->capacity = (int32_t)newCapacity; } -// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultRowCellInfo) + pRuntimeEnv->interBufSize) * inc; - SResultRow* pResult = getNewResultRow(pRuntimeEnv->pool); - pWindowResInfo->pResult[pWindowResInfo->size] = pResult; - int32_t ret = createQueryResultInfo(pQuery, pResult); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } + + SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool); + pWindowResInfo->pResult[pWindowResInfo->size] = pResult; + int32_t ret = initResultRow(pResult); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } // add a new result set for a new group pWindowResInfo->curIndex = pWindowResInfo->size++; - taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), + (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); } // too many time window in query @@ -502,7 +499,7 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } - return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); + return getResultRow(pWindowResInfo, pWindowResInfo->curIndex); } // get the correct time window according to the handled timestamp @@ -518,7 +515,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t } } else { int32_t slot = curTimeWindowIndex(pWindowResInfo); - SResultRow* pWindowRes = getWindowResult(pWindowResInfo, slot); + SResultRow* pWindowRes = getResultRow(pWindowResInfo, slot); w = pWindowRes->win; } @@ -1091,7 +1088,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * taosTFree(sasArray); } -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { if (isNull(pData, type)) { // ignore the null value return -1; } @@ -1113,7 +1110,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - uint64_t uid = 0; // uid is always set to be 0. + uint64_t uid = groupIndex; // uid is always set to be 0. SResultRow *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); if (pWindowRes == NULL) { return -1; @@ -1143,7 +1140,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } } - setWindowResOutputBuf(pRuntimeEnv, pWindowRes); + setResultOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); return TSDB_CODE_SUCCESS; } @@ -1374,7 +1371,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (groupbyColumnValue) { char *val = groupbyColumnData + bytes * offset; - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes); + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1607,29 +1604,15 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void setResultRowCellInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pRow, char* buf) { -// SQuery* pQuery = pRuntimeEnv->pQuery; -// -// char* p = buf; -// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// int32_t size = pQuery->pSelectExpr[i].interBytes; -// SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, i); -// setResultInfoBuf(pInfo, p); -// p += size; -// } -} - static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) { qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; - size_t size = pRuntimeEnv->interBufSize + pQuery->numOfOutput * sizeof(SResultRowCellInfo); - - pRuntimeEnv->resultInfo = calloc(1, size); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); + pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool);//calloc(1, sizeof(SResultRow)); - if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { + if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { goto _clean; } @@ -1666,7 +1649,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->inputType = pQuery->colList[index].type; } - assert(isValidDataType(pCtx->inputType)); pCtx->ptsOutputBuf = NULL; @@ -1711,13 +1693,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order } -// char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultRowCellInfo) * pQuery->numOfOutput; - - // set the intermediate result output buffer -// setResultRowCellInfo(pRuntimeEnv, pRuntimeEnv->resultInfo, NULL); - // if it is group by normal column, do not set output buffer, the output buffer is pResult - if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery) { + // fixed output query/multi-output query for normal table + if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { resetCtxOutputBuf(pRuntimeEnv); } @@ -1729,7 +1707,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order return TSDB_CODE_SUCCESS; _clean: - taosTFree(pRuntimeEnv->resultInfo); taosTFree(pRuntimeEnv->pCtx); return TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -1758,7 +1735,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosTFree(pCtx->tagInfo.pTagCtxList); } - taosTFree(pRuntimeEnv->resultInfo); taosTFree(pRuntimeEnv->pCtx); } @@ -2833,14 +2809,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) } SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; - SResultRow * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); + SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); TSKEY leftTimestamp = GET_INT64_VAL(b1); SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; - SResultRow * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); + SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2); @@ -2961,7 +2937,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { pQuery->rec.rows += offset; } -int64_t getNumOfResultWindowRes(SQuery *pQuery, SResultRow *pResultRow) { +int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) { + SQuery* pQuery = pRuntimeEnv->pQuery; + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].base.functionId; @@ -2973,7 +2951,7 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SResultRow *pResultRow) { continue; } - SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[j]; + SResultRowCellInfo *pResultInfo = getResultCell(pRuntimeEnv, pResultRow, j); assert(pResultInfo != NULL); if (pResultInfo->numOfRes > 0) { @@ -3042,18 +3020,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SLoserTreeInfo *pTree = NULL; tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); - SResultRow* pRow = calloc(1, getWindowResultSize(pRuntimeEnv)); - if (pRow == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pRow->pCellInfo = (SResultRowCellInfo*) ((char*) pRow + sizeof(SResultRow)); -// char* buf = (char*) pRow + sizeof(SResultRowCellInfo)*; -// if (buf == NULL) { -// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); -// } - - setResultRowCellInfo(pRuntimeEnv, pRow, NULL); + SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool); resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex); @@ -3069,23 +3036,20 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { taosTFree(pTableList); taosTFree(posList); taosTFree(pTree); -// taosTFree(pResultInfo); -// taosTFree(buf); - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } int32_t pos = pTree->pNode[0].index; SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; - SResultRow *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); + SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); TSKEY ts = GET_INT64_VAL(b); assert(ts == pWindowRes->win.skey); - int64_t num = getNumOfResultWindowRes(pQuery, pWindowRes); + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); if (num <= 0) { cs.position[pos] += 1; @@ -3128,7 +3092,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { } } else { // current page is not needed anymore - SResultRow *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); + SResultRow *pNextWindowRes = getResultRow(pWindowResInfo, cs.position[pos]); if (pNextWindowRes->pageId != currentPageId) { releaseResBufPage(pRuntimeEnv->pResultBuf, page); } @@ -3145,8 +3109,6 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { taosTFree(pTree); taosTFree(pTableList); taosTFree(posList); -// taosTFree(pResultInfo); - return -1; } } @@ -3251,8 +3213,8 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1; } -static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindowResInfo, int32_t order) { - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; +static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t order) { + SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { bool closed = getTimeWindowResStatus(pWindowResInfo, i); @@ -3260,17 +3222,18 @@ static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindow continue; } - SResultRow *buf = getWindowResult(pWindowResInfo, i); + SResultRow *pRow = getResultRow(pWindowResInfo, i); // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functId = pQuery->pSelectExpr[j].base.functionId; + SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, j); if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { - buf->pCellInfo[j].complete = false; + pInfo->complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { - buf->pCellInfo[j].complete = true; + pInfo->complete = true; } } } @@ -3284,7 +3247,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { // group by normal columns and interval query on normal table SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { - disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order); + disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor int32_t functId = pQuery->pSelectExpr[j].base.functionId; @@ -3334,22 +3297,16 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { } } -int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow) { -// int32_t numOfCols = pQuery->numOfOutput; - +int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); pResultRow->pageId = -1; pResultRow->rowId = -1; - -// char* buf = (char*) pResultRow->pCellInfo + numOfCols * sizeof(SResultRowCellInfo); - - // set the intermediate result output buffer -// setResultRowCellInfo(pRunimeEnv, pResultRow, buf); return TSDB_CODE_SUCCESS; } void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; + SResultRow* pRow = pRuntimeEnv->pResultRow; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -3359,8 +3316,9 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { * set the output buffer information and intermediate buffer * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. */ - RESET_RESULT_INFO(&pRuntimeEnv->resultInfo[i]); - pCtx->resultInfo = &pRuntimeEnv->resultInfo[i]; + SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i); + RESET_RESULT_INFO(pCellInfo); + pCtx->resultInfo = pCellInfo; // set the timestamp output buffer for top/bottom/diff query int32_t functionId = pQuery->pSelectExpr[i].base.functionId; @@ -3478,12 +3436,12 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pResult = getWindowResult(pWindowResInfo, i); + SResultRow *pResult = getResultRow(pWindowResInfo, i); if (!pResult->closed) { continue; } - setWindowResOutputBuf(pRuntimeEnv, pResult); + setResultOutputBuf(pRuntimeEnv, pResult); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int16_t functId = pQuery->pSelectExpr[j].base.functionId; @@ -3709,7 +3667,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { continue; } - setWindowResOutputBuf(pRuntimeEnv, buf); + setResultOutputBuf(pRuntimeEnv, buf); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); @@ -3816,11 +3774,11 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { // record the current active group id pRuntimeEnv->prevGroupId = groupIndex; - setWindowResOutputBuf(pRuntimeEnv, pWindowRes); + setResultOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); } -void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { +void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group @@ -3836,10 +3794,10 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { } /* - * set the output buffer information and intermediate buffer + * set the output buffer information and intermediate buffer, * not all queries require the interResultBuf, such as COUNT */ - pCtx->resultInfo = &pResult->pCellInfo[i]; + pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i); } } @@ -3852,7 +3810,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->resultInfo = &pResult->pCellInfo[i]; + pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i); if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) { continue; } @@ -4100,7 +4058,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { continue; } - pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->pCellInfo[j].numOfRes)); + SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); + pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); } } } @@ -4891,7 +4850,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); - if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) { + if (isPointInterpoQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); @@ -4921,11 +4880,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pRuntimeEnv->pQueryHandle = NULL; } - if (isFirstLastRowQuery(pQuery)) { - assert(0); // last_row query switch to other routine to handle - } else { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo); - } + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo); taosArrayDestroy(tx); taosArrayDestroy(g1); @@ -4939,10 +4894,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(taosArrayGetSize(s) >= 1); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); - if (isFirstLastRowQuery(pQuery)) { - assert(taosArrayGetSize(s) == 1); - } - taosArrayDestroy(s); // here we simply set the first table as current table @@ -5025,7 +4976,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { SResultRow *pResult = pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->pCellInfo[j].numOfRes)); + SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); + pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); } } @@ -5296,7 +5248,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead -// finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -6287,8 +6238,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou calResultBufSize(pQuery); for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { - assert(pExprs[col].interBytes >= pExprs[col].bytes || pExprs[col].interBytes == 0); - // allocate additional memory for interResults that are usually larger then final results size_t size = (size_t)((pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage)); pQuery->sdata[col] = (tFilePage *)calloc(1, size); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 600f48928a..824cae3932 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -87,7 +87,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { assert(num >= 0 && num <= numOfClosed); int16_t type = pWindowResInfo->type; - uint64_t uid = 0; // uid is always set to be 0. + STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable); // uid is always set to be 0. char *key = NULL; int16_t bytes = -1; @@ -104,7 +104,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { bytes = tDataTypeDesc[pWindowResInfo->type].nSize; } - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, id->uid); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); } else { break; @@ -137,14 +137,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { bytes = tDataTypeDesc[pWindowResInfo->type].nSize; } - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, id->uid); int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); assert(p != NULL); int32_t v = (*p - num); assert(v >= 0 && v <= pWindowResInfo->size); - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, id->uid); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); } @@ -217,11 +217,11 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_ } bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { - return (getWindowResult(pWindowResInfo, slot)->closed == true); + return (getResultRow(pWindowResInfo, slot)->closed == true); } void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { - getWindowResult(pWindowResInfo, slot)->closed = true; + getResultRow(pWindowResInfo, slot)->closed = true; } void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) { @@ -229,18 +229,21 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) { return; } - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); + // the result does not put into the SDiskbasedResultBuf, ignore it. + if (pWindowRes->pageId >= 0) { + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { - SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; - - char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); - size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; - memset(s, 0, size); - - RESET_RESULT_INFO(pResultInfo); + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { + SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; + + char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); + size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; + memset(s, 0, size); + + RESET_RESULT_INFO(pResultInfo); + } } - + pWindowRes->numOfRows = 0; pWindowRes->pageId = -1; pWindowRes->rowId = -1; @@ -327,6 +330,8 @@ SResultRow* getNewResultRow(SResultRowPool* p) { } p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock; + initResultRow(ptr); + return ptr; } diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 10767ee30f..22b5da1491 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -114,10 +114,12 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p * @param dsize size of actual data * @return hash node */ -static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { +static FORCE_INLINE SHashNode *doUpdateHashNode(SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { assert(pNode->keyLen == pNewNode->keyLen); if (prev != NULL) { prev->next = pNewNode; + } else { + pe->next = pNewNode; } pNewNode->next = pNode->next; @@ -242,7 +244,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } else { // not support the update operation, return error if (pHashObj->enableUpdate) { - doUpdateHashNode(prev, pNode, pNewNode); + doUpdateHashNode(pe, prev, pNode, pNewNode); + DO_FREE_HASH_NODE(pNode); + } else { + DO_FREE_HASH_NODE(pNewNode); } if (pHashObj->type == HASH_ENTRY_LOCK) { @@ -252,7 +257,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // enable resize __rd_unlock(&pHashObj->lock, pHashObj->type); - DO_FREE_HASH_NODE(pNewNode); return pHashObj->enableUpdate ? 0 : -1; } } -- GitLab