提交 6a0e9941 编写于 作者: wmmhello's avatar wmmhello

fix:avoid request offset type is 0

上级 d7bb7e11
...@@ -1860,8 +1860,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p ...@@ -1860,8 +1860,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.beginOffset = *reqOffset; if(reqOffset->type != 0) pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset; if(rspOffset->type != 0) pVg->offsetInfo.endOffset = *rspOffset;
} else { } else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
} }
......
...@@ -7207,11 +7207,6 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { ...@@ -7207,11 +7207,6 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
return pLeft->uid == pRight->uid; return pLeft->uid == pRight->uid;
} else {
ASSERT(0);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
/*return true;*/
} }
} }
return false; return false;
......
...@@ -344,9 +344,11 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ ...@@ -344,9 +344,11 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if (blockReturned) { if (blockReturned) {
return 0; return 0;
} }
} else { // use the consumer specified offset } else if(reqOffset.type != 0){ // use the consumer specified offset
// the offset value can not be monotonious increase?? // the offset value can not be monotonious increase??
offset = reqOffset; offset = reqOffset;
} else {
return TSDB_CODE_TMQ_INVALID_MSG;
} }
// this is a normal subscribe requirement // this is a normal subscribe requirement
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册