diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index c3e4efa252c1c8fb4e3d9b2d0214e2b5b965d736..d0ae5e1b8315370c3ce3373c330e9a7d0df2ca84 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -53,6 +53,7 @@ public class ClientConfig { * Offset persistent interval for consumer */ private int persistConsumerOffsetInterval = 1000 * 5; + private long pullTimeDelayMillsWhenException = 1000; private boolean unitMode = false; private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false")); @@ -148,6 +149,7 @@ public class ClientConfig { this.pollNameServerInterval = cc.pollNameServerInterval; this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval; this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval; + this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException; this.unitMode = cc.unitMode; this.unitName = cc.unitName; this.vipChannelEnabled = cc.vipChannelEnabled; @@ -165,6 +167,7 @@ public class ClientConfig { cc.pollNameServerInterval = pollNameServerInterval; cc.heartbeatBrokerInterval = heartbeatBrokerInterval; cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + cc.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; cc.unitMode = unitMode; cc.unitName = unitName; cc.vipChannelEnabled = vipChannelEnabled; @@ -222,6 +225,14 @@ public class ClientConfig { this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; } + public long getPullTimeDelayMillsWhenException() { + return pullTimeDelayMillsWhenException; + } + + public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) { + this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; + } + public String getUnitName() { return unitName; } @@ -287,12 +298,13 @@ public class ClientConfig { this.accessChannel = accessChannel; } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval - + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" - + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 9308fad7c755301b3e6d06f5d5a306acb00d3b8a..d094f661f76774d7511f919efcee2f0cd222ea5e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -110,7 +110,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { /** * Delay some time when exception occur */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000; + private long pullTimeDelayMillsWhenException = 1000; /** * Flow control interval */ @@ -160,6 +160,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return new Thread(r, "MonitorMessageQueueChangeThread"); } }); + this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException(); } private void checkServiceState() { @@ -787,7 +788,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); } catch (Throwable e) { - pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION; + pullDelayTimeMills = pullTimeDelayMillsWhenException; log.error("An error occurred in pull message process.", e); } @@ -1075,6 +1076,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } + public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) { + this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; + } + @Override public SendResult reply(final Message requestMsg, final byte[] replyContent, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ac92dc9ff527eaba63f8c8a219119cb192fb1e74..20c59609ed3a01aad978440ca1c89f3ac7e0371b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -85,7 +85,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { /** * Delay some time when exception occur */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; + private long pullTimeDelayMillsWhenException = 3000; /** * Flow control interval */ @@ -117,6 +117,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this.defaultMQPushConsumer = defaultMQPushConsumer; this.rpcHook = rpcHook; + this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException(); } public void registerFilterMessageHook(final FilterMessageHook hook) { @@ -224,7 +225,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } @@ -284,7 +285,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest.setNextOffset(offset); } } else { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } @@ -292,7 +293,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } @@ -399,7 +400,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { log.warn("execute the pull request exception", e); } - DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; @@ -446,7 +447,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ); } catch (Exception e) { log.error("pullKernelImpl exception", e); - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } } @@ -1171,6 +1172,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } + public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) { + this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; + } + @Override public SendResult reply(final Message requestMsg, final byte[] replyContent, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java index a130722a714a8719e51f91fa30d7bf6179881160..f3b3fd115680f2a093d87625cf07eea010a9d22e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java @@ -37,6 +37,8 @@ public class ResponseConsumer { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + //recommend client configs + consumer.setPullTimeDelayMillsWhenException(0L); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override