From 958eb7498112429aecb19785feff117080c01465 Mon Sep 17 00:00:00 2001 From: huangli Date: Fri, 10 Jan 2020 20:50:59 +0800 Subject: [PATCH] [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel; Fix problem that tx check may loss when channel is busy (#1627) --- .../broker/client/ProducerManager.java | 262 +++++++----------- .../processor/AdminBrokerProcessor.java | 2 +- .../broker/client/ProducerManagerTest.java | 30 +- .../processor/ClientManageProcessorTest.java | 3 +- 4 files changed, 127 insertions(+), 170 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 12f632b4..860b3493 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -18,15 +18,11 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -36,205 +32,145 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; public class ProducerManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3; - private final Lock groupChannelLock = new ReentrantLock(); - private final HashMap> groupChannelTable = - new HashMap>(); + private final ConcurrentHashMap> groupChannelTable = + new ConcurrentHashMap<>(); private final ConcurrentHashMap clientChannelTable = new ConcurrentHashMap<>(); private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); public ProducerManager() { } - public HashMap> getGroupChannelTable() { - HashMap> newGroupChannelTable = - new HashMap>(); - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - Iterator>> iter = groupChannelTable.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry> entry = iter.next(); - String key = entry.getKey(); - HashMap val = entry.getValue(); - HashMap tmp = new HashMap(); - tmp.putAll(val); - newGroupChannelTable.put(key, tmp); - } - } finally { - groupChannelLock.unlock(); - } - } - } catch (InterruptedException e) { - log.error("", e); - } - return newGroupChannelTable; + public ConcurrentHashMap> getGroupChannelTable() { + return groupChannelTable; } public void scanNotActiveChannel() { - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - for (final Map.Entry> entry : this.groupChannelTable - .entrySet()) { - final String group = entry.getKey(); - final HashMap chlMap = entry.getValue(); - - Iterator> it = chlMap.entrySet().iterator(); - while (it.hasNext()) { - Entry item = it.next(); - // final Integer id = item.getKey(); - final ClientChannelInfo info = item.getValue(); - - long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); - if (diff > CHANNEL_EXPIRED_TIMEOUT) { - it.remove(); - clientChannelTable.remove(info.getClientId()); - log.warn( - "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", - RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); - RemotingUtil.closeChannel(info.getChannel()); - } - } - } - } finally { - this.groupChannelLock.unlock(); + for (final Map.Entry> entry : this.groupChannelTable + .entrySet()) { + final String group = entry.getKey(); + final ConcurrentHashMap chlMap = entry.getValue(); + + Iterator> it = chlMap.entrySet().iterator(); + while (it.hasNext()) { + Entry item = it.next(); + // final Integer id = item.getKey(); + final ClientChannelInfo info = item.getValue(); + + long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); + if (diff > CHANNEL_EXPIRED_TIMEOUT) { + it.remove(); + clientChannelTable.remove(info.getClientId()); + log.warn( + "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", + RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); + RemotingUtil.closeChannel(info.getChannel()); } - } else { - log.warn("ProducerManager scanNotActiveChannel lock timeout"); } - } catch (InterruptedException e) { - log.error("", e); } } - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) { if (channel != null) { - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - for (final Map.Entry> entry : this.groupChannelTable - .entrySet()) { - final String group = entry.getKey(); - final HashMap clientChannelInfoTable = - entry.getValue(); - final ClientChannelInfo clientChannelInfo = - clientChannelInfoTable.remove(channel); - if (clientChannelInfo != null) { - clientChannelTable.remove(clientChannelInfo.getClientId()); - log.info( - "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", - clientChannelInfo.toString(), remoteAddr, group); - } - - } - } finally { - this.groupChannelLock.unlock(); - } - } else { - log.warn("ProducerManager doChannelCloseEvent lock timeout"); + for (final Map.Entry> entry : this.groupChannelTable + .entrySet()) { + final String group = entry.getKey(); + final ConcurrentHashMap clientChannelInfoTable = + entry.getValue(); + final ClientChannelInfo clientChannelInfo = + clientChannelInfoTable.remove(channel); + if (clientChannelInfo != null) { + clientChannelTable.remove(clientChannelInfo.getClientId()); + log.info( + "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", + clientChannelInfo.toString(), remoteAddr, group); } - } catch (InterruptedException e) { - log.error("", e); + } } } - public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { - try { - ClientChannelInfo clientChannelInfoFound = null; - - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - HashMap channelTable = this.groupChannelTable.get(group); - if (null == channelTable) { - channelTable = new HashMap<>(); - this.groupChannelTable.put(group, channelTable); - } - - clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); - if (null == clientChannelInfoFound) { - channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); - clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel()); - log.info("new producer connected, group: {} channel: {}", group, - clientChannelInfo.toString()); - } - } finally { - this.groupChannelLock.unlock(); - } + public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { + ClientChannelInfo clientChannelInfoFound = null; - if (clientChannelInfoFound != null) { - clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); - } - } else { - log.warn("ProducerManager registerProducer lock timeout"); - } - } catch (InterruptedException e) { - log.error("", e); + ConcurrentHashMap channelTable = this.groupChannelTable.get(group); + if (null == channelTable) { + channelTable = new ConcurrentHashMap<>(); + this.groupChannelTable.put(group, channelTable); + } + + clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); + if (null == clientChannelInfoFound) { + channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); + clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel()); + log.info("new producer connected, group: {} channel: {}", group, + clientChannelInfo.toString()); + } + + + if (clientChannelInfoFound != null) { + clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); } } - public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - HashMap channelTable = this.groupChannelTable.get(group); - if (null != channelTable && !channelTable.isEmpty()) { - ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); - clientChannelTable.remove(clientChannelInfo.getClientId()); - if (old != null) { - log.info("unregister a producer[{}] from groupChannelTable {}", group, - clientChannelInfo.toString()); - } - - if (channelTable.isEmpty()) { - this.groupChannelTable.remove(group); - log.info("unregister a producer group[{}] from groupChannelTable", group); - } - } - } finally { - this.groupChannelLock.unlock(); - } - } else { - log.warn("ProducerManager unregisterProducer lock timeout"); + public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { + ConcurrentHashMap channelTable = this.groupChannelTable.get(group); + if (null != channelTable && !channelTable.isEmpty()) { + ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); + clientChannelTable.remove(clientChannelInfo.getClientId()); + if (old != null) { + log.info("unregister a producer[{}] from groupChannelTable {}", group, + clientChannelInfo.toString()); + } + + if (channelTable.isEmpty()) { + this.groupChannelTable.remove(group); + log.info("unregister a producer group[{}] from groupChannelTable", group); } - } catch (InterruptedException e) { - log.error("", e); } } public Channel getAvaliableChannel(String groupId) { - HashMap channelClientChannelInfoHashMap = groupChannelTable.get(groupId); + if (groupId == null) { + return null; + } List channelList = new ArrayList(); + ConcurrentHashMap channelClientChannelInfoHashMap = groupChannelTable.get(groupId); if (channelClientChannelInfoHashMap != null) { for (Channel channel : channelClientChannelInfoHashMap.keySet()) { channelList.add(channel); } - int size = channelList.size(); - if (0 == size) { - log.warn("Channel list is empty. groupId={}", groupId); - return null; - } - - int index = positiveAtomicCounter.incrementAndGet() % size; - Channel channel = channelList.get(index); - int count = 0; - boolean isOk = channel.isActive() && channel.isWritable(); - while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) { - if (isOk) { - return channel; - } - index = (++index) % size; - channel = channelList.get(index); - isOk = channel.isActive() && channel.isWritable(); - } } else { log.warn("Check transaction failed, channel table is empty. groupId={}", groupId); return null; } - return null; + + int size = channelList.size(); + if (0 == size) { + log.warn("Channel list is empty. groupId={}", groupId); + return null; + } + + Channel lastActiveChannel = null; + + int index = positiveAtomicCounter.incrementAndGet() % size; + Channel channel = channelList.get(index); + int count = 0; + boolean isOk = channel.isActive() && channel.isWritable(); + while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) { + if (isOk) { + return channel; + } + if (channel.isActive()) { + lastActiveChannel = channel; + } + index = (++index) % size; + channel = channelList.get(index); + isOk = channel.isActive() && channel.isWritable(); + } + + return lastActiveChannel; } public Channel findChannel(String clientId) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index df0ec905..b2edc1a2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -815,7 +815,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); ProducerConnection bodydata = new ProducerConnection(); - HashMap channelInfoHashMap = + Map channelInfoHashMap = this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); if (channelInfoHashMap != null) { Iterator> it = channelInfoHashMap.entrySet().iterator(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java index d9539b67..4791ab1f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java @@ -19,7 +19,8 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import java.lang.reflect.Field; -import java.util.HashMap; +import java.util.Map; + import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.junit.Before; import org.junit.Test; @@ -74,7 +75,7 @@ public class ProducerManagerTest { @Test public void testRegisterProducer() throws Exception { producerManager.registerProducer(group, clientInfo); - HashMap channelMap = producerManager.getGroupChannelTable().get(group); + Map channelMap = producerManager.getGroupChannelTable().get(group); Channel channel1 = producerManager.findChannel("clientId"); assertThat(channelMap).isNotNull(); assertThat(channel1).isNotNull(); @@ -85,7 +86,7 @@ public class ProducerManagerTest { @Test public void unregisterProducer() throws Exception { producerManager.registerProducer(group, clientInfo); - HashMap channelMap = producerManager.getGroupChannelTable().get(group); + Map channelMap = producerManager.getGroupChannelTable().get(group); assertThat(channelMap).isNotNull(); assertThat(channelMap.get(channel)).isEqualTo(clientInfo); Channel channel1 = producerManager.findChannel("clientId"); @@ -102,9 +103,28 @@ public class ProducerManagerTest { @Test public void testGetGroupChannelTable() throws Exception { producerManager.registerProducer(group, clientInfo); - HashMap oldMap = producerManager.getGroupChannelTable().get(group); + Map oldMap = producerManager.getGroupChannelTable().get(group); producerManager.unregisterProducer(group, clientInfo); - assertThat(oldMap.size()).isNotEqualTo(0); + assertThat(oldMap.size()).isEqualTo(0); } + + @Test + public void testGetAvaliableChannel() { + producerManager.registerProducer(group, clientInfo); + + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + Channel c = producerManager.getAvaliableChannel(group); + assertThat(c).isSameAs(channel); + + when(channel.isWritable()).thenReturn(false); + c = producerManager.getAvaliableChannel(group); + assertThat(c).isSameAs(channel); + + when(channel.isActive()).thenReturn(false); + c = producerManager.getAvaliableChannel(group); + assertThat(c).isNull(); + } + } \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java index 147c7323..3d893ac1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; @@ -81,7 +82,7 @@ public class ClientManageProcessorTest { @Test public void processRequest_UnRegisterProducer() throws Exception { brokerController.getProducerManager().registerProducer(group, clientChannelInfo); - HashMap channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); + Map channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); assertThat(channelMap).isNotNull(); assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo); -- GitLab