提交 7820ba78 编写于 作者: H Hongze Cheng

fix bugs

上级 ddf53cd7
...@@ -44,6 +44,7 @@ IF (TD_LINUX) ...@@ -44,6 +44,7 @@ IF (TD_LINUX)
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMENT "prepare taosd environment") COMMENT "prepare taosd environment")
ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD}) ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD})
ENDIF () ENDIF ()
...@@ -40,7 +40,7 @@ typedef struct { ...@@ -40,7 +40,7 @@ typedef struct {
int tfsInit(SDiskCfg *pDiskCfg, int ndisk); int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
void tfsDestroy(); void tfsDestroy();
void tfsUpdateInfo(); void tfsUpdateInfo(SFSMeta *pFSMeta);
void tfsGetMeta(SFSMeta *pMeta); void tfsGetMeta(SFSMeta *pMeta);
void tfsAllocDisk(int expLevel, int *level, int *id); void tfsAllocDisk(int expLevel, int *level, int *id);
......
...@@ -102,7 +102,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { ...@@ -102,7 +102,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
return -1; return -1;
} }
tfsUpdateInfo(); tfsUpdateInfo(NULL);
for (int level = 0; level < TFS_NLEVEL(); level++) { for (int level = 0; level < TFS_NLEVEL(); level++) {
tfsPosNextId(TFS_TIER_AT(level)); tfsPosNextId(TFS_TIER_AT(level));
} }
......
...@@ -73,8 +73,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg); ...@@ -73,8 +73,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs); void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdbRepo *pRepo); int tsdbOpenFS(STsdbRepo *pRepo);
void tsdbCloseFS(STsdbRepo *pRepo); void tsdbCloseFS(STsdbRepo *pRepo);
void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd); void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdbFS *pfs); int tsdbEndFSTxn(STsdbRepo *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs); int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
......
...@@ -37,6 +37,7 @@ typedef struct { ...@@ -37,6 +37,7 @@ typedef struct {
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
SArray * aBlkIdx; // SBlockIdx array SArray * aBlkIdx; // SBlockIdx array
STable * pTable;
SArray * aSupBlk; // Table super-block array SArray * aSupBlk; // Table super-block array
SArray * aSubBlk; // table sub-block array SArray * aSubBlk; // table sub-block array
SDataCols * pDataCols; SDataCols * pDataCols;
...@@ -45,7 +46,7 @@ typedef struct { ...@@ -45,7 +46,7 @@ typedef struct {
#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) #define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) #define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) #define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet))
#define TSDB_COMMIT_TABLE(ch) TSDB_READ_TABLE(&(ch->readh)) #define TSDB_COMMIT_TABLE(ch) ((ch)->pTable)
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
...@@ -325,12 +326,13 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { ...@@ -325,12 +326,13 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
cfid = pSet->fid; cfid = pSet->fid;
pSet = tsdbFSIterNext(&(commith.fsIter)); pSet = tsdbFSIterNext(&(commith.fsIter));
} }
fid = tsdbNextCommitFid(&commith);
if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) { if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) {
tsdbDestroyCommitH(&commith); tsdbDestroyCommitH(&commith);
return -1; return -1;
} }
fid = tsdbNextCommitFid(&commith);
} }
} }
...@@ -346,7 +348,7 @@ static int tsdbStartCommit(STsdbRepo *pRepo) { ...@@ -346,7 +348,7 @@ static int tsdbStartCommit(STsdbRepo *pRepo) {
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
tsdbStartFSTxn(REPO_FS(pRepo), pMem->pointsAdd, pMem->storageAdd); tsdbStartFSTxn(pRepo, pMem->pointsAdd, pMem->storageAdd);
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
return 0; return 0;
...@@ -356,7 +358,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { ...@@ -356,7 +358,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) { if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo)); tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else { } else {
tsdbEndFSTxn(REPO_FS(pRepo)); tsdbEndFSTxn(pRepo);
} }
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
...@@ -699,6 +701,8 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -699,6 +701,8 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pCommith->pTable = pTable;
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
...@@ -1246,6 +1250,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { ...@@ -1246,6 +1250,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
tdResetDataCols(pCommith->pDataCols); tdResetDataCols(pCommith->pDataCols);
taosArrayClear(pCommith->aSubBlk); taosArrayClear(pCommith->aSubBlk);
taosArrayClear(pCommith->aSupBlk); taosArrayClear(pCommith->aSupBlk);
pCommith->pTable = NULL;
} }
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
static int tsdbComparFidFSet(const void *arg1, const void *arg2); static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus); static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbApplyFSTxn(STsdbFS *pfs); static int tsdbApplyFSTxn(STsdbFS *pfs, int vid);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
// ================== CURRENT file header info // ================== CURRENT file header info
...@@ -238,7 +238,8 @@ void tsdbCloseFS(STsdbRepo *pRepo) { ...@@ -238,7 +238,8 @@ void tsdbCloseFS(STsdbRepo *pRepo) {
} }
// Start a new transaction to modify the file system // Start a new transaction to modify the file system
void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd) { void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) {
STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(pfs->intxn == false); ASSERT(pfs->intxn == false);
pfs->intxn = true; pfs->intxn = true;
...@@ -251,12 +252,13 @@ void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd) { ...@@ -251,12 +252,13 @@ void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd) {
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
int tsdbEndFSTxn(STsdbFS *pfs) { int tsdbEndFSTxn(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(FS_IN_TXN(pfs)); ASSERT(FS_IN_TXN(pfs));
SFSStatus *pStatus; SFSStatus *pStatus;
// Write current file system snapshot // Write current file system snapshot
if (tsdbApplyFSTxn(pfs) < 0) { if (tsdbApplyFSTxn(pfs, REPO_ID(pRepo)) < 0) {
tsdbEndFSTxnWithError(pfs); tsdbEndFSTxnWithError(pfs);
return -1; return -1;
} }
...@@ -286,14 +288,19 @@ void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pf ...@@ -286,14 +288,19 @@ void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pf
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbApplyFSTxn(STsdbFS *pfs) { static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) {
ASSERT(FS_IN_TXN(pfs)); ASSERT(FS_IN_TXN(pfs));
SFSHeader fsheader; SFSHeader fsheader;
void * pBuf = NULL; void * pBuf = NULL;
void * ptr; void * ptr;
char hbuf[TSDB_FILE_HEAD_SIZE] = "\0"; char hbuf[TSDB_FILE_HEAD_SIZE] = "\0";
char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0";
int fd = open(TSDB_FS_TEMP_FNAME, O_WRONLY | O_CREAT | O_TRUNC, 0755); snprintf(tfname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), vid, TSDB_FS_TEMP_FNAME);
snprintf(cfname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), vid, TSDB_FS_CURRENT_FNAME);
int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC, 0755);
if (fd < 0) { if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -317,7 +324,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) { ...@@ -317,7 +324,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) {
if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
remove(TSDB_FS_TEMP_FNAME); remove(tfname);
return -1; return -1;
} }
...@@ -325,7 +332,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) { ...@@ -325,7 +332,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) {
if (fsheader.len > 0) { if (fsheader.len > 0) {
if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) { if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) {
close(fd); close(fd);
remove(TSDB_FS_TEMP_FNAME); remove(tfname);
return -1; return -1;
} }
...@@ -336,7 +343,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) { ...@@ -336,7 +343,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) {
if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) { if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
remove(TSDB_FS_TEMP_FNAME); remove(tfname);
taosTZfree(pBuf); taosTZfree(pBuf);
return -1; return -1;
} }
...@@ -346,13 +353,13 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) { ...@@ -346,13 +353,13 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) {
if (fsync(fd) < 0) { if (fsync(fd) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
remove(TSDB_FS_TEMP_FNAME); remove(tfname);
taosTZfree(pBuf); taosTZfree(pBuf);
return -1; return -1;
} }
(void)close(fd); (void)close(fd);
(void)rename(TSDB_FS_TEMP_FNAME, TSDB_FS_CURRENT_FNAME); (void)rename(tfname, cfname);
taosTZfree(pBuf); taosTZfree(pBuf);
return 0; return 0;
...@@ -398,13 +405,11 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) { ...@@ -398,13 +405,11 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
} }
} else if (pSetFrom == NULL || pSetFrom->fid > pSetTo->fid) { } else if (pSetFrom == NULL || pSetFrom->fid > pSetTo->fid) {
// Do nothing // Do nothing
if (pSetFrom) { ito++;
ito++; if (ito >= sizeTo) {
if (ito >= sizeTo) { pSetTo = NULL;
pSetTo = NULL; } else {
} else { pSetTo = taosArrayGet(pTo->df, ito);
pSetTo = taosArrayGet(pTo->df, ito);
}
} }
} else { } else {
tsdbApplyDFileSetChange(pSetFrom, pSetTo); tsdbApplyDFileSetChange(pSetFrom, pSetTo);
......
...@@ -91,7 +91,9 @@ int tsdbCreateMFile(SMFile *pMFile) { ...@@ -91,7 +91,9 @@ int tsdbCreateMFile(SMFile *pMFile) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
if (tsdbOpenMFile(pMFile, O_WRONLY | O_CREAT | O_EXCL) < 0) { pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), O_WRONLY | O_CREAT | O_EXCL, 0755);
if (pMFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -212,7 +214,9 @@ int tsdbCreateDFile(SDFile *pDFile) { ...@@ -212,7 +214,9 @@ int tsdbCreateDFile(SDFile *pDFile) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
if (tsdbOpenDFile(pDFile, O_WRONLY | O_CREAT | O_EXCL) < 0) { pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_EXCL, 0755);
if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -353,7 +357,9 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) { ...@@ -353,7 +357,9 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbApplyDFileChange(TSDB_DFILE_IN_SET(from, ftype), TSDB_DFILE_IN_SET(to, ftype)) < 0) { SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL;
SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL;
if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) {
return -1; return -1;
} }
} }
...@@ -387,9 +393,9 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c ...@@ -387,9 +393,9 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c
if (ftype < TSDB_FILE_MAX) { if (ftype < TSDB_FILE_MAX) {
if (ver == 0) { if (ver == 0) {
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]); snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
} else { } else {
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-ver%" PRIu32, vid, vid, fid, snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d%s-ver%" PRIu32, vid, vid, fid,
TSDB_FNAME_SUFFIX[ftype], ver); TSDB_FNAME_SUFFIX[ftype], ver);
} }
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册