From 98ba80a993556d12fcc8cffa26d70c711445bc23 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 23 Nov 2020 11:13:46 +0800 Subject: [PATCH] [TD-225] refactor codes. --- src/query/inc/qUtil.h | 2 ++ src/query/src/qExecutor.c | 54 ++++++++++++++++++++++++-------------- src/query/src/qResultbuf.c | 2 +- src/query/src/qUtil.c | 51 +++++++++++++++++------------------ 4 files changed, 63 insertions(+), 46 deletions(-) diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 4cd0e60ebe..b2c4b6e2ec 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -80,5 +80,7 @@ void* destroyResultRowPool(SResultRowPool* p); int32_t getNumOfAllocatedResultRows(SResultRowPool* p); int32_t getNumOfUsedResultRows(SResultRowPool* p); +uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv); + #endif // TDENGINE_QUERYUTIL_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d0874c36bc..c44d65e373 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -186,7 +186,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); -static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); +static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); static void buildTagQueryResult(SQInfo *pQInfo); @@ -283,8 +283,8 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { } } -static UNUSED_FUNC int32_t getGroupResultId(int32_t groupIndex) { - int32_t base = 20000000; +static int32_t getMergeResultGroupId(int32_t groupIndex) { + int32_t base = 50000000; return base + (groupIndex * 10000); } @@ -1115,7 +1115,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - // not assign result buffer yet, add new result buffer + // not assign result buffer yet, add new result buffer, TODO remove it char* d = pData; int16_t len = bytes; if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) { @@ -1128,7 +1128,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - uint64_t uid = groupIndex; // uid is always set to be 0. + uint64_t uid = groupIndex; SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); if (pResultRow == NULL) { return -1; @@ -1714,7 +1714,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order // if it is group by normal column, do not set output buffer, the output buffer is pResult // fixed output query/multi-output query for normal table if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { - resetCtxOutputBuf(pRuntimeEnv); + resetDefaultResInfoOutputBuf(pRuntimeEnv); } if (setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx) != TSDB_CODE_SUCCESS) { @@ -2936,10 +2936,24 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { assert(size == pGroupResInfo->numOfDataPages); bool done = false; + + //TODO add API for release none-dirty pages +// SPageInfo* prev = NULL; + for (int32_t j = pGroupResInfo->pageId; j < size; ++j) { SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j); tFilePage* pData = getResBufPage(pResultBuf, pi->pageId); + // release previous buffer pages +// if (prev == NULL) { +// prev = pi; +// } else { +// if (prev->pageId != pi->pageId) { +// releaseResBufPageInfo(pResultBuf, prev); +// prev = pi; +// } +// } + assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num); int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId); @@ -3058,7 +3072,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool); resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); - pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex); + pQInfo->groupResInfo.groupId = getMergeResultGroupId(pQInfo->groupIndex); // todo add windowRes iterator int64_t lastTimestamp = -1; @@ -3339,12 +3353,12 @@ int32_t initResultRow(SResultRow *pResultRow) { return TSDB_CODE_SUCCESS; } -void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { +void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t groupIndex = 0; - int32_t uid = 0; - SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); + int32_t tid = 0; + int64_t uid = getResultInfoUId(pRuntimeEnv); + SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -3427,7 +3441,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= pQuery->rec.rows; pQuery->rec.rows = 0; - resetCtxOutputBuf(pRuntimeEnv); + resetDefaultResInfoOutputBuf(pRuntimeEnv); // clear the buffer full flag if exists CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL); @@ -3792,7 +3806,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { return; } - uint64_t uid = 0; // uid is always set to be 0 + uint64_t uid = getResultInfoUId(pRuntimeEnv); SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); if (pResultRow == NULL) { @@ -4629,10 +4643,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo int32_t ps = DEFAULT_PAGE_SIZE; int32_t rowsize = 0; getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); - int32_t TWOMB = 1024*1024*2; + int32_t TENMB = 1024*1024*10; if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4660,7 +4674,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo } else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) { int32_t numOfResultRows = getInitialPageNum(pQInfo); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4930,7 +4944,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); if (isPointInterpoQuery(pQuery)) { - resetCtxOutputBuf(pRuntimeEnv); + resetDefaultResInfoOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pQInfo->groupIndex < numOfGroups) { @@ -5096,7 +5110,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { return; } - resetCtxOutputBuf(pRuntimeEnv); + resetDefaultResInfoOutputBuf(pRuntimeEnv); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); SArray *group = GET_TABLEGROUP(pQInfo, 0); @@ -5456,7 +5470,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { - resetCtxOutputBuf(pRuntimeEnv); + resetDefaultResInfoOutputBuf(pRuntimeEnv); } // skip blocks without load the actual data block from file if no filter condition present @@ -5486,7 +5500,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) qDebug("QInfo:%p skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->limit.offset, pQuery->current->lastKey, pQuery->current->win.ekey); - resetCtxOutputBuf(pRuntimeEnv); + resetDefaultResInfoOutputBuf(pRuntimeEnv); } limitResults(pRuntimeEnv); diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index c5ba551f20..67cb7af074 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -407,7 +407,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { } if (pResultBuf->file != NULL) { - qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f", + qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb", pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0, pResultBuf->fileSize/1024.0); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index c5317226c7..650b886c36 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -20,6 +20,18 @@ #include "qExecutor.h" #include "qUtil.h" +static int32_t getResultRowKeyInfo(SResultRow* pResult, int16_t type, char** key, int16_t* bytes) { + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + *key = varDataVal(pResult->key); + *bytes = varDataLen(pResult->key); + } else { + *key = (char*) &pResult->win.skey; + *bytes = tDataTypeDesc[type].nSize; + } + + return 0; +} + int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t size = 0; @@ -94,12 +106,8 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); assert(num >= 0 && num <= numOfClosed); - int16_t type = pWindowResInfo->type; - STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable); // uid is always set to be 0. - int64_t uid = id->uid; - if (pRuntimeEnv->groupbyNormalCol) { - uid = 0; - } + int16_t type = pWindowResInfo->type; + int64_t uid = getResultInfoUId(pRuntimeEnv); char *key = NULL; int16_t bytes = -1; @@ -107,16 +115,7 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { for (int32_t i = 0; i < num; ++i) { SResultRow *pResult = pWindowResInfo->pResult[i]; if (pResult->closed) { // remove the window slot from hash table - - // todo refactor - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - key = varDataVal(pResult->key); - bytes = varDataLen(pResult->key); - } else { - key = (char*) &pResult->win.skey; - bytes = tDataTypeDesc[pWindowResInfo->type].nSize; - } - + getResultRowKeyInfo(pResult, type, &key, &bytes); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); } else { @@ -141,16 +140,9 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { for (int32_t k = 0; k < pWindowResInfo->size; ++k) { SResultRow *pResult = pWindowResInfo->pResult[k]; - - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - key = varDataVal(pResult->key); - bytes = varDataLen(pResult->key); - } else { - key = (char*) &pResult->win.skey; - bytes = tDataTypeDesc[pWindowResInfo->type].nSize; - } - + getResultRowKeyInfo(pResult, type, &key, &bytes); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); assert(p != NULL); @@ -379,3 +371,12 @@ void* destroyResultRowPool(SResultRowPool* p) { tfree(p); return NULL; } + +uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) { + if (!pRuntimeEnv->stableQuery) { + return 0; // for simple table query, the uid is always set to be 0; + } + + STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current); + return id->uid; +} \ No newline at end of file -- GitLab