提交 b3fa1b40 编写于 作者: R Rossen Stoyanchev

Synchronized updates of STOMP header key cache

Issue: SPR-14901
上级 9b76dc2a
......@@ -25,6 +25,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -53,15 +54,26 @@ public class StompEncoder {
private static final int HEADER_KEY_CACHE_LIMIT = 32;
private final Map<String, byte[]> headerKeyAccessCache =
new ConcurrentHashMap<>(HEADER_KEY_CACHE_LIMIT);
@SuppressWarnings("serial")
private final Map<String, byte[]> headerKeyCache =
private final Map<String, byte[]> headerKeyUpdateCache =
new LinkedHashMap<String, byte[]>(HEADER_KEY_CACHE_LIMIT, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, byte[]> eldest) {
return size() > HEADER_KEY_CACHE_LIMIT;
if (size() > HEADER_KEY_CACHE_LIMIT) {
headerKeyAccessCache.remove(eldest.getKey());
return true;
}
else {
return false;
}
}
};
/**
* Encodes the given STOMP {@code message} into a {@code byte[]}
* @param message the message to encode
......@@ -160,21 +172,23 @@ public class StompEncoder {
private byte[] encodeHeaderKey(String input, boolean escape) {
String inputToUse = (escape ? escape(input) : input);
if (headerKeyCache.containsKey(inputToUse)) {
return headerKeyCache.get(inputToUse);
if (this.headerKeyAccessCache.containsKey(inputToUse)) {
return this.headerKeyAccessCache.get(inputToUse);
}
synchronized (this.headerKeyUpdateCache) {
byte[] bytes = this.headerKeyUpdateCache.get(inputToUse);
if (bytes == null) {
bytes = inputToUse.getBytes(StandardCharsets.UTF_8);
this.headerKeyAccessCache.put(inputToUse, bytes);
this.headerKeyUpdateCache.put(inputToUse, bytes);
}
return bytes;
}
byte[] bytes = encodeHeaderString(inputToUse);
headerKeyCache.put(inputToUse, bytes);
return bytes;
}
private byte[] encodeHeaderValue(String input, boolean escape) {
String inputToUse = (escape ? escape(input) : input);
return encodeHeaderString(inputToUse);
}
private byte[] encodeHeaderString(String input) {
return input.getBytes(StandardCharsets.UTF_8);
return inputToUse.getBytes(StandardCharsets.UTF_8);
}
/**
......
......@@ -88,13 +88,14 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private StompSubProtocolErrorHandler errorHandler;
private int messageSizeLimit = 64 * 1024;
private StompEncoder stompEncoder;
private StompEncoder stompEncoder = new StompEncoder();
private StompDecoder stompDecoder;
private StompDecoder stompDecoder = new StompDecoder();
private final Map<String, BufferingStompDecoder> decoders = new ConcurrentHashMap<>();
......@@ -106,10 +107,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
private final Stats stats = new Stats();
public StompSubProtocolHandler() {
setEncoder(new StompEncoder());
setDecoder(new StompDecoder());
}
/**
* Configure a handler for error messages sent to clients which allows
......@@ -129,24 +126,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
return this.errorHandler;
}
/**
* Configure a {@link StompEncoder} for encoding STOMP frames
* @param encoder the encoder
* @since 4.3.5
*/
public void setEncoder(StompEncoder encoder) {
this.stompEncoder = encoder;
}
/**
* Configure a {@link StompDecoder} for decoding STOMP frames
* @param decoder the decoder
* @since 4.3.5
*/
public void setDecoder(StompDecoder decoder) {
this.stompDecoder = decoder;
}
/**
* Configure the maximum size allowed for an incoming STOMP message.
* Since a STOMP message can be received in multiple WebSocket messages,
......@@ -167,6 +146,24 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
return this.messageSizeLimit;
}
/**
* Configure a {@link StompEncoder} for encoding STOMP frames
* @param encoder the encoder
* @since 4.3.5
*/
public void setEncoder(StompEncoder encoder) {
this.stompEncoder = encoder;
}
/**
* Configure a {@link StompDecoder} for decoding STOMP frames
* @param decoder the decoder
* @since 4.3.5
*/
public void setDecoder(StompDecoder decoder) {
this.stompDecoder = decoder;
}
/**
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
* messages created from decoded STOMP frames and other messages sent to the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册