提交 ace193ca 编写于 作者: H Haojun Liao

fix(stream): set the seek version if the reader's version is earlier than the first version in wal.

上级 777ed176
...@@ -138,6 +138,8 @@ typedef struct { ...@@ -138,6 +138,8 @@ typedef struct {
int8_t enableRef; int8_t enableRef;
} SWalFilterCond; } SWalFilterCond;
typedef struct SWalReader SWalReader;
// todo hide this struct // todo hide this struct
typedef struct SWalReader { typedef struct SWalReader {
SWal *pWal; SWal *pWal;
...@@ -198,6 +200,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver); ...@@ -198,6 +200,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead); int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader); int64_t walReaderGetCurrentVer(const SWalReader* pReader);
int64_t walReaderGetValidFirstVer(const SWalReader* pReader);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
......
...@@ -121,17 +121,27 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -121,17 +121,27 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false; *pScanIdle = false;
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit if (pTask->chkInfo.currentVer < firstVer) {
streamMetaReleaseTask(pStreamMeta, pTask); pTask->chkInfo.currentVer = firstVer;
continue; tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
} }
// append the data for the stream int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); if (currentVer != pTask->chkInfo.currentVer) {
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
}
SPackedData packData = {0}; SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
if (code != TSDB_CODE_SUCCESS) { // failed, continue if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
......
...@@ -101,6 +101,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -101,6 +101,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
} }
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0; int64_t ret = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册