From 4e3390ae04f0e553fc03117ed20c813004006cb6 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 15 Oct 2013 10:41:54 -0400 Subject: [PATCH] Upgrade to reactor 1.0 RC1 and remove MessageChannel This change upgrades reactor to 1.0 RC1 and also removes the reactor-based message channel in favor of the one available from org.projectreactor:reactor-spring. --- build.gradle | 5 +- .../channel/AbstractSubscribableChannel.java | 9 ++ .../channel/ReactorSubscribableChannel.java | 101 ------------------ 3 files changed, 11 insertions(+), 104 deletions(-) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/channel/ReactorSubscribableChannel.java diff --git a/build.gradle b/build.gradle index bf53b9381f..a4d05cf220 100644 --- a/build.gradle +++ b/build.gradle @@ -384,9 +384,8 @@ project("spring-messaging") { optional(project(":spring-websocket")) optional(project(":spring-webmvc")) optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") - optional("org.projectreactor:reactor-core:1.0.0.M3") - optional("org.projectreactor:reactor-tcp:1.0.0.M3") - optional("com.lmax:disruptor:3.1.1") + optional("org.projectreactor:reactor-core:1.0.0.RC1") + optional("org.projectreactor:reactor-tcp:1.0.0.RC1") optional("org.eclipse.jetty.websocket:websocket-server:9.0.5.v20130815") optional("org.eclipse.jetty.websocket:websocket-client:9.0.5.v20130815") testCompile(project(":spring-test")) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractSubscribableChannel.java index 9740e14971..3cc8dd278a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractSubscribableChannel.java @@ -41,8 +41,14 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel return subscribeInternal(handler); } + /** + * Whether the given {@link MessageHandler} is already subscribed. + */ protected abstract boolean hasSubscription(MessageHandler handler); + /** + * Subscribe the given {@link MessageHandler}. + */ protected abstract boolean subscribeInternal(MessageHandler handler); @Override @@ -53,6 +59,9 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel return unsubscribeInternal(handler); } + /** + * Unsubscribe the given {@link MessageHandler}. + */ protected abstract boolean unsubscribeInternal(MessageHandler handler); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ReactorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ReactorSubscribableChannel.java deleted file mode 100644 index d60ccea291..0000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ReactorSubscribableChannel.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.support.channel; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; - -import reactor.core.Reactor; -import reactor.event.Event; -import reactor.event.registry.Registration; -import reactor.event.selector.ObjectSelector; -import reactor.event.selector.Selector; -import reactor.function.Consumer; - - -/** - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class ReactorSubscribableChannel extends AbstractSubscribableChannel { - - private final Reactor reactor; - - private final Object key = new Object(); - - private final Map> registrations = new HashMap>(); - - - public ReactorSubscribableChannel(Reactor reactor) { - this.reactor = reactor; - } - - - @Override - protected boolean hasSubscription(MessageHandler handler) { - return this.registrations.containsKey(handler); - } - - @Override - public boolean sendInternal(Message message, long timeout) { - this.reactor.notify(this.key, Event.wrap(message)); - return true; - } - - @Override - public boolean subscribeInternal(final MessageHandler handler) { - Selector selector = ObjectSelector.objectSelector(this.key); - MessageHandlerConsumer consumer = new MessageHandlerConsumer(handler); - Registration>>> registration = this.reactor.on(selector, consumer); - this.registrations.put(handler, registration); - return true; - } - - @Override - public boolean unsubscribeInternal(MessageHandler handler) { - Registration registration = this.registrations.remove(handler); - if (registration != null) { - registration.cancel(); - return true; - } - return false; - } - - - private final class MessageHandlerConsumer implements Consumer>> { - - private final MessageHandler handler; - - private MessageHandlerConsumer(MessageHandler handler) { - this.handler = handler; - } - - @Override - public void accept(Event> event) { - Message message = event.getData(); - try { - this.handler.handleMessage(message); - } - catch (Throwable t) { - logger.error("Failed to process message " + message, t); - } - } - } -} -- GitLab