diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ff0ebefb42baa7b93a92f2f38e2788ef8397db06..9294718550cd218e11ed5249b4da3e28bf400882 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -879,14 +879,14 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } } - -static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) { +static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, + TDRowVerT maxVer) { STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { rmem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor +#if 0 // TODO: skiplist refactor if (TD_ROW_VER(rmem) > maxVer) { rmem = NULL; } @@ -898,7 +898,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { rimem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor +#if 0 // TODO: skiplist refactor if (TD_ROW_VER(rimem) > maxVer) { rimem = NULL; } @@ -1677,7 +1677,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa colIdOfRow2 = tdKvRowColIdAt(row2, k); } - if (colIdOfRow1 < colIdOfRow2) { // the most probability + if (colIdOfRow1 < colIdOfRow2) { // the most probability if (colIdOfRow1 < pColInfo->info.colId) { ++j; continue; @@ -1720,7 +1720,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ++(*curRow); } ++nResult; - } else if (update){ + } else if (update) { mergeOption = 2; } else { mergeOption = 0; @@ -1741,7 +1741,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ++(*curRow); } ++nResult; - } else if(update) { + } else if (update) { mergeOption = 2; } else { mergeOption = 0; @@ -2018,9 +2018,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf rv2 = TD_ROW_SVER(row2); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - // numOfRows += 1; + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf cur->win.ekey = key; cur->lastKey = key + step; cur->mixBlock = true; - moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it #if 0 @@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } #endif if (TD_SUPPORT_UPDATE(pCfg->update)) { - doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); + if (lastKeyAppend != key) { + lastKeyAppend = key; + ++curRow; + } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); if (rv1 != TD_ROW_SVER(row1)) { // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); @@ -2074,10 +2077,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - // ++numOfRows; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2117,15 +2120,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t qstart = 0, qend = 0; getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - lastKeyAppend = tsArray[qend]; - numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); + if ((lastKeyAppend != TSKEY_INITIAL_VAL) && + (lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) { + ++curRow; + } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); pos += (qend - qstart + 1) * step; - if(numOfRows > 0) { + if (numOfRows > 0) { curRow = numOfRows - 1; } + cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->lastKey = cur->win.ekey + step; + lastKeyAppend = cur->win.ekey; } } while (numOfRows < pTsdbReadHandle->outputCapacity); @@ -2429,7 +2437,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); - STimeWindow win = TSWINDOW_INITIALIZER; + STimeWindow win = TSWINDOW_INITIALIZER; while (true) { tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); @@ -2735,7 +2743,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STSchema* pSchema = NULL; TSKEY lastRowKey = TSKEY_INITIAL_VAL; - do { STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX); if (row == NULL) { @@ -2760,8 +2767,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); rv = TD_ROW_SVER(row); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, - NULL, pCfg->update, &lastRowKey); + numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, + pSchema, NULL, pCfg->update, &lastRowKey); if (numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); @@ -2770,7 +2777,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } while (moveToNextRowInMem(pCheckInfo)); - taosMemoryFreeClear(pSchema); // free the STSChema + taosMemoryFreeClear(pSchema); // free the STSChema assert(numOfRows <= maxRowsToRead); @@ -2898,8 +2905,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // if (ret != TSDB_CODE_SUCCESS) { // return false; // } - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId, - NULL, NULL, true, &lastRowKey); + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, + pCheckInfo->tableId, NULL, NULL, true, &lastRowKey); taosMemoryFreeClear(pRow); // update the last key value @@ -3468,7 +3475,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; -// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); + // ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } /*