diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 385d1f59ea929f00a7ca1aae56b483859f93aaea..9a4d94c58fc99ef8fc647d188d2d41f2979db212 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -34,15 +34,22 @@ typedef enum { TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; +#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) + extern const char *tsdbFileSuffix[]; typedef struct { int8_t type; + int fd; char fname[128]; int64_t size; // total size of the file int64_t tombSize; // unused file size + int32_t totalBlocks; + int32_t totalSubBlocks; } SFile; +#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1) + typedef struct { int32_t fileId; SFile files[TSDB_FILE_TYPE_MAX]; @@ -50,14 +57,26 @@ typedef struct { // TSDB file handle typedef struct { - int32_t daysPerFile; - int32_t keep; - int32_t minRowPerFBlock; - int32_t maxRowsPerFBlock; - int32_t maxTables; + int maxFGroups; + int numOfFGroups; + SFileGroup fGroup[]; } STsdbFileH; +#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId +#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId + +STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); +void tsdbCloseFileH(STsdbFileH *pFileH); +int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); + +typedef struct { + int32_t len; + int32_t padding; // For padding purpose + int64_t offset; +} SCompIdx; + /** * if numOfSubBlocks == -1, then the SCompBlock is a sub-block * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to @@ -78,14 +97,32 @@ typedef struct { TSKEY keyLast; } SCompBlock; -#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) +typedef struct { + int32_t delimiter; // For recovery usage + int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int64_t uid; + int32_t padding; // For padding purpose + int32_t numOfBlocks; // TODO: make the struct padding + SCompBlock blocks[]; +} SCompInfo; -STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock, int32_t maxTables); +// TODO: take pre-calculation into account +typedef struct { + int16_t colId; // Column ID + int16_t len; // Column length + int32_t type : 8; + int32_t offset : 24; +} SCompCol; + +// TODO: Take recover into account +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + int64_t uid; // For recovery usage + SCompCol cols[]; +} SCompData; -void tsdbCloseFile(STsdbFileH *pFileH); -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); -void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); +void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 98562be0cce5e057049c20d4f586453522908750..f622c38b5ff4acf79e5ec405de0343a5d5197a19 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -27,72 +27,126 @@ #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F -typedef struct { - int32_t len; - int32_t padding; // For padding purpose - int64_t offset; -} SCompIdx; - -typedef struct { - int32_t delimiter; // For recovery usage - int32_t checksum; // TODO: decide if checksum logic in this file or make it one API - int64_t uid; - int32_t padding; // For padding purpose - int32_t numOfBlocks; // TODO: make the struct padding - SCompBlock blocks[]; -} SCompInfo; - -// TODO: take pre-calculation into account -typedef struct { - int16_t colId; // Column ID - int16_t len; // Column length - int32_t type : 8; - int32_t offset : 24; -} SCompCol; - -// TODO: Take recover into account -typedef struct { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - int64_t uid; // For recovery usage - SCompCol cols[]; -} SCompData; - const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA ".last" // TSDB_FILE_TYPE_LAST }; -static int tsdbWriteFileHead(int fd, SFile *pFile) { +static int compFGroupKey(const void *key, const void *fgroup); +static int compFGroup(const void *arg1, const void *arg2); +static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); +static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); +static int tsdbWriteFileHead(SFile *pFile); +static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); + +STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { + STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); + if (pFileH == NULL) { // TODO: deal with ERROR here + return NULL; + } + + pFileH->maxFGroups = maxFiles; + + DIR *dir = opendir(dataDir); + if (dir == NULL) { + free(pFileH); + return NULL; + } + + struct dirent *dp; + while ((dp = readdir(dir)) != NULL) { + if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; + // TODO + } + + return pFileH; +} + +void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } + +int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { + if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; + + SFileGroup fGroup; + SFileGroup *pFGroup = &fGroup; + if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) || + bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) == + NULL) { + pFGroup->fileId = fid; + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { + // TODO: deal with the ERROR here, remove those creaed file + return -1; + } + } + + pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; + qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + } + return 0; +} + +int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { + SFileGroup *pGroup = + bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); + if (pGroup == NULL) return -1; + + // Remove from disk + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + remove(pGroup->files[type].fname); + } + + // Adjust the memory + int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1); + if (filesBehind > 0) { + memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind); + } + pFileH->numOfFGroups--; + + return 0; +} + +static int compFGroupKey(const void *key, const void *fgroup) { + int fid = *(int *)key; + SFileGroup *pFGroup = (SFileGroup *)fgroup; + return (fid - pFGroup->fileId); +} + +static int compFGroup(const void *arg1, const void *arg2) { + return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; +} + +static int tsdbWriteFileHead(SFile *pFile) { char head[TSDB_FILE_HEAD_SIZE] = "\0"; pFile->size += TSDB_FILE_HEAD_SIZE; // TODO: write version and File statistic to the head - lseek(fd, 0, SEEK_SET); - if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; + lseek(pFile->fd, 0, SEEK_SET); + if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; return 0; } -static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) { +static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { int size = sizeof(SCompIdx) * maxTables; void *buf = calloc(1, size); if (buf == NULL) return -1; - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { free(buf); return -1; } - if (write(fd, buf, size) < 0) { + if (write(pFile->fd, buf, size) < 0) { free(buf); return -1; } pFile->size += size; + free(buf); return 0; } @@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) return 0; } -/** - * Create a file and set the SFile object - */ +static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function + if (TSDB_IS_FILE_OPENED(pFile)) return -1; + + pFile->fd = open(pFile->fname, oflag, 0755); + if (pFile->fd < 0) return -1; + + return 0; +} + +static int tsdbCloseFile(SFile *pFile) { + if (!TSDB_IS_FILE_OPENED(pFile)) return -1; + int ret = close(pFile->fd); + pFile->fd = -1; + + return ret; +} + static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { memset((void *)pFile, 0, sizeof(SFile)); pFile->type = type; + pFile->fd = -1; tsdbGetFileName(dataDir, fileId, type, pFile->fname); if (access(pFile->fname, F_OK) == 0) { @@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755); - if (fd < 0) return -1; + if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) { + // TODO: deal with the ERROR here + return -1; + } if (type == TSDB_FILE_TYPE_HEAD) { - if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) { - close(fd); + if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) { + tsdbCloseFile(pFile); return -1; } } - if (tsdbWriteFileHead(fd, pFile) < 0) { - close(fd); + if (tsdbWriteFileHead(pFile) < 0) { + tsdbCloseFile(pFile); return -1; } - close(fd); - - return 0; -} - -static int tsdbRemoveFile(SFile *pFile) { - if (pFile == NULL) return -1; - return remove(pFile->fname); -} - -// Create a file group with fileId and return a SFileGroup object -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) { - if (dataDir == NULL || pFGroup == NULL) return -1; - - memset((void *)pFGroup, 0, sizeof(SFileGroup)); - - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) { - // TODO: deal with the error here, remove the created files - return -1; - } - } - - pFGroup->fileId = fileId; + tsdbCloseFile(pFile); return 0; } -/** - * Initialize the TSDB file handle - */ -STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock, int32_t maxTables) { - STsdbFileH *pTsdbFileH = - (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile)); - if (pTsdbFileH == NULL) return NULL; - - pTsdbFileH->daysPerFile = daysPerFile; - pTsdbFileH->keep = keep; - pTsdbFileH->minRowPerFBlock = minRowsPerFBlock; - pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock; - pTsdbFileH->maxTables = maxTables; - - // Open the directory to read information of each file - DIR *dir = opendir(dataDir); - if (dir == NULL) { - free(pTsdbFileH); - return NULL; - } - - char fname[256]; - - struct dirent *dp; - while ((dp = readdir(dir)) != NULL) { - if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; - if (true /* check if the file is the .head file */) { - int fileId = 0; - int vgId = 0; - sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId); - // TODO - - // Open head file - - // Open data file - - // Open last file - } - } - - return pTsdbFileH; -} - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index afd57947b7fa78c32496b88ae2f9831d60a82ac8..07ea9bd11ab86dca6ed10c431190be2540ee37f6 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO char dataDir[128] = "\0"; tsdbGetDataDirName(pRepo, dataDir); pRepo->tsdbFileH = - tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables); + tsdbInitFileH(dataDir, pCfg->maxTables); if (pRepo->tsdbFileH == NULL) { free(pRepo->rootDir); tsdbFreeCache(pRepo->tsdbCache); @@ -787,7 +787,7 @@ static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { for (int tid = 0; tid < maxTables; tid++) { if (iters[tid] == NULL) continue; - tSkipListDestroy(iters[tid]); + tSkipListDestroyIter(iters[tid]); } free(iters); @@ -836,42 +836,42 @@ static void *tsdbCommitToFile(void *arg) { SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); - int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); - - for (int fid = sfid; fid <= efid; fid++) { - TSKEY minKey = 0, maxKey = 0; - tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - - // tsdbOpenFileForWrite(pRepo, fid); - - for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL || pTable->imem == NULL) continue; - if (iters[tid] == NULL) { // create table iterator - iters[tid] = tSkipListCreateIter(pTable->imem->pData); - // TODO: deal with the error - if (iters[tid] == NULL) break; - if (!tSkipListIterNext(iters[tid])) { - // assert(0); - } - } - - // Init row data part - cols[0] = (SDataCol *)buf; - for (int col = 1; col < schemaNCols(pTable->schema); col++) { - cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); - } - - // Loop the iterator - int rowsRead = 0; - while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > - 0) { - // printf("rowsRead:%d-----------\n", rowsRead); - int k = 0; - } - } - } + // int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); + // int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + + // for (int fid = sfid; fid <= efid; fid++) { + // TSKEY minKey = 0, maxKey = 0; + // tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + // // tsdbOpenFileForWrite(pRepo, fid); + + // for (int tid = 0; tid < pCfg->maxTables; tid++) { + // STable *pTable = pMeta->tables[tid]; + // if (pTable == NULL || pTable->imem == NULL) continue; + // if (iters[tid] == NULL) { // create table iterator + // iters[tid] = tSkipListCreateIter(pTable->imem->pData); + // // TODO: deal with the error + // if (iters[tid] == NULL) break; + // if (!tSkipListIterNext(iters[tid])) { + // // assert(0); + // } + // } + + // // Init row data part + // cols[0] = (SDataCol *)buf; + // for (int col = 1; col < schemaNCols(pTable->schema); col++) { + // cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); + // } + + // // Loop the iterator + // int rowsRead = 0; + // while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > + // 0) { + // // printf("rowsRead:%d-----------\n", rowsRead); + // int k = 0; + // } + // } + // } tsdbDestroyTableIters(iters, pCfg->maxTables);