diff --git a/src/vnode/tsdb/inc/tsdbMetaFile.h b/src/vnode/tsdb/inc/tsdbMetaFile.h index b6b3fd4e4e1082830327c3da626d2f30f9e64133..a0cf2a005cd549acd4500bd20f811d3b1afcbd3d 100644 --- a/src/vnode/tsdb/inc/tsdbMetaFile.h +++ b/src/vnode/tsdb/inc/tsdbMetaFile.h @@ -31,7 +31,7 @@ typedef void (*afterFunc)(void *); typedef struct { int fd; // File descriptor int nDel; // number of deletions - int tombSize; // Number of records + int tombSize; // deleted size int64_t size; // Total file size void * map; // Map from uid ==> position iterFunc iFunc; diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 7f1cd0fd1aad2a306f31942e6360a7d42bade49f..98dcd45bedece4351a67430a213c597663fb35cd 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -241,9 +241,22 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { } table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + // Register to meta if (newSuper) tsdbAddTableToMeta(pMeta, super, true); tsdbAddTableToMeta(pMeta, table, true); + // Write to meta file + int bufLen = 0; + if (newSuper) { + void *buf = tsdbEncodeTable(super, &bufLen); + tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); + tsdbFreeEncode(buf); + } + + void *buf = tsdbEncodeTable(table, &bufLen); + tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); + tsdbFreeEncode(buf); + return 0; } diff --git a/src/vnode/tsdb/src/tsdbMetaFile.c b/src/vnode/tsdb/src/tsdbMetaFile.c index 9b5d4daa2eb10b30b870fff45d524f30cfe8a456..689f8033db895781ebfed49727c4f2b54b357969 100644 --- a/src/vnode/tsdb/src/tsdbMetaFile.c +++ b/src/vnode/tsdb/src/tsdbMetaFile.c @@ -45,6 +45,9 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, af mfh->iFunc = iFunc; mfh->aFunc = aFunc; mfh->appH = appH; + mfh->nDel = 0; + mfh->tombSize = 0; + mfh->size = 0; // OPEN MAP mfh->map = @@ -62,6 +65,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, af free(mfh); return NULL; } + mfh->size += TSDB_META_FILE_HEADER_SIZE; } else { // file exists, recover from file if (tsdbRestoreFromMetaFile(fname, mfh) < 0) { taosHashCleanup(mfh->map); @@ -80,7 +84,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co SRecordInfo info; info.offset = mfh->size; - info.size = contLen; // TODO: Here is not correct + info.size = contLen; info.uid = uid; mfh->size += (contLen + sizeof(SRecordInfo)); @@ -90,7 +94,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co } // TODO: make below a function to implement - if (lseek(mfh->fd, info.offset, SEEK_CUR) < 0) { + if (lseek(mfh->fd, info.offset, SEEK_SET) < 0) { return -1; } @@ -196,6 +200,11 @@ static int32_t tsdbWriteMetaHeader(int fd) { return 0; } +static int32_t tsdbReadMetaHeader(int fd) { + lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET); + return 0; +} + static int tsdbCreateMetaFile(char *fname) { int fd = open(fname, O_RDWR | O_CREAT, 0755); if (fd < 0) return -1; @@ -229,6 +238,8 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) { return -1; } + mfh->size += TSDB_META_FILE_HEADER_SIZE; + mfh->fd = fd; void *buf = NULL; @@ -241,20 +252,30 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) { mfh->size += (info.size + sizeof(SRecordInfo)); mfh->tombSize += (info.size + sizeof(SRecordInfo)); lseek(mfh->fd, info.size, SEEK_CUR); + mfh->size = mfh->size + sizeof(SRecordInfo) + info.size; + mfh->tombSize = mfh->tombSize + sizeof(SRecordInfo) + info.size; } else { if (taosHashPut(mfh->map, (char *)(&info.uid), sizeof(info.uid), (void *)(&info), sizeof(SRecordInfo)) < 0) { + if (buf) free(buf); return -1; } buf = realloc(buf, info.size); if (buf == NULL) return -1; - if (read(mfh->fd, buf, info.size) < 0) return -1; + if (read(mfh->fd, buf, info.size) < 0) { + if (buf) free(buf); + return -1; + } (*mfh->iFunc)(mfh->appH, buf, info.size); + + mfh->size = mfh->size + sizeof(SRecordInfo) + info.size; } } (*mfh->aFunc)(mfh->appH); + if (buf) free(buf); + return 0; } \ No newline at end of file