diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7d1a5a52de8cf4982f85e68b388007718a7aa741..bfc1aa89322353316783cfc37f0219e6f601b847 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -96,6 +96,11 @@ typedef struct { } STsdbBufPool; // ------------------ tsdbMemTable.c +typedef struct { + STable * pTable; + SSkipListIterator *pIter; +} SCommitIter; + typedef struct { uint64_t uid; TSKEY keyFirst; @@ -206,10 +211,10 @@ typedef struct { int64_t offset : 63; int32_t algorithm : 8; int32_t numOfRows : 24; - int32_t sversion; int32_t len; + int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols int16_t numOfSubBlocks; - int16_t numOfCols; + int16_t numOfCols; // not including timestamp column TSKEY keyFirst; TSKEY keyLast; } SCompBlock; @@ -377,6 +382,24 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); +int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, + TSKEY* filterKeys, int nFilterKeys); + +static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { + if (pIter == NULL) return NULL; + + SSkipListNode* node = tSkipListIterGet(pIter); + if (node == NULL) return NULL; + + return SL_GET_NODE_DATA(node); +} + +static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) return -1; + + return dataRowKey(row); +} // ------------------ tsdbFile.c #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) @@ -421,25 +444,36 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); #define helperType(h) (h)->type #define helperRepo(h) (h)->pRepo #define helperState(h) (h)->state - -int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -void tsdbDestroyHelper(SRWHelper* pHelper); -void tsdbResetHelper(SRWHelper* pHelper); -int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); -int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); -void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); -int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols); -int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); -int tsdbWriteCompInfo(SRWHelper* pHelper); -int tsdbWriteCompIdx(SRWHelper* pHelper); -int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); -int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); -int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); -void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds, - int numOfColIds); -int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo); +#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) + +int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); +int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); +void tsdbDestroyHelper(SRWHelper* pHelper); +void tsdbResetHelper(SRWHelper* pHelper); +int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); +int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); +void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); +int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); +int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); +int tsdbWriteCompInfo(SRWHelper* pHelper); +int tsdbWriteCompIdx(SRWHelper* pHelper); +int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); +int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); +int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); +void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); +int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds, + int numOfColIds); +int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo); + +static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { + if (*(TSKEY*)key1 > *(TSKEY*)key2) { + return 1; + } else if (*(TSKEY*)key1 == *(TSKEY*)key2) { + return 0; + } else { + return -1; + } +} // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 675e44f458ae78c6798ca38f8e350b944640d6ed..1cd212b216d73c9824323c2c7e0e62a8188bd548 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -18,11 +18,6 @@ #define TSDB_DATA_SKIPLIST_LEVEL 5 -typedef struct { - STable * pTable; - SSkipListIterator *pIter; -} SCommitIter; - static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo); static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); @@ -34,14 +29,11 @@ static char * tsdbGetTsTupleKey(const void *data); static void * tsdbCommitData(void *arg); static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); -static TSKEY tsdbNextIterKey(SCommitIter *pIter); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); -static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo); -static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables); -static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, - int maxRowsToRead, SDataCols *pCols); +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { @@ -252,6 +244,66 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { return 0; } +int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, + TSKEY *filterKeys, int nFilterKeys) { + ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); + if (pIter == NULL) return 0; + STSchema *pSchema = NULL; + int numOfRows = 0; + TSKEY keyNext = 0; + int filterIter = 0; + + if (nFilterKeys != 0) { // for filter purpose + ASSERT(filterKeys != NULL); + keyNext = tsdbNextIterKey(pIter); + if (keyNext < 0 || keyNext > maxKey) return numOfRows; + void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE); + filterIter = (ptr == NULL) ? nFilterKeys : (POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY)); + } + + do { + if (numOfRows >= maxRowsToRead) break; + + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) break; + + keyNext = dataRowKey(row); + if (keyNext < 0 || keyNext > maxKey) break; + + bool keyFiltered = false; + if (nFilterKeys != 0) { + while (true) { + if (filterIter >= nFilterKeys) break; + if (keyNext == filterKeys[filterIter]) { + keyFiltered = true; + filterIter++; + break; + } else if (keyNext < filterKeys[filterIter]) { + break; + } else { + filterIter++; + } + } + } + + if (!keyFiltered) { + if (pCols) { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); + if (pSchema == NULL) { + ASSERT(0); + } + } + + tdAppendDataRowToDataCol(row, pSchema, pCols); + } + numOfRows++; + } + } while (tSkipListIterNext(pIter)); + + return numOfRows; +} + // ---------------- LOCAL FUNCTIONS ---------------- static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { ASSERT(pRepo != NULL); @@ -378,7 +430,7 @@ static void *tsdbCommitData(void *arg) { // Create the iterator to read from cache if (pMem->numOfRows > 0) { - iters = tsdbCreateTableIters(pRepo); + iters = tsdbCreateCommitIters(pRepo); if (iters == NULL) { tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _exit; @@ -418,7 +470,7 @@ static void *tsdbCommitData(void *arg) { _exit: tdFreeDataCols(pDataCols); - tsdbDestroyTableIters(iters, pCfg->maxTables); + tsdbDestroyCommitIters(iters, pCfg->maxTables); tsdbDestroyHelper(&whelper); tsdbEndCommit(pRepo); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); @@ -479,19 +531,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo) { if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); } -static TSKEY tsdbNextIterKey(SCommitIter *pIter) { - if (pIter == NULL) return -1; - - SSkipListNode *node = tSkipListIterGet(pIter->pIter); - if (node == NULL) return -1; - - SDataRow row = SL_GET_NODE_DATA(node); - return dataRowKey(row); -} - static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { for (int i = 0; i < nIters; i++) { - TSKEY nextKey = tsdbNextIterKey(iters + i); + TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; @@ -504,7 +546,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { char * dataDir = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCfg * pCfg = &pRepo->config; STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pGroup = NULL; @@ -549,33 +590,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe if (pIter->pIter != NULL) { tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)); - int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; - int nLoop = 0; - while (true) { - int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols); - ASSERT(rowsRead >= 0); - if (pDataCols->numOfRows == 0) break; - nLoop++; - - ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); - ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); - - int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); - ASSERT(rowsWritten != 0); - if (rowsWritten < 0) { - taosRUnLockLatch(&(pIter->pTable->latch)); - tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), - tstrerror(terrno)); - goto _err; - } - ASSERT(rowsWritten <= pDataCols->numOfRows); - - tdPopDataColsPoints(pDataCols, rowsWritten); - maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows; + if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), + tstrerror(terrno)); + goto _err; } - - ASSERT(pDataCols->numOfRows == 0); } taosRUnLockLatch(&(pIter->pTable->latch)); @@ -615,7 +636,7 @@ _err: return -1; } -static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { STsdbCfg * pCfg = &(pRepo->config); SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -645,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { goto _err; } - if (!tSkipListIterNext(iters[i].pIter)) { - terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM; - goto _err; - } + tSkipListIterNext(iters[i].pIter); } } return iters; _err: - tsdbDestroyTableIters(iters, pCfg->maxTables); + tsdbDestroyCommitIters(iters, pCfg->maxTables); return NULL; } -static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) { +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { if (iters == NULL) return; for (int i = 1; i < maxTables; i++) { @@ -670,35 +688,4 @@ static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) { } free(iters); -} - -static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { - ASSERT(maxRowsToRead > 0); - if (pIter == NULL) return 0; - STSchema *pSchema = NULL; - - int numOfRows = 0; - - do { - if (numOfRows >= maxRowsToRead) break; - - SSkipListNode *node = tSkipListIterGet(pIter); - if (node == NULL) break; - - SDataRow row = SL_GET_NODE_DATA(node); - if (dataRowKey(row) > maxKey) break; - - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row)); - if (pSchema == NULL) { - // TODO: deal with the error here - ASSERT(0); - } - } - - tdAppendDataRowToDataCol(row, pSchema, pCols); - numOfRows++; - } while (tSkipListIterNext(pIter)); - - return numOfRows; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 6d65e1e743724ba456fc3e979dde96deea7f68d3..fcf9b04e4ae07d6a3d2d7fd136534b26bb2839a3 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -727,7 +727,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { T_REF_INC(pTable); - tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), + tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); return pTable; @@ -740,7 +740,7 @@ _err: static void tsdbFreeTable(STable *pTable) { if (pTable) { if (pTable->name != NULL) - tsdbDebug("table %s tid %d uid %" PRIu64 " is destroyed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), + tsdbTrace("table %s tid %d uid %" PRIu64 " is freed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); tfree(TABLE_NAME(pTable)); if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 5dd465147593cd2a581a89dc462daeefe7c86ea5..040b4a5334a148e4242d1c1d01d3eb95b16ba61e 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -22,39 +22,44 @@ #include "tfile.h" #define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) - -static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, - SCompBlock *pCompBlock, bool isLast, bool isSuperBlock); -static int compareKeyBlock(const void *arg1, const void *arg2); -static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); -static int compTSKEY(const void *key1, const void *key2); -static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey); -static void tsdbResetHelperFileImpl(SRWHelper *pHelper); -static int tsdbInitHelperFile(SRWHelper *pHelper); -static void tsdbDestroyHelperFile(SRWHelper *pHelper); -static void tsdbResetHelperTableImpl(SRWHelper *pHelper); -static void tsdbResetHelperTable(SRWHelper *pHelper); -static void tsdbInitHelperTable(SRWHelper *pHelper); -static void tsdbDestroyHelperTable(SRWHelper *pHelper); -static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); -static void tsdbResetHelperBlock(SRWHelper *pHelper); -static int tsdbInitHelperBlock(SRWHelper *pHelper); -static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); -static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize); -static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, - int numOfColIds); -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); -static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); +#define TSDB_KEY_COL_OFFSET 0 +#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock)) + +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, + bool isLast, bool isSuperBlock); +static int compareKeyBlock(const void *arg1, const void *arg2); +static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static void tsdbResetHelperFileImpl(SRWHelper *pHelper); +static int tsdbInitHelperFile(SRWHelper *pHelper); +static void tsdbDestroyHelperFile(SRWHelper *pHelper); +static void tsdbResetHelperTableImpl(SRWHelper *pHelper); +static void tsdbResetHelperTable(SRWHelper *pHelper); +static void tsdbInitHelperTable(SRWHelper *pHelper); +static void tsdbDestroyHelperTable(SRWHelper *pHelper); +static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); +static void tsdbResetHelperBlock(SRWHelper *pHelper); +static int tsdbInitHelperBlock(SRWHelper *pHelper); +static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bufferSize); +static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds); +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); +static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); +static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey); static void tsdbDestroyHelperBlock(SRWHelper *pHelper); static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, SDataCol *pDataCol); +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock); +static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, + int *blkIdx); +static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -225,84 +230,41 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { tdInitDataCols(pHelper->pDataCols[1], pSchema); SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid; - if (pIdx->offset > 0 && pIdx->hasLast) { - pHelper->hasOldLastBlock = true; + if (pIdx->offset > 0) { + if (pIdx->uid != TABLE_UID(pTable)) { + memset((void *)pIdx, 0, sizeof(SCompIdx)); + } else { + if (pIdx->hasLast) pHelper->hasOldLastBlock = true; + } } helperSetState(pHelper, TSDB_HELPER_TABLE_SET); ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); } -/** - * Write part of of points from pDataCols to file - * - * @return: number of points written to file successfully - * -1 for failure - */ -int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { +int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - ASSERT(pDataCols->numOfRows > 0); - SCompBlock compBlock; - int rowsToWrite = 0; - TSKEY keyFirst = dataColsKeyFirst(pDataCols); + SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]); + int blkIdx = 0; - STsdbCfg *pCfg = &pHelper->pRepo->config; + ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); + if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose + while (true) { + ASSERT(blkIdx <= pIdx->numOfBlocks); + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + if (keyFirst < 0 || keyFirst > maxKey) break; // iter over - // Load the SCompInfo part if neccessary - ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); - if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err; - - if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block - ASSERT(pHelper->hasOldLastBlock == false); - rowsToWrite = pDataCols->numOfRows; - SFile *pWFile = NULL; - bool isLast = false; - - if (rowsToWrite >= pCfg->minRowsPerFileBlock) { - pWFile = &(pHelper->files.dataF); + if (pIdx->len <= 0 || keyFirst > pIdx->maxKey) { + if (tsdbProcessAppendCommit(pHelper, pCommitIter, pDataCols, maxKey) < 0) return -1; + blkIdx = pIdx->numOfBlocks; } else { - isLast = true; - pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); - } - - if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err; - - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err; - } else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block - SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), pIdx->numOfBlocks, - sizeof(SCompBlock), compareKeyBlock, TD_GE); - - int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks); - - if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block - ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last); - rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); - if (rowsToWrite < 0) goto _err; - } else { // Has key overlap - - if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { - // Key overlap with the block, must merge with the block - - rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); - if (rowsToWrite < 0) goto _err; - } else { // Save as a super block in the middle - rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst - 1); - ASSERT(rowsToWrite > 0); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) - goto _err; - if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; - } + if (tsdbProcessMergeCommit(pHelper, pCommitIter, pDataCols, maxKey, &blkIdx) < 0) return -1; } } - return rowsToWrite; - -_err: - return -1; + return 0; } int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { @@ -310,29 +272,42 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - SCompBlock compBlock; - if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { + SCompBlock compBlock = {0}; + if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1; + SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last); if (pCompBlock->numOfSubBlocks > 1) { - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], - pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0) + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && + pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - } else { - if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); - if (pCompBlock->offset < 0) return -1; + if (pCompBlock->offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) + if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) { + tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), + pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; + } } pHelper->hasOldLastBlock = false; @@ -365,27 +340,30 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } } } else { - pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; - pHelper->pCompInfo->uid = pHelper->tableInfo.uid; - pHelper->pCompInfo->checksum = 0; - ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); - taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pIdx->offset = offset; - pIdx->uid = pHelper->tableInfo.uid; - ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); + if (pIdx->len > 0) { + pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; + pHelper->pCompInfo->uid = pHelper->tableInfo.uid; + pHelper->pCompInfo->checksum = 0; + ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && + (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); + offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pIdx->offset = offset; + pIdx->uid = pHelper->tableInfo.uid; + ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.nHeadF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, + pHelper->files.nHeadF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } } } @@ -397,7 +375,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) return -1; + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } SFile *pFile = &(pHelper->files.nHeadF); pFile->info.offset = offset; @@ -409,6 +392,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int drift = POINTER_DISTANCE(buf, pHelper->pBuffer); if (tsizeof(pHelper->pBuffer) - drift < 128) { pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2); + if (pHelper->pBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } } buf = POINTER_SHIFT(pHelper->pBuffer, drift); taosEncodeVariantU32(&buf, i); @@ -419,7 +406,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1; + if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, + pHelper->files.nHeadF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } pFile->info.len = tsize; return 0; } @@ -496,11 +488,29 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { - if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; + ASSERT(pIdx->uid == pHelper->tableInfo.uid); + if (lseek(fd, pIdx->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); - if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; - if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1; + if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, + pHelper->files.headF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) { + tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo), + pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + ASSERT(pIdx->uid == pHelper->pCompInfo->uid); } helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); @@ -628,13 +638,14 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { return false; } -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, - SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) { +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, + bool isLast, bool isSuperBlock) { STsdbCfg * pCfg = &(pHelper->pRepo->config); SCompData *pCompData = (SCompData *)(pHelper->pBuffer); int64_t offset = 0; + int rowsToWrite = pDataCols->numOfRows; - ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock); + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); offset = lseek(pFile->fd, 0, SEEK_END); @@ -646,7 +657,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa } int nColsNotAllNull = 0; - for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column SDataCol *pDataCol = pDataCols->cols + ncol; SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; @@ -658,7 +669,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompCol->colId = pDataCol->colId; pCompCol->type = pDataCol->type; - if (tDataTypeDesc[pDataCol->type].getStatisFunc && ncol != 0) { + if (tDataTypeDesc[pDataCol->type].getStatisFunc) { (*tDataTypeDesc[pDataCol->type].getStatisFunc)( (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); @@ -666,24 +677,24 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa nColsNotAllNull++; } - ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols); + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); // Compress the data if neccessary int tcol = 0; int32_t toffset = 0; int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); int32_t lsize = tsize; + int32_t keyLen = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { if (tcol >= nColsNotAllNull) break; SDataCol *pDataCol = pDataCols->cols + ncol; SCompCol *pCompCol = pCompData->cols + tcol; - if (pDataCol->colId != pCompCol->colId) continue; - void *tptr = (void *)((char *)pCompData + lsize); - - pCompCol->offset = toffset; + if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; + void *tptr = POINTER_SHIFT(pCompData, lsize); + int32_t flen = 0; // final length int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); if (pCfg->compression) { @@ -695,22 +706,29 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa } } - pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, - tsizeof(pHelper->pBuffer) - lsize, pCfg->compression, - pHelper->compBuffer, tsizeof(pHelper->compBuffer)); + flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, + tsizeof(pHelper->pBuffer) - lsize, pCfg->compression, + pHelper->compBuffer, tsizeof(pHelper->compBuffer)); } else { - pCompCol->len = tlen; - memcpy(tptr, pDataCol->pData, pCompCol->len); + flen = tlen; + memcpy(tptr, pDataCol->pData, flen); } // Add checksum - ASSERT(pCompCol->len > 0); - pCompCol->len += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len); + ASSERT(flen > 0); + flen += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + + if (ncol != 0) { + pCompCol->offset = toffset; + pCompCol->len = flen; + tcol++; + } else { + keyLen = flen; + } - toffset += pCompCol->len; - lsize += pCompCol->len; - tcol++; + toffset += flen; + lsize += flen; } pCompData->delimiter = TSDB_FILE_DELIMITER; @@ -732,14 +750,14 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompBlock->offset = offset; pCompBlock->algorithm = pCfg->compression; pCompBlock->numOfRows = rowsToWrite; - pCompBlock->sversion = pHelper->tableInfo.sversion; - pCompBlock->len = (int32_t)lsize; + pCompBlock->len = lsize; + pCompBlock->keyLen = keyLen; pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pCompBlock->numOfCols = nColsNotAllNull; pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); - tsdbTrace("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 + tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset), (int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, @@ -764,136 +782,6 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { return 0; } -static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { - // TODO: set pHelper->hasOldBlock - int rowsWritten = 0; - SCompBlock compBlock = {0}; - STsdbCfg * pCfg = &pHelper->pRepo->config; - - ASSERT(pDataCols->numOfRows > 0); - TSKEY keyFirst = dataColsKeyFirst(pDataCols); - - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx < pIdx->numOfBlocks); - - // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1); - ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst); - // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); - - if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append - ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks - 1); - int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface - - rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows); - if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && - (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pCfg->minRowsPerFileBlock) && - (pHelper->files.nLastF.fd) < 0) { - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) - goto _err; - if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; - } else { - // Load - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; - ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows); - // Merge - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; - // Write - SFile *pWFile = NULL; - bool isLast = false; - if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) { - pWFile = &(pHelper->files.dataF); - } else { - isLast = true; - pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); - } - if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock, - isLast, true) < 0) - goto _err; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; - } - - ASSERT(pHelper->hasOldLastBlock); - pHelper->hasOldLastBlock = false; - } else { - // Key must overlap with the block - ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast); - - TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1; - - // rows1: number of rows must merge in this block - int rows1 = - tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); - // rows2: max number of rows the block can have more - int rows2 = pCfg->maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows; - // rows3: number of rows between this block and the next block - int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); - - ASSERT(rows3 >= rows1); - - if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && - ((!blockAtIdx(pHelper, blkIdx)->last) || - ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock) && - (pHelper->files.nLastF.fd < 0)))) { - rowsWritten = rows1; - bool isLast = false; - SFile *pFile = NULL; - - if (blockAtIdx(pHelper, blkIdx)->last) { - isLast = true; - pFile = &(pHelper->files.lastF); - } else { - pFile = &(pHelper->files.dataF); - } - - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; - if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; - } else { // Load-Merge-Write - // Load - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; - if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; - - rowsWritten = rows3; - - int iter1 = 0; // iter over pHelper->pDataCols[0] - int iter2 = 0; // iter over pDataCols - int round = 0; - // tdResetDataCols(pHelper->pDataCols[1]); - while (true) { - if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break; - tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows, - pDataCols, &iter2, rowsWritten, pCfg->maxRowsPerFileBlock * 4 / 5); - ASSERT(pHelper->pDataCols[1]->numOfRows > 0); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], - pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) - goto _err; - if (round == 0) { - tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx); - } else { - tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); - } - round++; - blkIdx++; - } - } - } - - return rowsWritten; - -_err: - return -1; -} - -static int compTSKEY(const void *key1, const void *key2) { - if (*(TSKEY *)key1 > *(TSKEY *)key2) { - return 1; - } else if (*(TSKEY *)key1 == *(TSKEY *)key2) { - return 0; - } else { - return -1; - } -} - static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { if (tsizeof((void *)pHelper->pCompInfo) <= esize) { size_t tsize = esize + sizeof(SCompBlock) * 16; @@ -911,7 +799,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ASSERT(pCompBlock->numOfSubBlocks == 1); // Adjust memory if no more room - if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM); + if (pIdx->len == 0) pIdx->len = sizeof(SCompInfo) + sizeof(TSCKSUM); if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err; // Change the offset @@ -925,22 +813,22 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int if (tsize > 0) { ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo)); ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo)); - memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)), - (void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize); + memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)), + POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize); } pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pIdx->numOfBlocks++; pIdx->len += sizeof(SCompBlock); ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo)); - pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; - pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; + pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; + pIdx->hasLast = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; if (pIdx->numOfBlocks > 1) { ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); } - tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, + tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); return 0; @@ -1048,8 +936,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int if (pSCompBlock->numOfSubBlocks > 1) { size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); if (tsize > 0) { - memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset), - (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset), + POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize); } for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { @@ -1062,8 +950,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int *pSCompBlock = *pCompBlock; - pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; - pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; + pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; + pIdx->hasLast = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1071,30 +959,6 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int return 0; } -// Get the number of rows in range [minKey, maxKey] -static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) { - if (pDataCols->numOfRows == 0) return 0; - - ASSERT(minKey <= maxKey); - TSKEY keyFirst = dataColsKeyFirst(pDataCols); - TSKEY keyLast = dataColsKeyLast(pDataCols); - ASSERT(keyFirst <= keyLast); - - if (minKey > keyLast || maxKey < keyFirst) return 0; - - void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY), - compTSKEY, TD_GE); - ASSERT(ptr1 != NULL); - - void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY), - compTSKEY, TD_LE); - ASSERT(ptr2 != NULL); - - if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0; - - return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1; -} - static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); pHelper->files.fid = -1; @@ -1250,7 +1114,8 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl return -1; } - if (lseek(pFile->fd, pCompCol->offset, SEEK_SET) < 0) { + int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset; + if (lseek(pFile->fd, offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -1276,10 +1141,15 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { ASSERT(pCompBlock->numOfSubBlocks <= 1); + ASSERT(colIds[0] == 0); - SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SCompCol compCol = {0}; - if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; + // If only load timestamp column, no need to load SCompData part + if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; + + pDataCols->numOfRows = pCompBlock->numOfRows; int dcol = 0; int ccol = 0; @@ -1298,23 +1168,31 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, ASSERT(pDataCol->colId == colId); - while (ccol < pCompBlock->numOfCols) { - pCompCol = &pHelper->pCompData->cols[ccol]; - if (pCompCol->colId >= colId) break; - ccol++; - } + if (colId == 0) { // load the key row + compCol.colId = colId; + compCol.len = pCompBlock->keyLen; + compCol.type = pDataCol->type; + compCol.offset = TSDB_KEY_COL_OFFSET; + pCompCol = &compCol; + } else { // load non-key rows + while (ccol < pCompBlock->numOfCols) { + pCompCol = &pHelper->pCompData->cols[ccol]; + if (pCompCol->colId >= colId) break; + ccol++; + } - if (ccol >= pCompBlock->numOfCols || pCompCol->colId > colId) { - dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); - dcol++; - continue; - } + if (ccol >= pCompBlock->numOfCols || pCompCol->colId > colId) { + dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); + dcol++; + continue; + } - ASSERT(pCompCol->colId == pDataCol->colId); + ASSERT(pCompCol->colId == pDataCol->colId); + } if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err; dcol++; - ccol++; + if (colId != 0) ccol++; } return 0; @@ -1362,8 +1240,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa pDataCols->numOfRows = pCompBlock->numOfRows; // Recover the data - int ccol = 0; - int dcol = 0; + int ccol = 0; // loop iter for SCompCol object + int dcol = 0; // loop iter for SDataCols object while (dcol < pDataCols->numOfCols) { SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (ccol >= pCompData->numOfCols) { @@ -1373,12 +1251,23 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa continue; } - SCompCol *pCompCol = &(pCompData->cols[ccol]); + int16_t tcolId = 0; + int32_t toffset = TSDB_KEY_COL_OFFSET; + int32_t tlen = pCompBlock->keyLen; + + if (dcol != 0) { + SCompCol *pCompCol = &(pCompData->cols[ccol]); + tcolId = pCompCol->colId; + toffset = pCompCol->offset; + tlen = pCompCol->len; + } else { + ASSERT(pDataCol->colId == tcolId); + } - if (pCompCol->colId == pDataCol->colId) { + if (tcolId == pDataCol->colId) { if (pCompBlock->algorithm == TWO_STAGE_COMP) { int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; - if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) { + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows); } pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize); @@ -1387,16 +1276,16 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa goto _err; } } - if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, - pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints, - pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, - pCompCol->colId, (int64_t)pCompCol->offset); + if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + toffset, tlen, pCompBlock->algorithm, + pCompBlock->numOfRows, pDataCols->maxPoints, pHelper->compBuffer, + tsizeof(pHelper->compBuffer)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d", + REPO_ID(pHelper->pRepo), pFile->fname, tcolId, (int64_t)pCompBlock->offset, toffset); goto _err; } + if (dcol != 0) ccol++; dcol++; - ccol++; - } else if (pCompCol->colId < pDataCol->colId) { + } else if (tcolId < pDataCol->colId) { ccol++; } else { // Set current column as NULL and forward @@ -1442,3 +1331,250 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { return buf; } + +static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SCompBlock compBlock = {0}; + + ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); + if (pIdx->hasLast) { // append to with last block + ASSERT(pIdx->len > 0); + SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); + ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, + pDataCols, NULL, 0); + ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && + pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + } + + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } else { + ASSERT(!pHelper->hasOldLastBlock); + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); + ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + } + + return 0; +} + +static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, + int *blkIdx) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + SCompBlock compBlock = {0}; + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SDataCols *pDataCols0 = pHelper->pDataCols[0]; + + SSkipListIterator slIter = {0}; + + ASSERT(keyFirst <= pIdx->maxKey); + + SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), + pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); + ASSERT(pCompBlock != NULL); + int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); + + if (pCompBlock->last) { + ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); + int16_t colId = 0; + slIter = *(pCommitIter->pIter); + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); + + int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; + int rows2 = + tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); + if (rows2 == 0) { // all data filtered out + *(pCommitIter->pIter) = slIter; + } else { + if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && + !TSDB_NLAST_FILE_OPENED(pHelper)) { + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; + tblkIdx++; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + tdResetDataCols(pDataCols); + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } + + tblkIdx++; + round++; + } + } + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } + } else { + TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); + TSKEY blkKeyFirst = pCompBlock->keyFirst; + TSKEY blkKeyLast = pCompBlock->keyLast; + + if (keyFirst < blkKeyFirst) { + while (true) { + tdResetDataCols(pDataCols); + int rowsRead = + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); + if (rowsRead == 0) break; + + ASSERT(rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + tblkIdx++; + } + } else { + ASSERT(keyFirst <= blkKeyLast); + int16_t colId = 0; + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); + + slIter = *(pCommitIter->pIter); + int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, + pDataCols0->numOfRows); + + if (rows2 == 0) { // all filtered out + *(pCommitIter->pIter) = slIter; + } else { + int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + ASSERT(rows3 >= rows2); + + if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { + int rows = (rows1 >= rows3) ? rows3 : rows2; + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) + return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; + tblkIdx++; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) + return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } + + round++; + tblkIdx++; + } + } + } + } + } + + *blkIdx = tblkIdx; + return 0; +} + +static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows) { + int numOfRows = 0; + TSKEY key1 = INT64_MAX; + TSKEY key2 = INT64_MAX; + STSchema *pSchema = NULL; + + ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); + tdResetDataCols(pTarget); + + while (true) { + key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + SDataRow row = tsdbNextIterRow(pCommitIter->pIter); + key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row); + + if (key1 == INT64_MAX && key2 == INT64_MAX) break; + + if (key1 <= key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } + pTarget->numOfRows++; + (*iter)++; + if (key1 == key2) tSkipListIterNext(pCommitIter->pIter); + } else { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendDataRowToDataCol(row, pSchema, pTarget); + tSkipListIterNext(pCommitIter->pIter); + } + + numOfRows++; + if (numOfRows >= maxRows) break; + ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints); + } + + return numOfRows; +} + +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) { + STsdbCfg *pCfg = &(pHelper->pRepo->config); + SFile * pFile = NULL; + bool isLast = false; + + ASSERT(pDataCols->numOfRows > 0); + + if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + pFile = &(pHelper->files.dataF); + } else { + isLast = true; + pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + } + + ASSERT(pFile->fd > 0); + + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; + + return 0; +} \ No newline at end of file