StompWebSocketIntegrationTests.java 11.2 KB
Newer Older
1
/*
2
 * Copyright 2002-2014 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.web.socket.messaging;
18

19 20 21 22
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
23
import java.util.Arrays;
24 25
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
26 27 28
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

29
import org.junit.Test;
30 31 32 33
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.springframework.beans.factory.annotation.Autowired;
34
import org.springframework.context.annotation.Bean;
35
import org.springframework.context.annotation.ComponentScan;
36
import org.springframework.context.annotation.Configuration;
R
Rossen Stoyanchev 已提交
37 38
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
39
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
40
import org.springframework.messaging.handler.annotation.MessageMapping;
41
import org.springframework.messaging.simp.annotation.SendToUser;
42
import org.springframework.messaging.simp.annotation.SubscribeMapping;
43
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
44
import org.springframework.messaging.simp.stomp.StompCommand;
45 46
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
47
import org.springframework.stereotype.Controller;
48
import org.springframework.web.socket.*;
49
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
50 51
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
52
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
53 54
import org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
55
import org.springframework.web.socket.server.HandshakeHandler;
56 57

import static org.junit.Assert.*;
58
import static org.springframework.web.socket.messaging.StompTextMessageBuilder.*;
59 60

/**
61
 * Integration tests with annotated message-handling methods.
62
 *
63 64
 * @author Rossen Stoyanchev
 */
65
@RunWith(Parameterized.class)
66
public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
67

68 69 70
	@Parameters
	public static Iterable<Object[]> arguments() {
		return Arrays.asList(new Object[][] {
71
				{new JettyWebSocketTestServer(), new JettyWebSocketClient()},
72 73
				{new TomcatWebSocketTestServer(), new StandardWebSocketClient()},
				{new UndertowTestServer(), new StandardWebSocketClient()}
74
		});
75
	}
76 77


78 79
	@Override
	protected Class<?>[] getAnnotatedConfigClasses() {
80
		return new Class<?>[] { TestMessageBrokerConfiguration.class, TestMessageBrokerConfigurer.class };
81 82
	}

83

84
	@Test
85
	public void sendMessageToController() throws Exception {
86

87
		TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
88
		WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws").get();
89

90
		SimpleController controller = this.wac.getBean(SimpleController.class);
91
		try {
92
			assertTrue(controller.latch.await(10, TimeUnit.SECONDS));
93 94 95 96 97 98 99
		}
		finally {
			session.close();
		}
	}

	@Test
100
	public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
101

R
Rossen Stoyanchev 已提交
102 103
		TextMessage message1 = create(StompCommand.SUBSCRIBE)
				.headers("id:subs1", "destination:/topic/increment").build();
104

R
Rossen Stoyanchev 已提交
105 106
		TextMessage message2 = create(StompCommand.SEND)
				.headers("destination:/app/increment").body("5").build();
107 108

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
109
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();
110

111 112 113 114 115 116
		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
		}
		finally {
			session.close();
		}
117
	}
118

119 120 121 122 123
	// SPR-10930

	@Test
	public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {

124 125
		TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
		TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
126

127
		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
128 129 130 131 132 133
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();

		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));

			String payload = clientHandler.actual.get(0).getPayload();
134
			assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
135 136 137 138 139 140
		}
		finally {
			session.close();
		}
	}

141 142 143 144 145
	// SPR-11648

	@Test
	public void sendSubscribeToControllerAndReceiveReply() throws Exception {

146 147
		String destHeader = "destination:/app/number";
		TextMessage message = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
148 149 150 151 152 153 154

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message);
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();

		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
			String payload = clientHandler.actual.get(0).getPayload();
155
			assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
156 157 158 159 160 161 162
			assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
		}
		finally {
			session.close();
		}
	}

163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
	@Test
	public void handleExceptionAndSendToUser() throws Exception {

		String destHeader = "destination:/user/queue/error";
		TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
		TextMessage m2 = create(StompCommand.SEND).headers("destination:/app/exception").build();

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();

		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));

			String payload = clientHandler.actual.get(0).getPayload();
			assertTrue(payload.startsWith("MESSAGE\n"));
			assertTrue(payload.contains("destination:/user/queue/error\n"));
			assertTrue(payload.endsWith("\"Got error: Bad input\"\0"));
		}
		finally {
			session.close();
		}
	}

