StompEncoder.java 7.1 KB
Newer Older
A
Andy Wilkinson 已提交
1
/*
P
Phillip Webb 已提交
2
 * Copyright 2002-2018 the original author or authors.
A
Andy Wilkinson 已提交
3 4 5 6 7
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
A
Andy Wilkinson 已提交
9 10 11 12 13 14 15 16 17 18 19 20 21
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.messaging.simp.stomp;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
22
import java.nio.charset.StandardCharsets;
J
Juergen Hoeller 已提交
23
import java.util.Collections;
24
import java.util.LinkedHashMap;
A
Andy Wilkinson 已提交
25
import java.util.List;
26
import java.util.Map;
A
Andy Wilkinson 已提交
27
import java.util.Map.Entry;
28
import java.util.concurrent.ConcurrentHashMap;
A
Andy Wilkinson 已提交
29

30
import org.apache.commons.logging.Log;
31

32
import org.springframework.lang.Nullable;
A
Andy Wilkinson 已提交
33
import org.springframework.messaging.Message;
34
import org.springframework.messaging.simp.SimpLogging;
35
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
36
import org.springframework.messaging.simp.SimpMessageType;
37 38
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.util.Assert;
A
Andy Wilkinson 已提交
39 40

/**
41
 * An encoder for STOMP frames.
A
Andy Wilkinson 已提交
42 43
 *
 * @author Andy Wilkinson
44
 * @author Rossen Stoyanchev
A
Andy Wilkinson 已提交
45
 * @since 4.0
J
Juergen Hoeller 已提交
46
 * @see StompDecoder
A
Andy Wilkinson 已提交
47
 */
J
Juergen Hoeller 已提交
48
public class StompEncoder  {
A
Andy Wilkinson 已提交
49 50 51 52 53

	private static final byte LF = '\n';

	private static final byte COLON = ':';

54
	private static final Log logger = SimpLogging.forLogName(StompEncoder.class);
A
Andy Wilkinson 已提交
55

56 57
	private static final int HEADER_KEY_CACHE_LIMIT = 32;

58

59
	private final Map<String, byte[]> headerKeyAccessCache = new ConcurrentHashMap<>(HEADER_KEY_CACHE_LIMIT);
60

61
	@SuppressWarnings("serial")
62
	private final Map<String, byte[]> headerKeyUpdateCache =
63 64 65
			new LinkedHashMap<String, byte[]>(HEADER_KEY_CACHE_LIMIT, 0.75f, true) {
				@Override
				protected boolean removeEldestEntry(Map.Entry<String, byte[]> eldest) {
66 67 68 69 70 71 72
					if (size() > HEADER_KEY_CACHE_LIMIT) {
						headerKeyAccessCache.remove(eldest.getKey());
						return true;
					}
					else {
						return false;
					}
73 74
				}
			};
75

76

A
Andy Wilkinson 已提交
77
	/**
P
Phillip Webb 已提交
78
	 * Encodes the given STOMP {@code message} into a {@code byte[]}.
79 80
	 * @param message the message to encode
	 * @return the encoded message
A
Andy Wilkinson 已提交
81 82
	 */
	public byte[] encode(Message<byte[]> message) {
83 84 85 86 87 88 89 90 91 92 93 94
		return encode(message.getHeaders(), message.getPayload());
	}

	/**
	 * Encodes the given payload and headers into a {@code byte[]}.
	 * @param headers the headers
	 * @param payload the payload
	 * @return the encoded message
	 */
	public byte[] encode(Map<String, Object> headers, byte[] payload) {
		Assert.notNull(headers, "'headers' is required");
		Assert.notNull(payload, "'payload' is required");
95

A
Andy Wilkinson 已提交
96
		try {
97
			ByteArrayOutputStream baos = new ByteArrayOutputStream(128 + payload.length);
A
Andy Wilkinson 已提交
98 99
			DataOutputStream output = new DataOutputStream(baos);

100
			if (SimpMessageType.HEARTBEAT.equals(SimpMessageHeaderAccessor.getMessageType(headers))) {
101
				logger.trace("Encoding heartbeat");
102
				output.write(StompDecoder.HEARTBEAT_PAYLOAD);
103
			}
J
Juergen Hoeller 已提交
104

105
			else {
106
				StompCommand command = StompHeaderAccessor.getCommand(headers);
107 108 109 110
				if (command == null) {
					throw new IllegalStateException("Missing STOMP command: " + headers);
				}

111
				output.write(command.toString().getBytes(StandardCharsets.UTF_8));
R
Polish  
Rossen Stoyanchev 已提交
112
				output.write(LF);
113
				writeHeaders(command, headers, payload, output);
114
				output.write(LF);
115
				writeBody(payload, output);
R
Polish  
Rossen Stoyanchev 已提交
116
				output.write((byte) 0);
117
			}
A
Andy Wilkinson 已提交
118 119 120

			return baos.toByteArray();
		}
121 122
		catch (IOException ex) {
			throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers,  ex);
A
Andy Wilkinson 已提交
123 124 125
		}
	}

