提交 958eb749 编写于 作者: H huangli 提交者: Heng Du

[for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel; Fix...

[for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel; Fix problem that tx check may loss when channel is busy (#1627)
上级 366e6e72
...@@ -18,15 +18,11 @@ package org.apache.rocketmq.broker.client; ...@@ -18,15 +18,11 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
...@@ -36,51 +32,25 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; ...@@ -36,51 +32,25 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager { public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3; private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock(); private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = new ConcurrentHashMap<>();
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() { public ProducerManager() {
} }
public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = return groupChannelTable;
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
Iterator<Map.Entry<String, HashMap<Channel, ClientChannelInfo>>> iter = groupChannelTable.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry = iter.next();
String key = entry.getKey();
HashMap<Channel, ClientChannelInfo> val = entry.getValue();
HashMap<Channel, ClientChannelInfo> tmp = new HashMap<Channel, ClientChannelInfo>();
tmp.putAll(val);
newGroupChannelTable.put(key, tmp);
}
} finally {
groupChannelLock.unlock();
}
}
} catch (InterruptedException e) {
log.error("", e);
}
return newGroupChannelTable;
} }
public void scanNotActiveChannel() { public void scanNotActiveChannel() {
try { for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) { .entrySet()) {
final String group = entry.getKey(); final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator(); Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
...@@ -99,26 +69,14 @@ public class ProducerManager { ...@@ -99,26 +69,14 @@ public class ProducerManager {
} }
} }
} }
} finally {
this.groupChannelLock.unlock();
}
} else {
log.warn("ProducerManager scanNotActiveChannel lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
} }
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
if (channel != null) { if (channel != null) {
try { for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) { .entrySet()) {
final String group = entry.getKey(); final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable = final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
entry.getValue(); entry.getValue();
final ClientChannelInfo clientChannelInfo = final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel); clientChannelInfoTable.remove(channel);
...@@ -130,27 +88,15 @@ public class ProducerManager { ...@@ -130,27 +88,15 @@ public class ProducerManager {
} }
} }
} finally {
this.groupChannelLock.unlock();
}
} else {
log.warn("ProducerManager doChannelCloseEvent lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
} }
} }
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
ClientChannelInfo clientChannelInfoFound = null; ClientChannelInfo clientChannelInfoFound = null;
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) { if (null == channelTable) {
channelTable = new HashMap<>(); channelTable = new ConcurrentHashMap<>();
this.groupChannelTable.put(group, channelTable); this.groupChannelTable.put(group, channelTable);
} }
...@@ -161,26 +107,15 @@ public class ProducerManager { ...@@ -161,26 +107,15 @@ public class ProducerManager {
log.info("new producer connected, group: {} channel: {}", group, log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString()); clientChannelInfo.toString());
} }
} finally {
this.groupChannelLock.unlock();
}
if (clientChannelInfoFound != null) { if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
} }
} else {
log.warn("ProducerManager registerProducer lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
} }
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try { ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) { if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId()); clientChannelTable.remove(clientChannelInfo.getClientId());
...@@ -194,30 +129,31 @@ public class ProducerManager { ...@@ -194,30 +129,31 @@ public class ProducerManager {
log.info("unregister a producer group[{}] from groupChannelTable", group); log.info("unregister a producer group[{}] from groupChannelTable", group);
} }
} }
} finally {
this.groupChannelLock.unlock();
}
} else {
log.warn("ProducerManager unregisterProducer lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
} }
public Channel getAvaliableChannel(String groupId) { public Channel getAvaliableChannel(String groupId) {
HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId); if (groupId == null) {
return null;
}
List<Channel> channelList = new ArrayList<Channel>(); List<Channel> channelList = new ArrayList<Channel>();
ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
if (channelClientChannelInfoHashMap != null) { if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) { for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel); channelList.add(channel);
} }
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
int size = channelList.size(); int size = channelList.size();
if (0 == size) { if (0 == size) {
log.warn("Channel list is empty. groupId={}", groupId); log.warn("Channel list is empty. groupId={}", groupId);
return null; return null;
} }
Channel lastActiveChannel = null;
int index = positiveAtomicCounter.incrementAndGet() % size; int index = positiveAtomicCounter.incrementAndGet() % size;
Channel channel = channelList.get(index); Channel channel = channelList.get(index);
int count = 0; int count = 0;
...@@ -226,15 +162,15 @@ public class ProducerManager { ...@@ -226,15 +162,15 @@ public class ProducerManager {
if (isOk) { if (isOk) {
return channel; return channel;
} }
if (channel.isActive()) {
lastActiveChannel = channel;
}
index = (++index) % size; index = (++index) % size;
channel = channelList.get(index); channel = channelList.get(index);
isOk = channel.isActive() && channel.isWritable(); isOk = channel.isActive() && channel.isWritable();
} }
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId); return lastActiveChannel;
return null;
}
return null;
} }
public Channel findChannel(String clientId) { public Channel findChannel(String clientId) {
......
...@@ -815,7 +815,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -815,7 +815,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
(GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
ProducerConnection bodydata = new ProducerConnection(); ProducerConnection bodydata = new ProducerConnection();
HashMap<Channel, ClientChannelInfo> channelInfoHashMap = Map<Channel, ClientChannelInfo> channelInfoHashMap =
this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
if (channelInfoHashMap != null) { if (channelInfoHashMap != null) {
Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator(); Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
......
...@@ -19,7 +19,8 @@ package org.apache.rocketmq.broker.client; ...@@ -19,7 +19,8 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.HashMap; import java.util.Map;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -74,7 +75,7 @@ public class ProducerManagerTest { ...@@ -74,7 +75,7 @@ public class ProducerManagerTest {
@Test @Test
public void testRegisterProducer() throws Exception { public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group); Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
Channel channel1 = producerManager.findChannel("clientId"); Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull(); assertThat(channelMap).isNotNull();
assertThat(channel1).isNotNull(); assertThat(channel1).isNotNull();
...@@ -85,7 +86,7 @@ public class ProducerManagerTest { ...@@ -85,7 +86,7 @@ public class ProducerManagerTest {
@Test @Test
public void unregisterProducer() throws Exception { public void unregisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group); Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull(); assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo); assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
Channel channel1 = producerManager.findChannel("clientId"); Channel channel1 = producerManager.findChannel("clientId");
...@@ -102,9 +103,28 @@ public class ProducerManagerTest { ...@@ -102,9 +103,28 @@ public class ProducerManagerTest {
@Test @Test
public void testGetGroupChannelTable() throws Exception { public void testGetGroupChannelTable() throws Exception {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
HashMap<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group); Map<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
producerManager.unregisterProducer(group, clientInfo); producerManager.unregisterProducer(group, clientInfo);
assertThat(oldMap.size()).isNotEqualTo(0); assertThat(oldMap.size()).isEqualTo(0);
} }
@Test
public void testGetAvaliableChannel() {
producerManager.registerProducer(group, clientInfo);
when(channel.isActive()).thenReturn(true);
when(channel.isWritable()).thenReturn(true);
Channel c = producerManager.getAvaliableChannel(group);
assertThat(c).isSameAs(channel);
when(channel.isWritable()).thenReturn(false);
c = producerManager.getAvaliableChannel(group);
assertThat(c).isSameAs(channel);
when(channel.isActive()).thenReturn(false);
c = producerManager.getAvaliableChannel(group);
assertThat(c).isNull();
}
} }
\ No newline at end of file
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ClientChannelInfo;
...@@ -81,7 +82,7 @@ public class ClientManageProcessorTest { ...@@ -81,7 +82,7 @@ public class ClientManageProcessorTest {
@Test @Test
public void processRequest_UnRegisterProducer() throws Exception { public void processRequest_UnRegisterProducer() throws Exception {
brokerController.getProducerManager().registerProducer(group, clientChannelInfo); brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
HashMap<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); Map<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull(); assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo); assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册