R
Rossen Stoyanchev 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
	@Test
	public void webSocketScope() throws Exception {

		TextMessage message1 = create(StompCommand.SUBSCRIBE)
				.headers("id:subs1", "destination:/topic/scopedBeanValue").build();

		TextMessage message2 = create(StompCommand.SEND)
				.headers("destination:/app/scopedBeanValue").build();

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();

		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));

			String payload = clientHandler.actual.get(0).getPayload();
			assertTrue(payload.startsWith("MESSAGE\n"));
			assertTrue(payload.contains("destination:/topic/scopedBeanValue\n"));
			assertTrue(payload.endsWith("\"55\"\0"));
		}
		finally {
			session.close();
		}
	}


	@Target({ElementType.TYPE})
	@Retention(RetentionPolicy.RUNTIME)
	@Controller
	private @interface IntegrationTestController {
	}
217 218 219 220 221 222

	@IntegrationTestController
	static class SimpleController {

		private CountDownLatch latch = new CountDownLatch(1);

223
		@MessageMapping(value="/simple")
224 225 226
		public void handle() {
			this.latch.countDown();
		}
227 228 229 230 231 232 233

		@MessageMapping(value="/exception")
		public void handleWithError() {
			throw new IllegalArgumentException("Bad input");
		}

		@MessageExceptionHandler
234 235 236
		@SendToUser("/queue/error")
		public String handleException(IllegalArgumentException ex) {
			return "Got error: " + ex.getMessage();
237
		}
238 239
	}

240 241 242
	@IntegrationTestController
	static class IncrementController {

243
		@MessageMapping(value="/increment")
244 245 246
		public int handle(int i) {
			return i + 1;
		}
247 248 249 250 251

		@SubscribeMapping("/number")
		public int number() {
			return 42;
		}
252 253
	}

R
Rossen Stoyanchev 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
	@IntegrationTestController
	static class ScopedBeanController {

		private final ScopedBean scopedBean;

		@Autowired
		public ScopedBeanController(ScopedBean scopedBean) {
			this.scopedBean = scopedBean;
		}

		@MessageMapping(value="/scopedBeanValue")
		public String getValue() {
			return this.scopedBean.getValue();
		}
	}


	static interface ScopedBean {

		String getValue();
	}

	static class ScopedBeanImpl implements ScopedBean {

		private final String value;

		public ScopedBeanImpl(String value) {
			this.value = value;
		}

		@Override
		public String getValue() {
			return this.value;
		}
	}

290

291
	private static class TestClientWebSocketHandler extends TextWebSocketHandler {
292

293
		private final TextMessage[] messagesToSend;
294

295 296
		private final int expected;

297
		private final List<TextMessage> actual = new CopyOnWriteArrayList<>();
298 299 300 301

		private final CountDownLatch latch;


302 303
		public TestClientWebSocketHandler(int expectedNumberOfMessages, TextMessage... messagesToSend) {
			this.messagesToSend = messagesToSend;
304 305
			this.expected = expectedNumberOfMessages;
			this.latch = new CountDownLatch(this.expected);
306 307 308
		}

		@Override
309
		public void afterConnectionEstablished(WebSocketSession session) throws Exception {
310 311 312
			for (TextMessage message : this.messagesToSend) {
				session.sendMessage(message);
			}
313 314
		}

315 316 317 318
		@Override
		protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
			this.actual.add(message);
			this.latch.countDown();
319 320 321 322
		}
	}

	@Configuration
R
Rossen Stoyanchev 已提交
323 324
	@ComponentScan(
			basePackageClasses=StompWebSocketIntegrationTests.class,
325
			useDefaultFilters=false,
326
			includeFilters=@ComponentScan.Filter(IntegrationTestController.class))
327
	static class TestMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {
328

329 330 331
		@Autowired
		private HandshakeHandler handshakeHandler; // can't rely on classpath for server detection

332 333
		@Override
		public void registerStompEndpoints(StompEndpointRegistry registry) {
334
			registry.addEndpoint("/ws").setHandshakeHandler(this.handshakeHandler);
335 336 337
		}

		@Override
338
		public void configureMessageBroker(MessageBrokerRegistry configurer) {
339
			configurer.setApplicationDestinationPrefixes("/app");
340
			configurer.enableSimpleBroker("/topic", "/queue");
341
		}
R
Rossen Stoyanchev 已提交
342 343 344 345 346 347

		@Bean
		@Scope(value="websocket", proxyMode=ScopedProxyMode.INTERFACES)
		public ScopedBean scopedBean() {
			return new ScopedBeanImpl("55");
		}
348 349
	}

350 351
	@Configuration
	static class TestMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {
352

353 354
		@Override
		@Bean
355
		public AbstractSubscribableChannel clientInboundChannel() {
356 357
			return new ExecutorSubscribableChannel(); // synchronous
		}
358

359 360
		@Override
		@Bean
361
		public AbstractSubscribableChannel clientOutboundChannel() {
362
			return new ExecutorSubscribableChannel(); // synchronous
363 364 365 366
		}
	}

}