提交 8183be4a 编写于 作者: wmmhello's avatar wmmhello

fix:move sleep logic from rpc thread to tmq thread & fix some error

上级 04858fae
...@@ -1275,7 +1275,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1275,7 +1275,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
// in case of consumer mismatch, wait for 500ms and retry // in case of consumer mismatch, wait for 500ms and retry
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
taosMsleep(500); // taosMsleep(500);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER", tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
tmq->consumerId); tmq->consumerId);
...@@ -1289,8 +1289,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1289,8 +1289,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
} else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert // } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
taosMsleep(500); // taosMsleep(5);
} else{ } else{
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, epoch, tstrerror(code), requestId); vgId, epoch, tstrerror(code), requestId);
...@@ -1731,6 +1731,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p ...@@ -1731,6 +1731,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
// broadcast the poll request to all related vnodes // broadcast the poll request to all related vnodes
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
return 0;
}
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
......
...@@ -360,22 +360,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -360,22 +360,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 2. check re-balance status // 2. check re-balance status
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_INVALID_MSG; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
return -1; return -1;
} }
// 3. update the epoch value // 3. update the epoch value
int32_t savedEpoch = pHandle->epoch; if (pHandle->epoch > reqEpoch) {
if (savedEpoch <= reqEpoch) { tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ",
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->epoch, reqEpoch);
reqEpoch);
pHandle->epoch = reqEpoch;
}else {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ",
consumerId, TD_VID(pTq->pVnode), req.subKey, savedEpoch, reqEpoch);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
return -1; return -1;
...@@ -395,6 +390,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -395,6 +390,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(10); taosMsleep(10);
} }
if (pHandle->epoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch);
pHandle->epoch = reqEpoch;
}
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
...@@ -577,7 +577,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -577,7 +577,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.newConsumerId); req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
} }
atomic_add_fetch_32(&pHandle->epoch, 1); // atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task // kill executing task
if(tqIsHandleExec(pHandle)) { if(tqIsHandleExec(pHandle)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册