From 9c0e5360e109b2a5c4c86ed7053a59f868b078ee Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 21 May 2018 11:30:12 +0800 Subject: [PATCH] Tag language of clients initialized through OMS as 'OMS' --- .../apache/rocketmq/client/ClientConfig.java | 15 ++++++++++++++- .../rocketmq/client/impl/MQClientAPIImpl.java | 17 ++++++----------- .../rocketmq/consumer/PullConsumerImpl.java | 5 ++++- .../rocketmq/consumer/PushConsumerImpl.java | 2 ++ .../rocketmq/producer/AbstractOMSProducer.java | 4 +++- .../remoting/protocol/LanguageCode.java | 3 ++- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index a9eabfe6..d798164c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; +import org.apache.rocketmq.remoting.protocol.LanguageCode; /** * Client Common configuration @@ -48,6 +49,8 @@ public class ClientConfig { private boolean useTLS = TlsSystemConfig.tlsEnable; + private LanguageCode language = LanguageCode.JAVA; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -96,6 +99,7 @@ public class ClientConfig { this.unitName = cc.unitName; this.vipChannelEnabled = cc.vipChannelEnabled; this.useTLS = cc.useTLS; + this.language = cc.language; } public ClientConfig cloneClientConfig() { @@ -111,6 +115,7 @@ public class ClientConfig { cc.unitName = unitName; cc.vipChannelEnabled = vipChannelEnabled; cc.useTLS = useTLS; + cc.language = language; return cc; } @@ -186,12 +191,20 @@ public class ClientConfig { this.useTLS = useTLS; } + public LanguageCode getLanguage() { + return language; + } + + public void setLanguage(LanguageCode language) { + this.language = language; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + ", useTLS=" + useTLS + "]"; + + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index d4ed1ec4..ade69905 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.client.impl; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -49,7 +48,6 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -137,6 +135,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; @@ -156,7 +155,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class MQClientAPIImpl { private final static InternalLogger log = ClientLogger.getLog(); - public static boolean sendSmartMsg = + private static boolean sendSmartMsg = Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); static { @@ -217,13 +216,9 @@ public class MQClientAPIImpl { } public void updateNameServerAddressList(final String addrs) { - List lst = new ArrayList(); String[] addrArray = addrs.split(";"); - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); + List list = Arrays.asList(addrArray); + this.remotingClient.updateNameServerAddressList(list); } public void start() { @@ -857,7 +852,7 @@ public class MQClientAPIImpl { final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); - + request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index c11da585..d6735100 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.protocol.LanguageCode; public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer; @@ -46,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer { private final LocalMessageCache localMessageCache; private final ClientConfig clientConfig; - final static InternalLogger log = ClientLogger.getLog(); + private final static InternalLogger log = ClientLogger.getLog(); public PullConsumerImpl(final KeyValue properties) { this.properties = properties; @@ -77,6 +78,8 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer.setInstanceName(consumerId); properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); + this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); + this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 46f6775e..d5d394a6 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.protocol.LanguageCode; public class PushConsumerImpl implements PushConsumer { private final DefaultMQPushConsumer rocketmqPushConsumer; @@ -73,6 +74,7 @@ public class PushConsumerImpl implements PushConsumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); + this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS); this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index e40e2d45..53fc0f90 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; @@ -45,7 +46,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { final KeyValue properties; final DefaultMQProducer rocketmqProducer; private boolean started = false; - final ClientConfig clientConfig; + private final ClientConfig clientConfig; AbstractOMSProducer(final KeyValue properties) { this.properties = properties; @@ -67,6 +68,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); + this.rocketmqProducer.setLanguage(LanguageCode.OMS); properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java index 17ce9190..4382af35 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java @@ -28,7 +28,8 @@ public enum LanguageCode { OTHER((byte) 7), HTTP((byte) 8), GO((byte) 9), - PHP((byte) 10); + PHP((byte) 10), + OMS((byte) 11); private byte code; -- GitLab