提交 9c0e5360 编写于 作者: L Li Zhanhui

Tag language of clients initialized through OMS as 'OMS'

上级 12a51c41
...@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
/** /**
* Client Common configuration * Client Common configuration
...@@ -48,6 +49,8 @@ public class ClientConfig { ...@@ -48,6 +49,8 @@ public class ClientConfig {
private boolean useTLS = TlsSystemConfig.tlsEnable; private boolean useTLS = TlsSystemConfig.tlsEnable;
private LanguageCode language = LanguageCode.JAVA;
public String buildMQClientId() { public String buildMQClientId() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP()); sb.append(this.getClientIP());
...@@ -96,6 +99,7 @@ public class ClientConfig { ...@@ -96,6 +99,7 @@ public class ClientConfig {
this.unitName = cc.unitName; this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled; this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS; this.useTLS = cc.useTLS;
this.language = cc.language;
} }
public ClientConfig cloneClientConfig() { public ClientConfig cloneClientConfig() {
...@@ -111,6 +115,7 @@ public class ClientConfig { ...@@ -111,6 +115,7 @@ public class ClientConfig {
cc.unitName = unitName; cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled; cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS; cc.useTLS = useTLS;
cc.language = language;
return cc; return cc;
} }
...@@ -186,12 +191,20 @@ public class ClientConfig { ...@@ -186,12 +191,20 @@ public class ClientConfig {
this.useTLS = useTLS; this.useTLS = useTLS;
} }
public LanguageCode getLanguage() {
return language;
}
public void setLanguage(LanguageCode language) {
this.language = language;
}
@Override @Override
public String toString() { public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + "]"; + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
} }
} }
...@@ -18,7 +18,7 @@ package org.apache.rocketmq.client.impl; ...@@ -18,7 +18,7 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
...@@ -27,7 +27,6 @@ import java.util.Map; ...@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
...@@ -49,7 +48,6 @@ import org.apache.rocketmq.common.TopicConfig; ...@@ -49,7 +48,6 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
...@@ -137,6 +135,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; ...@@ -137,6 +135,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
...@@ -156,7 +155,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; ...@@ -156,7 +155,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class MQClientAPIImpl { public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog(); private final static InternalLogger log = ClientLogger.getLog();
public static boolean sendSmartMsg = private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static { static {
...@@ -217,13 +216,9 @@ public class MQClientAPIImpl { ...@@ -217,13 +216,9 @@ public class MQClientAPIImpl {
} }
public void updateNameServerAddressList(final String addrs) { public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";"); String[] addrArray = addrs.split(";");
for (String addr : addrArray) { List<String> list = Arrays.asList(addrArray);
lst.add(addr); this.remotingClient.updateNameServerAddressList(list);
}
this.remotingClient.updateNameServerAddressList(lst);
} }
public void start() { public void start() {
...@@ -857,7 +852,7 @@ public class MQClientAPIImpl { ...@@ -857,7 +852,7 @@ public class MQClientAPIImpl {
final long timeoutMillis final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException { ) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode()); request.setBody(heartbeatData.encode());
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null; assert response != null;
......
...@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger; ...@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PullConsumerImpl implements PullConsumer { public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer; private final DefaultMQPullConsumer rocketmqPullConsumer;
...@@ -46,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -46,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer {
private final LocalMessageCache localMessageCache; private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
final static InternalLogger log = ClientLogger.getLog(); private final static InternalLogger log = ClientLogger.getLog();
public PullConsumerImpl(final KeyValue properties) { public PullConsumerImpl(final KeyValue properties) {
this.properties = properties; this.properties = properties;
...@@ -77,6 +78,8 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -77,6 +78,8 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer.setInstanceName(consumerId); this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
} }
......
...@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; ...@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PushConsumerImpl implements PushConsumer { public class PushConsumerImpl implements PushConsumer {
private final DefaultMQPushConsumer rocketmqPushConsumer; private final DefaultMQPushConsumer rocketmqPushConsumer;
...@@ -73,6 +74,7 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -73,6 +74,7 @@ public class PushConsumerImpl implements PushConsumer {
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId); this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
} }
......
...@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
...@@ -45,7 +46,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -45,7 +46,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final KeyValue properties; final KeyValue properties;
final DefaultMQProducer rocketmqProducer; final DefaultMQProducer rocketmqProducer;
private boolean started = false; private boolean started = false;
final ClientConfig clientConfig; private final ClientConfig clientConfig;
AbstractOMSProducer(final KeyValue properties) { AbstractOMSProducer(final KeyValue properties) {
this.properties = properties; this.properties = properties;
...@@ -67,6 +68,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -67,6 +68,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout()); this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
this.rocketmqProducer.setLanguage(LanguageCode.OMS);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
} }
......
...@@ -28,7 +28,8 @@ public enum LanguageCode { ...@@ -28,7 +28,8 @@ public enum LanguageCode {
OTHER((byte) 7), OTHER((byte) 7),
HTTP((byte) 8), HTTP((byte) 8),
GO((byte) 9), GO((byte) 9),
PHP((byte) 10); PHP((byte) 10),
OMS((byte) 11);
private byte code; private byte code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册