From aca8e3644b0825ac8c7244a91090fe1492f5df1c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 16 Sep 2020 17:26:08 +0800 Subject: [PATCH] TD-1439 --- src/tsdb/src/tsdbRead.c | 43 ++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index d829a85754..806f0ea828 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -432,7 +432,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { tSkipListDestroyIter(pCheckInfo->iiter); } -static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) { +static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { SDataRow rmem = NULL, rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); @@ -466,9 +466,15 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order TSKEY r2 = dataRowKey(rimem); if (r1 == r2) { // data ts are duplicated, ignore the data in mem - tSkipListIterNext(pCheckInfo->iter); - pCheckInfo->chosen = 1; - return rimem; + if (!update) { + tSkipListIterNext(pCheckInfo->iter); + pCheckInfo->chosen = 1; + return rimem; + } else { + tSkipListIterNext(pCheckInfo->iiter); + pCheckInfo->chosen = 0; + return rmem; + } } else { if (ASCENDING_TRAVERSE(order)) { if (r1 < r2) { @@ -522,6 +528,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { } static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { + STsdbCfg *pCfg = &pHandle->pTsdb->config; size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); pHandle->cur.fid = -1; @@ -535,7 +542,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { initTableMemIterator(pHandle, pCheckInfo); } - SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update); if (row == NULL) { return false; } @@ -741,11 +748,12 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; + STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); int32_t code = TSDB_CODE_SUCCESS; /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); - SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); assert(cur->pos >= 0 && cur->pos <= binfo.rows); @@ -1208,6 +1216,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); + STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; initTableMemIterator(pQueryHandle, pCheckInfo); @@ -1256,7 +1265,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; do { - SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); if (row == NULL) { break; } @@ -1286,7 +1295,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it - moveToNextRowInMem(pCheckInfo); + if (pCfg->update) { + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); + numOfRows += 1; + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = key; + } + + cur->win.ekey = key; + cur->lastKey = key + step; + cur->mixBlock = true; + + moveToNextRowInMem(pCheckInfo); + pos += step; + } else { + moveToNextRowInMem(pCheckInfo); + } } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if (cur->win.skey == TSKEY_INITIAL_VAL) { @@ -1765,13 +1789,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STsdbQueryHandle* pQueryHandle) { int numOfRows = 0; int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryHandle->pColumns); + STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; win->skey = TSKEY_INITIAL_VAL; int64_t st = taosGetTimestampUs(); STable* pTable = pCheckInfo->pTableObj; do { - SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); if (row == NULL) { break; } -- GitLab