diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 437c38a8a4e2af91d350ad2a91ead49fb52f3fcf..aff239712bf1e19ae9d4933d956790e7ed2bec2c 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -101,23 +101,13 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); void tdDataRowReset(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); -/* Data column definition - * +---------+---------+-----------------------+ - * | int32_t | int32_t | | - * +---------+---------+-----------------------+ - * | len | npoints | data | - * +---------+---------+-----------------------+ - */ -typedef char *SDataCol; - -/* Data columns definition - * +---------+---------+-----------------------+--------+-----------------------+ - * | int32_t | int32_t | | | | - * +---------+---------+-----------------------+--------+-----------------------+ - * | len | npoints | SDataCol | .... | SDataCol | - * +---------+---------+-----------------------+--------+-----------------------+ - */ -typedef char *SDataCols; +// ----------------- Data column structure +typedef struct SDataCol { + int64_t len; + char data[]; +} SDataCol; + +void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 419e37639257b2172ffe4eaa0c68436bb8ff6f6c..9c356b0cbc71671ee8a7d917bf18c0b988f0cb1f 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -294,6 +294,16 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } +void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) { + int row = *iter; + + for (int i = 0; i < schemaNCols(pSchema); i++) { + // TODO + } + + *iter = row + 1; +} + /** * Return the first part length of a data row for a schema */ diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h new file mode 100644 index 0000000000000000000000000000000000000000..9e4dfe45801ed448e6c55a6e69c87b22e9e77b88 --- /dev/null +++ b/src/util/inc/tlist.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_LIST_ +#define _TD_LIST_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { TD_LIST_FORWARD, TD_LIST_BACKWARD } TD_LIST_DIRECTION_T; + +typedef struct _list_node { + struct _list_node *next; + struct _list_node *prev; + char data[]; +} SListNode; + +typedef struct { + struct _list_node *head; + struct _list_node *tail; + int numOfEles; + int eleSize; +} SList; + +typedef struct { + SListNode * next; + TD_LIST_DIRECTION_T direction; +} SListIter; + +#define listHead(l) (l)->head +#define listTail(l) (l)->tail +#define listNEles(l) (l)->numOfEles +#define listEleSize(l) (l)->eleSize +#define isListEmpty(l) ((l)->numOfEles == 0) +#define listNodeFree(n) free(n); + +SList * tdListNew(int eleSize); +void tdListFree(SList *list); +void tdListEmpty(SList *list); +void tdListPrependNode(SList *list, SListNode *node); +void tdListAppendNode(SList *list, SListNode *node); +int tdListPrepend(SList *list, void *data); +int tdListAppend(SList *list, void *data); +SListNode *tdListPopHead(SList *list); +SListNode *tdListPopTail(SList *list); +SListNode *tdListPopNode(SList *list, SListNode *node); +void tdListMove(SList *src, SList *dst); + +void tdListNodeGetData(SList *list, SListNode *node, void *target); +void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction); +SListNode *tdListNext(SListIter *pIter); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c new file mode 100644 index 0000000000000000000000000000000000000000..badcb7802f510b2978abace6b21a1098e1cdc44d --- /dev/null +++ b/src/util/src/tlist.c @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include + +#include "tlist.h" + +SList *tdListNew(int eleSize) { + SList *list = (SList *)malloc(sizeof(SList)); + if (list == NULL) return NULL; + + list->eleSize = eleSize; + list->numOfEles = 0; + list->head = list->tail = NULL; + return list; +} + +void tdListEmpty(SList *list) { + SListNode *node = list->head; + while (node) { + list->head = node->next; + free(node); + node = list->head; + } + list->head = list->tail = 0; + list->numOfEles = 0; +} + +void tdListFree(SList *list) { + tdListEmpty(list); + free(list); +} + +void tdListPrependNode(SList *list, SListNode *node) { + if (list->head == NULL) { + list->head = node; + list->tail = node; + } else { + node->next = list->head; + node->prev = NULL; + list->head->prev = node; + list->head = node; + } + list->numOfEles++; +} + +void tdListAppendNode(SList *list, SListNode *node) { + if (list->head == NULL) { + list->head = node; + list->tail = node; + } else { + node->prev = list->tail; + node->next = NULL; + list->tail->next = node; + list->tail = node; + } + + list->numOfEles++; +} + +int tdListPrepend(SList *list, void *data) { + SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); + if (node == NULL) return -1; + + memcpy((void *)(node->data), data, list->eleSize); + tdListPrependNode(list, node); + + return 0; +} + +int tdListAppend(SList *list, void *data) { + SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); + if (node == NULL) return -1; + + memcpy((void *)(node->data), data, list->eleSize); + tdListAppendNode(list, node); + + return 0; +} + +SListNode *tdListPopHead(SList *list) { + if (list->head == NULL) return NULL; + SListNode *node = list->head; + if (node->next == NULL) { + list->head = NULL; + list->tail = NULL; + } else { + list->head = node->next; + } + list->numOfEles--; + return node; +} + +SListNode *tdListPopTail(SList *list) { + if (list->tail == NULL) return NULL; + SListNode *node = list->tail; + if (node->prev == NULL) { + list->head = NULL; + list->tail = NULL; + } else { + list->tail = node->prev; + } + list->numOfEles--; + return node; +} + +SListNode *tdListPopNode(SList *list, SListNode *node) { + if (list->head == node) { + list->head = node->next; + } + if (list->tail == node) { + list->tail = node->prev; + } + + if (node->prev != NULL) { + node->prev->next = node->next; + } + if (node->next != NULL) { + node->next->prev = node->prev; + } + list->numOfEles--; + + return node; +} + +// Move all node elements from src to dst, the dst is assumed as an empty list +void tdListMove(SList *src, SList *dst) { + // assert(dst->eleSize == src->eleSize); + dst->numOfEles = src->numOfEles; + dst->head = src->head; + dst->tail = src->tail; + src->numOfEles = 0; + src->head = src->tail = NULL; +} + +void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } + +void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) { + pIter->direction = direction; + if (direction == TD_LIST_FORWARD) { + pIter->next = list->head; + } else { + pIter->next = list->tail; + } +} + +SListNode *tdListNext(SListIter *pIter) { + SListNode *node = pIter->next; + if (node == NULL) return NULL; + if (pIter->direction == TD_LIST_FORWARD) { + pIter->next = node->next; + } else { + pIter->next = node->prev; + } + + return node; +} \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index e12f51fd44cb768ba530441705acce1f3e1a0cb6..4964ac673f1b25d351d3eb5f0e8e146d510776db 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -58,6 +58,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); +int32_t tsdbTriggerCommit(tsdb_repo_t *repo); // --------- TSDB TABLE DEFINITION typedef struct { diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 1821505eae295f2a82d472625ea2f876be81a59c..3bffa1c6a9ea75688e2e44b0f356a42570856d52 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -17,45 +17,39 @@ #include -// #include "cache.h" +#include "tlist.h" #ifdef __cplusplus extern "C" { #endif -#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */ +#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */ typedef struct { - int64_t skey; // start key - int64_t ekey; // end key - int32_t numOfRows; // numOfRows -} STableCacheInfo; + int blockId; + int offset; + int remain; + int padding; + char data[]; +} STsdbCacheBlock; -typedef struct _tsdb_cache_block { - char * pData; - STableCacheInfo * pTableInfo; - struct _tsdb_cache_block *prev; - struct _tsdb_cache_block *next; -} STSDBCacheBlock; +typedef struct { + int64_t index; + SList * memPool; +} STsdbCachePool; -// Use a doublely linked list to implement this -typedef struct STSDBCache { - // Number of blocks the cache is allocated - int32_t numOfBlocks; - STSDBCacheBlock *cacheList; - void * current; +typedef struct { + int maxBytes; + int cacheBlockSize; + STsdbCachePool pool; + STsdbCacheBlock *curBlock; + SList * mem; + SList * imem; } STsdbCache; -// ---- Operation on STSDBCacheBlock -#define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) -#define TSDB_CACHE_AVAIL_SPACE(pBlock) ((char *)((pBlock)->pTableInfo) - ((pBlock)->pData)) -#define TSDB_TABLE_INFO_OF_CACHE(pBlock, tableId) ((pBlock)->pTableInfo)[tableId] -#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next) -#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev) - -STsdbCache *tsdbInitCache(int64_t maxSize); -int32_t tsdbFreeCache(STsdbCache *pCache); -void * tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes); +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); +void tsdbFreeCache(STsdbCache *pCache); +void * tsdbAllocFromCache(STsdbCache *pCache, int bytes); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index ab10fd8e4977706de403f967c92dd9f54eff9950..89159a06e71af6c95c546c0b149c657026ff5c2e 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -24,10 +24,10 @@ extern "C" { #endif typedef enum { - TSDB_FILE_TYPE_HEAD, // .head file type - TSDB_FILE_TYPE_DATA, // .data file type - TSDB_FILE_TYPE_LAST, // .last file type - TSDB_FILE_TYPE_META // .meta file type + TSDB_FILE_TYPE_HEAD = 0, // .head file type + TSDB_FILE_TYPE_DATA, // .data file type + TSDB_FILE_TYPE_LAST, // .last file type + TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; extern const char *tsdbFileSuffix[]; @@ -38,16 +38,15 @@ typedef struct { } SFileInfo; typedef struct { - int fd; - int64_t size; // total size of the file - int64_t tombSize; // unused file size + int8_t type; + char fname[128]; + int64_t size; // total size of the file + int64_t tombSize; // unused file size } SFile; typedef struct { int32_t fileId; - SFile fhead; - SFile fdata; - SFile flast; + SFile files[TSDB_FILE_TYPE_MAX]; } SFileGroup; // TSDB file handle @@ -56,17 +55,17 @@ typedef struct { int32_t keep; int32_t minRowPerFBlock; int32_t maxRowsPerFBlock; + int32_t maxTables; SFileGroup fGroup[]; } STsdbFileH; -#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META) +#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock); -void tsdbCloseFile(STsdbFileH *pFileH); - -char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type); + int32_t maxRowsPerFBlock, int32_t maxTables); +void tsdbCloseFile(STsdbFileH *pFileH); +int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index b18d16d0d94a6e242cb2f198ddfc960b366ea629..38f0818dfba826ee2300b6c0f2eee3fe2232423f 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -35,20 +35,21 @@ extern "C" { // ---------- TSDB TABLE DEFINITION typedef struct STable { - int8_t type; - STableId tableId; - int32_t superUid; // Super table UID - int32_t sversion; - STSchema * schema; - STSchema * tagSchema; - SDataRow tagVal; + int8_t type; + STableId tableId; + int32_t superUid; // Super table UID + int32_t sversion; + STSchema *schema; + STSchema *tagSchema; + SDataRow tagVal; union { void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index } content; + void * iData; // Skiplist to commit void * eventHandler; // TODO void * streamHandler; // TODO - struct STable *next; // TODO: remove the next + struct STable *next; // TODO: remove the next } STable; void * tsdbEncodeTable(STable *pTable, int *contLen); diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 165c561b5d4ef56cbe6cc8ec10b5c5bb3cb59470..6a0741dced475965527ba61213032a00e3e197dd 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -16,22 +16,106 @@ #include "tsdbCache.h" -STsdbCache *tsdbInitCache(int64_t maxSize) { - STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache)); - if (pCacheHandle == NULL) { - // TODO : deal with the error - return NULL; +static int tsdbAllocBlockFromPool(STsdbCache *pCache); +static void tsdbFreeBlockList(SList *list); + +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { + STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); + if (pCache == NULL) return NULL; + + if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; + + pCache->maxBytes = maxBytes; + pCache->cacheBlockSize = cacheBlockSize; + + int nBlocks = maxBytes / cacheBlockSize + 1; + if (nBlocks <= 1) nBlocks = 2; + + STsdbCachePool *pPool = &(pCache->pool); + pPool->index = 0; + pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *)); + if (pPool->memPool == NULL) goto _err; + + for (int i = 0; i < nBlocks; i++) { + STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); + if (pBlock == NULL) { + goto _err; + } + pBlock->offset = 0; + pBlock->remain = cacheBlockSize; + tdListAppend(pPool->memPool, (void *)(&pBlock)); } - return pCacheHandle; + pCache->mem = tdListNew(sizeof(STsdbCacheBlock *)); + if (pCache->mem == NULL) goto _err; + + pCache->imem = tdListNew(sizeof(STsdbCacheBlock *)); + if (pCache->imem == NULL) goto _err; + + return pCache; + +_err: + tsdbFreeCache(pCache); + return NULL; +} + +void tsdbFreeCache(STsdbCache *pCache) { + tsdbFreeBlockList(pCache->imem); + tsdbFreeBlockList(pCache->mem); + tsdbFreeBlockList(pCache->pool.memPool); + free(pCache); } -int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; } +void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { + if (pCache == NULL) return NULL; + if (bytes > pCache->cacheBlockSize) return NULL; -void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) { - // TODO: implement here - void *ptr = malloc(bytes); - if (ptr == NULL) return NULL; + if (isListEmpty(pCache->mem)) { + if (tsdbAllocBlockFromPool(pCache) < 0) { + // TODO: deal with the error + } + } + + if (pCache->curBlock->remain < bytes) { + if (tsdbAllocBlockFromPool(pCache) < 0) { + // TODO: deal with the error + } + } + + void *ptr = (void *)(pCache->curBlock->data + pCache->curBlock->offset); + pCache->curBlock->offset += bytes; + pCache->curBlock->remain -= bytes; + memset(ptr, 0, bytes); return ptr; +} + +static void tsdbFreeBlockList(SList *list) { + if (list == NULL) return; + SListNode * node = NULL; + STsdbCacheBlock *pBlock = NULL; + while ((node = tdListPopHead(list)) != NULL) { + tdListNodeGetData(list, node, (void *)(&pBlock)); + free(pBlock); + listNodeFree(node); + } + tdListFree(list); +} + +static int tsdbAllocBlockFromPool(STsdbCache *pCache) { + STsdbCachePool *pPool = &(pCache->pool); + if (listNEles(pPool->memPool) == 0) return -1; + + SListNode *node = tdListPopHead(pPool->memPool); + + STsdbCacheBlock *pBlock = NULL; + tdListNodeGetData(pPool->memPool, node, (void *)(&pBlock)); + pBlock->blockId = pPool->index++; + pBlock->offset = 0; + pBlock->remain = pCache->cacheBlockSize; + + tdListAppendNode(pCache->mem, node); + pCache->curBlock = pBlock; + + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 301f2978030b9fb69bca4cd5eb1f5356686fa6eb..8a7e40cabd0d7e864c9190009cd4a52deaf436cd 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -12,82 +12,183 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include +#include +#include #include #include -#include #include -#include +#include +#include +#include -#include "tsdbFile.h" #include "tglobalcfg.h" +#include "tsdbFile.h" -// int64_t tsMsPerDay[] = { -// 86400000L, // TSDB_PRECISION_MILLI -// 86400000000L, // TSDB_PRECISION_MICRO -// 86400000000000L // TSDB_PRECISION_NANO -// }; +#define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) typedef struct { + int32_t len; + int32_t padding; // For padding purpose int64_t offset; -} SCompHeader; - -typedef struct { - int64_t uid; - int64_t last : 1; - int64_t numOfBlocks : 63; - int32_t delimiter; -} SCompInfo; - -typedef struct { - TSKEY keyFirst; - TSKEY keyLast; - int32_t numOfBlocks; - int32_t offset; } SCompIdx; +/** + * if numOfSubBlocks == -1, then the SCompBlock is a sub-block + * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to + * the data block offset and length + * if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the + * binary + */ typedef struct { + int64_t last : 1; // If the block in data file or last file + int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks + int32_t algorithm : 8; // Compression algorithm + int32_t numOfPoints : 24; // Number of total points + int32_t sversion; // Schema version + int32_t len; // Data block length or nothing + int16_t numOfSubBlocks; // Number of sub-blocks; + int16_t numOfCols; TSKEY keyFirst; TSKEY keyLast; - int64_t offset; - int32_t len; - int32_t sversion; } SCompBlock; typedef struct { - int64_t uid; -} SBlock; + int32_t delimiter; // For recovery usage + int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int64_t uid; + int32_t padding; // For padding purpose + int32_t numOfBlocks; // TODO: make the struct padding + SCompBlock blocks[]; +} SCompInfo; +// TODO: take pre-calculation into account typedef struct { - int16_t colId; - int16_t bytes; - int32_t nNullPoints; - int32_t type:8; - int32_t offset:24; - int32_t len; - // fields for pre-aggregate - // TODO: pre-aggregation should be seperated - int64_t sum; - int64_t max; - int64_t min; - int16_t maxIdx; - int16_t minIdx; -} SField; + int16_t colId; // Column ID + int16_t len; // Column length + int32_t type : 8; + int32_t offset : 24; +} SCompCol; + +// TODO: Take recover into account +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + int64_t uid; // For recovery usage + SCompCol cols[]; +} SCompData; const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA - ".last", // TSDB_FILE_TYPE_LAST - ".meta" // TSDB_FILE_TYPE_META + ".last" // TSDB_FILE_TYPE_LAST }; +static int tsdbWriteFileHead(int fd, SFile *pFile) { + char head[TSDB_FILE_HEAD_SIZE] = "\0"; + + pFile->size += TSDB_FILE_HEAD_SIZE; + + // TODO: write version and File statistic to the head + lseek(fd, 0, SEEK_SET); + if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; + + return 0; +} + +static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) { + int size = sizeof(SCompIdx) * maxTables; + void *buf = calloc(1, size); + if (buf == NULL) return -1; + + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + free(buf); + return -1; + } + + if (write(fd, buf, size) < 0) { + free(buf); + return -1; + } + + pFile->size += size; + + return 0; +} + +static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) { + if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1; + + sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]); + + return 0; +} + +/** + * Create a file and set the SFile object + */ +static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { + memset((void *)pFile, 0, sizeof(SFile)); + pFile->type = type; + + tsdbGetFileName(dataDir, fileId, type, pFile->fname); + if (access(pFile->fname, F_OK) == 0) { + // File already exists + return -1; + } + + int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755); + if (fd < 0) return -1; + + if (type == TSDB_FILE_TYPE_HEAD) { + if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) { + close(fd); + return -1; + } + } + + if (tsdbWriteFileHead(fd, pFile) < 0) { + close(fd); + return -1; + } + + close(fd); + + return 0; +} + +static int tsdbRemoveFile(SFile *pFile) { + if (pFile == NULL) return -1; + return remove(pFile->fname); +} + +// Create a file group with fileId and return a SFileGroup object +int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) { + if (dataDir == NULL || pFGroup == NULL) return -1; + + memset((void *)pFGroup, 0, sizeof(SFileGroup)); + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) { + // TODO: deal with the error here, remove the created files + return -1; + } + } + + pFGroup->fileId = fileId; + + return 0; +} + /** * Initialize the TSDB file handle */ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock) { + int32_t maxRowsPerFBlock, int32_t maxTables) { STsdbFileH *pTsdbFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile)); if (pTsdbFileH == NULL) return NULL; @@ -96,6 +197,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 pTsdbFileH->keep = keep; pTsdbFileH->minRowPerFBlock = minRowsPerFBlock; pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock; + pTsdbFileH->maxTables = maxTables; // Open the directory to read information of each file DIR *dir = opendir(dataDir); @@ -104,8 +206,9 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 return NULL; } - struct dirent *dp; char fname[256]; + + struct dirent *dp; while ((dp = readdir(dir)) != NULL) { if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; if (true /* check if the file is the .head file */) { @@ -125,23 +228,6 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 return pTsdbFileH; } -/** - * Closet the file handle - */ -void tsdbCloseFile(STsdbFileH *pFileH) { - // TODO -} - -char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) { - if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL; - - char *fileName = (char *)malloc(strlen(dirName) + strlen(fname) + strlen(tsdbFileSuffix[type]) + 5); - if (fileName == NULL) return NULL; - - sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]); - return fileName; -} - static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 073321816db29939f5ba28400ad5ad5f598bfa29..ed95eac5bc9fc8c41b78fb2944932d4909415ab2 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -58,13 +58,16 @@ typedef struct _tsdb_repo { // The cache Handle STsdbCache *tsdbCache; + // The TSDB file handle + STsdbFileH *tsdbFileH; + // Disk tier handle for multi-tier storage void *diskTier; - // File Store - void *tsdbFiles; + pthread_mutex_t mutex; - pthread_mutex_t tsdbMutex; + int commit; + pthread_t commitThread; // A limiter to monitor the resources used by tsdb void *limiter; @@ -79,6 +82,8 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); +static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); +static void * tsdbCommitToFile(void *arg); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -162,7 +167,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->tsdbMeta = pMeta; // Initialize cache - STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize); + STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1); if (pCache == NULL) { free(pRepo->rootDir); tsdbFreeMeta(pRepo->tsdbMeta); @@ -171,6 +176,19 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO } pRepo->tsdbCache = pCache; + // Initialize file handle + char dataDir[128] = "\0"; + tsdbGetDataDirName(pRepo, dataDir); + pRepo->tsdbFileH = + tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables); + if (pRepo->tsdbFileH == NULL) { + free(pRepo->rootDir); + tsdbFreeCache(pRepo->tsdbCache); + tsdbFreeMeta(pRepo->tsdbMeta); + free(pRepo); + return NULL; + } + pRepo->state = TSDB_REPO_STATE_ACTIVE; return (tsdb_repo_t *)pRepo; @@ -230,7 +248,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } - pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1); if (pRepo->tsdbCache == NULL) { tsdbFreeMeta(pRepo->tsdbMeta); free(pRepo->rootDir); @@ -284,6 +302,32 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { return 0; } +int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + + if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; + if (pRepo->commit) return 0; + pRepo->commit = 1; + // Loop to move pData to iData + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pRepo->tsdbMeta->tables[i]; + if (pTable != NULL) { + void *pData = pTable->content.pData; + pTable->content.pData = NULL; + pTable->iData = pData; + } + } + // Loop to move mem to imem + tdListMove(pRepo->tsdbCache->mem, pRepo->tsdbCache->imem); + + pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); + pthread_mutex_unlock(&(pRepo->mutex)); + + pthread_join(pRepo->commitThread, NULL); + + return 0; +} + /** * Get the TSDB repository information, including some statistics * @param pRepo the TSDB repository handle @@ -612,9 +656,6 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { rmdir(dirName); - char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META); - remove(metaFname); - return 0; } @@ -662,4 +703,23 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } return 0; +} + +static void *tsdbCommitToFile(void *arg) { + // TODO + STsdbRepo *pRepo = (STsdbRepo *)arg; + STsdbMeta *pMeta = pRepo->tsdbMeta; + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + SSkipListIterator *pIter = tSkipListCreateIter(pTable->iData); + while (tSkipListIterNext(pIter)) { + SSkipListNode *node = tSkipListIterGet(pIter); + SDataRow row = SL_GET_NODE_DATA(node); + int k = 0; + + } + } + + return NULL; } \ No newline at end of file diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 46ae3940d2f260bbf3afa5cdf5c970ae6fe544c1..42a22553c7511160c189ab1b5415e29f45dd383e 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -3,6 +3,7 @@ #include "tsdb.h" #include "dataformat.h" +#include "tsdbFile.h" #include "tsdbMeta.h" TEST(TsdbTest, tableEncodeDecode) { @@ -71,39 +72,50 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 10; - SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * nRows); - - SSubmitBlk *pBlock = pMsg->blocks; - pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; - pBlock->sversion = 0; - pBlock->len = 0; + int nRows = 100; + int rowsPerSubmit = 10; int64_t start_time = 1584081000000; - for (int i = 0; i < nRows; i++) { - int64_t ttime = start_time + 1000 * i; - SDataRow row = (SDataRow)(pBlock->data + pBlock->len); - tdInitDataRow(row, schema); - - for (int j = 0; j < schemaNCols(schema); j++) { - if (j == 0) { // Just for timestamp - tdAppendColVal(row, (void *)(&ttime), schemaColAt(schema, j)); - } else { // For int - int val = 10; - tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); - } + SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + + for (int k = 0; k < nRows/rowsPerSubmit; k++) { + SSubmitBlk *pBlock = pMsg->blocks; + pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; + pBlock->sversion = 0; + pBlock->len = 0; + for (int i = 0; i < rowsPerSubmit; i++) { + start_time += 1000; + SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + tdInitDataRow(row, schema); + + for (int j = 0; j < schemaNCols(schema); j++) { + if (j == 0) { // Just for timestamp + tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); + } else { // For int + int val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); + } + } + pBlock->len += dataRowLen(row); } - pBlock->len += dataRowLen(row); + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + tsdbInsertData(pRepo, pMsg); } - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; - tsdbInsertData(pRepo, pMsg); + tsdbTriggerCommit(pRepo); - int k = 0; } TEST(TsdbTest, openRepo) { tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); ASSERT_NE(pRepo, nullptr); +} + +TEST(TsdbTest, createFileGroup) { + SFileGroup fGroup; + + ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); + + int k = 0; } \ No newline at end of file