diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index c92a9d7cb7af42efd59dbf29eff113f771f7fe9e..978fd9013a24b8b14f99fe87137ac11b04cd72b1 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -36,12 +36,10 @@ target_sources( # tsdb "src/tsdb/tsdbCommit.c" - # "src/tsdb/tsdbCommit2.c" "src/tsdb/tsdbFile.c" "src/tsdb/tsdbFS.c" "src/tsdb/tsdbOpen.c" "src/tsdb/tsdbMemTable.c" - # "src/tsdb/tsdbMemTable2.c" "src/tsdb/tsdbRead.c" "src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbWrite.c" diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 101f3fdc69a07ae1df58ecaae77eaeafdc5c0823..657b55a0c651a360176cca30acf48ebda4ec1059 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -58,9 +58,6 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, ST bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); - // tsdbFile.c ============================================================================================== typedef int32_t TSDB_FILE_T; typedef struct SDFInfo SDFInfo; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 80f71b217e219f2e3889de6744fd64496ee99bd7..17b8afda4b46f3d909705e9e7574bf4c304485d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -93,6 +93,9 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); +static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, + SMergeInfo *pMergeInfo); int32_t tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return 0; @@ -1658,4 +1661,171 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p } return false; +} + +static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, + bool merge) { + if (pCols) { + if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { + *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row)); + if (*ppSchema == NULL) { + ASSERT(false); + return -1; + } + } + + tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge); + } + + return 0; +} + +static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, + SMergeInfo *pMergeInfo) { + ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); + if (pIter == NULL) return 0; + STSchema *pSchema = NULL; + TSKEY rowKey = 0; + TSKEY fKey = 0; + // only fetch lastKey from mem data as file data not used in this function actually + TSKEY lastKey = TSKEY_INITIAL_VAL; + bool isRowDel = false; + int filterIter = 0; + STSRow *row = NULL; + SMergeInfo mInfo; + + // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by + // query handle) + + if (pMergeInfo == NULL) pMergeInfo = &mInfo; + + memset(pMergeInfo, 0, sizeof(*pMergeInfo)); + pMergeInfo->keyFirst = INT64_MAX; + pMergeInfo->keyLast = INT64_MIN; + if (pCols) tdResetDataCols(pCols); + + row = tsdbNextIterRow(pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { + rowKey = INT64_MAX; + isRowDel = false; + } else { + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); + } + + if (filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); + } + // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols + // 2. rowKey - would dup since Multi-Version supported + while (true) { + if (fKey == INT64_MAX && rowKey == INT64_MAX) break; + + if (fKey < rowKey) { + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); + + filterIter++; + if (filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); + } +#if 1 + } else if (fKey > rowKey) { + if (isRowDel) { + // TODO: support delete function + pMergeInfo->rowsDeleteFailed++; + } else { + if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break; + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + + if (lastKey != rowKey) { + pMergeInfo->rowsInserted++; + pMergeInfo->nOperations++; + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); + if (pCols) { + if (lastKey != TSKEY_INITIAL_VAL) { + ++pCols->numOfRows; + } + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); + } + lastKey = rowKey; + } else { + if (keepDup) { + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); + } else { + // discard + } + } + } + + tsdbTbDataIterNext(pIter); + row = tsdbNextIterRow(pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { + rowKey = INT64_MAX; + isRowDel = false; + } else { + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); + } + } else { // fkey == rowKey + if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?) + ASSERT(!keepDup); + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + pMergeInfo->rowsDeleteSucceed++; + pMergeInfo->nOperations++; + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); + } else { + if (keepDup) { + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + if (lastKey != rowKey) { + pMergeInfo->rowsUpdated++; + pMergeInfo->nOperations++; + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); + if (pCols) { + if (lastKey != TSKEY_INITIAL_VAL) { + ++pCols->numOfRows; + } + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); + } + lastKey = rowKey; + } else { + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); + } + } else { + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); + } + } + + tsdbTbDataIterNext(pIter); + row = tsdbNextIterRow(pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { + rowKey = INT64_MAX; + isRowDel = false; + } else { + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); + } + + filterIter++; + if (filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); + } + } +#endif + } + if (pCols && (lastKey != TSKEY_INITIAL_VAL)) { + ++pCols->numOfRows; + } + + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c deleted file mode 100644 index 07d4ef86563dc24895e302e2773411e25af25a45..0000000000000000000000000000000000000000 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -typedef struct { - SMemTable2 *pMemTable; - int32_t minutes; - int8_t precision; - TSKEY nCommitKey; - int32_t fid; - TSKEY minKey; - TSKEY maxKey; - SReadH readh; - SDFileSet wSet; - SArray *aBlkIdx; - SArray *aSupBlk; - SArray *aSubBlk; - SArray *aDelInfo; -} SCommitH; - -static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb); -static int32_t tsdbCommitEnd(SCommitH *pCHandle); -static int32_t tsdbCommitImpl(SCommitH *pCHandle); - -int32_t tsdbBegin2(STsdb *pTsdb) { - int32_t code = 0; - - ASSERT(pTsdb->mem == NULL); - code = tsdbMemTableCreate2(pTsdb, (SMemTable2 **)&pTsdb->mem); - if (code) { - tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - goto _exit; - } - -_exit: - return code; -} - -int32_t tsdbCommit2(STsdb *pTsdb) { - int32_t code = 0; - SCommitH ch = {0}; - - // start to commit - code = tsdbCommitStart(&ch, pTsdb); - if (code) { - goto _exit; - } - - // commit - code = tsdbCommitImpl(&ch); - if (code) { - goto _err; - } - - // end commit - code = tsdbCommitEnd(&ch); - if (code) { - goto _exit; - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) { - int32_t code = 0; - SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; - - tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode)); - - // switch to commit - ASSERT(pTsdb->imem == NULL && pTsdb->mem); - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - - // open handle - pCHandle->pMemTable = pMemTable; - pCHandle->minutes = pTsdb->keepCfg.days; - pCHandle->precision = pTsdb->keepCfg.precision; - pCHandle->nCommitKey = pMemTable->minKey.ts; - - code = tsdbInitReadH(&pCHandle->readh, pTsdb); - if (code) { - goto _err; - } - pCHandle->aBlkIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCHandle->aBlkIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pCHandle->aSupBlk = taosArrayInit(0, sizeof(SBlock)); - if (pCHandle->aSupBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pCHandle->aSubBlk = taosArrayInit(0, sizeof(SBlock)); - if (pCHandle->aSubBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pCHandle->aDelInfo = taosArrayInit(0, sizeof(SDelInfo)); - if (pCHandle->aDelInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - // start FS transaction - tsdbStartFSTxn(pTsdb, 0, 0); - - return code; - -_err: - return code; -} - -static int32_t tsdbCommitEnd(SCommitH *pCHandle) { - int32_t code = 0; - STsdb *pTsdb = pCHandle->pMemTable->pTsdb; - SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->imem; - - // end transaction - code = tsdbEndFSTxn(pTsdb); - if (code) { - goto _err; - } - - // close handle - taosArrayClear(pCHandle->aDelInfo); - taosArrayClear(pCHandle->aSubBlk); - taosArrayClear(pCHandle->aSupBlk); - taosArrayClear(pCHandle->aBlkIdx); - tsdbDestroyReadH(&pCHandle->readh); - - // destroy memtable (todo: unref it) - pTsdb->imem = NULL; - tsdbMemTableDestroy2(pMemTable); - - tsdbInfo("vgId:%d commit over", TD_VID(pTsdb->pVnode)); - return code; - -_err: - return code; -} - -static int32_t tsdbCommitTableStart(SCommitH *pCHandle) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbCommitTableEnd(SCommitH *pCHandle) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbCommitTable(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) { - int32_t code = 0; - SMemDataIter iter = {0}; - - // commit table start - code = tsdbCommitTableStart(pCHandle); - if (code) { - goto _err; - } - - // commit table impl - if (pMemData && pBlockIdx) { - // TODO - } else if (pMemData) { - // TODO - } else { - // TODO - } - - // commit table end - code = tsdbCommitTableEnd(pCHandle); - if (code) { - goto _err; - } - - return code; - -_err: - return code; -} - -static int32_t tsdbTableIdCmprFn(const void *p1, const void *p2) { - TABLEID *pId1 = (TABLEID *)p1; - TABLEID *pId2 = (TABLEID *)p2; - - if (pId1->suid < pId2->suid) { - return -1; - } else if (pId1->suid > pId2->suid) { - return 1; - } - - if (pId1->uid < pId2->uid) { - return -1; - } else if (pId1->uid > pId2->uid) { - return 1; - } - - return 0; -} - -static int32_t tsdbWriteBlockIdx(SDFile *pFile, SArray *pArray, uint8_t **ppBuf) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbCommitFileStart(SCommitH *pCHandle) { - int32_t code = 0; - STsdb *pTsdb = pCHandle->pMemTable->pTsdb; - SDFileSet *pSet = NULL; - - taosArrayClear(pCHandle->aBlkIdx); - - return code; -} - -static int32_t tsdbCommitFileEnd(SCommitH *pCHandle) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbCommitFile(SCommitH *pCHandle) { - int32_t code = 0; - SMemData *pMemData; - SBlockIdx *pBlockIdx; - int32_t iMemData; - int32_t nMemData; - int32_t iBlockIdx; - int32_t nBlockIdx; - - // commit file start - code = tsdbCommitFileStart(pCHandle); - if (code) { - goto _err; - } - - // commit file impl - iMemData = 0; - nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData); - iBlockIdx = 0; - nBlockIdx = 0; // todo - - for (;;) { - if (iMemData >= nMemData && iBlockIdx >= nBlockIdx) break; - - pMemData = NULL; - pBlockIdx = NULL; - if (iMemData < nMemData) { - pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData); - } - if (iBlockIdx < nBlockIdx) { - // pBlockIdx = ; - } - - if (pMemData && pBlockIdx) { - int32_t c = tsdbTableIdCmprFn(pMemData, pBlockIdx); - if (c < 0) { - iMemData++; - pBlockIdx = NULL; - } else if (c == 0) { - iMemData++; - iBlockIdx++; - } else { - iBlockIdx++; - pMemData = NULL; - } - } else { - if (pMemData) { - iMemData++; - } else { - iBlockIdx++; - } - } - - code = tsdbCommitTable(pCHandle, pMemData, pBlockIdx); - if (code) { - goto _err; - } - } - - // commit file end - code = tsdbCommitFileEnd(pCHandle); - if (code) { - goto _err; - } - - return code; - -_err: - return code; -} - -static int32_t tsdbCommitData(SCommitH *pCHandle) { - int32_t code = 0; - int32_t fid; - - if (pCHandle->pMemTable->nRows == 0) goto _exit; - - // loop to commit to each file - for (;;) { - if (pCHandle->nCommitKey == TSKEY_MAX) break; - - pCHandle->fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision); - tsdbGetFidKeyRange(pCHandle->minutes, pCHandle->precision, pCHandle->fid, &pCHandle->minKey, &pCHandle->maxKey); - code = tsdbCommitFile(pCHandle); - if (code) { - goto _err; - } - } - -_exit: - return code; - -_err: - return code; -} - -static int32_t delInfoCmprFn(const void *p1, const void *p2) { - SDelInfo *pDelInfo1 = (SDelInfo *)p1; - SDelInfo *pDelInfo2 = (SDelInfo *)p2; - - if (pDelInfo1->suid < pDelInfo2->suid) { - return -1; - } else if (pDelInfo1->suid > pDelInfo2->suid) { - return 1; - } - - if (pDelInfo1->uid < pDelInfo2->uid) { - return -1; - } else if (pDelInfo1->uid > pDelInfo2->uid) { - return 1; - } - - if (pDelInfo1->version < pDelInfo2->version) { - return -1; - } else if (pDelInfo1->version > pDelInfo2->version) { - return 1; - } - - return 0; -} -static int32_t tsdbCommitDelete(SCommitH *pCHandle) { - int32_t code = 0; - SDelInfo delInfo; - SMemData *pMemData; - - if (pCHandle->pMemTable->nDelOp == 0) goto _exit; - - // load del array (todo) - - // loop to append SDelInfo - for (int32_t iMemData = 0; iMemData < taosArrayGetSize(pCHandle->pMemTable->aMemData); iMemData++) { - pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData); - - for (SDelOp *pDelOp = pMemData->delOpHead; pDelOp; pDelOp = pDelOp->pNext) { - delInfo = (SDelInfo){.suid = pMemData->suid, - .uid = pMemData->uid, - .version = pDelOp->version, - .sKey = pDelOp->sKey, - .eKey = pDelOp->eKey}; - if (taosArrayPush(pCHandle->aDelInfo, &delInfo) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - } - - taosArraySort(pCHandle->aDelInfo, delInfoCmprFn); - - // write to new file - -_exit: - return code; - -_err: - return code; -} - -static int32_t tsdbCommitCache(SCommitH *pCHandle) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbCommitImpl(SCommitH *pCHandle) { - int32_t code = 0; - - // commit data - code = tsdbCommitData(pCHandle); - if (code) { - goto _err; - } - - // commit delete - code = tsdbCommitDelete(pCHandle); - if (code) { - goto _err; - } - - // commit cache if need (todo) - if (0) { - code = tsdbCommitCache(pCHandle); - if (code) { - goto _err; - } - } - - return code; - -_err: - return code; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 3a4ada04a83fa0477ab5cd9a984e095b5c36e11e..ffbef4e765fd073034cce7c58005a64e2e5a54fd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -188,23 +188,6 @@ _err: return code; } -static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, - bool merge) { - if (pCols) { - if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { - *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row)); - if (*ppSchema == NULL) { - ASSERT(false); - return -1; - } - } - - tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge); - } - - return 0; -} - int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) { int32_t code = 0; @@ -310,166 +293,6 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) { return true; } -/** - * This is an important function to load data or try to load data from memory skiplist iterator. - * - * This function load memory data until: - * 1. iterator ends - * 2. data key exceeds maxKey - * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead - * 4. operations in pCols not exceeds its max capacity if pCols is given - * - * The function tries to procceed AS MUCH AS POSSIBLE. - */ -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { - ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); - if (pIter == NULL) return 0; - STSchema *pSchema = NULL; - TSKEY rowKey = 0; - TSKEY fKey = 0; - // only fetch lastKey from mem data as file data not used in this function actually - TSKEY lastKey = TSKEY_INITIAL_VAL; - bool isRowDel = false; - int filterIter = 0; - STSRow *row = NULL; - SMergeInfo mInfo; - - // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by - // query handle) - - if (pMergeInfo == NULL) pMergeInfo = &mInfo; - - memset(pMergeInfo, 0, sizeof(*pMergeInfo)); - pMergeInfo->keyFirst = INT64_MAX; - pMergeInfo->keyLast = INT64_MIN; - if (pCols) tdResetDataCols(pCols); - - row = tsdbNextIterRow(pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - rowKey = INT64_MAX; - isRowDel = false; - } else { - rowKey = TD_ROW_KEY(row); - isRowDel = TD_ROW_IS_DELETED(row); - } - - if (filterIter >= nFilterKeys) { - fKey = INT64_MAX; - } else { - fKey = tdGetKey(filterKeys[filterIter]); - } - // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols - // 2. rowKey - would dup since Multi-Version supported - while (true) { - if (fKey == INT64_MAX && rowKey == INT64_MAX) break; - - if (fKey < rowKey) { - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); - - filterIter++; - if (filterIter >= nFilterKeys) { - fKey = INT64_MAX; - } else { - fKey = tdGetKey(filterKeys[filterIter]); - } -#if 1 - } else if (fKey > rowKey) { - if (isRowDel) { - // TODO: support delete function - pMergeInfo->rowsDeleteFailed++; - } else { - if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break; - if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; - - if (lastKey != rowKey) { - pMergeInfo->rowsInserted++; - pMergeInfo->nOperations++; - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); - if (pCols) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pCols->numOfRows; - } - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); - } - lastKey = rowKey; - } else { - if (keepDup) { - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); - } else { - // discard - } - } - } - - tsdbTbDataIterNext(pIter); - row = tsdbNextIterRow(pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - rowKey = INT64_MAX; - isRowDel = false; - } else { - rowKey = TD_ROW_KEY(row); - isRowDel = TD_ROW_IS_DELETED(row); - } - } else { // fkey == rowKey - if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?) - ASSERT(!keepDup); - if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; - pMergeInfo->rowsDeleteSucceed++; - pMergeInfo->nOperations++; - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); - } else { - if (keepDup) { - if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; - if (lastKey != rowKey) { - pMergeInfo->rowsUpdated++; - pMergeInfo->nOperations++; - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); - if (pCols) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pCols->numOfRows; - } - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); - } - lastKey = rowKey; - } else { - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); - } - } else { - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); - } - } - - tsdbTbDataIterNext(pIter); - row = tsdbNextIterRow(pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - rowKey = INT64_MAX; - isRowDel = false; - } else { - rowKey = TD_ROW_KEY(row); - isRowDel = TD_ROW_IS_DELETED(row); - } - - filterIter++; - if (filterIter >= nFilterKeys) { - fKey = INT64_MAX; - } else { - fKey = tdGetKey(filterKeys[filterIter]); - } - } -#endif - } - if (pCols && (lastKey != TSKEY_INITIAL_VAL)) { - ++pCols->numOfRows; - } - - return 0; -} - static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) { int32_t code = 0; int32_t idx = 0;