diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 52b2d1e15680073d82220f12b99eef0dbfa53b5a..4e8afd4f0eec26b42842fd4b4ddfcff05d876a13 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -119,19 +119,22 @@ typedef struct { int maxPoints; // max number of points int numOfPoints; int numOfCols; // Total number of cols + int sversion; // TODO: set sversion void * buf; SDataCol cols[]; } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0] -#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1] +#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)] +#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) +#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1) SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 8f6a40805fb3117c776a5e79d2438b31bd0c777b..0496fc6feb313597b343f10e366e838a46f1697f 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -353,6 +353,21 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { } pCols->numOfPoints++; } +// Pop pointsToPop points from the SDataCols +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { + int pointsLeft = pCols->numOfPoints - pointsToPop; + + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SDataCol *p_col = pCols->cols + iCol; + if (p_col->len > 0) { + p_col->len = TYPE_BYTES[p_col->type] * pointsLeft; + if (pointsLeft > 0) { + memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len); + } + } + } + pCols->numOfPoints = pointsLeft; +} /** * Return the first part length of a data row for a schema diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index badcb7802f510b2978abace6b21a1098e1cdc44d..aaedc7672677dcd2c27d961412955c0ad519d326 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -138,11 +138,10 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { // Move all node elements from src to dst, the dst is assumed as an empty list void tdListMove(SList *src, SList *dst) { // assert(dst->eleSize == src->eleSize); - dst->numOfEles = src->numOfEles; - dst->head = src->head; - dst->tail = src->tail; - src->numOfEles = 0; - src->head = src->tail = NULL; + SListNode *node = NULL; + while ((node = tdListPopHead(src)) != NULL) { + tdListAppendNode(dst, node); + } } void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 22563275cd482f3ee666549bca17162a0bd632ad..8c106d1067f92d9eabcb3dbf8eb89081a93f015a 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -25,6 +25,9 @@ extern "C" { #endif +#define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F + #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) @@ -40,13 +43,16 @@ typedef enum { extern const char *tsdbFileSuffix[]; typedef struct { - int8_t type; - int fd; - char fname[128]; int64_t size; // total size of the file int64_t tombSize; // unused file size int32_t totalBlocks; int32_t totalSubBlocks; +} SFileInfo; + +typedef struct { + int fd; + char fname[128]; + SFileInfo info; } SFile; #define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1) @@ -69,9 +75,10 @@ typedef struct { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); +int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbOpenFile(SFile *pFile, int oflag); -SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); +int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); typedef struct { @@ -104,6 +111,9 @@ typedef struct { TSKEY keyLast; } SCompBlock; +#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1) +#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) + typedef struct { int32_t delimiter; // For recovery usage int32_t checksum; // TODO: decide if checksum logic in this file or make it one API @@ -111,8 +121,16 @@ typedef struct { SCompBlock blocks[]; } SCompInfo; -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); +#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) +#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\ +do {\ + if (pCompBlock->numOfSubBlocks > 1) {\ + pCompBlock = pCompInfo->blocks + pCompBlock->offset;\ + size = pCompBlock->numOfSubBlocks;\ + } else {\ + size = 1;\ + }\ +} while (0) // TODO: take pre-calculation into account typedef struct { @@ -130,7 +148,17 @@ typedef struct { SCompCol cols[]; } SCompData; -int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols); + +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); +int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); +int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); +int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); + +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); + +// TODO: need an API to merge all sub-block data into one void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 7d0bdbd84548d97abc819b9d50dd486dda32af47..5240a99a370727a773422877c6e88a8984ea42dc 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -24,9 +24,6 @@ #include "tsdbFile.h" -#define TSDB_FILE_HEAD_SIZE 512 -#define TSDB_FILE_DELIMITER 0xF00AFA0F - const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA @@ -35,11 +32,9 @@ const char *tsdbFileSuffix[] = { static int compFGroupKey(const void *key, const void *fgroup); static int compFGroup(const void *arg1, const void *arg2); -static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); +static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); -static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -71,10 +66,10 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (tsdbSearchFGroup(pFileH, fid) == NULL) { + if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { + if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) { // TODO: deal with the ERROR here, remove those creaed file return -1; } @@ -106,6 +101,61 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { + SCompBlock *pBlock = pStartBlock; + for (int i = 0; i < numOfBlocks; i++) { + if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; + for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { + SCompCol *pCompCol = &(pCompData->cols[iCol]); + pCols->numOfPoints += pBlock->numOfPoints; + int k = 0; + for (; k < pCols->numOfCols; k++) { + if (pCompCol->colId == pCols->cols[k].colId) break; + } + + if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, + (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) + return -1; + } + pStartBlock++; + } + return 0; +} + +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) { + SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx); + SCompBlock *pStartBlock = NULL; + SCompBlock *pBlock = NULL; + int numOfBlocks = pSuperBlock->numOfSubBlocks; + + if (numOfBlocks == 1) + pStartBlock = pSuperBlock; + else + pStartBlock = TSDB_COMPBLOCK_AT(pCompInfo, pSuperBlock->offset); + + int maxNumOfCols = 0; + pBlock = pStartBlock; + for (int i = 0; i < numOfBlocks; i++) { + if (pBlock->numOfCols > maxNumOfCols) maxNumOfCols = pBlock->numOfCols; + pBlock++; + } + + SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * maxNumOfCols); + if (pCompData == NULL) return -1; + + // Load data from the block + if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData)); + + // Write data block to the file + { + // TODO + } + + + if (pCompData) free(pCompData); + return 0; +} + int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; @@ -127,62 +177,19 @@ int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { return 0; } -static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write - SDataCols * pCols, // Data column buffer - int numOfPointsToWrie, // Number of points to write to the file - SCompBlock *pBlock // SCompBlock to hold block information to return - ) { - // pBlock->last = 0; - // pBlock->offset = lseek(pFile->fd, 0, SEEK_END); - // // pBlock->algorithm = ; - // pBlock->numOfPoints = pCols->numOfPoints; - // // pBlock->sversion = ; - // // pBlock->len = ; - // pBlock->numOfSubBlocks = 1; - // pBlock->keyFirst = dataColsKeyFirst(pCols); - // pBlock->keyLast = dataColsKeyLast(pCols); - // for (int i = 0; i < pCols->numOfCols; i++) { - // // TODO: if all col value is NULL, do not save it - // pBlock->numOfCols++; - // pCompData->numOfCols++; - // SCompCol *pCompCol = pCompData->cols + i; - // pCompCol->colId = pCols->cols[i].colId; - // pCompCol->type = pCols->cols[i].type; - - // // pCompCol->len = ; - // // pCompCol->offset = ; - // } +int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { + // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); + + if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; + size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; + if (read(pFile->fd, buf, size) < 0) return -1; return 0; } -int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) { - memset((void *)pBlock, 0, sizeof(SCompBlock)); - SFile *pFile = NULL; - SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols); - if (pCompData == NULL) return -1; - pCompData->delimiter = TSDB_FILE_DELIMITER; - // pCompData->uid = ; - - if (isMerge) { - TSKEY keyFirst = dataColsKeyFirst(pCols); - // 1. Binary search the block the data can merged into - - if (1/* the data should only merged into last file */) { - } else { - } - } else { - // Write directly to the file without merge - if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) { - // TODO: write the data to the last file - } else { - // TODO: wirte the data to the data file - } - } - - // TODO: need to update pIdx - - if (pCompData) free(pCompData); +int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { + if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; + if (read(pFile->fd, buf, pCol->len) < 0) return -1; return 0; } @@ -199,7 +206,7 @@ static int compFGroup(const void *arg1, const void *arg2) { static int tsdbWriteFileHead(SFile *pFile) { char head[TSDB_FILE_HEAD_SIZE] = "\0"; - pFile->size += TSDB_FILE_HEAD_SIZE; + pFile->info.size += TSDB_FILE_HEAD_SIZE; // TODO: write version and File statistic to the head lseek(pFile->fd, 0, SEEK_SET); @@ -223,16 +230,16 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return -1; } - pFile->size += size; + pFile->info.size += size; free(buf); return 0; } -static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) { - if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1; +static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname) { + if (dataDir == NULL || fname == NULL) return -1; - sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]); + sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); return 0; } @@ -246,6 +253,12 @@ int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function return 0; } +int tsdbCloseFile(SFile *pFile) { + int ret = close(pFile->fd); + pFile->fd = -1; + return ret; +} + SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); if (pGroup == NULL) return NULL; @@ -256,20 +269,12 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { return pGroup; } -static int tsdbCloseFile(SFile *pFile) { - if (!TSDB_IS_FILE_OPENED(pFile)) return -1; - int ret = close(pFile->fd); - pFile->fd = -1; - - return ret; -} - -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { +int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) { memset((void *)pFile, 0, sizeof(SFile)); - pFile->type = type; pFile->fd = -1; - tsdbGetFileName(dataDir, fileId, type, pFile->fname); + tsdbGetFileName(dataDir, fileId, suffix, pFile->fname); + if (access(pFile->fname, F_OK) == 0) { // File already exists return -1; @@ -280,7 +285,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - if (type == TSDB_FILE_TYPE_HEAD) { + if (writeHeader) { if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) { tsdbCloseFile(pFile); return -1; @@ -292,7 +297,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - tsdbCloseFile(pFile); + if (toClose) tsdbCloseFile(pFile); return 0; } @@ -303,7 +308,7 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } -static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) return NULL; void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 8e433ecb5bbbb97c9e34775973f9bd728581fb6f..769fc238153987b4cf14e09ca2692d260fdf9b3a 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -8,6 +8,7 @@ #include #include #include +#include #include // #include "taosdef.h" @@ -45,6 +46,7 @@ #define TSDB_CFG_FILE_NAME "CONFIG" #define TSDB_DATA_DIR_NAME "data" #define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7 +#define TSDB_MAX_LAST_FILE_SIZE (1024 * 1024 * 10) // 10M enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; @@ -86,6 +88,10 @@ static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); +static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); +static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); +static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, + int64_t uid); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -326,10 +332,13 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->mem = NULL; pRepo->tsdbCache->curBlock = NULL; + tsdbUnLockRepo(repo); // TODO: here should set as detached or use join for memory leak - pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo); - tsdbUnLockRepo(repo); + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo); return 0; } @@ -775,7 +784,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max tdAppendDataRowToDataCol(row, pCols); numOfRows++; - if (numOfRows > maxRowsToRead) break; + if (numOfRows >= maxRowsToRead) break; } while (tSkipListIterNext(pIter)); return numOfRows; @@ -807,7 +816,9 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) } if (!tSkipListIterNext(iters[tid])) { - assert(false); + // No data in this iterator + tSkipListDestroyIter(iters[tid]); + iters[tid] = NULL; } } @@ -832,8 +843,8 @@ static void *tsdbCommitData(void *arg) { } // Create a data column buffer for commit - SDataCols *pCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); - if (pCols == NULL) { + SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); + if (pDataCols == NULL) { // TODO: deal with the error return NULL; } @@ -842,13 +853,15 @@ static void *tsdbCommitData(void *arg) { int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); for (int fid = sfid; fid <= efid; fid++) { - tsdbCommitToFile(pRepo, fid, iters, pCols); + if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) { + // TODO: deal with the error here + // assert(0); + } } - tdFreeDataCols(pCols); + tdFreeDataCols(pDataCols); tsdbDestroyTableIters(iters, pCfg->maxTables); - tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); free(pCache->imem); @@ -867,12 +880,12 @@ static void *tsdbCommitData(void *arg) { } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int flag = 0; + int isNewLastFile = 0; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; - SFile tFile, lFile; + SFile hFile, lFile; SFileGroup *pGroup = NULL; SCompIdx * pIndices = NULL; SCompInfo * pCompInfo = NULL; @@ -883,106 +896,305 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + // Check if there are data to commit to this file + int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); + if (!hasDataToCommit) return 0; // No data to commit, just return + + // Create and open files for commit + tsdbGetDataDirName(pRepo, dataDir); + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ + } + pGroup = tsdbOpenFilesForCommit(pFileH, fid); + if (pGroup == NULL) { /* TODO */ + } + tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); + if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // TODO: make it not to write the last file every time + tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + isNewLastFile = 1; + } + + // Load the SCompIdx + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + if (pIndices == NULL) { /* TODO*/ + } + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ + } + + lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + + // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; - int isLoadCompBlocks = 0; + SCompIdx * pIdx = &pIndices[tid]; + + if (pTable == NULL || pIter == NULL) continue; + + /* If no new data to write for this table, just write the old data to new file + * if there are. + */ + if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { + // has old data + if (pIdx->offset > 0) { + if (isNewLastFile && pIdx->hasLast) { + // need to move the last block to new file + if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ + } + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ + } + + tdInitDataCols(pCols, pTable->schema); + + SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); + int nBlocks = 0; + + TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); + + SCompBlock tBlock; + int64_t toffset, tlen; + tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); + + tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, tlen, pTable->tableId.uid); + pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); + pTBlock->offset = toffset; + pTBlock->len = tlen; + pTBlock->numOfPoints = pCols->numOfPoints; + pTBlock->numOfSubBlocks = 1; + + pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); + if (nBlocks > 1) { + pIdx->len -= (sizeof(SCompBlock) * nBlocks); + } + write(hFile.fd, (void *)pCompInfo, pIdx->len); + } else { + pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); + hFile.info.size += pIdx->len; + } + } + continue; + } + + // Load SCompBlock part if neccessary + int isCompBlockLoaded = 0; + if (pIdx->offset > 0) { + if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { + // has last block || cache key overlap with commit key + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ + } + if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; + } else { + // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part + // and write those new blocks to it + } + } - if (pIter == NULL) continue; tdInitDataCols(pCols, pTable->schema); - int numOfWrites = 0; - // while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) { - // break; - // if (!flag) { - // // There are data to commit to this file, we need to create/open it for read/write. - // // At the meantime, we set the flag to prevent further create/open operations - // if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - // // Open files for commit - // pGroup = tsdbOpenFilesForCommit(pFileH, fid); - // if (pGroup == NULL) { - // // TODO: deal with the ERROR here - // } - // // TODO: open .h file and if neccessary, open .l file - // {} - // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - // if (pIndices == NULL) { - // // TODO: deal with the ERROR - // } - // // load the SCompIdx part - // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - - // // TODO: sendfile those not need changed table content - // for (int ttid = 0; ttid < tid; ttid++) { - // // SCompIdx *pIdx = &pIndices[ttid]; - // // if (pIdx->len > 0) { - // // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR); - // // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); - // // } - // } - // flag = 1; - // } - - // SCompIdx *pIdx = &pIndices[tid]; - - // /* The first time to write to the table, need to decide - // * if it is neccessary to load the SComplock part. If it - // * is needed, just load it, or, just use sendfile and - // * append it. - // */ - // if (numOfWrites == 0 && pIdx->offset > 0) { - // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // // TODO: deal with the ERROR here - // } - // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - // } else { - // // TODO: sendfile the prefix part - // } - // } - - // // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { - // // // TODO: deal with the ERROR here - // // } - - // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - - - // // if (1 /* the SCompBlock part is not loaded*/) { - // // // Append to .data file generate a SCompBlock and record it - // // } else { - // // } - - // // // TODO: need to reset the pCols - - // numOfWrites++; - // } - - // if (pCols->numOfPoints > 0) { - // // TODO: still has data to commit, commit it - // } - - // if (1/* SCompBlock part is loaded, write it to .head file*/) { - // // TODO - // } else { - // // TODO: use sendfile send the old part and append the newly added part - // } + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; + while (1) { + tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); + if (pCols->numOfPoints == 0) break; + + int pointsWritten = 0; + // { // TODO : try to write the block data to file + // if (!isCompBlockLoaded) { // Just append + // if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file + // lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END); + + // } else { + // if (isNewLastFile) { // write directly to .l file + + // } else { // write directly to .last file + + // } + // } + // } else { // Need to append + // // SCompBlock *pTBlock = NULL; + // } + // } + // pointsWritten = pCols->numOfPoints; + tdPopDataColsPoints(pCols, pointsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; + } + + // Write the SCompBlock part + if (isCompBlockLoaded) { + // merge the block into old and update pIdx + } else { + // sendfile the SCompBlock part and update the pIdx + } } // Write the SCompIdx part + if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */} + if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */} - // Close all files and return - if (flag) { - // TODO + // close the files + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbCloseFile(&pGroup->files[type]); + } + tsdbCloseFile(&hFile); + if (isNewLastFile) tsdbCloseFile(&lFile); + // TODO: replace the .head and .last file + rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname); + pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info; + if (isNewLastFile) { + rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname); + pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info; } if (pIndices) free(pIndices); if (pCompInfo) free(pCompInfo); return 0; +} + +static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) { + if (pIter == NULL) return 0; + + SSkipListNode *node = tSkipListIterGet(pIter); + if (node == NULL) return 0; + + SDataRow row = SL_GET_NODE_DATA(node); + if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1; + + return 0; +} + +static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) { + for (int i = 0; i < nIters; i++) { + SSkipListIterator *pIter = iters[i]; + if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; + } + return 0; +} + +static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { + size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; + SCompData *pCompData = (SCompData *)malloc(size); + if (pCompData == NULL) return -1; + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = uid; + pCompData->numOfCols = pCols->numOfCols; + + *offset = lseek(pFile->fd, 0, SEEK_END); + *len = size; + + int toffset = size; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SCompCol *pCompCol = pCompData->cols + iCol; + SDataCol *pDataCol = pCols->cols + iCol; + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + pCompCol->offset = toffset; + + // TODO: add compression + pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; + toffset += pCompCol->len; + } + + // Write the block + if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; + *len += size; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SDataCol *pDataCol = pCols->cols + iCol; + SCompCol *pCompCol = pCompData->cols + iCol; + if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; + *len += pCompCol->len; + } + + if (pCompData == NULL) free((void *)pCompData); + return 0; + +_err: + if (pCompData == NULL) free((void *)pCompData); + return -1; +} + +static int compareKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SCompBlock *pBlock = (SCompBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } + + return 0; +} + +static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { + STsdbCfg * pCfg = &(pRepo->config); + SCompData *pCompData = NULL; + SFile * pFile = NULL; + int numOfPointsToWrite = 0; + int64_t offset = 0; + int32_t len = 0; + + memset((void *)pCompBlock, 0, sizeof(SCompBlock)); + + if (pCompInfo == NULL) { + // Just append the data block to .data or .l or .last file + numOfPointsToWrite = pCols->numOfPoints; + if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file + pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); + } else { // Write to .last or .l file + pCompBlock->last = 1; + if (lFile) { + pFile = lFile; + } else { + pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); + } + } + tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); + pCompBlock->offset = offset; + pCompBlock->len = len; + pCompBlock->algorithm = 2; // TODO : add to configuration + pCompBlock->sversion = pCols->sversion; + pCompBlock->numOfPoints = pCols->numOfPoints; + pCompBlock->numOfSubBlocks = 1; + pCompBlock->numOfCols = pCols->numOfCols; + pCompBlock->keyFirst = dataColsKeyFirst(pCols); + pCompBlock->keyLast = dataColsKeyLast(pCols); + } else { + // Need to merge the block to either the last block or the other block + TSKEY keyFirst = dataColsKeyFirst(pCols); + SCompBlock *pMergeBlock = NULL; + + // Search the block to merge in + void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, + compareKeyBlock, TD_GE); + if (ptr == NULL) { + // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block + // and merge the data + pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); + } else { + pMergeBlock = (SCompBlock *)ptr; + } + + if (pMergeBlock->last) { + if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { + // Need to load the data from .last and combine data in pCols to write to .data file + + } else { // Just append the block to .last or .l file + if (lFile) { + // read the block from .last file and merge with pCols, write to .l file + + } else { + // tsdbWriteBlockToFileImpl(); + } + } + } else { // The block need to merge in .data file + + } + + } + + return numOfPointsToWrite; } \ No newline at end of file