提交 59646be2 编写于 作者: L Liu Jicong

Merge branch 'feature/stream' into fix/wal

...@@ -103,8 +103,8 @@ typedef struct SWal { ...@@ -103,8 +103,8 @@ typedef struct SWal {
int32_t fsyncSeq; int32_t fsyncSeq;
// meta // meta
SWalVer vers; SWalVer vers;
TdFilePtr pWriteLogTFile; TdFilePtr pLogFile;
TdFilePtr pWriteIdxTFile; TdFilePtr pIdxFile;
int32_t writeCur; int32_t writeCur;
SArray *fileInfoSet; // SArray<SWalFileInfo> SArray *fileInfoSet; // SArray<SWalFileInfo>
// status // status
......
...@@ -101,8 +101,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -101,8 +101,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
// open meta // open meta
walResetVer(&pWal->vers); walResetVer(&pWal->vers);
pWal->pWriteLogTFile = NULL; pWal->pLogFile = NULL;
pWal->pWriteIdxTFile = NULL; pWal->pIdxFile = NULL;
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) { if (pWal->fileInfoSet == NULL) {
...@@ -179,10 +179,10 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { ...@@ -179,10 +179,10 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
void walClose(SWal *pWal) { void walClose(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
taosCloseFile(&pWal->pWriteLogTFile); taosCloseFile(&pWal->pLogFile);
pWal->pWriteLogTFile = NULL; pWal->pLogFile = NULL;
taosCloseFile(&pWal->pWriteIdxTFile); taosCloseFile(&pWal->pIdxFile);
pWal->pWriteIdxTFile = NULL; pWal->pIdxFile = NULL;
walSaveMeta(pWal); walSaveMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL; pWal->fileInfoSet = NULL;
...@@ -223,7 +223,7 @@ static void walFsyncAll() { ...@@ -223,7 +223,7 @@ static void walFsyncAll() {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
atomic_load_32(&tsWal.seq)); atomic_load_32(&tsWal.seq));
int32_t code = taosFsyncFile(pWal->pWriteLogTFile); int32_t code = taosFsyncFile(pWal->pLogFile);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(code)); strerror(code));
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
int64_t code = 0; int64_t code = 0;
TdFilePtr pIdxTFile = pWal->pWriteIdxTFile; TdFilePtr pIdxTFile = pWal->pIdxFile;
TdFilePtr pLogTFile = pWal->pWriteLogTFile; TdFilePtr pLogTFile = pWal->pLogFile;
// seek position // seek position
int64_t idxOff = walGetVerIdxOffset(pWal, ver); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
...@@ -68,8 +68,8 @@ int walInitWriteFile(SWal* pWal) { ...@@ -68,8 +68,8 @@ int walInitWriteFile(SWal* pWal) {
return -1; return -1;
} }
// switch file // switch file
pWal->pWriteIdxTFile = pIdxTFile; pWal->pIdxFile = pIdxTFile;
pWal->pWriteLogTFile = pLogTFile; pWal->pLogFile = pLogTFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
return 0; return 0;
} }
...@@ -78,15 +78,15 @@ int walChangeWrite(SWal* pWal, int64_t ver) { ...@@ -78,15 +78,15 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
int code; int code;
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (pWal->pWriteLogTFile != NULL) { if (pWal->pLogFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile); code = taosCloseFile(&pWal->pLogFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
} }
if (pWal->pWriteIdxTFile != NULL) { if (pWal->pIdxFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile); code = taosCloseFile(&pWal->pIdxFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -106,7 +106,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) { ...@@ -106,7 +106,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxTFile == NULL) { if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pWal->pWriteIdxTFile = NULL; pWal->pIdxFile = NULL;
return -1; return -1;
} }
walBuildLogName(pWal, fileFirstVer, fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr);
...@@ -114,12 +114,12 @@ int walChangeWrite(SWal* pWal, int64_t ver) { ...@@ -114,12 +114,12 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
if (pLogTFile == NULL) { if (pLogTFile == NULL) {
taosCloseFile(&pIdxTFile); taosCloseFile(&pIdxTFile);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pWal->pWriteLogTFile = NULL; pWal->pLogFile = NULL;
return -1; return -1;
} }
pWal->pWriteLogTFile = pLogTFile; pWal->pLogFile = pLogTFile;
pWal->pWriteIdxTFile = pIdxTFile; pWal->pIdxFile = pIdxTFile;
pWal->writeCur = idx; pWal->writeCur = idx;
return fileFirstVer; return fileFirstVer;
} }
......
...@@ -32,8 +32,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { ...@@ -32,8 +32,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
} }
} }
taosCloseFile(&pWal->pWriteLogTFile); taosCloseFile(&pWal->pLogFile);
taosCloseFile(&pWal->pWriteIdxTFile); taosCloseFile(&pWal->pIdxFile);
if (pWal->vers.firstVer != -1) { if (pWal->vers.firstVer != -1) {
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet); int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
...@@ -324,34 +324,34 @@ END: ...@@ -324,34 +324,34 @@ END:
int32_t walRollImpl(SWal *pWal) { int32_t walRollImpl(SWal *pWal) {
int32_t code = 0; int32_t code = 0;
if (pWal->pWriteIdxTFile != NULL) { if (pWal->pIdxFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile); code = taosCloseFile(&pWal->pIdxFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto END; goto END;
} }
} }
if (pWal->pWriteLogTFile != NULL) { if (pWal->pLogFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile); code = taosCloseFile(&pWal->pLogFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto END; goto END;
} }
} }
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxFile, pLogFile;
// create new file // create new file
int64_t newFileFirstVersion = pWal->vers.lastVer + 1; int64_t newFileFirstVer = pWal->vers.lastVer + 1;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVersion, fnameStr); walBuildIdxName(pWal, newFileFirstVer, fnameStr);
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxTFile == NULL) { if (pIdxFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
code = -1; code = -1;
goto END; goto END;
} }
walBuildLogName(pWal, newFileFirstVersion, fnameStr); walBuildLogName(pWal, newFileFirstVer, fnameStr);
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogTFile == NULL) { if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
code = -1; code = -1;
goto END; goto END;
...@@ -363,8 +363,8 @@ int32_t walRollImpl(SWal *pWal) { ...@@ -363,8 +363,8 @@ int32_t walRollImpl(SWal *pWal) {
} }
// switch file // switch file
pWal->pWriteIdxTFile = pIdxTFile; pWal->pIdxFile = pIdxFile;
pWal->pWriteLogTFile = pLogTFile; pWal->pLogFile = pLogFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
ASSERT(pWal->writeCur >= 0); ASSERT(pWal->writeCur >= 0);
...@@ -378,10 +378,10 @@ END: ...@@ -378,10 +378,10 @@ END:
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END); int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset, wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
idxOffset); idxOffset);
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry)); int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) { if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate // TODO truncate
...@@ -407,7 +407,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -407,7 +407,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
// TODO ftruncate // TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
...@@ -416,7 +416,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -416,7 +416,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
goto END; goto END;
} }
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) {
// TODO ftruncate // TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
...@@ -456,14 +456,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo ...@@ -456,14 +456,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo
return -1; return -1;
} }
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) { if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) { if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
} }
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0); ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -494,14 +494,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync ...@@ -494,14 +494,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
return -1; return -1;
} }
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) { if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) { if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
} }
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0); ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -524,7 +524,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in ...@@ -524,7 +524,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pWriteLogTFile) < 0) { if (taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
strerror(errno)); strerror(errno));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册