diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index a269bdc2589099d3dc7f392e09c1084993f9a58a..437c38a8a4e2af91d350ad2a91ead49fb52f3fcf 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -51,19 +51,21 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes); // ----------------- TSDB SCHEMA DEFINITION typedef struct { int numOfCols; // Number of columns appended - int totalCols; // Total columns allocated + int padding; // Total columns allocated STColumn columns[]; } STSchema; #define schemaNCols(s) ((s)->numOfCols) -#define schemaTCols(s) ((s)->totalCols) #define schemaColAt(s, i) ((s)->columns + i) STSchema *tdNewSchema(int32_t nCols); -int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes); +int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); STSchema *tdDupSchema(STSchema *pSchema); void tdFreeSchema(STSchema *pSchema); void tdUpdateSchema(STSchema *pSchema); +int tdGetSchemaEncodeSize(STSchema *pSchema); +void * tdEncodeSchema(void *dst, STSchema *pSchema); +STSchema *tdDecodeSchema(void **psrc); // ----------------- Data row structure @@ -99,33 +101,6 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); void tdDataRowReset(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); -/* Data rows definition, the format of it is like below: - * +---------+-----------------------+--------+-----------------------+ - * | int32_t | | | | - * +---------+-----------------------+--------+-----------------------+ - * | len | SDataRow | .... | SDataRow | - * +---------+-----------------------+--------+-----------------------+ - */ -typedef void *SDataRows; - -#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t) - -#define dataRowsLen(rs) (*(int32_t *)(rs)) -#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l)) -#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t)) - -void tdDataRowsAppendRow(SDataRows rows, SDataRow row); - -// Data rows iterator -typedef struct { - int32_t totalLen; - int32_t len; - SDataRow row; -} SDataRowsIter; - -void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter); -SDataRow tdDataRowsNext(SDataRowsIter *pIter); - /* Data column definition * +---------+---------+-----------------------+ * | int32_t | int32_t | | diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 58530c5e3d54bdefe7a38b9e6c9c13112f0a2602..419e37639257b2172ffe4eaa0c68436bb8ff6f6c 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -91,10 +91,9 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) { STSchema *tdNewSchema(int32_t nCols) { int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols; - STSchema *pSchema = (STSchema *)malloc(size); + STSchema *pSchema = (STSchema *)calloc(1, size); if (pSchema == NULL) return NULL; pSchema->numOfCols = 0; - pSchema->totalCols = nCols; return pSchema; } @@ -102,8 +101,8 @@ STSchema *tdNewSchema(int32_t nCols) { /** * Append a column to the schema */ -int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes) { - if (pSchema->numOfCols >= pSchema->totalCols) return -1; +int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) { + // if (pSchema->numOfCols >= pSchema->totalCols) return -1; if (!isValidDataType(type, 0)) return -1; STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema)); @@ -159,6 +158,53 @@ void tdUpdateSchema(STSchema *pSchema) { } } +/** + * Return the size of encoded schema + */ +int tdGetSchemaEncodeSize(STSchema *pSchema) { + return sizeof(STSchema) + schemaNCols(pSchema) * (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + + T_MEMBER_SIZE(STColumn, bytes)); +} + +/** + * Encode a schema to dst, and return the next pointer + */ +void *tdEncodeSchema(void *dst, STSchema *pSchema) { + T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols); + for (int i = 0; i < schemaNCols(pSchema); i++) { + STColumn *pCol = schemaColAt(pSchema, i); + T_APPEND_MEMBER(dst, pCol, STColumn, type); + T_APPEND_MEMBER(dst, pCol, STColumn, colId); + T_APPEND_MEMBER(dst, pCol, STColumn, bytes); + } + + return dst; +} + +/** + * Decode a schema from a binary. + */ +STSchema *tdDecodeSchema(void **psrc) { + int numOfCols = 0; + + T_READ_MEMBER(*psrc, int, numOfCols); + + STSchema *pSchema = tdNewSchema(numOfCols); + if (pSchema == NULL) return NULL; + for (int i = 0; i < numOfCols; i++) { + int8_t type = 0; + int16_t colId = 0; + int32_t bytes = 0; + T_READ_MEMBER(*psrc, int8_t, type); + T_READ_MEMBER(*psrc, int16_t, colId); + T_READ_MEMBER(*psrc, int32_t, bytes); + + tdSchemaAppendCol(pSchema, type, colId, bytes); + } + + return pSchema; +} + /** * Initialize a data row */ @@ -234,6 +280,8 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) { dataRowFLen(row) += TYPE_BYTES[colType(pCol)]; break; } + + return 0; } void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); } @@ -246,40 +294,6 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } -void tdDataRowsAppendRow(SDataRows rows, SDataRow row) { - dataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row); - dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row)); -} - -// Initialize the iterator -void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) { - if (pIter == NULL) return; - pIter->totalLen = dataRowsLen(rows); - - if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) { - pIter->row = NULL; - return; - } - - pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN); - pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row); -} - -// Get the next row in Rows -SDataRow tdDataRowsNext(SDataRowsIter *pIter) { - SDataRow row = pIter->row; - if (row == NULL) return NULL; - - if (pIter->len >= pIter->totalLen) { - pIter->row = NULL; - } else { - pIter->row = (char *)row + dataRowLen(row); - pIter->len += dataRowLen(row); - } - - return row; -} - /** * Return the first part length of a data row for a schema */ diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 7aac36786a32288a9d2947f010ad4a50f0495ace..7308715a87ffe7c69f5def0aab723b670644ab03 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -82,6 +82,17 @@ extern const int32_t TYPE_BYTES[11]; #define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MICRO_STR "us" +#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member) +#define T_APPEND_MEMBER(dst, ptr, type, member) \ +do {\ + memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\ + dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\ +} while(0) +#define T_READ_MEMBER(src, type, target) \ +do { \ + (target) = *(type *)(src); \ + (src) = (void *)((char *)src + sizeof(type));\ +} while(0) #define TSDB_KEYSIZE sizeof(TSKEY) diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index de29dc86375abe08ad77c5509f0b00d8d7b1fb8e..b18d16d0d94a6e242cb2f198ddfc960b366ea629 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -35,7 +35,7 @@ extern "C" { // ---------- TSDB TABLE DEFINITION typedef struct STable { - TSDB_TABLE_TYPE type; + int8_t type; STableId tableId; int32_t superUid; // Super table UID int32_t sversion; diff --git a/src/vnode/tsdb/inc/tsdbMetaFile.h b/src/vnode/tsdb/inc/tsdbMetaFile.h index 9fad703842889e61fea73fe7686ce3f71add65b0..a0cf2a005cd549acd4500bd20f811d3b1afcbd3d 100644 --- a/src/vnode/tsdb/inc/tsdbMetaFile.h +++ b/src/vnode/tsdb/inc/tsdbMetaFile.h @@ -25,15 +25,21 @@ extern "C" { #define TSDB_META_FILE_NAME "META" #define TSDB_META_HASH_FRACTION 1.1 +typedef int (*iterFunc)(void *, void *cont, int contLen); +typedef void (*afterFunc)(void *); + typedef struct { - int fd; // File descriptor - int nDel; // number of deletions - int nRecord; // Number of records - int64_t size; // Total file size - void * map; // Map from uid ==> position + int fd; // File descriptor + int nDel; // number of deletions + int tombSize; // deleted size + int64_t size; // Total file size + void * map; // Map from uid ==> position + iterFunc iFunc; + afterFunc aFunc; + void * appH; } SMetaFile; -SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables); +SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH); int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen); int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid); int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 2ca7b9a940345f3a92ec8d94be1644b4f8c7bf0c..073321816db29939f5ba28400ad5ad5f598bfa29 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -77,8 +77,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int tsdbOpenMetaFile(char *tsdbDir); -static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); +static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -219,25 +219,25 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } - int fd = tsdbOpenMetaFile(tsdbDir); - if (fd < 0) { - free(pRepo); - return NULL; - } + pRepo->rootDir = strdup(tsdbDir); - if (tsdbRecoverRepo(fd, &(pRepo->config)) < 0) { - close(fd); + tsdbRestoreCfg(pRepo, &(pRepo->config)); + + pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); + if (pRepo->tsdbMeta == NULL) { + free(pRepo->rootDir); free(pRepo); return NULL; } - pRepo->tsdbCache = tsdbInitCache(5); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize); if (pRepo->tsdbCache == NULL) { - // TODO: deal with error + tsdbFreeMeta(pRepo->tsdbMeta); + free(pRepo->rootDir); + free(pRepo); return NULL; } - pRepo->rootDir = strdup(tsdbDir); pRepo->state = TSDB_REPO_STATE_ACTIVE; return (tsdb_repo_t *)pRepo; @@ -459,7 +459,7 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { if (pIter->len >= pIter->totalLen) { pIter->pBlock = NULL; } else { - pIter->pBlock = (char *)pBlock + pBlock->len + sizeof(SSubmitBlk); + pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk)); } return pBlock; @@ -623,12 +623,6 @@ static int tsdbOpenMetaFile(char *tsdbDir) { return 0; } -static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) { - // TODO: read tsdb configuration from file - // recover tsdb meta - return 0; -} - static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) { // TODO int32_t level = 0; diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 3277476aa19559f0f9ee2feea059577f6353c44b..98dcd45bedece4351a67430a213c597663fb35cd 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -13,7 +13,7 @@ static int tsdbFreeTable(STable *pTable); static int32_t tsdbCheckTableCfg(STableCfg *pCfg); -static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable); +static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx); static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); @@ -39,8 +39,21 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) { void *ret = malloc(*contLen); if (ret == NULL) return NULL; - // TODO: encode the object to the memory - {} + void *ptr = ret; + T_APPEND_MEMBER(ptr, pTable, STable, type); + T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid); + T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid); + T_APPEND_MEMBER(ptr, pTable, STable, superUid); + T_APPEND_MEMBER(ptr, pTable, STable, sversion); + + if (pTable->type == TSDB_SUPER_TABLE) { + ptr = tdEncodeSchema(ptr, pTable->schema); + ptr = tdEncodeSchema(ptr, pTable->tagSchema); + } else if (pTable->type == TSDB_CHILD_TABLE) { + dataRowCpy(ptr, pTable->tagVal); + } else { + ptr = tdEncodeSchema(ptr, pTable->schema); + } return ret; } @@ -59,8 +72,20 @@ STable *tsdbDecodeTable(void *cont, int contLen) { STable *pTable = (STable *)calloc(1, sizeof(STable)); if (pTable == NULL) return NULL; - { - // TODO recover from the binary content + void *ptr = cont; + T_READ_MEMBER(ptr, int8_t, pTable->type); + T_READ_MEMBER(ptr, int64_t, pTable->tableId.uid); + T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid); + T_READ_MEMBER(ptr, int32_t, pTable->superUid); + T_READ_MEMBER(ptr, int32_t, pTable->sversion); + + if (pTable->type == TSDB_SUPER_TABLE) { + pTable->schema = tdDecodeSchema(&ptr); + pTable->tagSchema = tdDecodeSchema(&ptr); + } else if (pTable->type == TSDB_CHILD_TABLE) { + pTable->tagVal = tdDataRowDup(ptr); + } else { + pTable->schema = tdDecodeSchema(&ptr); } return pTable; @@ -70,6 +95,36 @@ void *tsdbFreeEncode(void *cont) { if (cont != NULL) free(cont); } +int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { + STsdbMeta *pMeta = (STsdbMeta *)pHandle; + + STable *pTable = tsdbDecodeTable(cont, contLen); + if (pTable == NULL) return -1; + + if (pTable->type == TSDB_SUPER_TABLE) { + pTable->content.pIndex = + tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); + } else { + pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + } + + tsdbAddTableToMeta(pMeta, pTable, false); + + return 0; +} + +void tsdbOrgMeta(void *pHandle) { + STsdbMeta *pMeta = (STsdbMeta *)pHandle; + + for (int i = 0; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) { + tsdbAddTableIntoIndex(pMeta, pTable); + } + } +} + /** * Initialize the meta handle * ASSUMPTIONS: VALID PARAMETER @@ -94,7 +149,7 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) { return NULL; } - pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables); + pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables, tsdbRestoreTable, tsdbOrgMeta, pMeta); if (pMeta->mfh == NULL) { taosHashCleanup(pMeta->map); free(pMeta->tables); @@ -186,8 +241,21 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { } table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); - if (newSuper) tsdbAddTableToMeta(pMeta, super); - tsdbAddTableToMeta(pMeta, table); + // Register to meta + if (newSuper) tsdbAddTableToMeta(pMeta, super, true); + tsdbAddTableToMeta(pMeta, table, true); + + // Write to meta file + int bufLen = 0; + if (newSuper) { + void *buf = tsdbEncodeTable(super, &bufLen); + tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); + tsdbFreeEncode(buf); + } + + void *buf = tsdbEncodeTable(table, &bufLen); + tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); + tsdbFreeEncode(buf); return 0; } @@ -268,7 +336,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) { return *(STable **)ptr; } -static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) { +static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { if (pTable->type == TSDB_SUPER_TABLE) { // add super table to the linked list if (pMeta->superList == NULL) { @@ -318,8 +386,22 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { } static int tsdbEstimateTableEncodeSize(STable *pTable) { - // TODO - return 0; + int size = 0; + size += T_MEMBER_SIZE(STable, type); + size += T_MEMBER_SIZE(STable, tableId); + size += T_MEMBER_SIZE(STable, superUid); + size += T_MEMBER_SIZE(STable, sversion); + + if (pTable->type == TSDB_SUPER_TABLE) { + size += tdGetSchemaEncodeSize(pTable->schema); + size += tdGetSchemaEncodeSize(pTable->tagSchema); + } else if (pTable->type == TSDB_CHILD_TABLE) { + size += dataRowLen(pTable->tagVal); + } else { + size += tdGetSchemaEncodeSize(pTable->schema); + } + + return size; } static char *getTupleKey(const void * data) { diff --git a/src/vnode/tsdb/src/tsdbMetaFile.c b/src/vnode/tsdb/src/tsdbMetaFile.c index 70ae0611068dfd9e743524153c9a878b6bcad432..689f8033db895781ebfed49727c4f2b54b357969 100644 --- a/src/vnode/tsdb/src/tsdbMetaFile.c +++ b/src/vnode/tsdb/src/tsdbMetaFile.c @@ -19,11 +19,14 @@ #include "hash.h" #include "tsdbMetaFile.h" +#define TSDB_META_FILE_VERSION_MAJOR 1 +#define TSDB_META_FILE_VERSION_MINOR 0 #define TSDB_META_FILE_HEADER_SIZE 512 typedef struct { int32_t offset; int32_t size; + int64_t uid; } SRecordInfo; static int32_t tsdbGetMetaFileName(char *rootDir, char *fname); @@ -32,14 +35,20 @@ static int32_t tsdbWriteMetaHeader(int fd); static int tsdbCreateMetaFile(char *fname); static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh); -SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) { - // TODO +SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH) { char fname[128] = "\0"; if (tsdbGetMetaFileName(rootDir, fname) < 0) return NULL; SMetaFile *mfh = (SMetaFile *)calloc(1, sizeof(SMetaFile)); if (mfh == NULL) return NULL; + mfh->iFunc = iFunc; + mfh->aFunc = aFunc; + mfh->appH = appH; + mfh->nDel = 0; + mfh->tombSize = 0; + mfh->size = 0; + // OPEN MAP mfh->map = taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); @@ -56,6 +65,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) { free(mfh); return NULL; } + mfh->size += TSDB_META_FILE_HEADER_SIZE; } else { // file exists, recover from file if (tsdbRestoreFromMetaFile(fname, mfh) < 0) { taosHashCleanup(mfh->map); @@ -74,7 +84,8 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co SRecordInfo info; info.offset = mfh->size; - info.size = contLen; // TODO: Here is not correct + info.size = contLen; + info.uid = uid; mfh->size += (contLen + sizeof(SRecordInfo)); @@ -83,7 +94,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co } // TODO: make below a function to implement - if (lseek(mfh->fd, info.offset, SEEK_CUR) < 0) { + if (lseek(mfh->fd, info.offset, SEEK_SET) < 0) { return -1; } @@ -97,7 +108,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co fsync(mfh->fd); - mfh->nRecord++; + mfh->tombSize++; return 0; } @@ -182,6 +193,15 @@ static int32_t tsdbCheckMetaHeader(int fd) { static int32_t tsdbWriteMetaHeader(int fd) { // TODO: write the meta file header to file + char head[TSDB_META_FILE_HEADER_SIZE] = "\0"; + sprintf(head, "version: %d.%d", TSDB_META_FILE_VERSION_MAJOR, TSDB_META_FILE_VERSION_MINOR); + + write(fd, (void *)head, TSDB_META_FILE_HEADER_SIZE); + return 0; +} + +static int32_t tsdbReadMetaHeader(int fd) { + lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET); return 0; } @@ -218,8 +238,44 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) { return -1; } + mfh->size += TSDB_META_FILE_HEADER_SIZE; + mfh->fd = fd; - // TODO: iterate to read the meta file to restore the meta data + + void *buf = NULL; + int buf_size = 0; + + SRecordInfo info; + while (1) { + if (read(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) == 0) break; + if (info.offset < 0) { + mfh->size += (info.size + sizeof(SRecordInfo)); + mfh->tombSize += (info.size + sizeof(SRecordInfo)); + lseek(mfh->fd, info.size, SEEK_CUR); + mfh->size = mfh->size + sizeof(SRecordInfo) + info.size; + mfh->tombSize = mfh->tombSize + sizeof(SRecordInfo) + info.size; + } else { + if (taosHashPut(mfh->map, (char *)(&info.uid), sizeof(info.uid), (void *)(&info), sizeof(SRecordInfo)) < 0) { + if (buf) free(buf); + return -1; + } + + buf = realloc(buf, info.size); + if (buf == NULL) return -1; + + if (read(mfh->fd, buf, info.size) < 0) { + if (buf) free(buf); + return -1; + } + (*mfh->iFunc)(mfh->appH, buf, info.size); + + mfh->size = mfh->size + sizeof(SRecordInfo) + info.size; + } + + } + (*mfh->aFunc)(mfh->appH); + + if (buf) free(buf); return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 5ce25d2fd7dd88fb6d8dbfed3eda0333a3e172e5..46ae3940d2f260bbf3afa5cdf5c970ae6fe544c1 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -3,6 +3,44 @@ #include "tsdb.h" #include "dataformat.h" +#include "tsdbMeta.h" + +TEST(TsdbTest, tableEncodeDecode) { + STable *pTable = (STable *)malloc(sizeof(STable)); + + pTable->type = TSDB_NORMAL_TABLE; + pTable->tableId.uid = 987607499877672L; + pTable->tableId.tid = 0; + pTable->superUid = -1; + pTable->sversion = 0; + pTable->tagSchema = NULL; + pTable->tagVal = NULL; + int nCols = 5; + STSchema *schema = tdNewSchema(nCols); + + for (int i = 0; i < nCols; i++) { + if (i == 0) { + tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); + } else { + tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); + } + } + + pTable->schema = schema; + + int bufLen = 0; + void *buf = tsdbEncodeTable(pTable, &bufLen); + + STable *tTable = tsdbDecodeTable(buf, bufLen); + + ASSERT_EQ(pTable->type, tTable->type); + ASSERT_EQ(pTable->tableId.uid, tTable->tableId.uid); + ASSERT_EQ(pTable->tableId.tid, tTable->tableId.tid); + ASSERT_EQ(pTable->superUid, tTable->superUid); + ASSERT_EQ(pTable->sversion, tTable->sversion); + ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); + ASSERT_EQ(tTable->content.pData, nullptr); +} TEST(TsdbTest, createRepo) { STsdbCfg config; @@ -65,3 +103,7 @@ TEST(TsdbTest, createRepo) { int k = 0; } +TEST(TsdbTest, openRepo) { + tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); + ASSERT_NE(pRepo, nullptr); +} \ No newline at end of file