未验证 提交 451a39d4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2890 from taosdata/feature/2.0tsdb

add file version TSDB file and meta file
......@@ -2379,10 +2379,11 @@ static void bottom_func_second_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
do_bottom_function_add(pOutput, pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp,
pCtx->outputType, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType;
do_bottom_function_add(pOutput, pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp, type,
&pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
if (pOutput->num > 0) {
......
......@@ -44,6 +44,10 @@ extern int tsdbDebugFlag;
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
// NOTE: Any file format change must increase this version number by 1
// Also, implement the convert function
#define TSDB_FILE_VERSION ((uint32_t)0)
// Definitions
// ------------------ tsdbMeta.c
typedef struct STable {
......@@ -443,7 +447,7 @@ void tsdbCloseFile(SFile* pFile);
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
void tsdbFitRetention(STsdbRepo* pRepo);
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
......
......@@ -247,11 +247,14 @@ int tsdbOpenFile(SFile *pFile, int oflag) {
return -1;
}
tsdbTrace("open file %s, fd %d", pFile->fname, pFile->fd);
return 0;
}
void tsdbCloseFile(SFile *pFile) {
if (TSDB_IS_FILE_OPENED(pFile)) {
tsdbTrace("close file %s, fd %d", pFile->fname, pFile->fd);
close(pFile->fd);
pFile->fd = -1;
}
......@@ -276,7 +279,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) {
if (tsdbUpdateFileHeader(pFile) < 0) {
tsdbCloseFile(pFile);
return -1;
}
......@@ -313,11 +316,11 @@ void tsdbFitRetention(STsdbRepo *pRepo) {
pthread_rwlock_unlock(&(pFileH->fhlock));
}
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
int tsdbUpdateFileHeader(SFile *pFile) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
void *pBuf = (void *)buf;
taosEncodeFixedU32((void *)(&pBuf), version);
taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION);
tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
......@@ -409,6 +412,11 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
if (version != TSDB_FILE_VERSION) {
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
}
tsdbCloseFile(pFile);
return 0;
......
......@@ -132,7 +132,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
#endif
// Create and open .h
......@@ -140,7 +140,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
// Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) {
......@@ -149,7 +149,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
pFile->info.len = 0;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
}
} else {
if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) return -1;
......@@ -166,44 +166,36 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
#ifdef TSDB_IDX
pFile = helperIdxF(pHelper);
if (pFile->fd > 0) {
close(pFile->fd);
pFile->fd = -1;
}
tsdbCloseFile(pFile);
#endif
pFile = helperHeadF(pHelper);
if (pFile->fd > 0) {
close(pFile->fd);
pFile->fd = -1;
}
tsdbCloseFile(pFile);
pFile = helperDataF(pHelper);
if (pFile->fd > 0) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else {
// TODO: shrink back to origin
}
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
}
pFile = helperLastF(pHelper);
if (pFile->fd > 0) {
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else {
// TODO: shrink back to origin
}
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
}
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
......@@ -211,11 +203,10 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pFile = helperNewIdxF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
#endif
......@@ -223,22 +214,20 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pFile = helperNewHeadF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
pFile = helperNewLastF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
}
......
......@@ -21,6 +21,8 @@ extern "C" {
#include <stdint.h>
#define KVSTORE_FILE_VERSION ((uint32_t)0)
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
......
......@@ -44,7 +44,7 @@ static char * tdGetKVStoreSnapshotFname(char *fdata);
static char * tdGetKVStoreNewFname(char *fdata);
static void tdFreeKVStore(SKVStore *pStore);
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version);
static int tdEncodeKVRecord(void **buf, SKVRecord *pRecord);
static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord);
static int tdRestoreKVStore(SKVStore *pStore);
......@@ -91,6 +91,7 @@ int tdDestroyKVStore(char *fname) {
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SStoreInfo info = {0};
uint32_t version = 0;
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
if (pStore == NULL) return NULL;
......@@ -111,9 +112,14 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
}
} else {
uDebug("file %s exists, try to recover the KV store", pStore->fsnap);
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) {
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info, &version) < 0) {
if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err;
} else {
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fsnap,
version, KVSTORE_FILE_VERSION);
}
if (ftruncate(pStore->fd, info.size) < 0) {
uError("failed to truncate %s to %" PRId64 " size since %s", pStore->fname, info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -132,7 +138,11 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
(void)remove(pStore->fsnap);
}
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info, &version) < 0) goto _err;
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fname, version,
KVSTORE_FILE_VERSION);
}
pStore->info.size = TD_KVSTORE_HEADER_SIZE;
pStore->info.magic = info.magic;
......@@ -320,7 +330,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
return 0;
}
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
if (lseek(fd, 0, SEEK_SET) < 0) {
......@@ -341,7 +351,9 @@ static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
return -1;
}
tdDecodeStoreInfo(buf, pInfo);
void *pBuf = (void *)buf;
pBuf = tdDecodeStoreInfo(pBuf, pInfo);
pBuf = taosDecodeFixedU32(pBuf, version);
return 0;
}
......@@ -357,6 +369,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
void *pBuf = buf;
tdEncodeStoreInfo(&pBuf, pInfo);
taosEncodeFixedU32(&pBuf, KVSTORE_FILE_VERSION);
ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册