提交 a3e214b9 编写于 作者: wmmhello's avatar wmmhello

fix:put poll to push manager if wal not exist when offset is latest

上级 50487255
......@@ -205,7 +205,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
SWalRef *walRefFirstVer(SWal *, SWalRef *);
void walRefFirstVer(SWal *, SWalRef *);
void walRefLastVer(SWal *, SWalRef *);
SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *);
......
......@@ -115,18 +115,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
tqOffsetResetToData(pOffsetVal, 0, 0);
}
} else {
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->pRef == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// offset set to previous version when init
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
// offset set to previous version when init
tqOffsetResetToLog(pOffsetVal, walGetLastVer(pTq->pVnode->pWal));
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey);
......
......@@ -1080,9 +1080,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
const char* id = GET_TASKID(pTaskInfo);
// if pOffset equal to current offset, means continue consume
char buf[80] = {0};
tFormatOffset(buf, 80, &pTaskInfo->streamInfo.currentOffset);
qDebug("currentOffset:%s", buf);
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
}
......
......@@ -63,21 +63,22 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
return 0;
}
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
if (pRef == NULL) {
pRef = walOpenRef(pWal);
if (pRef == NULL) {
return NULL;
}
}
void walRefFirstVer(SWal *pWal, SWalRef *pRef) {
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal);
pRef->refVer = ver;
taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
}
return pRef;
void walRefLastVer(SWal *pWal, SWalRef *pRef) {
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetLastVer(pWal);
pRef->refVer = ver;
taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver);
}
SWalRef *walRefCommittedVer(SWal *pWal) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册