提交 0c47fcf7 编写于 作者: Q qqeasonchen

add recommend client configs in rpc mode

上级 77728501
...@@ -53,6 +53,7 @@ public class ClientConfig { ...@@ -53,6 +53,7 @@ public class ClientConfig {
* Offset persistent interval for consumer * Offset persistent interval for consumer
*/ */
private int persistConsumerOffsetInterval = 1000 * 5; private int persistConsumerOffsetInterval = 1000 * 5;
private long pullTimeDelayMillsWhenException = 1000;
private boolean unitMode = false; private boolean unitMode = false;
private String unitName; private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false")); private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
...@@ -148,6 +149,7 @@ public class ClientConfig { ...@@ -148,6 +149,7 @@ public class ClientConfig {
this.pollNameServerInterval = cc.pollNameServerInterval; this.pollNameServerInterval = cc.pollNameServerInterval;
this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval; this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval; this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException;
this.unitMode = cc.unitMode; this.unitMode = cc.unitMode;
this.unitName = cc.unitName; this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled; this.vipChannelEnabled = cc.vipChannelEnabled;
...@@ -165,6 +167,7 @@ public class ClientConfig { ...@@ -165,6 +167,7 @@ public class ClientConfig {
cc.pollNameServerInterval = pollNameServerInterval; cc.pollNameServerInterval = pollNameServerInterval;
cc.heartbeatBrokerInterval = heartbeatBrokerInterval; cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval; cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
cc.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
cc.unitMode = unitMode; cc.unitMode = unitMode;
cc.unitName = unitName; cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled; cc.vipChannelEnabled = vipChannelEnabled;
...@@ -222,6 +225,14 @@ public class ClientConfig { ...@@ -222,6 +225,14 @@ public class ClientConfig {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
} }
public long getPullTimeDelayMillsWhenException() {
return pullTimeDelayMillsWhenException;
}
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
public String getUnitName() { public String getUnitName() {
return unitName; return unitName;
} }
...@@ -287,12 +298,13 @@ public class ClientConfig { ...@@ -287,12 +298,13 @@ public class ClientConfig {
this.accessChannel = accessChannel; this.accessChannel = accessChannel;
} }
@Override @Override
public String toString() { public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]"; + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
} }
} }
...@@ -110,7 +110,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -110,7 +110,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
/** /**
* Delay some time when exception occur * Delay some time when exception occur
*/ */
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000; private long pullTimeDelayMillsWhenException = 1000;
/** /**
* Flow control interval * Flow control interval
*/ */
...@@ -160,6 +160,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -160,6 +160,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return new Thread(r, "MonitorMessageQueueChangeThread"); return new Thread(r, "MonitorMessageQueueChangeThread");
} }
}); });
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
} }
private void checkServiceState() { private void checkServiceState() {
...@@ -787,7 +788,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -787,7 +788,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) { } catch (Throwable e) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION; pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error("An error occurred in pull message process.", e); log.error("An error occurred in pull message process.", e);
} }
...@@ -1075,6 +1076,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -1075,6 +1076,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
@Override @Override
public SendResult reply(final Message requestMsg, final byte[] replyContent, public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
......
...@@ -85,7 +85,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -85,7 +85,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/** /**
* Delay some time when exception occur * Delay some time when exception occur
*/ */
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; private long pullTimeDelayMillsWhenException = 3000;
/** /**
* Flow control interval * Flow control interval
*/ */
...@@ -117,6 +117,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -117,6 +117,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer; this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
} }
public void registerFilterMessageHook(final FilterMessageHook hook) { public void registerFilterMessageHook(final FilterMessageHook hook) {
...@@ -224,7 +225,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -224,7 +225,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.makeSureStateOK(); this.makeSureStateOK();
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e); log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return; return;
} }
...@@ -284,7 +285,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -284,7 +285,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.setNextOffset(offset); pullRequest.setNextOffset(offset);
} }
} else { } else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest); log.info("pull message later because not locked in broker, {}", pullRequest);
return; return;
} }
...@@ -292,7 +293,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -292,7 +293,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) { if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest); log.warn("find the consumer's subscription failed, {}", pullRequest);
return; return;
} }
...@@ -399,7 +400,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -399,7 +400,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
log.warn("execute the pull request exception", e); log.warn("execute the pull request exception", e);
} }
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
} }
}; };
...@@ -446,7 +447,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -446,7 +447,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
); );
} catch (Exception e) { } catch (Exception e) {
log.error("pullKernelImpl exception", e); log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
} }
} }
...@@ -1171,6 +1172,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1171,6 +1172,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
@Override @Override
public SendResult reply(final Message requestMsg, final byte[] replyContent, public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
......
...@@ -37,6 +37,8 @@ public class ResponseConsumer { ...@@ -37,6 +37,8 @@ public class ResponseConsumer {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//recommend client configs
consumer.setPullTimeDelayMillsWhenException(0L);
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册