diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b37d5632dde3838307a43a49abab227a7fd2ebaf..6686789e12d027ffe660ea4656418bd63e8c3d1b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -82,10 +82,12 @@ typedef struct STsdbFSState STsdbFSState; // tsdbUtil.c ============================================================================================== // TSDBROW +#define TSDBROW_TS(ROW) (((ROW)->type == 0) ? (ROW)->pTSRow->ts : (ROW)->pBlockData->aTSKEY[(ROW)->iRow]) +#define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow]) #define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow) +#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)}) #define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}); -#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .pTSRow = (IROW)}); -TSDBKEY tsdbRowKey(TSDBROW *pRow); +#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)}); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); @@ -132,6 +134,8 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataPCmprFn(const void *p1, const void *p2); // SBlockData +#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) +#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); @@ -175,7 +179,8 @@ TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); // tsdbFile.c ============================================================================================== typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT; -void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]); +void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]); +int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype); // SDelFile #define tsdbDelFileCreate() \ ((SDelFile){ \ @@ -195,16 +200,18 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); // tsdbReaderWriter.c ============================================================================================== // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); -int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync); +int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, SBlockIdx *pBlockIdx, SBlock *pBlock); int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); + +SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); -int32_t tsdbDataFReaderClose(SDataFReader *pReader); +int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, @@ -470,11 +477,13 @@ struct SBlockSMA { SColSMA *aColSMA; }; +#pragma pack(push, 1) struct SBlockDataHdr { uint32_t delimiter; int64_t suid; int64_t uid; }; +#pragma pack(pop) struct SHeadFile { int64_t commitID; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 02d659cc1643f6ffac548a4145d88099b7eb4916..64ed6bb92ea8b6c18e4ca81c69f4a54bf404bff5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -16,9 +16,9 @@ #include "tsdb.h" int32_t tsdbOpenCache(STsdb *pTsdb) { - int32_t code = 0; + int32_t code = 0; SLRUCache *pCache = NULL; - size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config + size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config pCache = taosLRUCacheInit(cfgCapacity, -1, .5); if (pCache == NULL) { @@ -44,35 +44,33 @@ void tsdbCloseCache(SLRUCache *pCache) { static void getTableCacheKey(tb_uid_t uid, const char *cacheType, char *key, int *len) { int keyLen = 0; - snprintf(key, 30, "%"PRIi64 "%s", uid, cacheType); + snprintf(key, 30, "%" PRIi64 "%s", uid, cacheType); *len = strlen(key); } -static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { - taosMemoryFree(value); -} +static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); } int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { int32_t code = 0; STSRow *cacheRow = NULL; - char key[32] = {0}; - int keyLen = 0; + char key[32] = {0}; + int keyLen = 0; getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { - cacheRow = (STSRow *) taosLRUCacheValue(pCache, h); + cacheRow = (STSRow *)taosLRUCacheValue(pCache, h); if (row->ts >= cacheRow->ts) { if (row->ts > cacheRow->ts) { - tdRowCpy(cacheRow, row); + tdRowCpy(cacheRow, row); } } } else { cacheRow = tdRowDup(row); _taos_lru_deleter_t deleter = deleteTableCacheLastrow; - LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), - deleter, NULL, TAOS_LRU_PRIORITY_LOW); + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { code = -1; } @@ -87,7 +85,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { SMetaReader mr = {0}; metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); if (metaGetTableEntryByUid(&mr, uid) < 0) { - metaReaderClear(&mr); // table not esist + metaReaderClear(&mr); // table not esist return 0; } @@ -116,9 +114,9 @@ static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow tsdbTbDataIterCreate(pMem, NULL, 1, &iter); if (iter != NULL) { - TSDBROW *row = tsdbTbDataIterGet(iter); + TSDBROW *row = tsdbTbDataIterGet(iter); - tsdbTbDataIterDestroy(iter); + tsdbTbDataIterDestroy(iter); } } } else { @@ -153,7 +151,7 @@ _err: } static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { - int32_t code = 0; + int32_t code = 0; SDelData *pDelData = pTbData ? pTbData->pHead : NULL; for (; pDelData; pDelData = pDelData->pNext) { @@ -163,7 +161,8 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { return code; } -static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) { +static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, + SArray *aDelData) { int32_t code = 0; if (pMem) { @@ -185,7 +184,8 @@ _err: return code; } -static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aSkyline) { +static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, + SArray *aSkyline) { int32_t code = 0; SArray *aDelData = taosArrayInit(32, sizeof(SDelData)); @@ -219,10 +219,8 @@ _err: return code; } -static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, - SArray *pSkyline, - STsdb *pTsdb, - STSRow **pLastRow) { +static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline, + STsdb *pTsdb, STSRow **pLastRow) { int32_t code = 0; TSDBROW *pMemRow = NULL; @@ -247,7 +245,7 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile SBlockData *pBlockData; - tsdbDataFReaderClose(pDataFReader); + tsdbDataFReaderClose(&pDataFReader); _err: return code; @@ -258,10 +256,10 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { tb_uid_t suid = getTableSuidByUid(uid, pTsdb); - STbData *pMem = NULL; - STbData *pIMem = NULL; - STbDataIter iter; // mem buffer skip list iterator - STbDataIter iiter; // imem buffer skip list iterator + STbData *pMem = NULL; + STbData *pIMem = NULL; + STbDataIter iter; // mem buffer skip list iterator + STbDataIter iiter; // imem buffer skip list iterator if (pTsdb->mem) { tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); @@ -280,7 +278,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { *ppRow = NULL; SDelFReader *pDelFReader; - //code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL); + // code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL); if (code) goto _err; SDelIdx delIdx; @@ -297,18 +295,18 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter); do { */ - SDFileSet *pFileSet = NULL; - //pFileSet = tsdbFSIterGet(fsiter); - - code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow); - if (code < 0) { - goto _err; - } + SDFileSet *pFileSet = NULL; + // pFileSet = tsdbFSIterGet(fsiter); - if (*ppRow != NULL) { - //break; - } - /* + code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow); + if (code < 0) { + goto _err; + } + + if (*ppRow != NULL) { + // break; + } + /* } while (fsHasNext = tsdbFSIterNext(fsiter)) */ @@ -323,13 +321,13 @@ _err: int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; + char key[32] = {0}; + int keyLen = 0; getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { - *ppRow = (STSRow *) taosLRUCacheValue(pCache, h); + *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } else { STSRow *pRow = NULL; code = mergeLastRow(uid, pTsdb, &pRow); @@ -344,14 +342,14 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; + char key[32] = {0}; + int keyLen = 0; getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { taosLRUCacheRelease(pCache, h, true); - //void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 853c950862289e0f080f630696821dc5173c65ea..09ef37ab62b37e6862d61080eb642fd0556cb4d9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -23,6 +23,7 @@ typedef struct { int8_t precision; int32_t minRow; int32_t maxRow; + int8_t cmprAlg; // -------------- TSKEY nextKey; // reset by each table commit int32_t commitFid; @@ -39,6 +40,9 @@ typedef struct { SMapData nBlockMap; // SMapData SBlock nBlock; SBlockData nBlockData; + int64_t suid; + int64_t uid; + STSchema *pTSchema; /* commit del */ SDelFReader *pDelFReader; SMapData oDelIdxMap; // SMapData, old @@ -710,50 +714,119 @@ _err: return code; } +static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { + int32_t code = 0; + + if (pCommitter->pTSchema) { + if (pCommitter->suid == suid) { + if (suid == 0) { + if (pCommitter->uid == uid && sver == pCommitter->pTSchema->version) goto _exit; + } else { + if (sver == pCommitter->pTSchema->version) goto _exit; + } + } + } + +_update_schema: + pCommitter->suid = suid; + pCommitter->uid = uid; + tTSchemaDestroy(pCommitter->pTSchema); + pCommitter->pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); + if (pCommitter->pTSchema == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + +_exit: + return code; +} + static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; STbDataIter *pIter = &(STbDataIter){0}; TSDBKEY key = {.ts = pCommitter->minKey, .version = VERSION_MIN}; + TSDBROW row; TSDBROW *pRow; // create iter tsdbTbDataIterOpen(pTbData, &key, 0, pIter); pRow = tsdbTbDataIterGet(pIter); - if (pRow == NULL || tsdbRowKey(pRow).ts > pCommitter->maxKey) goto _exit; + if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) goto _exit; // main loop + SMapData *mBlock = &pCommitter->nBlockMap; SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pTbData->suid, .uid = pTbData->uid}; SBlock *pBlock = &pCommitter->nBlock; SBlockData *pBlockData = &pCommitter->nBlockData; + tMapDataReset(mBlock); tBlockIdxReset(pBlockIdx); tBlockReset(pBlock); tBlockDataReset(pBlockData); - while (pRow != NULL && tsdbRowKey(pRow).ts <= pCommitter->maxKey) { - code = tBlockDataAppendRow(pBlockData, pRow, NULL); + while (pRow != NULL && TSDBROW_TS(pRow) <= pCommitter->maxKey) { + code = tsdbCommitterUpdateSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (code) goto _err; - pBlock->minVersion = TMIN(pBlock->minVersion, tsdbRowKey(pRow).version); - pBlock->maxVersion = TMAX(pBlock->maxVersion, tsdbRowKey(pRow).version); + pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); + pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow)); + pBlock->nRow++; + // next tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - // write the block and do something + ASSERT(0); + // // SBlock + // pBlock->last = 0; + // pBlock->cmprAlg = pCommitter->cmprAlg; + // code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock); + // if (code) goto _err; + + // // SBlockIdx + // pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts); + // pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts); + // pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion); + // pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion); + + // tBlockReset(pBlock); + // tBlockDataReset(pBlockData); } } if (pBlockData->nRow > 0) { - // write the block to file + // SBlock + row = tBlockDataFirstRow(pBlockData); + if (tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&row)) > 0) pBlock->minKey = TSDBROW_KEY(&row); + row = tBlockDataLastRow(pBlockData); + if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row); + pBlock->last = 1; + pBlock->cmprAlg = pCommitter->cmprAlg; + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock); + if (code) goto _err; + + // SBlockIdx + code = tMapDataPutItem(mBlock, pBlock, tPutBlock); + if (code) goto _err; + pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts); + pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts); + pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion); + pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion); } + // write block + code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); + if (code) goto _err; + _exit: - if (pRow) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, tsdbRowKey(pRow).ts); - } + if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); return code; _err: @@ -856,70 +929,6 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } } -#if 0 - while (true) { - if (pTbData == NULL && pBlockIdx == NULL) break; - - if (pTbData && pBlockIdx) { - c = tTABLEIDCmprFn(pTbData, pBlockIdx); - - if (c == 0) { - goto _commit_mem_and_disk_data; - } else if (c < 0) { - goto _commit_mem_data; - } else { - goto _commit_disk_data; - } - } else if (pTbData) { - goto _commit_mem_data; - } else { - goto _commit_disk_data; - } - - _commit_mem_data: - code = tsdbCommitTableData(pCommitter, pTbData, NULL); - if (code) goto _err; - - iTbData++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } - continue; - - _commit_disk_data: - code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx); - if (code) goto _err; - - iBlockIdx++; - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); - } else { - pBlockIdx = NULL; - } - continue; - - _commit_mem_and_disk_data: - code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx); - if (code) goto _err; - - iTbData++; - iBlockIdx++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); - } else { - pBlockIdx = NULL; - } - continue; - } -#endif - return code; _err: @@ -938,12 +947,16 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL); if (code) goto _err; + // upsert SDFileSet + code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter)); + if (code) goto _err; + // close and sync - code = tsdbDataFWriterClose(pCommitter->pWriter, 1); + code = tsdbDataFWriterClose(&pCommitter->pWriter, 1); if (code) goto _err; if (pCommitter->pReader) { - code = tsdbDataFReaderClose(pCommitter->pReader); + code = tsdbDataFReaderClose(&pCommitter->pReader); goto _err; } @@ -995,6 +1008,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->precision = pTsdb->keepCfg.precision; pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; code = tsdbFSBegin(pTsdb->fs); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index a7800c76102d8f7057d775b9d4e75009c253500a..84495520478f614159835239eea661209b5a50c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -45,6 +45,38 @@ void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char } } +int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) { + int32_t n = 0; + + switch (ftype) { + case TSDB_HEAD_FILE: { + SHeadFile *pHeadFile = &pSet->fHead; + n += tPutI64(p + n, pHeadFile->commitID); + n += tPutI64(p + n, pHeadFile->size); + n += tPutI64(p + n, pHeadFile->offset); + } break; + case TSDB_DATA_FILE: { + SDataFile *pDataFile = &pSet->fData; + n += tPutI64(p + n, pDataFile->commitID); + n += tPutI64(p + n, pDataFile->size); + } break; + case TSDB_LAST_FILE: { + SLastFile *pLastFile = &pSet->fLast; + n += tPutI64(p + n, pLastFile->commitID); + n += tPutI64(p + n, pLastFile->size); + } break; + case TSDB_SMA_FILE: { + SSmaFile *pSmaFile = &pSet->fSma; + n += tPutI64(p + n, pSmaFile->commitID); + n += tPutI64(p + n, pSmaFile->size); + } break; + default: + ASSERT(0); + } + + return n; +} + // SHeadFile =============================================== // SDataFile =============================================== diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index b322a9833370a17e9414eec1d31d3551a148a19f..984f1f11a3f085cdfe25523e431094068cf3ec9f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -460,33 +460,35 @@ _err: return code; } -int32_t tsdbDataFReaderClose(SDataFReader *pReader) { +int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { int32_t code = 0; - if (taosCloseFile(&pReader->pHeadFD) < 0) { + if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&pReader->pDataFD) < 0) { + if (taosCloseFile(&(*ppReader)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&pReader->pLastFD) < 0) { + if (taosCloseFile(&(*ppReader)->pLastFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&pReader->pSmaFD) < 0) { + if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + taosMemoryFree(*ppReader); + *ppReader = NULL; return code; _err: - tsdbError("vgId:%d data file reader close failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code)); return code; } @@ -615,6 +617,8 @@ struct SDataFWriter { TdFilePtr pSmaFD; }; +SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter) { return &pWriter->wSet; } + int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { int32_t code = 0; int32_t flag; @@ -751,55 +755,58 @@ _err: return code; } -int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync) { +int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { int32_t code = 0; + STsdb *pTsdb = (*ppWriter)->pTsdb; if (sync) { - if (taosFsyncFile(pWriter->pHeadFD) < 0) { + if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile(pWriter->pDataFD) < 0) { + if (taosFsyncFile((*ppWriter)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile(pWriter->pLastFD) < 0) { + if (taosFsyncFile((*ppWriter)->pLastFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile(pWriter->pSmaFD) < 0) { + if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } } - if (taosCloseFile(&pWriter->pHeadFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&pWriter->pDataFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&pWriter->pLastFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&pWriter->pSmaFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + taosMemoryFree(*ppWriter); + *ppWriter = NULL; return code; _err: - tsdbError("vgId:%d data file writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -821,7 +828,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { // head ============== // build memset(*ppBuf, 0, size); - // tPutHeadFileHdr(*ppBuf, pHeadFile); + tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_HEAD_FILE); taosCalcChecksumAppend(0, *ppBuf, size); // seek @@ -839,7 +846,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { // data ============== memset(*ppBuf, 0, size); - // tPutDataFileHdr(*ppBuf, pDataFile); + tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_DATA_FILE); taosCalcChecksumAppend(0, *ppBuf, size); // seek @@ -857,7 +864,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { // last ============== memset(*ppBuf, 0, size); - // tPutLastFileHdr(*ppBuf, pLastFile); + tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_LAST_FILE); taosCalcChecksumAppend(0, *ppBuf, size); // seek @@ -875,7 +882,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { // sma ============== memset(*ppBuf, 0, size); - // tPutSmaFileHdr(*ppBuf, pSmaFile); + tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_SMA_FILE); taosCalcChecksumAppend(0, *ppBuf, size); // seek @@ -902,12 +909,13 @@ _err: int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) { int32_t code = 0; - int64_t size = 0; + int64_t size; SHeadFile *pHeadFile = &pWriter->wSet.fHead; - int64_t n = 0; + int64_t n; uint8_t *pBuf = NULL; // prepare + size = 0; size += tPutU32(NULL, TSDB_FILE_DLMT); size = size + tPutMapData(NULL, mBlockIdx) + sizeof(TSCKSUM); @@ -917,6 +925,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t ** if (code) goto _err; // build + n = 0; n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); n += tPutMapData(*ppBuf, mBlockIdx); taosCalcChecksumAppend(0, *ppBuf, size); @@ -944,20 +953,17 @@ _err: } int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { - int32_t code = 0; - SHeadFile *pHeadFile = &pWriter->wSet.fHead; - uint8_t *pBuf = NULL; - int64_t size; - int64_t n; + int32_t code = 0; + SHeadFile *pHeadFile = &pWriter->wSet.fHead; + SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + uint8_t *pBuf = NULL; + int64_t size; + int64_t n; ASSERT(mBlock->nItem > 0); // prepare - size = 0; - size += tPutU32(NULL, TSDB_FILE_DLMT); - size += tPutI64(NULL, pBlockIdx->suid); - size += tPutI64(NULL, pBlockIdx->uid); - size = size + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); + size = sizeof(SBlockDataHdr) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); // alloc if (!ppBuf) ppBuf = &pBuf; @@ -966,9 +972,8 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, // build n = 0; - n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); - n += tPutI64(*ppBuf + n, pBlockIdx->suid); - n += tPutI64(*ppBuf + n, pBlockIdx->uid); + *(SBlockDataHdr *)(*ppBuf) = hdr; + n += sizeof(hdr); n += tPutMapData(*ppBuf + n, mBlock); taosCalcChecksumAppend(0, *ppBuf, size); @@ -1001,17 +1006,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ SBlockIdx *pBlockIdx, SBlock *pBlock) { int32_t code = 0; SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; - SBlockCol bCol; + SBlockCol *pBlockCol = &(SBlockCol){0}; int64_t size; int64_t n; - TdFilePtr pFileFD = pWriter->pDataFD; // TODO + TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD; SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; TSCKSUM cksm; uint8_t *p; int64_t offset; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; - pSubBlock->offset = 0; // TODO: set as file offset + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; + if (pBlock->last) { + pSubBlock->offset = pWriter->wSet.fLast.size; + } else { + pSubBlock->offset = pWriter->wSet.fData.size; + } pSubBlock->bsize = 0; // HDR @@ -1100,7 +1113,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ // other columns offset = 0; - tMapDataClear(&pSubBlock->mBlockCol); + tMapDataReset(&pSubBlock->mBlockCol); for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) { SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol); @@ -1108,18 +1121,18 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ if (pColData->flag == HAS_NONE) continue; - bCol.cid = pColData->cid; - bCol.type = pColData->type; - bCol.flag = pColData->flag; + pBlockCol->cid = pColData->cid; + pBlockCol->type = pColData->type; + pBlockCol->flag = pColData->flag; if (pColData->flag != HAS_NULL) { cksm = 0; - bCol.offset = offset; - bCol.size = 0; + pBlockCol->offset = offset; + pBlockCol->size = 0; // bitmap if (pColData->flag != HAS_VALUE) { - // TODO: optimize bitmap part + // optimize bitmap storage (todo) n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -1127,7 +1140,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ } cksm = taosCalcChecksum(cksm, pColData->pBitMap, n); - bCol.size += n; + pBlockCol->size += n; } // data @@ -1138,7 +1151,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TAOS_SYSTEM_ERROR(errno); goto _err; } - bCol.size += n; + pBlockCol->size += n; // checksum cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData); @@ -1147,7 +1160,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TAOS_SYSTEM_ERROR(errno); goto _err; } - bCol.size += n; + pBlockCol->size += n; } else { size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM); @@ -1160,8 +1173,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ } // data - n = tDataTypes->compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, pBlock->cmprAlg, - *ppBuf2, size); + n = tDataTypes[pColData->type].compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, + pBlock->cmprAlg, *ppBuf2, size); if (n <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; @@ -1171,29 +1184,40 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ n += sizeof(TSCKSUM); ASSERT(n <= size); taosCalcChecksumAppend(cksm, *ppBuf1, n); - bCol.size += n; // write - n = taosWriteFile(pFileFD, *ppBuf1, bCol.size); + n = taosWriteFile(pFileFD, *ppBuf1, n); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + + pBlockCol->size += n; } // state - offset += bCol.size; - pSubBlock->bsize += bCol.size; + offset += pBlockCol->size; + pSubBlock->bsize += pBlockCol->size; } - code = tMapDataPutItem(&pSubBlock->mBlockCol, &bCol, tPutBlockCol); + code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol); if (code) goto _err; } + if (pBlock->last) { + pWriter->wSet.fLast.size += pSubBlock->bsize; + } else { + pWriter->wSet.fData.size += pSubBlock->bsize; + } + + tsdbFree(pBuf1); + tsdbFree(pBuf2); return code; _err: tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf1); + tsdbFree(pBuf2); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 158aab3f126504422409d5120893047c28ef0414..d64e4173853886beff1d49c83a482181923c6cdb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -326,6 +326,7 @@ void tBlockReset(SBlock *pBlock) { pBlock->maxVersion = VERSION_MIN; pBlock->nRow = 0; pBlock->last = -1; + pBlock->hasDup = 0; pBlock->cmprAlg = -1; for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { pBlock->aSubBlock[iSubBlock].offset = -1; @@ -566,14 +567,6 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK // } // TSDBROW ====================================================== -TSDBKEY tsdbRowKey(TSDBROW *pRow) { - if (pRow->type == 0) { - return (TSDBKEY){.version = pRow->version, .ts = pRow->pTSRow->ts}; - } else { - return (TSDBKEY){.version = pRow->pBlockData->aVersion[pRow->iRow], .ts = pRow->pBlockData->aTSKEY[pRow->iRow]}; - } -} - void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { STColumn *pTColumn = &pTSchema->columns[iCol]; SValue value; @@ -661,7 +654,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { // SRowMerger ====================================================== int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; - TSDBKEY key = tsdbRowKey(pRow); + TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; STColumn *pTColumn; @@ -702,7 +695,7 @@ void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t code = 0; - TSDBKEY key = tsdbRowKey(pRow); + TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); @@ -1067,15 +1060,14 @@ static SColData *tBlockDataAddBlockCol(SBlockData *pBlockData, int32_t iColData, int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; - TSDBKEY key = tsdbRowKey(pRow); // TSDBKEY code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); if (code) goto _err; code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); if (code) goto _err; - pBlockData->aVersion[pBlockData->nRow] = key.version; - pBlockData->aTSKEY[pBlockData->nRow] = key.ts; + pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); + pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); // OTHER int32_t iColData = 0;