diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java index 77198b7fdc83b7c485da01e223e49d4cad012c0e..09d940a1cfaf59741210e59274ef5b66f005aad6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java @@ -76,19 +76,19 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue } - Collection cidNodes = new ArrayList<>(); + Collection cidNodes = new ArrayList(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); } final ConsistentHashRouter router; //for building hash ring if (customHashFunction != null) { - router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction); + router = new ConsistentHashRouter(cidNodes, virtualNodeCnt, customHashFunction); } else { - router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt); + router = new ConsistentHashRouter(cidNodes, virtualNodeCnt); } - List results = new ArrayList<>(); + List results = new ArrayList(); for (MessageQueue mq : mqAll) { ClientNode clientNode = router.routeNode(mq.toString()); if (clientNode != null && currentCID.equals(clientNode.getKey())) { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java index fc7ab9fcf4d154399a576aed890bd1e4a435e66b..e9e5db7efd82235d1d48583a16743bb2e791385c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Assert; import org.junit.Before; @@ -113,13 +114,13 @@ public class AllocateMessageQueueConsitentHashTest { //System.out.println("mqAll:" + mqAll.toString()); List cidAll = createConsumerIdList(consumerSize); - List allocatedResAll = new ArrayList<>(); + List allocatedResAll = new ArrayList(); - Map allocateToAllOrigin = new TreeMap<>(); + Map allocateToAllOrigin = new TreeMap(); //test allocate all { - List cidBegin = new ArrayList<>(cidAll); + List cidBegin = new ArrayList(cidAll); //System.out.println("cidAll:" + cidBegin.toString()); for (String cid : cidBegin) { @@ -135,13 +136,13 @@ public class AllocateMessageQueueConsitentHashTest { verifyAllocateAll(cidBegin,mqAll, allocatedResAll)); } - Map allocateToAllAfterRemoveOne = new TreeMap<>(); - List cidAfterRemoveOne = new ArrayList<>(cidAll); + Map allocateToAllAfterRemoveOne = new TreeMap(); + List cidAfterRemoveOne = new ArrayList(cidAll); //test allocate remove one cid { String removeCID = cidAfterRemoveOne.remove(0); //System.out.println("removing one cid "+removeCID); - List mqShouldOnlyChanged = new ArrayList<>(); + List mqShouldOnlyChanged = new ArrayList(); Iterator> it = allocateToAllOrigin.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); @@ -151,7 +152,7 @@ public class AllocateMessageQueueConsitentHashTest { } //System.out.println("cidAll:" + cidAfterRemoveOne.toString()); - List allocatedResAllAfterRemove = new ArrayList<>(); + List allocatedResAllAfterRemove = new ArrayList(); for (String cid : cidAfterRemoveOne) { List rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne); allocatedResAllAfterRemove.addAll(rs); @@ -166,16 +167,16 @@ public class AllocateMessageQueueConsitentHashTest { verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID); } - List cidAfterAdd = new ArrayList<>(cidAfterRemoveOne); + List cidAfterAdd = new ArrayList(cidAfterRemoveOne); //test allocate add one more cid { String newCid = CID_PREFIX+"NEW"; //System.out.println("add one more cid "+newCid); cidAfterAdd.add(newCid); - List mqShouldOnlyChanged = new ArrayList<>(); + List mqShouldOnlyChanged = new ArrayList(); //System.out.println("cidAll:" + cidAfterAdd.toString()); - List allocatedResAllAfterAdd = new ArrayList<>(); - Map allocateToAll3 = new TreeMap<>(); + List allocatedResAllAfterAdd = new ArrayList(); + Map allocateToAll3 = new TreeMap(); for (String cid : cidAfterAdd) { List rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd); allocatedResAllAfterAdd.addAll(rs); @@ -225,7 +226,7 @@ public class AllocateMessageQueueConsitentHashTest { } private List createConsumerIdList(int size) { - List consumerIdList = new ArrayList<>(size); + List consumerIdList = new ArrayList(size); for (int i = 0; i < size; i++) { consumerIdList.add(CID_PREFIX + String.valueOf(i)); } @@ -233,7 +234,7 @@ public class AllocateMessageQueueConsitentHashTest { } private List createMessageQueueList(int size) { - List messageQueueList = new ArrayList<>(size); + List messageQueueList = new ArrayList(size); for (int i = 0; i < size; i++) { MessageQueue mq = new MessageQueue(topic, "brokerName", i); messageQueueList.add(mq); diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java index 8606c4330e130c670f51b2fcd1351510eed793ce..a6fce51e2265e28005aa184973cbb9a58aa62c82 100644 --- a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java +++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java @@ -30,7 +30,7 @@ import java.util.TreeMap; * @param */ public class ConsistentHashRouter { - private final SortedMap> ring = new TreeMap<>(); + private final SortedMap> ring = new TreeMap>(); private final HashFunction hashFunction; public ConsistentHashRouter(Collection pNodes, int vNodeCount) { @@ -64,7 +64,7 @@ public class ConsistentHashRouter { if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount); int existingReplicas = getExistingReplicas(pNode); for (int i = 0; i < vNodeCount; i++) { - VirtualNode vNode = new VirtualNode<>(pNode, i + existingReplicas); + VirtualNode vNode = new VirtualNode(pNode, i + existingReplicas); ring.put(hashFunction.hash(vNode.getKey()), vNode); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7b2926318a2dcd44f6acff02ed23941366bcca78..b44211c5712cf5e0c2828212d0db5a267b221e9e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -722,7 +722,7 @@ public class CommitLog { messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); - lockForPutMessage(); //spin... + putMessageLock.lock(); try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -771,7 +771,7 @@ public class CommitLog { eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { - releasePutMessageLock(); + putMessageLock.unlock(); }