提交 8641cfe2 编写于 作者: L Liu Jicong

change test

上级 dff06409
...@@ -1044,7 +1044,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { ...@@ -1044,7 +1044,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) { if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscDebug("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt); tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0; if (epSkipCnt < 5000) return 0;
} }
atomic_store_32(&tmq->epSkipCnt, 0); atomic_store_32(&tmq->epSkipCnt, 0);
...@@ -1235,7 +1235,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1235,7 +1235,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscDebug("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
continue; continue;
/*if (vgSkipCnt < 10000) continue;*/ /*if (vgSkipCnt < 10000) continue;*/
#if 0 #if 0
......
...@@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0; return 0;
} }
vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName); vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, pTq->pVnode->vgId);
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0; rsp.skipLogNum = 0;
...@@ -325,8 +325,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -325,8 +325,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO // TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch); consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > pReq->epoch) { if (consumerEpoch > reqEpoch) {
// TODO: return vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
break; break;
} }
SWalReadHead* pHead; SWalReadHead* pHead;
......
...@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) { ...@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
while (running) { while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 4000); tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册