提交 6b305bf7 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 aca5760c
...@@ -195,7 +195,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); ...@@ -195,7 +195,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead); void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader); void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead); int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader); int64_t walReaderGetCurrentVer(const SWalReader* pReader);
int64_t walReaderGetValidFirstVer(const SWalReader* pReader); int64_t walReaderGetValidFirstVer(const SWalReader* pReader);
......
...@@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) { ...@@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) {
} }
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) { if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
return -1; return -1;
} }
tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id); tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id);
......
...@@ -112,11 +112,17 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -112,11 +112,17 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
pTask->chkInfo.currentVer = firstVer; pTask->chkInfo.currentVer = firstVer;
tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
}
// todo need retry if failed
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
} else {
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (currentVer != pTask->chkInfo.currentVer) { if (currentVer != pTask->chkInfo.currentVer) {
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
...@@ -125,6 +131,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -125,6 +131,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
// append the data for the stream // append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
} }
}
SPackedData packData = {0}; SPackedData packData = {0};
int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
......
...@@ -184,6 +184,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { ...@@ -184,6 +184,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
if (pReader->curFileFirstVer != pRet->firstVer) { if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner // error code was set inner
if (walReadChangeFile(pReader, pRet->firstVer) < 0) { if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
...@@ -203,7 +204,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { ...@@ -203,7 +204,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return 0; return 0;
} }
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal; SWal *pWal = pReader->pWal;
if (ver == pReader->curVersion) { if (ver == pReader->curVersion) {
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
...@@ -233,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -233,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curVersion != fetchVer) { if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReaderSeekVer(pRead, fetchVer) < 0) {
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -336,7 +337,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -336,7 +337,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} }
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReaderSeekVer(pRead, ver);
if (code < 0) { if (code < 0) {
// pRead->curVersion = ver; // pRead->curVersion = ver;
// pRead->curInvalid = 1; // pRead->curInvalid = 1;
...@@ -471,7 +472,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -471,7 +472,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
taosThreadMutexLock(&pReader->mutex); taosThreadMutexLock(&pReader->mutex);
if (pReader->curVersion != ver) { if (pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) { if (walReaderSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册