diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index bc934647ec4827faea4814dfbfb46acb34033668..83214d293ea4d11de19417e6dbc72ab57bd34abc 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -73,14 +73,14 @@ typedef struct SResultRowPool { typedef struct SResultRow { int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer - int32_t offset:29; // row index in buffer page + int32_t offset:29; // row index in buffer page bool startInterp; // the time window start timestamp has done the interpolation already. bool endInterp; // the time window end timestamp has done the interpolation already. bool closed; // this result status: closed or opened uint32_t numOfRows; // number of rows of current time window SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo - STimeWindow win; - char* key; // start key of current result row + STimeWindow win; + char *key; // start key of current result row } SResultRow; typedef struct SGroupResInfo { @@ -105,7 +105,7 @@ typedef struct SResultRowInfo { int16_t type:8; // data type for hash key int32_t size:24; // number of result set int32_t capacity; // max capacity - int32_t curIndex; // current start active index + SResultRow* current; // current start active index int64_t prevSKey; // previous (not completed) sliding window start key } SResultRowInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e98c7830f35499b8bfffd45f28faff528e342af9..7089760e351b01bdbf3d4fdfaa7ac24d9f691f0a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -426,13 +426,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes } if (p1 != NULL) { - for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) { - if (pResultRowInfo->pResult[i] == (*p1)) { - pResultRowInfo->curIndex = i; - existed = true; - break; - } - } + pResultRowInfo->current = (*p1); + existed = true; } } else { if (p1 != NULL) { // group by column query @@ -457,8 +452,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes pResult = *p1; } - pResultRowInfo->pResult[pResultRowInfo->size] = pResult; - pResultRowInfo->curIndex = pResultRowInfo->size++; + pResultRowInfo->pResult[pResultRowInfo->size++] = pResult; + pResultRowInfo->current = pResult; } // too many time window in query @@ -466,7 +461,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } - return getResultRow(pResultRowInfo, pResultRowInfo->curIndex); + return pResultRowInfo->current; } static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { @@ -497,7 +492,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) { STimeWindow w = {0}; - if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value + if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { getInitialStartTimeWindow(pQueryAttr, ts, &w); pResultRowInfo->prevSKey = w.skey; @@ -511,8 +506,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t w.ekey = w.skey + pQueryAttr->interval.interval - 1; } } else { - int32_t slot = curTimeWindowIndex(pResultRowInfo); - SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot); +// int32_t slot = curTimeWindowIndex(pResultRowInfo); +// SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot); + SResultRow* pWindowRes = pResultRowInfo->current; w = pWindowRes->win; } @@ -698,7 +694,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, // all result rows are closed, set the last one to be the skey if (skey == TSKEY_INITIAL_VAL) { - pResultRowInfo->curIndex = pResultRowInfo->size - 1; + if (pResultRowInfo->size == 0) { +// assert(pResultRowInfo->current == NULL); + pResultRowInfo->current = NULL; + } else { + pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1]; + } } else { for (i = pResultRowInfo->size - 1; i >= 0; --i) { @@ -709,12 +710,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } if (i == pResultRowInfo->size - 1) { - pResultRowInfo->curIndex = i; + pResultRowInfo->current = pResultRowInfo->pResult[i]; } else { - pResultRowInfo->curIndex = i + 1; // current not closed result object + pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object } - pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey; + pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey; } } @@ -722,7 +723,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); - pResultRowInfo->curIndex = pResultRowInfo->size - 1; + pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1]; } else { int32_t step = ascQuery ? 1 : -1; doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo); @@ -1231,7 +1232,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); + SResultRow* prevRow = pResultRowInfo->current; +// int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); TSKEY* tsCols = NULL; if (pSDataBlock->pDataBlock != NULL) { @@ -1260,9 +1262,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // prev time window not interpolation yet. - int32_t curIndex = curTimeWindowIndex(pResultRowInfo); - if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { - for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. +// int32_t curIndex = curTimeWindowIndex(pResultRowInfo); +// if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { +// for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. + if (prevRow != NULL && prevRow != pResultRowInfo->current && pQueryAttr->timeWindowInterpo) { + int32_t j = 0; + while(pResultRowInfo->pResult[j] != prevRow) { + j++; + } + + for(; pResultRowInfo->pResult[j] != pResultRowInfo->current; ++j) { SResultRow* pRes = pResultRowInfo->pResult[j]; if (pRes->closed) { assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); @@ -3146,7 +3155,7 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl } -static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQueryInfo *pTableQueryInfo) { +static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; } @@ -3158,7 +3167,12 @@ static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQue pTableQueryInfo->cur.vgroupIndex = -1; // set the index to be the end slot of result rows array - pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1; + SResultRowInfo* pResRowInfo = &pTableQueryInfo->resInfo; + if (pResRowInfo->size > 0) { + pResRowInfo->current = pResRowInfo->pResult[pResRowInfo->size - 1]; + } else { + pResRowInfo->current = NULL; + } } static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { @@ -3172,7 +3186,7 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { size_t t = taosArrayGetSize(group); for (int32_t j = 0; j < t; ++j) { STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); - updateTableQueryInfoForReverseScan(pQueryAttr, pCheckInfo); + updateTableQueryInfoForReverseScan(pCheckInfo); // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide // the start check timestamp of tsdbQueryHandle @@ -4571,7 +4585,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { } if (pResultRowInfo->size > 0) { - pResultRowInfo->curIndex = 0; + pResultRowInfo->current = pResultRowInfo->pResult[0]; pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey; } @@ -4597,8 +4611,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { pTableScanInfo->order = cond.order; if (pResultRowInfo->size > 0) { - pResultRowInfo->curIndex = pResultRowInfo->size-1; - pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; + pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1]; + pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey; } p = doTableScanImpl(pOperator, newgroup); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 7b08450d3b9461c5ccfc316485f12c5d4ea84111..7c5a95312e0e5378a5ad47788bf6857143d1190e 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -45,7 +45,7 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t pResultRowInfo->type = type; pResultRowInfo->size = 0; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; - pResultRowInfo->curIndex = -1; + pResultRowInfo->current = NULL; pResultRowInfo->capacity = size; pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES); @@ -90,9 +90,9 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, &groupIndex, sizeof(groupIndex), uid); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex))); } - - pResultRowInfo->curIndex = -1; - pResultRowInfo->size = 0; + + pResultRowInfo->size = 0; + pResultRowInfo->current = NULL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; }