提交 28174744 编写于 作者: A Andy Wilkinson

Fix race when flushing messages

The use of an AtomicBoolean and no lock meant that it was possible
for a message to be queued and then never be flushed and sent to the
broker:

1. On t1, a message is received and isConnected is false. The message
   will be queued.
2. On t2, CONNECTED is received from the broker. isConnected is set
   to true, the queue is drained and the queued messages are forwarded
3. On t1, the message is added to the queue

To fix this, checking that isConnected is false (step 1 above) and the
queueing of a message (step 3 above) need to be performed as a unit
so that the flushing of the queued messages can't be interleaved. This
is achieved by synchronizing on a monitor and performing steps 1
and 3 and synchronizing on the same monitor while performing step 2.

The monitor is held while the messages are actually being forwarded
to the broker. An alternative would be to drain the queue into
a local variable, release the monitor, and then forward the messages.
The main advantage of this alternative is that the monitor is held for
less time. It also reduces the theoretical risk of deadlock by not
holding the monitor while making an alien call. The downside of the
alternative is that it may lead to messages being forwarded out of
order. For this reason the alternative approach was rejected.
上级 3c6c56fe
......@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
......@@ -171,10 +170,11 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
private final Promise<TcpConnection<String, String>> promise;
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final BlockingQueue<M> messageQueue = new LinkedBlockingQueue<M>(50);
private final Object monitor = new Object();
private boolean isConnected = false;
public RelaySession(final M message, final StompHeaders stompHeaders) {
......@@ -224,8 +224,10 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
StompHeaders headers = StompHeaders.fromMessageHeaders(message.getHeaders());
if (StompCommand.CONNECTED == headers.getStompCommand()) {
this.isConnected.set(true);
flushMessages(promise.get());
synchronized(this.monitor) {
this.isConnected = true;
flushMessages(promise.get());
}
return;
}
if (StompCommand.ERROR == headers.getStompCommand()) {
......@@ -248,14 +250,14 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
public void forward(M message, StompHeaders headers) {
if (!this.isConnected.get()) {
@SuppressWarnings("unchecked")
M m = (M) MessageBuilder.fromPayloadAndHeaders(message.getPayload(), headers.toMessageHeaders()).build();
if (logger.isTraceEnabled()) {
logger.trace("Adding to queue message " + m + ", queue size=" + this.messageQueue.size());
synchronized(this.monitor) {
if (!this.isConnected) {
if (logger.isTraceEnabled()) {
logger.trace("Adding to queue message " + message + ", queue size=" + this.messageQueue.size());
}
this.messageQueue.add(message);
return;
}
this.messageQueue.add(m);
return;
}
TcpConnection<String, String> connection = this.promise.get();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册