diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h
index 17840df4a4062002b54f4aea5293427a5a7a1c3b..7efcd54cfda619d24e3457f1834ff77a8139828d 100644
--- a/src/client/inc/tsclient.h
+++ b/src/client/inc/tsclient.h
@@ -31,8 +31,8 @@ extern "C" {
#include "tutil.h"
#include "qExecutor.h"
+#include "qSqlparser.h"
#include "qTsbuf.h"
-#include "qsqlparser.h"
#include "tcmdtype.h"
// forward declaration
diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c
index 5662b4a8856e638ebb41c0e21b5e23647494224c..42bf27c45a5f2eff9acf4eb40c4b34ac0194f2da 100644
--- a/src/client/src/tscSystem.c
+++ b/src/client/src/tscSystem.c
@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) {
- tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
+ tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
}
tscDebug("client is initialized successfully");
diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h
index 127c38a6f836196641ec1179da680d872e0eac8e..92fe078c3f8921cfb3875bfb9c64169502791a62 100644
--- a/src/query/inc/qExecutor.h
+++ b/src/query/inc/qExecutor.h
@@ -20,8 +20,8 @@
#include "hash.h"
#include "qFill.h"
#include "qResultbuf.h"
+#include "qSqlparser.h"
#include "qTsbuf.h"
-#include "qsqlparser.h"
#include "query.h"
#include "taosdef.h"
#include "tarray.h"
diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h
index 8c8afb0957c862042e1da99e211ed06e091dee4a..d9da6bb63edfc4f58d39f21216a7897e9e6f4734 100644
--- a/src/query/inc/qResultbuf.h
+++ b/src/query/inc/qResultbuf.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_VNODEQUERYUTIL_H
-#define TDENGINE_VNODEQUERYUTIL_H
+#ifndef TDENGINE_QRESULTBUF_H
+#define TDENGINE_QRESULTBUF_H
#ifdef __cplusplus
extern "C" {
@@ -26,11 +26,18 @@ extern "C" {
typedef struct SArray* SIDList;
+typedef struct SPageInfo {
+ int32_t pageId;
+ int32_t offset;
+ int32_t lengthOnDisk;
+} SPageInfo;
+
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
- int32_t fd; // data file fd
+ FILE* file;
+// int32_t fd; // data file fd
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
void* pBuf; // mmap buffer pointer
@@ -43,6 +50,8 @@ typedef struct SDiskbasedResultBuf {
void* iBuf; // inmemory buf
void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list
+ bool comp;
+
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
@@ -56,7 +65,7 @@ typedef struct SDiskbasedResultBuf {
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
- int32_t inMemPages, void* handle);
+ int32_t inMemPages, const void* handle);
/**
*
@@ -126,4 +135,4 @@ int32_t getLastPageId(SIDList pList);
}
#endif
-#endif // TDENGINE_VNODEQUERYUTIL_H
+#endif // TDENGINE_QRESULTBUF_H
diff --git a/src/query/inc/qsqlparser.h b/src/query/inc/qSqlparser.h
similarity index 100%
rename from src/query/inc/qsqlparser.h
rename to src/query/inc/qSqlparser.h
diff --git a/src/query/src/qAst.c b/src/query/src/qAst.c
index c2578c15c0536ea222f376e833a8fe11fa269229..e3c0c1dbb0bc1e5856094c058619fa2b5cca149f 100644
--- a/src/query/src/qAst.c
+++ b/src/query/src/qAst.c
@@ -18,8 +18,8 @@
#include "exception.h"
#include "qAst.h"
+#include "qSqlparser.h"
#include "qSyntaxtreefunction.h"
-#include "qsqlparser.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index 906d0cfe678e89d2fb357bf3572f7e61ade70cb2..30d6cc288f452962f4a296989f59d38c7bfb5813 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -6617,14 +6617,16 @@ void* qOpenQueryMgmt(int32_t vgId) {
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
- SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
+ SQueryMgmt* pQueryMgmt = calloc(1, sizeof(SQueryMgmt));
- pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
- pQueryHandle->closed = false;
- pthread_mutex_init(&pQueryHandle->lock, NULL);
+ pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
+ pQueryMgmt->closed = false;
+ pQueryMgmt->vgId = vgId;
+
+ pthread_mutex_init(&pQueryMgmt->lock, NULL);
qDebug("vgId:%d, open querymgmt success", vgId);
- return pQueryHandle;
+ return pQueryMgmt;
}
static void queryMgmtKillQueryFn(void* handle) {
@@ -6664,7 +6666,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
- qDebug("vgId:%d querymgmt cleanup completed", vgId);
+ qDebug("vgId:%d queryMgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
diff --git a/src/query/src/qParserImpl.c b/src/query/src/qParserImpl.c
index ecc11f8f4d76769d0d52b03f44813349aed29adc..1e58dbbe0b75ee5ed2ddc0627b1590e314456b94 100644
--- a/src/query/src/qParserImpl.c
+++ b/src/query/src/qParserImpl.c
@@ -14,7 +14,7 @@
*/
#include "os.h"
-#include "qsqlparser.h"
+#include "qSqlparser.h"
#include "queryLog.h"
#include "taosdef.h"
#include "taosmsg.h"
diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c
index de59676e59679dca4feb3540e3f97981f98f10ba..2443381194342c90582492536b1826409d1f5fca 100644
--- a/src/query/src/qResultbuf.c
+++ b/src/query/src/qResultbuf.c
@@ -5,7 +5,7 @@
#include "taoserror.h"
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
- int32_t pagesize, int32_t inMemPages, void* handle) {
+ int32_t pagesize, int32_t inMemPages, const void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf;
@@ -24,6 +24,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->incStep = 4;
pResBuf->allocateId = -1;
+ // todo opt perf by on demand create in memory buffer
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
// init id hash table
@@ -31,10 +32,10 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
char path[PATH_MAX] = {0};
- getTmpfilePath("tsdb_qbuf", path);
+ getTmpfilePath("qbuf", path);
pResBuf->path = strdup(path);
- pResBuf->fd = FD_INITIALIZER;
+ pResBuf->file = NULL;
pResBuf->pBuf = NULL;
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
@@ -52,8 +53,9 @@ int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->tota
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
- pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
- if (!FD_VALID(pResultBuf->fd)) {
+// pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
+ pResultBuf->file = fopen(pResultBuf->path, "r+");
+ if (pResultBuf->file == NULL) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
@@ -61,13 +63,15 @@ static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
pResultBuf->numOfPages += pResultBuf->incStep;
- int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
+ int32_t ret = ftruncate(fileno(pResultBuf->file), NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
- pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
+ pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED,
+ fileno(pResultBuf->file), 0);
+
if (pResultBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
@@ -82,7 +86,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
int32_t ret = TSDB_CODE_SUCCESS;
if (pResultBuf->pBuf == NULL) {
- assert(pResultBuf->fd == FD_INITIALIZER);
+ assert(pResultBuf->file == NULL);
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
return ret;
@@ -95,7 +99,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
- ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
+ ret = ftruncate(fileno(pResultBuf->file), NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
@@ -103,7 +107,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
- pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
+ pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, fileno(pResultBuf->file), 0);
if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
@@ -185,11 +189,11 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
return;
}
- if (FD_VALID(pResultBuf->fd)) {
+ if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
- close(pResultBuf->fd);
+ fclose(pResultBuf->file);
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->pBuf = NULL;
} else {
diff --git a/src/query/src/sql.c b/src/query/src/sql.c
index ac9952bb97d9c333a5e4209318cf1b26d3fe4b9c..307d5203b34161c024a8dd259d610a7170098438 100644
--- a/src/query/src/sql.c
+++ b/src/query/src/sql.c
@@ -30,7 +30,7 @@
#include
#include
#include
-#include "qsqlparser.h"
+#include "qSqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h"
#include "ttokendef.h"
diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c
index 37784577c498da13f15e2489c5c1aecc38106ad6..3eeca85db7d026e78538e49f2cb1e8e06b025f85 100644
--- a/src/tsdb/src/tsdbRead.c
+++ b/src/tsdb/src/tsdbRead.c
@@ -654,8 +654,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
int64_t elapsedTime = (taosGetTimestampUs() - st);
pQueryHandle->cost.blockLoadTime += elapsedTime;
- tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, elapsedTime);
+ tsdbDebug("%p load file block into buffer, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us",
+ pQueryHandle, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime);
return blockLoaded;
}
@@ -971,6 +972,52 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
}
}
+static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
+ if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
+ return;
+ }
+
+ // if the buffer is not full in case of descending order query, move the data in the front of the buffer
+ if (numOfRows < pQueryHandle->outputCapacity) {
+ int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
+ for(int32_t i = 0; i < numOfCols; ++i) {
+ SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
+ memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
+ }
+ }
+}
+
+static void getQualifiedRowsPos(STsdbQueryHandle* pQueryHandle, int32_t startPos, int32_t endPos,
+ int32_t numOfExisted, int32_t *start, int32_t *end) {
+ *start = -1;
+
+ if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
+ int32_t remain = endPos - startPos + 1;
+ if (remain + numOfExisted > pQueryHandle->outputCapacity) {
+ *end = (pQueryHandle->outputCapacity - numOfExisted) + startPos - 1;
+ }
+
+ *start = startPos;
+ } else {
+ int32_t remain = (startPos - endPos) + 1;
+ if (remain + numOfExisted > pQueryHandle->outputCapacity) {
+ *end = startPos + 1 - (pQueryHandle->outputCapacity - numOfExisted);
+ }
+
+ *start = *end;
+ *end = startPos;
+ }
+}
+
+static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
+ SQueryFilePos* cur = &pQueryHandle->cur;
+
+ pCheckInfo->lastKey = cur->lastKey;
+ pQueryHandle->realNumOfRows = numOfRows;
+ cur->rows = numOfRows;
+ cur->pos = endPos;
+}
+
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
@@ -978,7 +1025,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo);
+
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
+ assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
+ TSKEY* tsArray = pCols->cols[0].pData;
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
@@ -1004,9 +1054,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = cur->pos;
- assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
- TSKEY* tsArray = pCols->cols[0].pData;
-
int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
@@ -1014,34 +1061,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos;
int32_t end = endPos;
+
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
- end = cur->pos;
- start = endPos;
+ SWAP(start, end, int32_t);
}
- cur->win.skey = tsArray[start];
- cur->win.ekey = tsArray[end];
-
- // todo opt in case of no data in buffer
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
-
- // if the buffer is not full in case of descending order query, move the data in the front of the buffer
- if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
- int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
+ cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
- for(int32_t i = 0; i < numOfCols; ++i) {
- SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
- memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
- }
- }
-
pos += (end - start + 1) * step;
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
-
- pCheckInfo->lastKey = cur->lastKey;
- pQueryHandle->realNumOfRows = numOfRows;
- cur->rows = numOfRows;
+
+ // if the buffer is not full in case of descending order query, move the data in the front of the buffer
+ moveDataToFront(pQueryHandle, numOfRows, numOfCols);
+ updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
+
return;
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL;
@@ -1087,27 +1122,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
moveToNextRowInMem(pCheckInfo);
}
-
- int32_t start = -1;
- if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
- int32_t remain = end - pos + 1;
- if (remain + numOfRows > pQueryHandle->outputCapacity) {
- end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
- }
- start = pos;
- } else {
- int32_t remain = (pos - end) + 1;
- if (remain + numOfRows > pQueryHandle->outputCapacity) {
- end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
- }
+ int32_t qstart = 0, qend = 0;
+ getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
- start = end;
- end = pos;
- }
+ numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
+ pos += (qend - qstart + 1) * step;
- numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
- pos += (end - start + 1) * step;
+ cur->win.ekey = tsArray[end];
+ cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
@@ -1124,30 +1147,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->win.skey = tsArray[pos];
}
- int32_t start = -1;
- int32_t end = -1;
-
- // all remain data are qualified, but check the remain capacity in the first place.
- if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
- int32_t remain = endPos - pos + 1;
- if (remain + numOfRows > pQueryHandle->outputCapacity) {
- endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
- }
-
- start = pos;
- end = endPos;
- } else {
- int32_t remain = pos + 1;
- if (remain + numOfRows > pQueryHandle->outputCapacity) {
- endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
- }
-
- start = endPos;
- end = pos;
- }
+ int32_t start = -1, end = -1;
+ getQualifiedRowsPos(pQueryHandle, pos, endPos, numOfRows, &start, &end);
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
+
+ cur->win.ekey = tsArray[end];
+ cur->lastKey = cur->win.ekey + step;
}
}
}
@@ -1157,21 +1164,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
-
- // if the buffer is not full in case of descending order query, move the data in the front of the buffer
- if (numOfRows < pQueryHandle->outputCapacity) {
- int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
- for(int32_t i = 0; i < numOfCols; ++i) {
- SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
- memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
- }
- }
}
-
- pCheckInfo->lastKey = cur->lastKey;
- pQueryHandle->realNumOfRows = numOfRows;
- cur->rows = numOfRows;
- cur->pos = pos;
+
+ moveDataToFront(pQueryHandle, numOfRows, numOfCols);
+ updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
+
+ if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
+ assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey);
+ } else {
+ assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
+ }
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c
index 92d4b2caacb061bf144920900548875b06904a21..89199f035d5c1ba2ac4c9935ebbaf185bd2504e6 100644
--- a/src/util/src/tcache.c
+++ b/src/util/src/tcache.c
@@ -674,6 +674,7 @@ void* taosCacheTimedRefresh(void *handle) {
// check if current cache object will be deleted every 500ms.
if (pCacheObj->deleting) {
+ uDebug("%s refresh threads quit", pCacheObj->name);
break;
}