J
Juergen Hoeller 已提交
126 127
	private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
			DataOutputStream output) throws IOException {
A
Andy Wilkinson 已提交
128

129 130 131
		@SuppressWarnings("unchecked")
		Map<String,List<String>> nativeHeaders =
				(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
R
Polish  
Rossen Stoyanchev 已提交
132

133
		if (logger.isTraceEnabled()) {
134
			logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
135 136 137 138
		}

		if (nativeHeaders == null) {
			return;
139
		}
R
Polish  
Rossen Stoyanchev 已提交
140

141 142 143
		boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);

		for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
144 145 146
			if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
				continue;
			}
147

148
			List<String> values = entry.getValue();
149 150
			if (StompCommand.CONNECT.equals(command) &&
					StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
J
Juergen Hoeller 已提交
151
				values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
152
			}
153

154
			byte[] encodedKey = encodeHeaderKey(entry.getKey(), shouldEscape);
155
			for (String value : values) {
156
				output.write(encodedKey);
A
Andy Wilkinson 已提交
157
				output.write(COLON);
158
				output.write(encodeHeaderValue(value, shouldEscape));
A
Andy Wilkinson 已提交
159 160 161
				output.write(LF);
			}
		}
162

R
Polish  
Rossen Stoyanchev 已提交
163
		if (command.requiresContentLength()) {
164
			int contentLength = payload.length;
165 166
			output.write("content-length:".getBytes(StandardCharsets.UTF_8));
			output.write(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8));
A
Andy Wilkinson 已提交
167 168 169 170
			output.write(LF);
		}
	}

171 172
	private byte[] encodeHeaderKey(String input, boolean escape) {
		String inputToUse = (escape ? escape(input) : input);
173 174 175 176 177 178 179 180 181 182 183
		if (this.headerKeyAccessCache.containsKey(inputToUse)) {
			return this.headerKeyAccessCache.get(inputToUse);
		}
		synchronized (this.headerKeyUpdateCache) {
			byte[] bytes = this.headerKeyUpdateCache.get(inputToUse);
			if (bytes == null) {
				bytes = inputToUse.getBytes(StandardCharsets.UTF_8);
				this.headerKeyAccessCache.put(inputToUse, bytes);
				this.headerKeyUpdateCache.put(inputToUse, bytes);
			}
			return bytes;
184 185 186 187
		}
	}

	private byte[] encodeHeaderValue(String input, boolean escape) {
188
		String inputToUse = (escape ? escape(input) : input);
189
		return inputToUse.getBytes(StandardCharsets.UTF_8);
190 191 192 193
	}

	/**
	 * See STOMP Spec 1.2:
S
Spring Operator 已提交
194
	 * <a href="https://stomp.github.io/stomp-specification-1.2.html#Value_Encoding">"Value Encoding"</a>.
195 196
	 */
	private String escape(String inString) {
197
		StringBuilder sb = null;
198 199 200
		for (int i = 0; i < inString.length(); i++) {
			char c = inString.charAt(i);
			if (c == '\\') {
201
				sb = getStringBuilder(sb, inString, i);
202 203 204
				sb.append("\\\\");
			}
			else if (c == ':') {
205
				sb = getStringBuilder(sb, inString, i);
206 207 208
				sb.append("\\c");
			}
			else if (c == '\n') {
209 210
				sb = getStringBuilder(sb, inString, i);
				sb.append("\\n");
211 212
			}
			else if (c == '\r') {
213
				sb = getStringBuilder(sb, inString, i);
214 215
				sb.append("\\r");
			}
216
			else if (sb != null){
217 218
				sb.append(c);
			}
A
Andy Wilkinson 已提交
219
		}
220 221 222
		return (sb != null ? sb.toString() : inString);
	}

223
	private StringBuilder getStringBuilder(@Nullable StringBuilder sb, String inString, int i) {
224 225 226 227 228
		if (sb == null) {
			sb = new StringBuilder(inString.length());
			sb.append(inString.substring(0, i));
		}
		return sb;
A
Andy Wilkinson 已提交
229 230
	}

231 232
	private void writeBody(byte[] payload, DataOutputStream output) throws IOException {
		output.write(payload);
A
Andy Wilkinson 已提交
233
	}
234 235

}