提交 f07bc79d 编写于 作者: 李强 提交者: RongtongJin

[ISSUE #2283] Fix the spelling mistake in ProducerManager & some code optimization

(cherry picked from commit 798d9803b9e831b19957cbdb74571e4c6c341ce3)
上级 2abda6d2
...@@ -33,7 +33,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; ...@@ -33,7 +33,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager { public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3; private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable = private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
...@@ -131,16 +131,14 @@ public class ProducerManager { ...@@ -131,16 +131,14 @@ public class ProducerManager {
} }
} }
public Channel getAvaliableChannel(String groupId) { public Channel getAvailableChannel(String groupId) {
if (groupId == null) { if (groupId == null) {
return null; return null;
} }
List<Channel> channelList = new ArrayList<Channel>(); List<Channel> channelList;
ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId); ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
if (channelClientChannelInfoHashMap != null) { if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) { channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());
channelList.add(channel);
}
} else { } else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId); log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null; return null;
...@@ -158,7 +156,7 @@ public class ProducerManager { ...@@ -158,7 +156,7 @@ public class ProducerManager {
Channel channel = channelList.get(index); Channel channel = channelList.get(index);
int count = 0; int count = 0;
boolean isOk = channel.isActive() && channel.isWritable(); boolean isOk = channel.isActive() && channel.isWritable();
while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) { while (count++ < GET_AVAILABLE_CHANNEL_RETRY_COUNT) {
if (isOk) { if (isOk) {
return channel; return channel;
} }
......
...@@ -69,7 +69,7 @@ public abstract class AbstractTransactionalMessageCheckListener { ...@@ -69,7 +69,7 @@ public abstract class AbstractTransactionalMessageCheckListener {
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0); msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId); Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) { if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else { } else {
......
...@@ -110,20 +110,20 @@ public class ProducerManagerTest { ...@@ -110,20 +110,20 @@ public class ProducerManagerTest {
} }
@Test @Test
public void testGetAvaliableChannel() { public void testGetAvailableChannel() {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
when(channel.isActive()).thenReturn(true); when(channel.isActive()).thenReturn(true);
when(channel.isWritable()).thenReturn(true); when(channel.isWritable()).thenReturn(true);
Channel c = producerManager.getAvaliableChannel(group); Channel c = producerManager.getAvailableChannel(group);
assertThat(c).isSameAs(channel); assertThat(c).isSameAs(channel);
when(channel.isWritable()).thenReturn(false); when(channel.isWritable()).thenReturn(false);
c = producerManager.getAvaliableChannel(group); c = producerManager.getAvailableChannel(group);
assertThat(c).isSameAs(channel); assertThat(c).isSameAs(channel);
when(channel.isActive()).thenReturn(false); when(channel.isActive()).thenReturn(false);
c = producerManager.getAvaliableChannel(group); c = producerManager.getAvailableChannel(group);
assertThat(c).isNull(); assertThat(c).isNull();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册