From cc543aaf50cb312c943add51357746cb8c99e549 Mon Sep 17 00:00:00 2001 From: hzcheng <hzcheng@taosdata.com> Date: Thu, 26 Mar 2020 23:26:48 +0800 Subject: [PATCH] TD-34 --- src/common/inc/dataformat.h | 17 +++--- src/common/src/dataformat.c | 1 + src/util/inc/tutil.h | 8 +++ src/util/src/tutil.c | 69 +++++++++++++++++++++ src/vnode/tsdb/inc/tsdbFile.h | 30 ++++++--- src/vnode/tsdb/src/tsdbFile.c | 107 ++++++++++++++++++++++++++++++-- src/vnode/tsdb/src/tsdbMain.c | 112 ++++++++++++++++++++++++++++++++-- 7 files changed, 315 insertions(+), 29 deletions(-) diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index e123efd11e..52b2d1e156 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -105,11 +105,12 @@ SDataRow tdDataRowDup(SDataRow row); // ----------------- Data column structure typedef struct SDataCol { - int8_t type; - int bytes; - int len; - int offset; - void * pData; + int8_t type; + int16_t colId; + int bytes; + int len; + int offset; + void * pData; } SDataCol; typedef struct { @@ -122,9 +123,9 @@ typedef struct { SDataCol cols[]; } SDataCols; -#define keyCol(cols) (&((cols)->cols[0])) // Key column -#define dataColsKeyFirst(cols) ((int64_t *)(keyCol(cols)->pData))[0] -#define dataColsKeyLast(cols) ((int64_t *)(keyCol(cols)->pData))[(cols)->numOfPoints - 1] +#define keyCol(pCols) (&((pCols)->cols[0])) // Key column +#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0] +#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1] SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index da55663d0b..8f6a40805f 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -324,6 +324,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { pCols->cols[i].type = colType(schemaColAt(pSchema, i)); pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)); + pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); } return pCols; diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index df97dde5ac..b80aad1ceb 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -176,6 +176,14 @@ uint32_t ip2uint(const char *const ip_addr); void taosSetAllocMode(int mode, const char* path, bool autoDump); void taosDumpMemoryLeak(); +#define TD_EQ 0x1 +#define TD_GT 0x2 +#define TD_LT 0x4 +#define TD_GE (TD_EQ | TD_GT) +#define TD_LE (TD_EQ | TD_LT) +void *taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, + int (*compar)(const void *, const void *), int flags); + #ifdef TAOS_MEM_CHECK void * taos_malloc(size_t size, const char *file, uint32_t line); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 9c384b25ba..cbd08954cc 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -617,3 +617,72 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } + +#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx)) +void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) { + // TODO: need to check the correctness of this function + int l = 0; + int r = nmemb; + int idx = 0; + int comparison; + + if (flags == TD_EQ) { + return bsearch(key, base, nmemb, size, compar); + } else if (flags == TD_GE) { + if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0); + if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL; + + while (l < r) { + idx = (l + r) / 2; + comparison = (*compar)(key, elePtrAt(base, size, idx)); + if (comparison < 0) { + r = idx; + } else if (comparison > 0) { + l = idx + 1; + } else { + return elePtrAt(base, size, idx); + } + } + + if ((*compar)(key, elePtrAt(base, size, idx) < 0)) { + return elePtrAt(base, size, idx); + } else { + if (idx + 1 > nmemb - 1) { + return NULL; + } else { + return elePtrAt(base, size, idx + 1); + } + } + } else if (flags == TD_LE) { + if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1); + if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL; + + while (l < r) { + idx = (l + r) / 2; + comparison = (*compar)(key, elePtrAt(base, size, idx)); + if (comparison < 0) { + r = idx; + } else if (comparison > 0) { + l = idx + 1; + } else { + return elePtrAt(base, size, idx); + } + } + + if ((*compar)(key, elePtrAt(base, size, idx)) > 0) { + return elePtrAt(base, size, idx); + } else { + if (idx == 0) { + return NULL; + } else { + return elePtrAt(base, size, idx - 1); + } + } + + } else { + assert(0); + return NULL; + } + + return NULL; +} diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 9a4d94c58f..22563275cd 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -17,6 +17,7 @@ #include <stdint.h> +#include "dataformat.h" #include "taosdef.h" #include "tglobalcfg.h" @@ -69,20 +70,26 @@ typedef struct { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +int tsdbOpenFile(SFile *pFile, int oflag); +SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); typedef struct { int32_t len; - int32_t padding; // For padding purpose - int64_t offset; -} SCompIdx; + int32_t offset; + int32_t hasLast : 1; + int32_t numOfSuperBlocks : 31; + int32_t checksum; + TSKEY maxKey; +} SCompIdx; /* sizeof(SCompIdx) = 24 */ /** - * if numOfSubBlocks == -1, then the SCompBlock is a sub-block - * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to - * the data block offset and length - * if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the - * binary + * if numOfSubBlocks == 0, then the SCompBlock is a sub-block + * if numOfSubBlocks >= 1, then the SCompBlock is a super-block + * - if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to + * the data block offset and length + * - if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the + * binary */ typedef struct { int64_t last : 1; // If the block in data file or last file @@ -101,11 +108,12 @@ typedef struct { int32_t delimiter; // For recovery usage int32_t checksum; // TODO: decide if checksum logic in this file or make it one API int64_t uid; - int32_t padding; // For padding purpose - int32_t numOfBlocks; // TODO: make the struct padding SCompBlock blocks[]; } SCompInfo; +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); + // TODO: take pre-calculation into account typedef struct { int16_t colId; // Column ID @@ -122,6 +130,8 @@ typedef struct { SCompCol cols[]; } SCompData; +int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); + void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index f622c38b5f..7d0bdbd845 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -39,6 +39,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); +static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -70,9 +71,7 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) || - bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) == - NULL) { + if (tsdbSearchFGroup(pFileH, fid) == NULL) { pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { @@ -107,6 +106,86 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { + SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); + if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + + if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; + // TODO: need to check the correctness + return 0; +} + +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { + SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); + + if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; + + if (read(pFile->fd, buf, pIdx->len) < 0) return -1; + + // TODO: need to check the correctness + + return 0; +} + +static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write + SDataCols * pCols, // Data column buffer + int numOfPointsToWrie, // Number of points to write to the file + SCompBlock *pBlock // SCompBlock to hold block information to return + ) { + // pBlock->last = 0; + // pBlock->offset = lseek(pFile->fd, 0, SEEK_END); + // // pBlock->algorithm = ; + // pBlock->numOfPoints = pCols->numOfPoints; + // // pBlock->sversion = ; + // // pBlock->len = ; + // pBlock->numOfSubBlocks = 1; + // pBlock->keyFirst = dataColsKeyFirst(pCols); + // pBlock->keyLast = dataColsKeyLast(pCols); + // for (int i = 0; i < pCols->numOfCols; i++) { + // // TODO: if all col value is NULL, do not save it + // pBlock->numOfCols++; + // pCompData->numOfCols++; + // SCompCol *pCompCol = pCompData->cols + i; + // pCompCol->colId = pCols->cols[i].colId; + // pCompCol->type = pCols->cols[i].type; + + // // pCompCol->len = ; + // // pCompCol->offset = ; + // } + + return 0; +} + +int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) { + memset((void *)pBlock, 0, sizeof(SCompBlock)); + SFile *pFile = NULL; + SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols); + if (pCompData == NULL) return -1; + pCompData->delimiter = TSDB_FILE_DELIMITER; + // pCompData->uid = ; + + if (isMerge) { + TSKEY keyFirst = dataColsKeyFirst(pCols); + // 1. Binary search the block the data can merged into + + if (1/* the data should only merged into last file */) { + } else { + } + } else { + // Write directly to the file without merge + if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) { + // TODO: write the data to the last file + } else { + // TODO: wirte the data to the data file + } + } + + // TODO: need to update pIdx + + if (pCompData) free(pCompData); + return 0; +} + static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; SFileGroup *pFGroup = (SFileGroup *)fgroup; @@ -158,7 +237,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) return 0; } -static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function +int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function if (TSDB_IS_FILE_OPENED(pFile)) return -1; pFile->fd = open(pFile->fname, oflag, 0755); @@ -167,6 +246,16 @@ static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the f return 0; } +SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { + SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); + if (pGroup == NULL) return NULL; + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbOpenFile(&(pGroup->files[type]), O_RDWR); + } + return pGroup; +} + static int tsdbCloseFile(SFile *pFile) { if (!TSDB_IS_FILE_OPENED(pFile)) return -1; int ret = close(pFile->fd); @@ -186,7 +275,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) { + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) { // TODO: deal with the ERROR here return -1; } @@ -212,4 +301,12 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; +} + +static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { + if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) + return NULL; + void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); + if (ptr == NULL) return NULL; + return (SFileGroup *)ptr; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index a8e04e216a..8e433ecb5b 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -764,6 +764,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { int numOfRows = 0; + do { SSkipListNode *node = tSkipListIterGet(pIter); if (node == NULL) break; @@ -776,6 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max numOfRows++; if (numOfRows > maxRowsToRead) break; } while (tSkipListIterNext(pIter)); + return numOfRows; } @@ -865,24 +867,122 @@ static void *tsdbCommitData(void *arg) { } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { + int flag = 0; + STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; + SFile tFile, lFile; + SFileGroup *pGroup = NULL; + SCompIdx * pIndices = NULL; + SCompInfo * pCompInfo = NULL; + size_t compInfoSize = 0; + SCompBlock compBlock; + SCompBlock *pBlock = &compBlock; + TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; + STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; + int isLoadCompBlocks = 0; if (pIter == NULL) continue; tdInitDataCols(pCols, pTable->schema); - while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) { - // TODO - int k = 0; - } - } + int numOfWrites = 0; + // while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) { + // break; + // if (!flag) { + // // There are data to commit to this file, we need to create/open it for read/write. + // // At the meantime, we set the flag to prevent further create/open operations + // if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) { + // // TODO: deal with the ERROR here + // } + // // Open files for commit + // pGroup = tsdbOpenFilesForCommit(pFileH, fid); + // if (pGroup == NULL) { + // // TODO: deal with the ERROR here + // } + // // TODO: open .h file and if neccessary, open .l file + // {} + // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + // if (pIndices == NULL) { + // // TODO: deal with the ERROR + // } + // // load the SCompIdx part + // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { + // // TODO: deal with the ERROR here + // } + + // // TODO: sendfile those not need changed table content + // for (int ttid = 0; ttid < tid; ttid++) { + // // SCompIdx *pIdx = &pIndices[ttid]; + // // if (pIdx->len > 0) { + // // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR); + // // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); + // // } + // } + // flag = 1; + // } + + // SCompIdx *pIdx = &pIndices[tid]; + + // /* The first time to write to the table, need to decide + // * if it is neccessary to load the SComplock part. If it + // * is needed, just load it, or, just use sendfile and + // * append it. + // */ + // if (numOfWrites == 0 && pIdx->offset > 0) { + // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { + // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); + // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { + // // TODO: deal with the ERROR here + // } + // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; + // } else { + // // TODO: sendfile the prefix part + // } + // } + + // // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { + // // // TODO: deal with the ERROR here + // // } + + // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); + + + // // if (1 /* the SCompBlock part is not loaded*/) { + // // // Append to .data file generate a SCompBlock and record it + // // } else { + // // } + + // // // TODO: need to reset the pCols + + // numOfWrites++; + // } + + // if (pCols->numOfPoints > 0) { + // // TODO: still has data to commit, commit it + // } + + // if (1/* SCompBlock part is loaded, write it to .head file*/) { + // // TODO + // } else { + // // TODO: use sendfile send the old part and append the newly added part + // } + } + + // Write the SCompIdx part + + // Close all files and return + if (flag) { + // TODO + } + + if (pIndices) free(pIndices); + if (pCompInfo) free(pCompInfo); return 0; } \ No newline at end of file -- GitLab