diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 3932650e1f0b2bcd7511dd8a98ba2be8baef36f2..03027329950ce1fa2dc0cc1b9659eb75cd52bb49 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -160,7 +160,7 @@ typedef struct SQueryRuntimeEnv { SQueryCostSummary summary; bool stableQuery; // super table query or not void* pQueryHandle; - void* pSubQueryHandle; // another thread for + void* pSecQueryHandle; // another thread for SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file } SQueryRuntimeEnv; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 0f74cf1bebb5f5259b7270652b0ebf9c54c9d5ed..8015360919a829695f11df9ccb8b131343678036 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1552,6 +1552,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { destroyResultBuf(pRuntimeEnv->pResultBuf); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -2565,7 +2567,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); - tsdb_query_handle_t pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSubQueryHandle; + tsdb_query_handle_t pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSecQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { @@ -3557,7 +3559,10 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { taosArrayPush(cols, &pQuery->colList[i]); } - pRuntimeEnv->pSubQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols); + if (pRuntimeEnv->pSecQueryHandle != NULL) { + pRuntimeEnv->pSecQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols); + } + taosArrayDestroy(cols); status = pQuery->status; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 966debda8d9117bc153d2aab979f36141a3f58ea..ee95199e5bf15d0d9a00f57a24351e33ffb4bf08 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -218,9 +218,31 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { if (pTable->mem == NULL && pTable->imem == NULL) { return false; } + + STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + pTable = pCheckInfo->pTableObj; + + if (pCheckInfo->iter == NULL) { + pCheckInfo->iter = tSkipListCreateIter(pTable->mem->pData); + if (pCheckInfo->iter == NULL) { + return false; + } + } + + if (!tSkipListIterNext(pCheckInfo->iter)) { // buffer is empty + return false; + } + + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + if (node == NULL) { + return false; + } + SDataRow row = SL_GET_NODE_DATA(node); + pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer + // all data in mem are checked already. - if (pTableCheckInfo->lastKey > pTable->mem->keyLast) { + if (pTableCheckInfo->lastKey > pHandle->window.ekey) { return false; } @@ -522,13 +544,15 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { int numOfPoints; TSKEY* keyList; + assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); + if (num <= 0) return -1; keyList = (TSKEY*)pValue; firstPos = 0; lastPos = num - 1; - if (order == 0) { + if (order == TSDB_ORDER_DESC) { // find the first position which is smaller than the key while (1) { if (key >= keyList[lastPos]) return lastPos; @@ -596,8 +620,8 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf pQueryHandle->realNumOfRows = cur->pos + 1; pCheckInfo->lastKey = blockInfo.window.ekey - 1; } else { - endPos = - vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order); + int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { if (endPos < cur->pos) { @@ -1042,7 +1066,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); *skey = INT64_MIN; - while (tSkipListIterNext(pIter)) { + do/* (1) */{ SSkipListNode* node = tSkipListIterGet(pIter); if (node == NULL) break; @@ -1063,8 +1087,11 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max } numOfRows++; - if (numOfRows >= maxRowsToRead) break; - }; + if (numOfRows >= maxRowsToRead) { + break; + } + + } while(tSkipListIterNext(pIter)); return numOfRows; } @@ -1107,10 +1134,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet - if (pCheckInfo->iter == NULL) { - pCheckInfo->iter = tSkipListCreateIter(pTable->mem->pData); - } - rows = tsdbReadRowsFromCache(pCheckInfo->iter, INT64_MAX, 2, &skey, &ekey, pHandle); + assert(pCheckInfo->iter != NULL); + rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle); // update the last key value pCheckInfo->lastKey = ekey + 1;