StompWebSocketIntegrationTests.java 8.6 KB
Newer Older
1
/*
2
 * Copyright 2002-2014 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.web.socket.messaging;
18

19 20 21 22
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
23
import java.util.Arrays;
24 25
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
26 27 28
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

29
import org.junit.Test;
30 31 32 33
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.springframework.beans.factory.annotation.Autowired;
34
import org.springframework.context.annotation.Bean;
35
import org.springframework.context.annotation.ComponentScan;
36
import org.springframework.context.annotation.Configuration;
37
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
38
import org.springframework.messaging.handler.annotation.MessageMapping;
39
import org.springframework.messaging.simp.annotation.SubscribeMapping;
40
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
41
import org.springframework.messaging.simp.stomp.StompCommand;
42 43
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
44
import org.springframework.stereotype.Controller;
45
import org.springframework.web.socket.*;
46
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
47 48
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
49
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
50 51
import org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
52
import org.springframework.web.socket.server.HandshakeHandler;
53 54

import static org.junit.Assert.*;
55
import static org.springframework.web.socket.messaging.StompTextMessageBuilder.*;
56 57

/**
58
 * Integration tests with annotated message-handling methods.
59
 *
60 61
 * @author Rossen Stoyanchev
 */
62
@RunWith(Parameterized.class)
63
public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
64

65 66 67
	@Parameters
	public static Iterable<Object[]> arguments() {
		return Arrays.asList(new Object[][] {
68
				{new JettyWebSocketTestServer(), new JettyWebSocketClient()},
69 70
				{new TomcatWebSocketTestServer(), new StandardWebSocketClient()},
				{new UndertowTestServer(), new StandardWebSocketClient()}
71
		});
72
	}
73 74


75 76
	@Override
	protected Class<?>[] getAnnotatedConfigClasses() {
77
		return new Class<?>[] { TestMessageBrokerConfiguration.class, TestMessageBrokerConfigurer.class };
78 79
	}

80

81
	@Test
82
	public void sendMessageToController() throws Exception {
83

84
		TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
85
		WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws").get();
86

87
		SimpleController controller = this.wac.getBean(SimpleController.class);
88
		try {
89
			assertTrue(controller.latch.await(10, TimeUnit.SECONDS));
90 91 92 93 94 95 96
		}
		finally {
			session.close();
		}
	}

	@Test
97
	public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
98 99

		TextMessage message1 = create(StompCommand.SUBSCRIBE).headers(
100
				"id:subs1", "destination:/topic/increment").build();
101 102

		TextMessage message2 = create(StompCommand.SEND).headers(
103
				"destination:/app/increment").body("5").build();
104 105

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
106
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();
107

108 109 110 111 112 113
		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
		}
		finally {
			session.close();
		}
114
	}
115

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
	// SPR-10930

	@Test
	public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {

		TextMessage message1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
		TextMessage message2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();

		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));

			String payload = clientHandler.actual.get(0).getPayload();
			assertTrue("Expected STOMP Command=MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
		}
		finally {
			session.close();
		}
	}

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
	// SPR-11648

	@Test
	public void sendSubscribeToControllerAndReceiveReply() throws Exception {

		TextMessage message = create(StompCommand.SUBSCRIBE).headers(
				"id:subs1", "destination:/app/number").build();

		TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message);
		WebSocketSession session = doHandshake(clientHandler, "/ws").get();

		try {
			assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
			String payload = clientHandler.actual.get(0).getPayload();
			assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains("destination:/app/number"));
			assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
		}
		finally {
			session.close();
		}
	}

160 161 162 163 164 165

	@IntegrationTestController
	static class SimpleController {

		private CountDownLatch latch = new CountDownLatch(1);

166
		@MessageMapping(value="/simple")
167 168 169
		public void handle() {
			this.latch.countDown();
		}
170 171 172 173 174 175 176 177 178 179 180

		@MessageMapping(value="/exception")
		public void handleWithError() {
			throw new IllegalArgumentException("Bad input");
		}

		@MessageExceptionHandler
		public void handleException(IllegalArgumentException ex) {

		}

181 182
	}

183 184 185
	@IntegrationTestController
	static class IncrementController {

186
		@MessageMapping(value="/increment")
187 188 189
		public int handle(int i) {
			return i + 1;
		}
190 191 192 193 194

		@SubscribeMapping("/number")
		public int number() {
			return 42;
		}
195 196 197
	}


198
	private static class TestClientWebSocketHandler extends TextWebSocketHandler {
199

200
		private final TextMessage[] messagesToSend;
201

202 203
		private final int expected;

204
		private final List<TextMessage> actual = new CopyOnWriteArrayList<>();
205 206 207 208

		private final CountDownLatch latch;


209 210
		public TestClientWebSocketHandler(int expectedNumberOfMessages, TextMessage... messagesToSend) {
			this.messagesToSend = messagesToSend;
211 212
			this.expected = expectedNumberOfMessages;
			this.latch = new CountDownLatch(this.expected);
213 214 215
		}

		@Override
216
		public void afterConnectionEstablished(WebSocketSession session) throws Exception {
217 218 219
			for (TextMessage message : this.messagesToSend) {
				session.sendMessage(message);
			}
220 221
		}

222 223 224 225
		@Override
		protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
			this.actual.add(message);
			this.latch.countDown();
226 227 228 229
		}
	}

	@Configuration
230
	@ComponentScan(basePackageClasses=StompWebSocketIntegrationTests.class,
231
			useDefaultFilters=false,
232
			includeFilters=@ComponentScan.Filter(IntegrationTestController.class))
233
	static class TestMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {
234

235 236 237
		@Autowired
		private HandshakeHandler handshakeHandler; // can't rely on classpath for server detection

238 239
		@Override
		public void registerStompEndpoints(StompEndpointRegistry registry) {
240
			registry.addEndpoint("/ws").setHandshakeHandler(this.handshakeHandler);
241 242 243
		}

		@Override
244
		public void configureMessageBroker(MessageBrokerRegistry configurer) {
245
			configurer.setApplicationDestinationPrefixes("/app");
246
			configurer.enableSimpleBroker("/topic", "/queue");
247 248 249
		}
	}

250 251
	@Configuration
	static class TestMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {
252

253 254
		@Override
		@Bean
255
		public AbstractSubscribableChannel clientInboundChannel() {
256 257
			return new ExecutorSubscribableChannel(); // synchronous
		}
258

259 260
		@Override
		@Bean
261
		public AbstractSubscribableChannel clientOutboundChannel() {
262
			return new ExecutorSubscribableChannel(); // synchronous
263 264 265
		}
	}

266 267 268 269 270 271
	@Target({ElementType.TYPE})
	@Retention(RetentionPolicy.RUNTIME)
	@Controller
	private @interface IntegrationTestController {
	}

272
}