diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 1f46c68abcffa57c246c561b4c64b931d3d5c7ea..52b2d1e15680073d82220f12b99eef0dbfa53b5a 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -105,11 +105,33 @@ SDataRow tdDataRowDup(SDataRow row); // ----------------- Data column structure typedef struct SDataCol { - int64_t len; - char data[]; + int8_t type; + int16_t colId; + int bytes; + int len; + int offset; + void * pData; } SDataCol; -void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter); +typedef struct { + int maxRowSize; + int maxCols; // max number of columns + int maxPoints; // max number of points + int numOfPoints; + int numOfCols; // Total number of cols + void * buf; + SDataCol cols[]; +} SDataCols; + +#define keyCol(pCols) (&((pCols)->cols[0])) // Key column +#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0] +#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1] + +SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); +void tdResetDataCols(SDataCols *pCols); +void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); +void tdFreeDataCols(SDataCols *pCols); +void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 04826e43ac98852495b5951aaa0ba66315bf2480..8f6a40805fb3117c776a5e79d2438b31bd0c777b 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -294,14 +294,64 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } -void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) { - int row = *iter; +SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { + SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); + if (pCols == NULL) return NULL; + + pCols->maxRowSize = maxRowSize; + pCols->maxCols = maxCols; + pCols->maxPoints = maxRows; + + pCols->buf = malloc(maxRowSize * maxRows); + if (pCols->buf == NULL) { + free(pCols); + return NULL; + } + + return pCols; +} + +void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { + // assert(schemaNCols(pSchema) <= pCols->numOfCols); + tdResetDataCols(pCols); + pCols->numOfCols = schemaNCols(pSchema); + pCols->cols[0].pData = pCols->buf; for (int i = 0; i < schemaNCols(pSchema); i++) { - // TODO + if (i > 0) { + pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints; + } + pCols->cols[i].type = colType(schemaColAt(pSchema, i)); + pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); + pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)); + pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); + } + + return pCols; +} + +void tdFreeDataCols(SDataCols *pCols) { + if (pCols) { + if (pCols->buf) free(pCols->buf); + free(pCols); } +} - *iter = row + 1; +void tdResetDataCols(SDataCols *pCols) { + pCols->numOfPoints = 0; + for (int i = 0; i < pCols->maxCols; i++) { + pCols->cols[i].len = 0; + } +} + +void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { + TSKEY key = dataRowKey(row); + for (int i = 0; i < pCols->numOfCols; i++) { + SDataCol *pCol = pCols->cols + i; + memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); + pCol->len += pCol->bytes; + } + pCols->numOfPoints++; } /** diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index df97dde5ac7b1440b2cdf2abab9f0243f84ddee5..b80aad1cebab3782ad7e2c4751275d854ef5f92c 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -176,6 +176,14 @@ uint32_t ip2uint(const char *const ip_addr); void taosSetAllocMode(int mode, const char* path, bool autoDump); void taosDumpMemoryLeak(); +#define TD_EQ 0x1 +#define TD_GT 0x2 +#define TD_LT 0x4 +#define TD_GE (TD_EQ | TD_GT) +#define TD_LE (TD_EQ | TD_LT) +void *taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, + int (*compar)(const void *, const void *), int flags); + #ifdef TAOS_MEM_CHECK void * taos_malloc(size_t size, const char *file, uint32_t line); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 9c384b25bace047cdd80fe2903c330dbe3ed85d7..cbd08954cc21ffd1f8a0e3cfc9076e0330860c42 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -617,3 +617,72 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } + +#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx)) +void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) { + // TODO: need to check the correctness of this function + int l = 0; + int r = nmemb; + int idx = 0; + int comparison; + + if (flags == TD_EQ) { + return bsearch(key, base, nmemb, size, compar); + } else if (flags == TD_GE) { + if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0); + if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL; + + while (l < r) { + idx = (l + r) / 2; + comparison = (*compar)(key, elePtrAt(base, size, idx)); + if (comparison < 0) { + r = idx; + } else if (comparison > 0) { + l = idx + 1; + } else { + return elePtrAt(base, size, idx); + } + } + + if ((*compar)(key, elePtrAt(base, size, idx) < 0)) { + return elePtrAt(base, size, idx); + } else { + if (idx + 1 > nmemb - 1) { + return NULL; + } else { + return elePtrAt(base, size, idx + 1); + } + } + } else if (flags == TD_LE) { + if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1); + if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL; + + while (l < r) { + idx = (l + r) / 2; + comparison = (*compar)(key, elePtrAt(base, size, idx)); + if (comparison < 0) { + r = idx; + } else if (comparison > 0) { + l = idx + 1; + } else { + return elePtrAt(base, size, idx); + } + } + + if ((*compar)(key, elePtrAt(base, size, idx)) > 0) { + return elePtrAt(base, size, idx); + } else { + if (idx == 0) { + return NULL; + } else { + return elePtrAt(base, size, idx - 1); + } + } + + } else { + assert(0); + return NULL; + } + + return NULL; +} diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 9a4d94c58fc99ef8fc647d188d2d41f2979db212..22563275cd482f3ee666549bca17162a0bd632ad 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -17,6 +17,7 @@ #include +#include "dataformat.h" #include "taosdef.h" #include "tglobalcfg.h" @@ -69,20 +70,26 @@ typedef struct { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +int tsdbOpenFile(SFile *pFile, int oflag); +SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); typedef struct { int32_t len; - int32_t padding; // For padding purpose - int64_t offset; -} SCompIdx; + int32_t offset; + int32_t hasLast : 1; + int32_t numOfSuperBlocks : 31; + int32_t checksum; + TSKEY maxKey; +} SCompIdx; /* sizeof(SCompIdx) = 24 */ /** - * if numOfSubBlocks == -1, then the SCompBlock is a sub-block - * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to - * the data block offset and length - * if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the - * binary + * if numOfSubBlocks == 0, then the SCompBlock is a sub-block + * if numOfSubBlocks >= 1, then the SCompBlock is a super-block + * - if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to + * the data block offset and length + * - if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the + * binary */ typedef struct { int64_t last : 1; // If the block in data file or last file @@ -101,11 +108,12 @@ typedef struct { int32_t delimiter; // For recovery usage int32_t checksum; // TODO: decide if checksum logic in this file or make it one API int64_t uid; - int32_t padding; // For padding purpose - int32_t numOfBlocks; // TODO: make the struct padding SCompBlock blocks[]; } SCompInfo; +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); + // TODO: take pre-calculation into account typedef struct { int16_t colId; // Column ID @@ -122,6 +130,8 @@ typedef struct { SCompCol cols[]; } SCompData; +int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); + void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index f622c38b5ff4acf79e5ec405de0343a5d5197a19..7d0bdbd84548d97abc819b9d50dd486dda32af47 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -39,6 +39,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); +static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -70,9 +71,7 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) || - bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) == - NULL) { + if (tsdbSearchFGroup(pFileH, fid) == NULL) { pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { @@ -107,6 +106,86 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { + SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); + if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + + if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; + // TODO: need to check the correctness + return 0; +} + +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { + SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); + + if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; + + if (read(pFile->fd, buf, pIdx->len) < 0) return -1; + + // TODO: need to check the correctness + + return 0; +} + +static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write + SDataCols * pCols, // Data column buffer + int numOfPointsToWrie, // Number of points to write to the file + SCompBlock *pBlock // SCompBlock to hold block information to return + ) { + // pBlock->last = 0; + // pBlock->offset = lseek(pFile->fd, 0, SEEK_END); + // // pBlock->algorithm = ; + // pBlock->numOfPoints = pCols->numOfPoints; + // // pBlock->sversion = ; + // // pBlock->len = ; + // pBlock->numOfSubBlocks = 1; + // pBlock->keyFirst = dataColsKeyFirst(pCols); + // pBlock->keyLast = dataColsKeyLast(pCols); + // for (int i = 0; i < pCols->numOfCols; i++) { + // // TODO: if all col value is NULL, do not save it + // pBlock->numOfCols++; + // pCompData->numOfCols++; + // SCompCol *pCompCol = pCompData->cols + i; + // pCompCol->colId = pCols->cols[i].colId; + // pCompCol->type = pCols->cols[i].type; + + // // pCompCol->len = ; + // // pCompCol->offset = ; + // } + + return 0; +} + +int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) { + memset((void *)pBlock, 0, sizeof(SCompBlock)); + SFile *pFile = NULL; + SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols); + if (pCompData == NULL) return -1; + pCompData->delimiter = TSDB_FILE_DELIMITER; + // pCompData->uid = ; + + if (isMerge) { + TSKEY keyFirst = dataColsKeyFirst(pCols); + // 1. Binary search the block the data can merged into + + if (1/* the data should only merged into last file */) { + } else { + } + } else { + // Write directly to the file without merge + if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) { + // TODO: write the data to the last file + } else { + // TODO: wirte the data to the data file + } + } + + // TODO: need to update pIdx + + if (pCompData) free(pCompData); + return 0; +} + static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; SFileGroup *pFGroup = (SFileGroup *)fgroup; @@ -158,7 +237,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) return 0; } -static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function +int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function if (TSDB_IS_FILE_OPENED(pFile)) return -1; pFile->fd = open(pFile->fname, oflag, 0755); @@ -167,6 +246,16 @@ static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the f return 0; } +SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { + SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); + if (pGroup == NULL) return NULL; + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbOpenFile(&(pGroup->files[type]), O_RDWR); + } + return pGroup; +} + static int tsdbCloseFile(SFile *pFile) { if (!TSDB_IS_FILE_OPENED(pFile)) return -1; int ret = close(pFile->fd); @@ -186,7 +275,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) { + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) { // TODO: deal with the ERROR here return -1; } @@ -212,4 +301,12 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; +} + +static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { + if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) + return NULL; + void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); + if (ptr == NULL) return NULL; + return (SFileGroup *)ptr; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index af3a923d904d62c5fc3af8d8f4ca8dfad64c7282..8e433ecb5bbbb97c9e34775973f9bd728581fb6f 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -84,7 +84,8 @@ static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); -static void * tsdbCommitToFile(void *arg); +static void * tsdbCommitData(void *arg); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -327,7 +328,7 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { pRepo->tsdbCache->curBlock = NULL; // TODO: here should set as detached or use join for memory leak - pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); + pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo); tsdbUnLockRepo(repo); return 0; @@ -761,24 +762,22 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { return 0; } -static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCol **cols, STSchema *pSchema) { +static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { int numOfRows = 0; + do { SSkipListNode *node = tSkipListIterGet(pIter); if (node == NULL) break; SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; - // Convert row data to column data - // for (int i = 0; i < schemaNCols(pSchema); i++) { - // STColumn *pCol = schemaColAt(pSchema, i); - // memcpy(cols[i]->data + TYPE_BYTES[colType(pCol)] * numOfRows, dataRowAt(row, pCol->offset), - // TYPE_BYTES[colType(pCol)]); - // } + + tdAppendDataRowToDataCol(row, pCols); numOfRows++; if (numOfRows > maxRowsToRead) break; } while (tSkipListIterNext(pIter)); + return numOfRows; } @@ -816,14 +815,14 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) } // Commit to file -static void *tsdbCommitToFile(void *arg) { +static void *tsdbCommitData(void *arg) { // TODO printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; STsdbCfg * pCfg = &(pRepo->config); - if (pCache->imem == NULL) return; + if (pCache->imem == NULL) return NULL; // Create the iterator to read from cache SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); @@ -832,52 +831,23 @@ static void *tsdbCommitToFile(void *arg) { return NULL; } - int maxCols = pMeta->maxCols; - int maxBytes = pMeta->maxRowBytes; - SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); - void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); + // Create a data column buffer for commit + SDataCols *pCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); + if (pCols == NULL) { + // TODO: deal with the error + return NULL; + } int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); for (int fid = sfid; fid <= efid; fid++) { - TSKEY minKey = 0, maxKey = 0; - tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - - // tsdbOpenFileForWrite(pRepo, fid); - - for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL || pTable->imem == NULL) continue; - if (iters[tid] == NULL) { // create table iterator - iters[tid] = tSkipListCreateIter(pTable->imem->pData); - // TODO: deal with the error - if (iters[tid] == NULL) break; - if (!tSkipListIterNext(iters[tid])) { - // assert(0); - } - } - - // Init row data part - cols[0] = (SDataCol *)buf; - for (int col = 1; col < schemaNCols(pTable->schema); col++) { - cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); - } - - // Loop the iterator - int rowsRead = 0; - while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > - 0) { - // printf("rowsRead:%d-----------\n", rowsRead); - int k = 0; - } - } + tsdbCommitToFile(pRepo, fid, iters, pCols); } + tdFreeDataCols(pCols); tsdbDestroyTableIters(iters, pCfg->maxTables); - free(buf); - free(cols); tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); @@ -894,4 +864,125 @@ static void *tsdbCommitToFile(void *arg) { tsdbUnLockRepo(arg); return NULL; +} + +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { + int flag = 0; + + STsdbMeta * pMeta = pRepo->tsdbMeta; + STsdbFileH *pFileH = pRepo->tsdbFileH; + STsdbCfg * pCfg = &pRepo->config; + SFile tFile, lFile; + SFileGroup *pGroup = NULL; + SCompIdx * pIndices = NULL; + SCompInfo * pCompInfo = NULL; + size_t compInfoSize = 0; + SCompBlock compBlock; + SCompBlock *pBlock = &compBlock; + + TSKEY minKey = 0, maxKey = 0; + tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + for (int tid = 0; tid < pCfg->maxTables; tid++) { + STable * pTable = pMeta->tables[tid]; + SSkipListIterator *pIter = iters[tid]; + int isLoadCompBlocks = 0; + + if (pIter == NULL) continue; + tdInitDataCols(pCols, pTable->schema); + + int numOfWrites = 0; + // while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) { + // break; + // if (!flag) { + // // There are data to commit to this file, we need to create/open it for read/write. + // // At the meantime, we set the flag to prevent further create/open operations + // if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) { + // // TODO: deal with the ERROR here + // } + // // Open files for commit + // pGroup = tsdbOpenFilesForCommit(pFileH, fid); + // if (pGroup == NULL) { + // // TODO: deal with the ERROR here + // } + // // TODO: open .h file and if neccessary, open .l file + // {} + // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + // if (pIndices == NULL) { + // // TODO: deal with the ERROR + // } + // // load the SCompIdx part + // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { + // // TODO: deal with the ERROR here + // } + + // // TODO: sendfile those not need changed table content + // for (int ttid = 0; ttid < tid; ttid++) { + // // SCompIdx *pIdx = &pIndices[ttid]; + // // if (pIdx->len > 0) { + // // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR); + // // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); + // // } + // } + // flag = 1; + // } + + // SCompIdx *pIdx = &pIndices[tid]; + + // /* The first time to write to the table, need to decide + // * if it is neccessary to load the SComplock part. If it + // * is needed, just load it, or, just use sendfile and + // * append it. + // */ + // if (numOfWrites == 0 && pIdx->offset > 0) { + // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { + // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); + // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { + // // TODO: deal with the ERROR here + // } + // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; + // } else { + // // TODO: sendfile the prefix part + // } + // } + + // // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { + // // // TODO: deal with the ERROR here + // // } + + // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); + + + // // if (1 /* the SCompBlock part is not loaded*/) { + // // // Append to .data file generate a SCompBlock and record it + // // } else { + // // } + + // // // TODO: need to reset the pCols + + // numOfWrites++; + // } + + // if (pCols->numOfPoints > 0) { + // // TODO: still has data to commit, commit it + // } + + // if (1/* SCompBlock part is loaded, write it to .head file*/) { + // // TODO + // } else { + // // TODO: use sendfile send the old part and append the newly added part + // } + } + + // Write the SCompIdx part + + // Close all files and return + if (flag) { + // TODO + } + + if (pIndices) free(pIndices); + if (pCompInfo) free(pCompInfo); + + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 397807f54724b7a166a0a3fb2e77a5808674c327..761006a0095db206eb7be93fef456a7de1951df3 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -381,7 +381,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { int32_t headSize = 0; // first tag column - STColumn* s = pSTable->tagSchema->columns[0]; //??? + STColumn* s = schemaColAt(pSTable->tagSchema, 0); tSkipListRandNodeInfo(pSTable->pIndex, &level, &headSize); SSkipListNode* pNode = calloc(1, headSize + s->bytes + POINTER_BYTES); @@ -389,7 +389,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { SSkipList* list = pSTable->pIndex; - memcpy(SL_GET_NODE_KEY(list, pNode), dataRowTuple(pTable->tagVal), s->columns[0].bytes); + memcpy(SL_GET_NODE_KEY(list, pNode), dataRowTuple(pTable->tagVal), colBytes(s)); memcpy(SL_GET_NODE_DATA(pNode), &pTable, POINTER_BYTES); tSkipListPut(list, pNode);