diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 7d5779c43f7c05ed7675cb59ef3036c14f852938..3dd0c58ae217d38d280d2bd30d85e13b33e876b0 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -8,25 +8,26 @@ extern "C" { #include "taosdef.h" #define GET_TYPED_DATA(_v, _finalType, _type, _data) \ - switch (_type) { \ - case TSDB_DATA_TYPE_TINYINT: \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ (_v) = (_finalType)GET_INT8_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_SMALLINT: \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ (_v) = (_finalType)GET_INT16_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_BIGINT: \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ (_v) = (_finalType)(GET_INT64_VAL(_data)); \ - break; \ - case TSDB_DATA_TYPE_FLOAT: \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ (_v) = (_finalType)GET_FLOAT_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_DOUBLE: \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ (_v) = (_finalType)GET_DOUBLE_VAL(_data); \ - break; \ - default: \ + break; \ + default: \ (_v) = (_finalType)GET_INT32_VAL(_data); \ - break; \ + break; \ }; #ifdef __cplusplus diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index acba5df2f7fac6ade5ccad774aecfbedd8bbbdb2..dde2e398459ebef7f870cd5f4482f5755247ecf9 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -29,31 +29,30 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery); -void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); -void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); -SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); +size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv); +int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); +void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); -int32_t initWindowResInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); +void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); +void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num); +void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo); +int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); +void closeAllResultRows(SResultRowInfo* pResultRowInfo); +void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); -void cleanupTimeWindowInfo(SResultRowInfo* pResultRowInfo); -void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); -void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); +int32_t initResultRow(SResultRow *pResultRow); +void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); +bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); +void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); +void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); -void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); -int32_t numOfClosedTimeWindow(SResultRowInfo* pResultRowInfo); -void closeTimeWindow(SResultRowInfo* pResultRowInfo, int32_t slot); -void closeAllTimeWindow(SResultRowInfo* pResultRowInfo); -void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); +SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); return pResultRowInfo->pResult[slot]; } -bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot); - -int32_t initResultRow(SResultRow *pResultRow); - static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, tFilePage* page) { assert(pResult != NULL && pRuntimeEnv != NULL); @@ -71,8 +70,6 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); __filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type); -size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv); - SResultRowPool* initResultRowPool(size_t size); SResultRow* getNewResultRow(SResultRowPool* p); int64_t getResultRowPoolMemSize(SResultRowPool* p); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0a8916b13fae81e3707ab42907e1c2873e322df9..1f07e2bf6a18e8a74689694d84f49a95f5219254 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -614,14 +614,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, +static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo, STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // todo refactor int64_t uid = getResultInfoUId(pRuntimeEnv); - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); if (pResultRow == NULL) { *newWind = false; @@ -717,7 +717,7 @@ static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY TSKEY ekey = pResult->win.ekey; if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeTimeWindow(pWindowResInfo, i); + closeResultRow(pWindowResInfo, i); } else { skey = pResult->win.skey; break; @@ -751,7 +751,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe // query completed if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); @@ -1351,14 +1351,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } int64_t v = -1; - switch(type) { - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break; - case TSDB_DATA_TYPE_SMALLINT: v = GET_INT16_VAL(pData); break; - case TSDB_DATA_TYPE_INT: v = GET_INT32_VAL(pData); break; - case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break; - } - + GET_TYPED_DATA(v, int64_t, type, pData); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (pResultRow->key == NULL) { pResultRow->key = malloc(varDataTLen(pData)); @@ -1790,7 +1783,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl if (QUERY_IS_INTERVAL_QUERY(pQuery)) { numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); } else if (pRuntimeEnv->groupbyNormalCol) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); numOfRes = pWindowResInfo->size; } else { // projection query numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); @@ -2094,7 +2087,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p teardown runtime env", pQInfo); - cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo); + cleanupResultRowInfo(&pRuntimeEnv->windowResInfo); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -2928,7 +2921,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - closeAllTimeWindow(&pRuntimeEnv->windowResInfo); + closeAllResultRows(&pRuntimeEnv->windowResInfo); pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window } else { assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); @@ -3707,8 +3700,8 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); - pResultRow->pageId = -1; - pResultRow->rowId = -1; + pResultRow->pageId = -1; + pResultRow->rowId = -1; return TSDB_CODE_SUCCESS; } @@ -4057,12 +4050,12 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { // for each group result, call the finalize function for each column SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pRuntimeEnv->groupbyNormalCol) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SResultRow *buf = pWindowResInfo->pResult[i]; - if (!isWindowResClosed(pWindowResInfo, i)) { + if (!isResultRowClosed(pWindowResInfo, i)) { continue; } @@ -4112,7 +4105,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void // set more initial size of interval/groupby query if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { int32_t initialSize = 128; - int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); + int32_t code = initResultRowInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4128,7 +4121,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { } tVariantDestroy(&pTableQueryInfo->tag); - cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo); + cleanupResultRowInfo(&pTableQueryInfo->windowResInfo); } /** @@ -4360,7 +4353,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_ int32_t step = -1; qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); - int32_t totalSet = numOfClosedTimeWindow(pResultInfo); + int32_t totalSet = numOfClosedResultRows(pResultInfo); SResultRow** result = pResultInfo->pResult; if (orderType == TSDB_ORDER_ASC) { @@ -4481,7 +4474,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc // TODO refactor if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; } else { updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); @@ -5031,7 +5024,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_INT; // group id } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type); + code = initResultRowInfo(&pRuntimeEnv->windowResInfo, 8, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5051,7 +5044,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_TIMESTAMP; } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); + code = initResultRowInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5479,7 +5472,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); - clearClosedTimeWindow(pRuntimeEnv); + clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo); break; } } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL && !isTSCompQuery(pQuery)) { @@ -5641,7 +5634,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } resetDefaultResInfoOutputBuf(pRuntimeEnv); - resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); + resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); SArray *group = GET_TABLEGROUP(pQInfo, 0); assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && @@ -5796,11 +5789,11 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { STableQueryInfo* item = taosArrayGetP(group, j); - closeAllTimeWindow(&item->windowResInfo); + closeAllResultRows(&item->windowResInfo); } } } else { // close results for group result - closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo); + closeAllResultRows(&pQInfo->runtimeEnv.windowResInfo); } } @@ -6048,10 +6041,10 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && pQuery->fillType == TSDB_FILL_NONE) { // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset)); - clearFirstNWindowRes(pRuntimeEnv, c); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c); pQuery->limit.offset -= c; } @@ -6088,7 +6081,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); } // no result generated, abort @@ -6121,16 +6114,16 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { // all data scanned, the group by normal column can return if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) { // skip offset result rows - clearFirstNWindowRes(pRuntimeEnv, (int32_t) pQuery->limit.offset); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset); pQuery->rec.rows = 0; pQInfo->groupIndex = 0; copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); doSecondaryArithmeticProcess(pQuery); limitResults(pRuntimeEnv); @@ -6164,7 +6157,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); if (pQuery->rec.rows > 0) { qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -7029,7 +7022,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); - pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv)); + pQInfo->runtimeEnv.pool = initResultRowPool(getResultRowSize(&pQInfo->runtimeEnv)); pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize); char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 9b0569d9e6da3fe7ff34eb00edc5e24df66b1ba5..65ac60e91fc9901e8bf96cca9b799b767b7e4d4a 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { return size; } -int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { +int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { pResultRowInfo->capacity = size; pResultRowInfo->type = type; @@ -59,10 +59,11 @@ int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t return TSDB_CODE_SUCCESS; } -void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { +void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL) { return; } + if (pResultRowInfo->capacity == 0) { assert(pResultRowInfo->pResult == NULL); return; @@ -77,7 +78,7 @@ void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { tfree(pResultRowInfo->pResult); } -void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { +void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { return; } @@ -100,13 +101,12 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultR pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } -void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { - SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; +void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); + int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); assert(num >= 0 && num <= numOfClosed); int16_t type = pResultRowInfo->type; @@ -159,17 +159,16 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { pResultRowInfo->curIndex = -1; } -void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { - SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; +void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); - clearFirstNWindowRes(pRuntimeEnv, numOfClosed); + int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed); } -int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { +int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { int32_t i = 0; while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { ++i; @@ -178,7 +177,7 @@ int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { return i; } -void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { +void closeAllResultRows(SResultRowInfo *pResultRowInfo) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); for (int32_t i = 0; i < pResultRowInfo->size; ++i) { @@ -195,7 +194,7 @@ void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * NOTE: remove redundant, only when the result set order equals to traverse order */ -void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { +void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); if (pResultRowInfo->size <= 1) { return; @@ -224,11 +223,11 @@ void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_ } } -bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { +bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { return (getResultRow(pResultRowInfo, slot)->closed == true); } -void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) { +void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { getResultRow(pResultRowInfo, slot)->closed = true; } @@ -310,7 +309,7 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); } -size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { +size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow); }