提交 7b735c67 编写于 作者: wmmhello's avatar wmmhello

fix:move sleep logic from rpc thread to tmq thread & fix some error

上级 8183be4a
...@@ -367,15 +367,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -367,15 +367,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
// 3. update the epoch value
if (pHandle->epoch > reqEpoch) {
tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ",
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->epoch, reqEpoch);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock);
return -1;
}
bool exec = tqIsHandleExec(pHandle); bool exec = tqIsHandleExec(pHandle);
if(!exec) { if(!exec) {
tqSetHandleExec(pHandle); tqSetHandleExec(pHandle);
...@@ -390,6 +381,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -390,6 +381,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(10); taosMsleep(10);
} }
// 3. update the epoch value
if (pHandle->epoch < reqEpoch) { if (pHandle->epoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch);
pHandle->epoch = reqEpoch; pHandle->epoch = reqEpoch;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册