未验证 提交 f01deae2 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #19681 from taosdata/fix/wal_ref

fix: wal ref
...@@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); ...@@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
SWalRef *walRefFirstVer(SWal *, SWalRef *);
SWalRef *walRefCommittedVer(SWal *); SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *); SWalRef *walOpenRef(SWal *);
......
...@@ -521,7 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -521,7 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToData(&fetchOffsetNew, 0, 0);
} }
} else { } else {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->pRef == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
...@@ -719,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -719,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
......
...@@ -77,6 +77,31 @@ void walUnrefVer(SWalRef *pRef) { ...@@ -77,6 +77,31 @@ void walUnrefVer(SWalRef *pRef) {
} }
#endif #endif
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
if (pRef == NULL) {
pRef = walOpenRef(pWal);
if (pRef == NULL) {
return NULL;
}
}
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
pRef->refVer = ver;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
return pRef;
}
SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *walRefCommittedVer(SWal *pWal) {
SWalRef *pRef = walOpenRef(pWal); SWalRef *pRef = walOpenRef(pWal);
if (pRef == NULL) { if (pRef == NULL) {
...@@ -86,6 +111,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) { ...@@ -86,6 +111,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
int64_t ver = walGetCommittedVer(pWal); int64_t ver = walGetCommittedVer(pWal);
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
pRef->refVer = ver; pRef->refVer = ver;
// bsearch in fileSet // bsearch in fileSet
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册