提交 975f8c2c 编写于 作者: H Hongze Cheng

update file magic with other method

上级 dd9f62cf
......@@ -42,6 +42,7 @@ extern int tsdbDebugFlag;
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
// Definitions
// ------------------ tsdbMeta.c
......@@ -230,7 +231,7 @@ typedef struct {
typedef struct {
int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
int32_t tid;
uint64_t uid;
SCompBlock blocks[];
} SCompInfo;
......@@ -308,6 +309,7 @@ typedef struct {
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
......
......@@ -264,6 +264,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
}
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) {
tsdbCloseFile(pFile);
......
......@@ -212,6 +212,8 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
char *sdup = strdup(pRepo->rootDir);
char *prefix = dirname(sdup);
int prefixLen = strlen(prefix);
tfree(sdup);
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
int fid = (*index) / TSDB_FILE_TYPE_MAX;
......@@ -220,8 +222,8 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
fname = tsdbGetMetaFileName(pRepo->rootDir);
*index = TSDB_META_FILE_INDEX;
magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta);
} else {
tfree(sdup);
return 0;
}
} else {
......@@ -229,42 +231,42 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
if (pFGroup->fileId == fid) {
fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname);
magic = pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].info.magic;
} else {
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < eindex) {
fname = strdup(pFGroup->files[0].fname);
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
magic = pFGroup->files[0].info.magic;
} else {
tfree(sdup);
return 0;
}
}
}
strcpy(name, fname + strlen(prefix));
strcpy(name, fname + prefixLen);
} else { // get the named file at the specified index. If not there, return 0
if (*index == TSDB_META_FILE_INDEX) { // get meta file
fname = tsdbGetMetaFileName(pRepo->rootDir);
magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta);
} else {
int fid = (*index) / TSDB_FILE_TYPE_MAX;
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pFGroup == NULL) { // not found
tfree(sdup);
return 0;
}
SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX];
fname = strdup(pFile->fname);
magic = pFile->info.magic;
}
}
if (stat(fname, &fState) < 0) {
tfree(sdup);
tfree(fname);
return 0;
}
tfree(sdup);
*size = fState.st_size;
magic = *size;
// magic = *size;
tfree(fname);
return magic;
......
......@@ -100,6 +100,7 @@ void tsdbResetHelper(SRWHelper *pHelper) {
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
ASSERT(pHelper != NULL && pGroup != NULL);
SFile *pFile = NULL;
// Clear the helper object
tsdbResetHelper(pHelper);
......@@ -127,18 +128,27 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
#ifdef TSDB_IDX
// Create and open .i file
if (tsdbOpenFile(helperNewIdxF(pHelper), O_WRONLY | O_CREAT) < 0) return -1;
if (tsdbUpdateFileHeader(helperNewIdxF(pHelper), 0) < 0) return -1;
pFile = helperNewIdxF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
#endif
// Create and open .h
if (tsdbOpenFile(helperNewHeadF(pHelper), O_WRONLY | O_CREAT) < 0) return -1;
if (tsdbUpdateFileHeader(helperNewHeadF(pHelper), 0) < 0) return -1;
pFile = helperNewHeadF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
// Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) {
if (tsdbOpenFile(helperNewLastF(pHelper), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsdbUpdateFileHeader(helperNewLastF(pHelper), 0) < 0) return -1;
pFile = helperNewLastF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) goto _err;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
}
} else {
if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err;
......@@ -334,7 +344,15 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last);
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0)
return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
#if 0
if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
......@@ -365,6 +383,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
return -1;
}
}
#endif
pHelper->hasOldLastBlock = false;
}
......@@ -379,50 +398,34 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
if (pIdx->len > 0) {
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pIdx->offset = offset;
pIdx->uid = pHelper->tableInfo.uid;
pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (tsendfile(pFile->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) {
tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
helperHeadF(pHelper)->fname, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
} else {
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
pHelper->pCompInfo->checksum = 0;
pHelper->pCompInfo->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) &&
(pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pIdx->offset = offset;
pIdx->uid = pHelper->tableInfo.uid;
pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
}
if (twrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pFile->info.magic = taosCalcChecksum(
pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)), sizeof(TSCKSUM));
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pIdx->offset = offset;
pIdx->uid = pHelper->tableInfo.uid;
pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (twrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
#ifdef TSDB_IDX
......@@ -463,6 +466,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
}
}
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len);
pFile->info.magic = taosCalcChecksum(
pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pWIdx, pFile->info.len - sizeof(TSCKSUM)), sizeof(TSCKSUM));
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
......@@ -594,7 +599,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
return -1;
}
ASSERT(pIdx->uid == pHelper->pCompInfo->uid);
ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid);
}
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
......@@ -813,6 +818,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
ASSERT(flen > 0);
flen += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
pFile->info.magic =
taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
if (ncol != 0) {
pCompCol->offset = toffset;
......@@ -831,6 +838,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)),
sizeof(TSCKSUM));
// Write the whole block to file
if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
......
......@@ -25,10 +25,11 @@ typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int64_t size; // including 512 bytes of header size
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
int64_t size; // including 512 bytes of header size
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
uint32_t magic;
} SStoreInfo;
typedef struct {
......@@ -45,6 +46,8 @@ typedef struct {
SStoreInfo info;
} SKVStore;
#define KVSTORE_MAGIC(s) (s)->info.magic
int tdCreateKVStore(char *fname);
int tdDestroyKVStore(char *fname);
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
......
......@@ -34,6 +34,7 @@
#define TD_KVSTORE_MAINOR_VERSION 0
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
#define TD_KVSTORE_NEW_SUFFIX ".new"
#define TD_KVSTORE_INIT_MAGIC 0xFFFFFFFF
typedef struct {
uint64_t uid;
......@@ -251,6 +252,8 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
return -1;
}
pStore->info.magic =
taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
pStore->info.size += (sizeof(SKVRecord) + contLen);
SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid));
if (pRecord != NULL) { // just to insert
......@@ -288,6 +291,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
return -1;
}
pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, POINTER_DISTANCE(pBuf, buf));
pStore->info.size += POINTER_DISTANCE(pBuf, buf);
pStore->info.nDels++;
pStore->info.nRecords--;
......@@ -371,7 +375,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
}
static int tdInitKVStoreHeader(int fd, char *fname) {
SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0};
SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0, TD_KVSTORE_INIT_MAGIC};
return tdUpdateKVStoreHeader(fd, fname, &info);
}
......@@ -382,6 +386,7 @@ static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) {
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
......@@ -391,6 +396,7 @@ static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册