提交 ee326208 编写于 作者: wmmhello's avatar wmmhello

fix:[TS-3347]set ver to first version if version stored is smaller than first...

fix:[TS-3347]set ver to first version if version stored is smaller than first version in wal when subscribe db
上级 db0ad956
...@@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int ...@@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetOpen(qTaskInfo_t tinfo);
......
...@@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) { if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1; int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
......
...@@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { ...@@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
} }
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
...@@ -1083,12 +1091,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1083,12 +1091,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderClose(pScanBaseInfo->dataReader); tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL; pScanBaseInfo->dataReader = NULL;
// let's seek to the next version in wal file verifyOffset(pInfo->tqReader->pWalReader, pOffset);
int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册