diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 22563275cd482f3ee666549bca17162a0bd632ad..c761f8520e1f76646f7a66ed34f58e82d908eeb8 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -25,6 +25,8 @@ extern "C" { #endif +#define TSDB_FILE_HEAD_SIZE 512 + #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) @@ -69,6 +71,7 @@ typedef struct { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); +int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbOpenFile(SFile *pFile, int oflag); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); @@ -104,6 +107,9 @@ typedef struct { TSKEY keyLast; } SCompBlock; +#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1) +#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) + typedef struct { int32_t delimiter; // For recovery usage int32_t checksum; // TODO: decide if checksum logic in this file or make it one API @@ -111,8 +117,7 @@ typedef struct { SCompBlock blocks[]; } SCompInfo; -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); +#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) // TODO: take pre-calculation into account typedef struct { @@ -129,6 +134,13 @@ typedef struct { int64_t uid; // For recovery usage SCompCol cols[]; } SCompData; +int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast); + +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); +int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); +int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); +// TODO: need an API to merge all sub-block data into one int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 7d0bdbd84548d97abc819b9d50dd486dda32af47..f22274531da45a7b68195b806cebadcf1d008238 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -24,7 +24,6 @@ #include "tsdbFile.h" -#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F const char *tsdbFileSuffix[] = { @@ -35,8 +34,7 @@ const char *tsdbFileSuffix[] = { static int compFGroupKey(const void *key, const void *fgroup); static int compFGroup(const void *arg1, const void *arg2); -static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); +static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); @@ -71,10 +69,10 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (tsdbSearchFGroup(pFileH, fid) == NULL) { + if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { + if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) { // TODO: deal with the ERROR here, remove those creaed file return -1; } @@ -105,6 +103,10 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast) { + // TODO + return 0; +} int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); @@ -127,6 +129,22 @@ int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { return 0; } +int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { + // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); + + if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; + size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; + if (read(pFile->fd, buf, size) < 0) return -1; + + return 0; +} + +int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { + if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; + if (read(pFile->fd, buf, pCol->len) < 0) return -1; + return 0; +} + static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write SDataCols * pCols, // Data column buffer int numOfPointsToWrie, // Number of points to write to the file @@ -229,10 +247,10 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return 0; } -static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) { - if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1; +static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname) { + if (dataDir == NULL || fname == NULL) return -1; - sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]); + sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); return 0; } @@ -264,12 +282,12 @@ static int tsdbCloseFile(SFile *pFile) { return ret; } -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { +int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) { memset((void *)pFile, 0, sizeof(SFile)); - pFile->type = type; pFile->fd = -1; - tsdbGetFileName(dataDir, fileId, type, pFile->fname); + tsdbGetFileName(dataDir, fileId, suffix, pFile->fname); + if (access(pFile->fname, F_OK) == 0) { // File already exists return -1; @@ -280,7 +298,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - if (type == TSDB_FILE_TYPE_HEAD) { + if (writeHeader) { if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) { tsdbCloseFile(pFile); return -1; @@ -292,7 +310,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - tsdbCloseFile(pFile); + if (toClose) tsdbCloseFile(pFile); return 0; } diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 8e433ecb5bbbb97c9e34775973f9bd728581fb6f..3945ffabdad18e01548a7b7c9ab112b6ba63dff0 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -8,6 +8,7 @@ #include #include #include +#include #include // #include "taosdef.h" @@ -45,6 +46,7 @@ #define TSDB_CFG_FILE_NAME "CONFIG" #define TSDB_DATA_DIR_NAME "data" #define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7 +#define TSDB_MAX_LAST_FILE_SIZE (1024 * 1024 * 10) // 10M enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; @@ -775,7 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max tdAppendDataRowToDataCol(row, pCols); numOfRows++; - if (numOfRows > maxRowsToRead) break; + if (numOfRows >= maxRowsToRead) break; } while (tSkipListIterNext(pIter)); return numOfRows; @@ -842,7 +844,10 @@ static void *tsdbCommitData(void *arg) { int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); for (int fid = sfid; fid <= efid; fid++) { - tsdbCommitToFile(pRepo, fid, iters, pCols); + if (tsdbCommitToFile(pRepo, fid, iters, pCols) < 0) { + // TODO: deal with the error here + // assert(0); + } } tdFreeDataCols(pCols); @@ -867,7 +872,8 @@ static void *tsdbCommitData(void *arg) { } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int flag = 0; + int hasDataToCommit = 0; + int isNewLastFile = 0; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; @@ -887,97 +893,125 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; int isLoadCompBlocks = 0; + char dataDir[128] = "\0"; if (pIter == NULL) continue; tdInitDataCols(pCols, pTable->schema); int numOfWrites = 0; - // while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) { - // break; - // if (!flag) { - // // There are data to commit to this file, we need to create/open it for read/write. - // // At the meantime, we set the flag to prevent further create/open operations - // if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - // // Open files for commit - // pGroup = tsdbOpenFilesForCommit(pFileH, fid); - // if (pGroup == NULL) { - // // TODO: deal with the ERROR here - // } - // // TODO: open .h file and if neccessary, open .l file - // {} - // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - // if (pIndices == NULL) { - // // TODO: deal with the ERROR - // } - // // load the SCompIdx part - // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - - // // TODO: sendfile those not need changed table content - // for (int ttid = 0; ttid < tid; ttid++) { - // // SCompIdx *pIdx = &pIndices[ttid]; - // // if (pIdx->len > 0) { - // // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR); - // // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); - // // } - // } - // flag = 1; - // } - - // SCompIdx *pIdx = &pIndices[tid]; - - // /* The first time to write to the table, need to decide - // * if it is neccessary to load the SComplock part. If it - // * is needed, just load it, or, just use sendfile and - // * append it. - // */ - // if (numOfWrites == 0 && pIdx->offset > 0) { - // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // // TODO: deal with the ERROR here - // } - // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - // } else { - // // TODO: sendfile the prefix part - // } - // } - - // // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { - // // // TODO: deal with the ERROR here - // // } - - // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - - - // // if (1 /* the SCompBlock part is not loaded*/) { - // // // Append to .data file generate a SCompBlock and record it - // // } else { - // // } - - // // // TODO: need to reset the pCols - - // numOfWrites++; - // } - - // if (pCols->numOfPoints > 0) { - // // TODO: still has data to commit, commit it - // } - - // if (1/* SCompBlock part is loaded, write it to .head file*/) { - // // TODO - // } else { - // // TODO: use sendfile send the old part and append the newly added part - // } + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose + // Loop to read columns from cache + while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { + if (!hasDataToCommit) { + // There are data to commit to this fileId, we need to create/open it for read/write. + // At the meantime, we set the flag to prevent further create/open operations + tsdbGetDataDirName(pRepo, dataDir); + + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { + // TODO: deal with the ERROR here + } + // Open files for commit + pGroup = tsdbOpenFilesForCommit(pFileH, fid); + if (pGroup == NULL) { + // TODO: deal with the ERROR here + } + // TODO: open .h file and if neccessary, open .l file + tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); + if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // TODO: make it not to write the last file every time + tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + isNewLastFile = 1; + } + + // load the SCompIdx part + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + if (pIndices == NULL) { // TODO: deal with the ERROR + } + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here + } + + // sendfile those not need to changed table content + lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, + SEEK_SET); + lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + for (int ttid = 0; ttid < tid; ttid++) { + SCompIdx * tIdx= &pIndices[ttid]; + if (tIdx->len <= 0) continue; + if (isNewLastFile && tIdx->hasLast) { + // TODO: Need to load the SCompBlock part and copy to new last file + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); + if (pCompInfo == NULL) { /* TODO */} + if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} + SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); + int numOfSubBlocks = pLastBlock->numOfSubBlocks; + assert(pLastBlock->last); + if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} + { + if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); + tIdx->checksum = 0; + } + write(tFile.fd, (void *)pCompInfo, tIdx->len); + tFile.size += tIdx->len; + } else { + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); + tFile.size += tIdx->len; + } + } + + hasDataToCommit = 1; + } + + SCompIdx *pIdx = &pIndices[tid]; + + /* The first time to write to the table, need to decide + * if it is neccessary to load the SComplock part. If it + * is needed, just load it, or, just use sendfile and + * append it. + */ + if (numOfWrites == 0 && pIdx->offset > 0) { + if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { + // TODO: deal with the ERROR here + } + if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; + } else { + // TODO: sendfile the prefix part + } + } + + // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { + // // TODO: deal with the ERROR here + // } + + // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); + + + // if (1 /* the SCompBlock part is not loaded*/) { + // // Append to .data file generate a SCompBlock and record it + // } else { + // } + + // // TODO: need to reset the pCols + + numOfWrites++; + } + + if (pCols->numOfPoints > 0) { + // TODO: still has data to commit, commit it + } + + if (1/* SCompBlock part is loaded, write it to .head file*/) { + // TODO + } else { + // TODO: use sendfile send the old part and append the newly added part + } } // Write the SCompIdx part // Close all files and return - if (flag) { + if (hasDataToCommit) { // TODO }