未验证 提交 4e965b31 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15388 from taosdata/feature/stream

feat(wal): ref
...@@ -114,21 +114,30 @@ typedef struct SWal { ...@@ -114,21 +114,30 @@ typedef struct SWal {
int64_t refId; int64_t refId;
TdThreadMutex mutex; TdThreadMutex mutex;
// ref // ref
SHashObj *pRefHash; // ref -> SWalRef SHashObj *pRefHash; // refId -> SWalRef
// path // path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
// reusable write head // reusable write head
SWalCkHead writeHead; SWalCkHead writeHead;
} SWal; // WAL HANDLE } SWal;
typedef struct {
int64_t refId;
int64_t refVer;
int64_t refFile;
SWal *pWal;
} SWalRef;
typedef struct { typedef struct {
int8_t scanUncommited; int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta; int8_t scanMeta;
int8_t enableRef; int8_t enableRef;
} SWalFilterCond; } SWalFilterCond;
typedef struct { typedef struct {
SWal *pWal; SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile; TdFilePtr pLogFile;
TdFilePtr pIdxFile; TdFilePtr pIdxFile;
int64_t curFileFirstVer; int64_t curFileFirstVer;
...@@ -138,7 +147,8 @@ typedef struct { ...@@ -138,7 +147,8 @@ typedef struct {
int8_t curStopped; int8_t curStopped;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;
SWalCkHead *pHead; // TODO remove it
SWalCkHead *pHead;
} SWalReader; } SWalReader;
// module initialization // module initialization
...@@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_ ...@@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen); int32_t bodyLen);
// This interface assign version automatically and return to caller. // Assign version automatically and return to caller,
// When using this interface with concurrent writes,
// wal will write all logs atomically,
// but not sure which one will be actually write first,
// and then the unique index of successful writen is returned.
// -1 will be returned for failed writes // -1 will be returned for failed writes
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
...@@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); ...@@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId; SWalRef *walRefCommittedVer(SWal *);
int64_t ver;
} SWalRef;
SWalRef *walOpenRef(SWal *); SWalRef *walOpenRef(SWal *);
void walCloseRef(SWalRef *); void walCloseRef(SWal *pWal, int64_t refId);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *); void walUnrefVer(SWalRef *);
// help function for raft // helper function for raft
bool walLogExist(SWal *, int64_t ver); bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *); bool walIsEmpty(SWal *);
......
...@@ -104,6 +104,8 @@ typedef struct { ...@@ -104,6 +104,8 @@ typedef struct {
// TODO remove // TODO remove
SWalReader* pWalReader; SWalReader* pWalReader;
SWalRef* pRef;
// push // push
STqPushHandle pushHandle; STqPushHandle pushHandle;
......
...@@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
return -1;
}
}
/*}*/ /*}*/
/*}*/ /*}*/
...@@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
code = -1; code = -1;
goto OVER; goto OVER;
...@@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType; pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta; pHandle->fetchMeta = req.withMeta;
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) {
ASSERT(0);
}
int64_t ver = pRef->refVer;
pHandle->pRef = pRef;
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
// TODO version should be assigned in preprocess
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
...@@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader); ASSERT(pHandle->execHandle.pExecReader);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
......
...@@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT(0); ASSERT(0);
} }
TXN txn; TXN txn = {0};
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0); ASSERT(0);
...@@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle; STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = { SReadHandle reader = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
...@@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) {
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader); ASSERT(handle.execHandle.pExecReader);
} else { } else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }
......
...@@ -93,7 +93,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -93,7 +93,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
} }
// init ref // init ref
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (pWal->pRefHash == NULL) { if (pWal->pRefHash == NULL) {
taosMemoryFree(pWal); taosMemoryFree(pWal);
return NULL; return NULL;
......
...@@ -21,107 +21,112 @@ static int32_t walFetchBodyNew(SWalReader *pRead); ...@@ -21,107 +21,112 @@ static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead); static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader)); SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
if (pRead == NULL) { if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pRead->pWal = pWal; pReader->pWal = pWal;
pRead->pIdxFile = NULL; pReader->readerId = tGenIdPI64();
pRead->pLogFile = NULL; pReader->pIdxFile = NULL;
pRead->curVersion = -1; pReader->pLogFile = NULL;
pRead->curFileFirstVer = -1; pReader->curVersion = -1;
pRead->curInvalid = 1; pReader->curFileFirstVer = -1;
pRead->capacity = 0; pReader->curInvalid = 1;
pReader->capacity = 0;
if (cond) { if (cond) {
pRead->cond = *cond; pReader->cond = *cond;
} else { } else {
pRead->cond.scanMeta = 0; pReader->cond.scanUncommited = 0;
pRead->cond.scanUncommited = 0; pReader->cond.scanNotApplied = 0;
pRead->cond.enableRef = 0; pReader->cond.scanMeta = 0;
pReader->cond.enableRef = 0;
} }
taosThreadMutexInit(&pRead->mutex, NULL); taosThreadMutexInit(&pReader->mutex, NULL);
/*if (pRead->cond.enableRef) {*/ pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
/*walOpenRef(pWal);*/ if (pReader->pHead == NULL) {
/*}*/
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosMemoryFree(pRead); taosMemoryFree(pReader);
return NULL; return NULL;
} }
return pRead; /*if (pReader->cond.enableRef) {*/
/* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
/*}*/
return pReader;
} }
void walCloseReader(SWalReader *pRead) { void walCloseReader(SWalReader *pReader) {
taosCloseFile(&pRead->pIdxFile); taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pRead->pLogFile); taosCloseFile(&pReader->pLogFile);
taosMemoryFreeClear(pRead->pHead); /*if (pReader->cond.enableRef) {*/
taosMemoryFree(pRead); /*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
/*}*/
taosMemoryFreeClear(pReader->pHead);
taosMemoryFree(pReader);
} }
int32_t walNextValidMsg(SWalReader *pRead) { int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pRead->curVersion; int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pRead->pWal); int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pRead->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pRead->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal);
int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer; int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
endVer = TMIN(appliedVer, endVer); endVer = TMIN(appliedVer, endVer);
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld", wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
pRead->curStopped = 0; pReader->curStopped = 0;
while (fetchVer <= endVer) { while (fetchVer <= endVer) {
if (walFetchHeadNew(pRead, fetchVer) < 0) { if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1; return -1;
} }
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT || if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) { (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
if (walFetchBodyNew(pRead) < 0) { if (walFetchBodyNew(pReader) < 0) {
return -1; return -1;
} }
return 0; return 0;
} else { } else {
if (walSkipFetchBodyNew(pRead) < 0) { if (walSkipFetchBodyNew(pReader) < 0) {
return -1; return -1;
} }
fetchVer++; fetchVer++;
ASSERT(fetchVer == pRead->curVersion); ASSERT(fetchVer == pReader->curVersion);
} }
} }
pRead->curStopped = 1; pReader->curStopped = 1;
return -1; return -1;
} }
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) { static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0; int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pIdxFile; TdFilePtr pIdxTFile = pReader->pIdxFile;
TdFilePtr pLogTFile = pRead->pLogFile; TdFilePtr pLogTFile = pReader->pLogFile;
// seek position // seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
offset, terrstr()); ver, offset, terrstr());
return -1; return -1;
} }
SWalIdxEntry entry = {0}; SWalIdxEntry entry = {0};
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) { if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to read idx file, since %s", pRead->pWal->cfg.vgId, terrstr()); wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64, wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64,
pRead->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry)); pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
} }
return -1; return -1;
} }
...@@ -130,79 +135,79 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64 ...@@ -130,79 +135,79 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
entry.offset, terrstr()); ver, entry.offset, terrstr());
return -1; return -1;
} }
return ret; return ret;
} }
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
taosCloseFile(&pRead->pIdxFile); taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pRead->pLogFile); taosCloseFile(&pReader->pLogFile);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pLogTFile == NULL) { if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr()); wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
return -1; return -1;
} }
pRead->pLogFile = pLogTFile; pReader->pLogFile = pLogFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pIdxTFile == NULL) { if (pIdxFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr()); wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
return -1; return -1;
} }
pRead->pIdxFile = pIdxTFile; pReader->pIdxFile = pIdxFile;
return 0; return 0;
} }
int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) { int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pReader->pWal;
// bsearch in fileSet
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
// bsearch in fileSet
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
if (pRead->curFileFirstVer != pRet->firstVer) { if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner // error code was set inner
if (walReadChangeFile(pRead, pRet->firstVer) < 0) { if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
return -1; return -1;
} }
} }
// error code was set inner // error code was set inner
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
return -1; return -1;
} }
wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver); wDebug("wal version reset from %ld(invalid: %d) to %ld", pReader->curVersion, pReader->curInvalid, ver);
pRead->curVersion = ver; pReader->curVersion = ver;
return 0; return 0;
} }
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pReader->pWal;
if (!pRead->curInvalid && ver == pRead->curVersion) { if (!pReader->curInvalid && ver == pReader->curVersion) {
wDebug("wal version %ld match, no need to reset", ver); wDebug("wal version %ld match, no need to reset", ver);
return 0; return 0;
} }
pRead->curInvalid = 1; pReader->curInvalid = 1;
pRead->curVersion = ver; pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer); ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
...@@ -210,7 +215,7 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { ...@@ -210,7 +215,7 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
} }
if (walReadSeekVerImpl(pRead, ver) < 0) { if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1; return -1;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
#include "tutil.h"
#include "walInt.h"
SWalRef *walOpenRef(SWal *pWal) {
SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef));
if (pRef == NULL) {
return NULL;
}
pRef->refId = tGenIdPI64();
pRef->refVer = -1;
pRef->refFile = -1;
pRef->pWal = pWal;
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
return pRef;
}
void walCloseRef(SWal *pWal, int64_t refId) {
SWalRef *pRef = *(SWalRef **)taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
taosMemoryFree(pRef);
}
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal;
if (pRef->refVer != ver) {
taosThreadMutexLock(&pWal->mutex);
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
taosThreadMutexUnlock(&pWal->mutex);
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
pRef->refVer = ver;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
}
return 0;
}
void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1;
pRef->refFile = -1;
}
SWalRef *walRefCommittedVer(SWal *pWal) {
SWalRef *pRef = walOpenRef(pWal);
if (pRef == NULL) {
return NULL;
}
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetCommittedVer(pWal);
pRef->refVer = ver;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
return pRef;
}
...@@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { ...@@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pIter = taosHashIterate(pWal->pRefHash, pIter); pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SWalRef *pRef = (SWalRef *)pIter; SWalRef *pRef = (SWalRef *)pIter;
if (pRef->ver != -1) { if (pRef->refVer != -1) {
taosHashCancelIterate(pWal->pRefHash, pIter); taosHashCancelIterate(pWal->pRefHash, pIter);
return -1; return -1;
} }
...@@ -215,22 +215,23 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -215,22 +215,23 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
/*pWal->vers.firstVer = index;*/
if (walRollImpl(pWal) < 0) { if (walRollImpl(pWal) < 0) {
return -1; return -1;
} }
} else { return 0;
int64_t passed = walGetSeq() - pWal->lastRollSeq; }
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
if (walRollImpl(pWal) < 0) { int64_t passed = walGetSeq() - pWal->lastRollSeq;
return -1; if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
} if (walRollImpl(pWal) < 0) {
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { return -1;
if (walRollImpl(pWal) < 0) { }
return -1; } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
} if (walRollImpl(pWal) < 0) {
return -1;
} }
} }
return 0; return 0;
} }
...@@ -260,6 +261,16 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -260,6 +261,16 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.snapshotVer = ver; pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec(); int ts = taosGetTimestampSec();
int64_t minVerToDelete = ver;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer == -1) continue;
minVerToDelete = TMIN(minVerToDelete, pRef->refVer);
}
int deleteCnt = 0; int deleteCnt = 0;
int64_t newTotSize = pWal->totSize; int64_t newTotSize = pWal->totSize;
SWalFileInfo tmp; SWalFileInfo tmp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册