提交 c6052fad 编写于 作者: L Liu Jicong

feat(wal): add append interface

上级 9fd5ec90
...@@ -45,7 +45,6 @@ extern "C" { ...@@ -45,7 +45,6 @@ extern "C" {
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDULL
typedef enum { typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2, TAOS_WAL_FSYNC = 2,
} EWalType; } EWalType;
...@@ -74,7 +73,7 @@ typedef struct { ...@@ -74,7 +73,7 @@ typedef struct {
int8_t isWeek; int8_t isWeek;
uint64_t seqNum; uint64_t seqNum;
uint64_t term; uint64_t term;
} SSyncLogMeta; } SWalSyncInfo;
typedef struct { typedef struct {
int8_t protoVer; int8_t protoVer;
...@@ -84,7 +83,7 @@ typedef struct { ...@@ -84,7 +83,7 @@ typedef struct {
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
// sync meta // sync meta
SSyncLogMeta syncMeta; SWalSyncInfo syncMeta;
char body[]; char body[];
} SWalCont; } SWalCont;
...@@ -149,11 +148,22 @@ SWal *walOpen(const char *path, SWalCfg *pCfg); ...@@ -149,11 +148,22 @@ SWal *walOpen(const char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *); void walClose(SWal *);
// write // write interfaces
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen); // By assigning index by the caller, wal gurantees linearizability
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen);
// This interface assign version automatically and return to caller.
// When using this interface with concurrent writes,
// wal will write all logs atomically,
// but not sure which one will be actually write first,
// and then the unique index of successful writen is returned.
// -1 will be returned for failed writes
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver); int32_t walCommit(SWal *, int64_t ver);
......
...@@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} else { } else {
ASSERT(pHandle->fetchMeta); ASSERT(pHandle->fetchMeta);
ASSERT(IS_META_MSG(pHead->msgType)); ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
/*metaRsp.reqOffset = pReq->reqOffset.version;*/ /*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/ /*metaRsp.rspOffset = fetchVer;*/
......
...@@ -122,8 +122,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI ...@@ -122,8 +122,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, err, "wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
err, errStr, sysErr, sysErrStr); snapshotIndex, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf); syncNodeErrorLog(pData->pSyncNode, logBuf);
return -1; return -1;
...@@ -207,13 +207,13 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -207,13 +207,13 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SyncIndex writeIndex = raftLogWriteIndex(pLogStore); SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
if (pEntry->index != writeIndex) { if (pEntry->index != writeIndex) {
sError("vgId:%d wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId, pEntry->index, sError("vgId:%d wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId,
writeIndex); pEntry->index, writeIndex);
pEntry->index = writeIndex; pEntry->index = writeIndex;
} }
int code = 0; int code = 0;
SSyncLogMeta syncMeta; SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
...@@ -272,8 +272,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ...@@ -272,8 +272,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
do { do {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err, snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
err, errStr, sysErr, sysErrStr); index, err, err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
syncNodeEventLog(pData->pSyncNode, logBuf); syncNodeEventLog(pData->pSyncNode, logBuf);
} else { } else {
...@@ -369,7 +369,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -369,7 +369,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
ASSERT(pEntry->index == lastIndex + 1); ASSERT(pEntry->index == lastIndex + 1);
int code = 0; int code = 0;
SSyncLogMeta syncMeta; SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
...@@ -418,8 +418,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -418,8 +418,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
do { do {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
err, err, errStr, sysErr, sysErrStr); index, err, err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
syncNodeEventLog(pData->pSyncNode, logBuf); syncNodeEventLog(pData->pSyncNode, logBuf);
} else { } else {
......
...@@ -146,12 +146,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); ...@@ -146,12 +146,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// seek section // seek section
int walChangeWrite(SWal* pWal, int64_t ver); int walChangeWrite(SWal* pWal, int64_t ver);
int walSetWrite(SWal* pWal); int walInitWriteFile(SWal* pWal);
// seek section end // seek section end
int64_t walGetSeq(); int64_t walGetSeq();
int walSeekWriteVer(SWal* pWal, int64_t ver); int walSeekWriteVer(SWal* pWal, int64_t ver);
int walRoll(SWal* pWal); int32_t walRollImpl(SWal* pWal);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -51,10 +51,10 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -51,10 +51,10 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &file_size, NULL); taosStatFile(fnameStr, &fileSize, NULL);
int readSize = TMIN(WAL_MAX_SIZE + 2, file_size); int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize);
pLastFileInfo->fileSize = file_size; pLastFileInfo->fileSize = fileSize;
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -145,6 +145,26 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -145,6 +145,26 @@ int walCheckAndRepairMeta(SWal* pWal) {
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(pLogInfoArray); int actualFileNum = taosArrayGetSize(pLogInfoArray);
#if 0
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, fileNo);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
if (fileSize == 0) {
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
taosArrayPop(pLogInfoArray);
} else {
break;
}
}
actualFileNum = taosArrayGetSize(pLogInfoArray);
#endif
if (metaFileNum > actualFileNum) { if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) { } else if (metaFileNum < actualFileNum) {
...@@ -164,6 +184,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -164,6 +184,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t fileSize = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL); taosStatFile(fnameStr, &fileSize, NULL);
/*ASSERT(fileSize != 0);*/
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) { if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = fileSize; pLastFileInfo->fileSize = fileSize;
...@@ -380,9 +401,9 @@ int walLoadMeta(SWal* pWal) { ...@@ -380,9 +401,9 @@ int walLoadMeta(SWal* pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr); walBuildMetaName(pWal, metaVer, fnameStr);
// read metafile // read metafile
int64_t file_size = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &file_size, NULL); taosStatFile(fnameStr, &fileSize, NULL);
int size = (int)file_size; int size = (int)fileSize;
char* buf = taosMemoryMalloc(size + 5); char* buf = taosMemoryMalloc(size + 5);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
......
...@@ -48,7 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { ...@@ -48,7 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
return 0; return 0;
} }
int walSetWrite(SWal* pWal) { int walInitWriteFile(SWal* pWal) {
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
...@@ -70,6 +70,7 @@ int walSetWrite(SWal* pWal) { ...@@ -70,6 +70,7 @@ int walSetWrite(SWal* pWal) {
// switch file // switch file
pWal->pWriteIdxTFile = pIdxTFile; pWal->pWriteIdxTFile = pIdxTFile;
pWal->pWriteLogTFile = pLogTFile; pWal->pWriteLogTFile = pLogTFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
return 0; return 0;
} }
......
...@@ -207,12 +207,35 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -207,12 +207,35 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return 0; return 0;
} }
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
/*pWal->vers.firstVer = index;*/
if (walRollImpl(pWal) < 0) {
return -1;
}
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
if (walRollImpl(pWal) < 0) {
return -1;
}
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
if (walRollImpl(pWal) < 0) {
return -1;
}
}
}
return 0;
}
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
walRoll(pWal); if (walGetLastFileSize(pWal) != 0) {
walRollImpl(pWal);
}
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
} }
...@@ -282,7 +305,7 @@ END: ...@@ -282,7 +305,7 @@ END:
return code; return code;
} }
int walRoll(SWal *pWal) { int32_t walRollImpl(SWal *pWal) {
int32_t code = 0; int32_t code = 0;
if (pWal->pWriteIdxTFile != NULL) { if (pWal->pWriteIdxTFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile); code = taosCloseFile(&pWal->pWriteIdxTFile);
...@@ -330,11 +353,13 @@ int walRoll(SWal *pWal) { ...@@ -330,11 +353,13 @@ int walRoll(SWal *pWal) {
pWal->lastRollSeq = walGetSeq(); pWal->lastRollSeq = walGetSeq();
walSaveMeta(pWal);
END: END:
return code; return code;
} }
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END); int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset, wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
...@@ -348,61 +373,14 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { ...@@ -348,61 +373,14 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0; return 0;
} }
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, // TODO gurantee atomicity by truncate failed writing
int32_t bodyLen) { static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
int32_t code = 0; const void *body, int32_t bodyLen) {
int64_t code = 0;
// no wal
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
if (index == pWal->vers.lastVer + 1) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->vers.firstVer = index;
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
}
} else {
// reject skip log or rewrite log
// must truncate explicitly first
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
ASSERT(pWal->writeCur >= 0); int64_t offset = walGetCurFileOffset(pWal);
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteLogTFile == NULL) {
walSetWrite(pWal);
taosLSeekFile(pWal->pWriteLogTFile, 0, SEEK_END);
taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
}
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
...@@ -417,7 +395,8 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -417,7 +395,8 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
return -1; code = -1;
goto END;
} }
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
...@@ -425,13 +404,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -425,13 +404,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
return -1; code = -1;
goto END;
} }
code = walWriteIndex(pWal, index, offset); code = walWriteIndex(pWal, index, offset);
if (code != 0) { if (code < 0) {
// TODO // TODO ftruncate
return -1; goto END;
} }
// set status // set status
...@@ -444,13 +424,88 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -444,13 +424,88 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen;
return 0;
END:
return -1;
}
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
int64_t index = pWal->vers.lastVer + 1;
if (walCheckAndRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return index;
}
return 0; int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen) {
int32_t code = 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
// concurrency control:
// if logs are write with assigned index,
// smaller index must be write before larger one
if (index != pWal->vers.lastVer + 1) {
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (walCheckAndRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
taosThreadMutexUnlock(&pWal->mutex);
return code;
} }
int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
SSyncLogMeta syncMeta = { SWalSyncInfo syncMeta = {
.isWeek = -1, .isWeek = -1,
.seqNum = UINT64_MAX, .seqNum = UINT64_MAX,
.term = UINT64_MAX, .term = UINT64_MAX,
......
...@@ -106,8 +106,8 @@ int32_t taosMkDir(const char *dirname) { ...@@ -106,8 +106,8 @@ int32_t taosMkDir(const char *dirname) {
int32_t taosMulMkDir(const char *dirname) { int32_t taosMulMkDir(const char *dirname) {
if (dirname == NULL) return -1; if (dirname == NULL) return -1;
char temp[1024]; char temp[1024];
char * pos = temp; char *pos = temp;
int32_t code = 0; int32_t code = 0;
#ifdef WINDOWS #ifdef WINDOWS
taosRealPath(dirname, temp, sizeof(temp)); taosRealPath(dirname, temp, sizeof(temp));
...@@ -127,11 +127,11 @@ int32_t taosMulMkDir(const char *dirname) { ...@@ -127,11 +127,11 @@ int32_t taosMulMkDir(const char *dirname) {
for (; *pos != '\0'; pos++) { for (; *pos != '\0'; pos++) {
if (*pos == TD_DIRSEP[0]) { if (*pos == TD_DIRSEP[0]) {
*pos = '\0'; *pos = '\0';
#ifdef WINDOWS #ifdef WINDOWS
code = _mkdir(temp, 0755); code = _mkdir(temp, 0755);
#else #else
code = mkdir(temp, 0755); code = mkdir(temp, 0755);
#endif #endif
if (code < 0 && errno != EEXIST) { if (code < 0 && errno != EEXIST) {
return code; return code;
} }
...@@ -140,11 +140,11 @@ int32_t taosMulMkDir(const char *dirname) { ...@@ -140,11 +140,11 @@ int32_t taosMulMkDir(const char *dirname) {
} }
if (*(pos - 1) != TD_DIRSEP[0]) { if (*(pos - 1) != TD_DIRSEP[0]) {
#ifdef WINDOWS #ifdef WINDOWS
code = _mkdir(temp, 0755); code = _mkdir(temp, 0755);
#else #else
code = mkdir(temp, 0755); code = mkdir(temp, 0755);
#endif #endif
if (code < 0 && errno != EEXIST) { if (code < 0 && errno != EEXIST) {
return code; return code;
} }
...@@ -267,7 +267,7 @@ char *taosDirName(char *name) { ...@@ -267,7 +267,7 @@ char *taosDirName(char *name) {
} else { } else {
name[0] = 0; name[0] = 0;
} }
return name; return name;
#else #else
return dirname(name); return dirname(name);
#endif #endif
...@@ -334,9 +334,9 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) { ...@@ -334,9 +334,9 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) {
} }
char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) { char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) {
if (pDirEntry == NULL) { /*if (pDirEntry == NULL) {*/
return NULL; /*return NULL;*/
} /*}*/
#ifdef WINDOWS #ifdef WINDOWS
return pDirEntry->findFileData.cFileName; return pDirEntry->findFileData.cFileName;
#else #else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册