PublishSubscribeChannel.java 2.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright 2002-2013 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.messaging.channel;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

/**
29
 * A {@link SubscribableChannel} that sends messages to each of its subscribers.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
 *
 * @author Phillip Webb
 * @since 4.0
 */
public class PublishSubscribeChannel implements SubscribableChannel {

	private Executor executor;

	private Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();


	/**
	 * Create a new {@link PublishSubscribeChannel} instance where messages will be sent
	 * in the callers thread.
	 */
	public PublishSubscribeChannel() {
		this(null);
	}

	/**
	 * Create a new {@link PublishSubscribeChannel} instance where messages will be sent
	 * via the specified executor.
	 * @param executor the executor used to send the message or {@code null} to execute in
	 *        the callers thread.
	 */
	public PublishSubscribeChannel(Executor executor) {
		this.executor = executor;
	}

	@Override
	public boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}

	@Override
	public boolean send(Message<?> message, long timeout) {
		Assert.notNull(message, "Message must not be null");
		Assert.notNull(message.getPayload(), "Message payload must not be null");
		for (final MessageHandler handler : this.handlers) {
			dispatchToHandler(message, handler);
		}
		return true;
	}

	private void dispatchToHandler(final Message<?> message, final MessageHandler handler) {
		if (this.executor == null) {
			handler.handleMessage(message);
		}
		else {
			this.executor.execute(new Runnable() {
				@Override
				public void run() {
					handler.handleMessage(message);
				}
			});
		}
	}

	@Override
	public boolean subscribe(MessageHandler handler) {
		return this.handlers.add(handler);
	}

	@Override
	public boolean unsubscribe(MessageHandler handler) {
		return this.handlers.remove(handler);
	}

}