提交 d7decc84 编写于 作者: Y yukon

[ROCKETMQ-139] Degrade the client related modules' JDK version to 1.6

上级 a146646b
...@@ -22,12 +22,16 @@ ...@@ -22,12 +22,16 @@
<version>4.1.0-incubating-SNAPSHOT</version> <version>4.1.0-incubating-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging> <packaging>jar</packaging>
<artifactId>rocketmq-client</artifactId> <artifactId>rocketmq-client</artifactId>
<name>rocketmq-client ${project.version}</name> <name>rocketmq-client ${project.version}</name>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
......
...@@ -133,7 +133,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -133,7 +133,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Subscription relationship * Subscription relationship
*/ */
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<>(); private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
/** /**
* Message listener * Message listener
......
...@@ -842,7 +842,11 @@ public class MQClientInstance { ...@@ -842,7 +842,11 @@ public class MQClientInstance {
try { try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException | InterruptedException | MQBrokerException e) { } catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (InterruptedException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e); log.error("unregister client exception from broker: " + addr, e);
} }
} }
......
...@@ -99,7 +99,7 @@ public class DefaultMQPullConsumerTest { ...@@ -99,7 +99,7 @@ public class DefaultMQPullConsumerTest {
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1); assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123); assertThat(pullResult.getMinOffset()).isEqualTo(123);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048); assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<>()); assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<Object>());
} }
@Test @Test
...@@ -137,7 +137,7 @@ public class DefaultMQPullConsumerTest { ...@@ -137,7 +137,7 @@ public class DefaultMQPullConsumerTest {
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1); assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123); assertThat(pullResult.getMinOffset()).isEqualTo(123);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048); assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<>()); assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<Object>());
} }
@Override public void onException(Throwable e) { @Override public void onException(Throwable e) {
......
...@@ -143,7 +143,7 @@ public class DefaultMQPushConsumerTest { ...@@ -143,7 +143,7 @@ public class DefaultMQPushConsumerTest {
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
Set<MessageQueue> messageQueueSet = new HashSet<>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class)); doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
......
...@@ -69,7 +69,7 @@ public class LocalFileOffsetStoreTest { ...@@ -69,7 +69,7 @@ public class LocalFileOffsetStoreTest {
offsetStore.updateOffset(messageQueue, 1024, false); offsetStore.updateOffset(messageQueue, 1024, false);
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
offsetStore.persistAll(new HashSet<>(Collections.singletonList(messageQueue))); offsetStore.persistAll(new HashSet<MessageQueue>(Collections.singletonList(messageQueue)));
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
} }
} }
\ No newline at end of file
...@@ -119,7 +119,7 @@ public class RemoteBrokerOffsetStoreTest { ...@@ -119,7 +119,7 @@ public class RemoteBrokerOffsetStoreTest {
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1023); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1023);
offsetStore.updateOffset(messageQueue, 1025, false); offsetStore.updateOffset(messageQueue, 1025, false);
offsetStore.persistAll(new HashSet<>(Collections.singletonList(messageQueue))); offsetStore.persistAll(new HashSet<MessageQueue>(Collections.singletonList(messageQueue)));
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
} }
......
...@@ -48,17 +48,17 @@ public class MQClientInstanceTest { ...@@ -48,17 +48,17 @@ public class MQClientInstanceTest {
TopicRouteData topicRouteData = new TopicRouteData(); TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<>(); List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData(); BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA"); brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster"); brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<>(); HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911"); brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs); brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData); brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList); topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<>(); List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData(); QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA"); queueData.setBrokerName("BrokerA");
queueData.setPerm(6); queueData.setPerm(6);
......
...@@ -194,17 +194,17 @@ public class DefaultMQProducerTest { ...@@ -194,17 +194,17 @@ public class DefaultMQProducerTest {
TopicRouteData topicRouteData = new TopicRouteData(); TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<>(); List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData(); BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA"); brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster"); brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<>(); HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911"); brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs); brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData); brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList); topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<>(); List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData(); QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA"); queueData.setBrokerName("BrokerA");
queueData.setPerm(6); queueData.setPerm(6);
......
...@@ -34,7 +34,7 @@ public class SelectMessageQueueByHashTest { ...@@ -34,7 +34,7 @@ public class SelectMessageQueueByHashTest {
Message message = new Message(topic, new byte[] {}); Message message = new Message(topic, new byte[] {});
List<MessageQueue> messageQueues = new ArrayList<>(); List<MessageQueue> messageQueues = new ArrayList<MessageQueue>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
MessageQueue messageQueue = new MessageQueue(topic, "DefaultBroker", i); MessageQueue messageQueue = new MessageQueue(topic, "DefaultBroker", i);
messageQueues.add(messageQueue); messageQueues.add(messageQueue);
......
...@@ -27,6 +27,10 @@ ...@@ -27,6 +27,10 @@
<artifactId>rocketmq-common</artifactId> <artifactId>rocketmq-common</artifactId>
<name>rocketmq-common ${project.version}</name> <name>rocketmq-common ${project.version}</name>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
......
...@@ -256,7 +256,7 @@ public class MixAll { ...@@ -256,7 +256,7 @@ public class MixAll {
if (null == value) { if (null == value) {
value = ""; value = "";
} }
} catch (IllegalArgumentException | IllegalAccessException e) { } catch (IllegalAccessException e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -313,7 +313,7 @@ public class MixAll { ...@@ -313,7 +313,7 @@ public class MixAll {
try { try {
field.setAccessible(true); field.setAccessible(true);
value = field.get(object); value = field.get(object);
} catch (IllegalArgumentException | IllegalAccessException e) { } catch (IllegalAccessException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
...@@ -59,9 +59,9 @@ public class MixAllTest { ...@@ -59,9 +59,9 @@ public class MixAllTest {
file.delete(); file.delete();
} }
file.createNewFile(); file.createNewFile();
try (PrintWriter out = new PrintWriter(fileName)) { PrintWriter out = new PrintWriter(fileName);
out.write("TestForMixAll"); out.write("TestForMixAll");
} out.close();
String string = MixAll.file2String(fileName); String string = MixAll.file2String(fileName);
assertThat(string).isEqualTo("TestForMixAll"); assertThat(string).isEqualTo("TestForMixAll");
file.delete(); file.delete();
......
...@@ -36,7 +36,7 @@ public class FilterAPITest { ...@@ -36,7 +36,7 @@ public class FilterAPITest {
assertThat(subscriptionData.getTopic()).isEqualTo(topic); assertThat(subscriptionData.getTopic()).isEqualTo(topic);
assertThat(subscriptionData.getSubString()).isEqualTo(subString); assertThat(subscriptionData.getSubString()).isEqualTo(subString);
String [] tags = subString.split("\\|\\|"); String [] tags = subString.split("\\|\\|");
Set<String> tagSet = new HashSet<>(); Set<String> tagSet = new HashSet<String>();
for (String tag : tags) { for (String tag : tags) {
tagSet.add(tag.trim()); tagSet.add(tag.trim());
} }
......
...@@ -27,6 +27,10 @@ ...@@ -27,6 +27,10 @@
<artifactId>rocketmq-remoting</artifactId> <artifactId>rocketmq-remoting</artifactId>
<name>rocketmq-remoting ${project.version}</name> <name>rocketmq-remoting ${project.version}</name>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
......
...@@ -73,12 +73,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -73,12 +73,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupWorker;
private final Lock lockChannelTables = new ReentrantLock(); private final Lock lockChannelTables = new ReentrantLock();
private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
private final Timer timer = new Timer("ClientHouseKeepingService", true); private final Timer timer = new Timer("ClientHouseKeepingService", true);
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<>(); private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<>(); private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
private final Lock lockNamesrvChannel = new ReentrantLock(); private final Lock lockNamesrvChannel = new ReentrantLock();
...@@ -522,7 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -522,7 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
executorThis = this.publicExecutor; executorThis = this.publicExecutor;
} }
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<>(processor, executorThis); Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair); this.processorTable.put(requestCode, pair);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册