提交 4927c905 编写于 作者: J Juergen Hoeller

Revised SingleConnectionFactory for individual proxies with ExceptionListener and start/stop state

Issue: SPR-10397
上级 38ef6dec
...@@ -21,7 +21,9 @@ import java.lang.reflect.InvocationTargetException; ...@@ -21,7 +21,9 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
...@@ -73,9 +75,8 @@ import org.springframework.util.Assert; ...@@ -73,9 +75,8 @@ import org.springframework.util.Assert;
* @see org.springframework.jms.listener.SimpleMessageListenerContainer * @see org.springframework.jms.listener.SimpleMessageListenerContainer
* @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel
*/ */
public class SingleConnectionFactory public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ExceptionListener, TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {
InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
...@@ -87,17 +88,17 @@ public class SingleConnectionFactory ...@@ -87,17 +88,17 @@ public class SingleConnectionFactory
private boolean reconnectOnException = false; private boolean reconnectOnException = false;
/** Wrapped Connection */ /** The target Connection */
private Connection target;
/** Proxy Connection */
private Connection connection; private Connection connection;
/** A hint whether to create a queue or topic connection */ /** A hint whether to create a queue or topic connection */
private Boolean pubSubMode; private Boolean pubSubMode;
/** An internal aggregator allowing for per-connection ExceptionListeners */
private AggregatedExceptionListener aggregatedExceptionListener;
/** Whether the shared Connection has been started */ /** Whether the shared Connection has been started */
private boolean started = false; private int startedCount = 0;
/** Synchronization monitor for the shared Connection */ /** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object(); private final Object connectionMonitor = new Object();
...@@ -112,18 +113,16 @@ public class SingleConnectionFactory ...@@ -112,18 +113,16 @@ public class SingleConnectionFactory
/** /**
* Create a new SingleConnectionFactory that always returns the given Connection. * Create a new SingleConnectionFactory that always returns the given Connection.
* @param target the single Connection * @param targetConnection the single Connection
*/ */
public SingleConnectionFactory(Connection target) { public SingleConnectionFactory(Connection targetConnection) {
Assert.notNull(target, "Target Connection must not be null"); Assert.notNull(targetConnection, "Target Connection must not be null");
this.target = target; this.connection = targetConnection;
this.connection = getSharedConnectionProxy(target);
} }
/** /**
* Create a new SingleConnectionFactory that always returns a single * Create a new SingleConnectionFactory that always returns a single Connection
* Connection that it will lazily create via the given target * that it will lazily create via the given target ConnectionFactory.
* ConnectionFactory.
* @param targetConnectionFactory the target ConnectionFactory * @param targetConnectionFactory the target ConnectionFactory
*/ */
public SingleConnectionFactory(ConnectionFactory targetConnectionFactory) { public SingleConnectionFactory(ConnectionFactory targetConnectionFactory) {
...@@ -171,7 +170,7 @@ public class SingleConnectionFactory ...@@ -171,7 +170,7 @@ public class SingleConnectionFactory
/** /**
* Specify an JMS ExceptionListener implementation that should be * Specify an JMS ExceptionListener implementation that should be
* registered with with the single Connection created by this factory. * registered with the single Connection created by this factory.
* @see #setReconnectOnException * @see #setReconnectOnException
*/ */
public void setExceptionListener(ExceptionListener exceptionListener) { public void setExceptionListener(ExceptionListener exceptionListener) {
...@@ -180,7 +179,7 @@ public class SingleConnectionFactory ...@@ -180,7 +179,7 @@ public class SingleConnectionFactory
/** /**
* Return the JMS ExceptionListener implementation that should be registered * Return the JMS ExceptionListener implementation that should be registered
* with with the single Connection created by this factory, if any. * with the single Connection created by this factory, if any.
*/ */
protected ExceptionListener getExceptionListener() { protected ExceptionListener getExceptionListener() {
return this.exceptionListener; return this.exceptionListener;
...@@ -215,19 +214,14 @@ public class SingleConnectionFactory ...@@ -215,19 +214,14 @@ public class SingleConnectionFactory
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
if (this.connection == null && getTargetConnectionFactory() == null) { if (this.connection == null && getTargetConnectionFactory() == null) {
throw new IllegalArgumentException("Connection or 'targetConnectionFactory' is required"); throw new IllegalArgumentException("Target Connection or ConnectionFactory is required");
} }
} }
@Override @Override
public Connection createConnection() throws JMSException { public Connection createConnection() throws JMSException {
synchronized (this.connectionMonitor) { return getSharedConnectionProxy(getConnection());
if (this.connection == null) {
initConnection();
}
return this.connection;
}
} }
@Override @Override
...@@ -277,11 +271,27 @@ public class SingleConnectionFactory ...@@ -277,11 +271,27 @@ public class SingleConnectionFactory
} }
/**
* Obtain an initialized shared Connection.
* @return the Connection (never {@code null})
* @throws javax.jms.JMSException if thrown by JMS API methods
* @see #initConnection()
*/
protected Connection getConnection() throws JMSException {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
initConnection();
}
return this.connection;
}
}
/** /**
* Initialize the underlying shared Connection. * Initialize the underlying shared Connection.
* <p>Closes and reinitializes the Connection if an underlying * <p>Closes and reinitializes the Connection if an underlying
* Connection is present already. * Connection is present already.
* @throws javax.jms.JMSException if thrown by JMS API methods * @throws javax.jms.JMSException if thrown by JMS API methods
* @see #prepareConnection
*/ */
public void initConnection() throws JMSException { public void initConnection() throws JMSException {
if (getTargetConnectionFactory() == null) { if (getTargetConnectionFactory() == null) {
...@@ -289,20 +299,23 @@ public class SingleConnectionFactory ...@@ -289,20 +299,23 @@ public class SingleConnectionFactory
"'targetConnectionFactory' is required for lazily initializing a Connection"); "'targetConnectionFactory' is required for lazily initializing a Connection");
} }
synchronized (this.connectionMonitor) { synchronized (this.connectionMonitor) {
if (this.target != null) { if (this.connection != null) {
closeConnection(this.target); closeConnection(this.connection);
}
this.connection = doCreateConnection();
prepareConnection(this.connection);
if (this.startedCount > 0) {
this.connection.start();
} }
this.target = doCreateConnection();
prepareConnection(this.target);
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Established shared JMS Connection: " + this.target); logger.info("Established shared JMS Connection: " + this.connection);
} }
this.connection = getSharedConnectionProxy(this.target);
} }
} }
/** /**
* Exception listener callback that renews the underlying single Connection. * Exception listener callback that renews the underlying single Connection.
* @see #resetConnection()
*/ */
@Override @Override
public void onException(JMSException ex) { public void onException(JMSException ex) {
...@@ -315,6 +328,7 @@ public class SingleConnectionFactory ...@@ -315,6 +328,7 @@ public class SingleConnectionFactory
* The provider of this ConnectionFactory needs to care for proper shutdown. * The provider of this ConnectionFactory needs to care for proper shutdown.
* <p>As this bean implements DisposableBean, a bean factory will * <p>As this bean implements DisposableBean, a bean factory will
* automatically invoke this on destruction of its cached singletons. * automatically invoke this on destruction of its cached singletons.
* @see #resetConnection()
*/ */
@Override @Override
public void destroy() { public void destroy() {
...@@ -323,13 +337,13 @@ public class SingleConnectionFactory ...@@ -323,13 +337,13 @@ public class SingleConnectionFactory
/** /**
* Reset the underlying shared Connection, to be reinitialized on next access. * Reset the underlying shared Connection, to be reinitialized on next access.
* @see #closeConnection
*/ */
public void resetConnection() { public void resetConnection() {
synchronized (this.connectionMonitor) { synchronized (this.connectionMonitor) {
if (this.target != null) { if (this.connection != null) {
closeConnection(this.target); closeConnection(this.connection);
} }
this.target = null;
this.connection = null; this.connection = null;
} }
} }
...@@ -365,10 +379,18 @@ public class SingleConnectionFactory ...@@ -365,10 +379,18 @@ public class SingleConnectionFactory
if (getClientId() != null) { if (getClientId() != null) {
con.setClientID(getClientId()); con.setClientID(getClientId());
} }
if (getExceptionListener() != null || isReconnectOnException()) { if (this.aggregatedExceptionListener != null) {
con.setExceptionListener(this.aggregatedExceptionListener);
}
else if (getExceptionListener() != null || isReconnectOnException()) {
ExceptionListener listenerToUse = getExceptionListener(); ExceptionListener listenerToUse = getExceptionListener();
if (isReconnectOnException()) { if (isReconnectOnException()) {
listenerToUse = new InternalChainedExceptionListener(this, listenerToUse); this.aggregatedExceptionListener = new AggregatedExceptionListener();
this.aggregatedExceptionListener.delegates.add(this);
if (listenerToUse != null) {
this.aggregatedExceptionListener.delegates.add(listenerToUse);
}
listenerToUse = this.aggregatedExceptionListener;
} }
con.setExceptionListener(listenerToUse); con.setExceptionListener(listenerToUse);
} }
...@@ -422,12 +444,11 @@ public class SingleConnectionFactory ...@@ -422,12 +444,11 @@ public class SingleConnectionFactory
*/ */
protected void closeConnection(Connection con) { protected void closeConnection(Connection con) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Closing shared JMS Connection: " + this.target); logger.debug("Closing shared JMS Connection: " + con);
} }
try { try {
try { try {
if (this.started) { if (this.startedCount > 0) {
this.started = false;
con.stop(); con.stop();
} }
} }
...@@ -463,7 +484,7 @@ public class SingleConnectionFactory ...@@ -463,7 +484,7 @@ public class SingleConnectionFactory
return (Connection) Proxy.newProxyInstance( return (Connection) Proxy.newProxyInstance(
Connection.class.getClassLoader(), Connection.class.getClassLoader(),
classes.toArray(new Class<?>[classes.size()]), classes.toArray(new Class<?>[classes.size()]),
new SharedConnectionInvocationHandler(target)); new SharedConnectionInvocationHandler());
} }
...@@ -472,28 +493,34 @@ public class SingleConnectionFactory ...@@ -472,28 +493,34 @@ public class SingleConnectionFactory
*/ */
private class SharedConnectionInvocationHandler implements InvocationHandler { private class SharedConnectionInvocationHandler implements InvocationHandler {
private final Connection target; private ExceptionListener localExceptionListener;
public SharedConnectionInvocationHandler(Connection target) { private boolean locallyStarted = false;
this.target = target;
}
@Override @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("equals")) { if (method.getName().equals("equals")) {
// Only consider equal when proxies are identical. Object other = args[0];
return (proxy == args[0]); if (proxy == other) {
return true;
}
if (other == null || !Proxy.isProxyClass(other.getClass())) {
return false;
}
InvocationHandler otherHandler = Proxy.getInvocationHandler(other);
return (otherHandler instanceof SharedConnectionInvocationHandler &&
factory() == ((SharedConnectionInvocationHandler) otherHandler).factory());
} }
else if (method.getName().equals("hashCode")) { else if (method.getName().equals("hashCode")) {
// Use hashCode of Connection proxy. // Use hashCode of containing SingleConnectionFactory.
return System.identityHashCode(proxy); return System.identityHashCode(factory());
} }
else if (method.getName().equals("toString")) { else if (method.getName().equals("toString")) {
return "Shared JMS Connection: " + this.target; return "Shared JMS Connection: " + getConnection();
} }
else if (method.getName().equals("setClientID")) { else if (method.getName().equals("setClientID")) {
// Handle setClientID method: throw exception if not compatible. // Handle setClientID method: throw exception if not compatible.
String currentClientId = this.target.getClientID(); String currentClientId = getConnection().getClientID();
if (currentClientId != null && currentClientId.equals(args[0])) { if (currentClientId != null && currentClientId.equals(args[0])) {
return null; return null;
} }
...@@ -505,35 +532,57 @@ public class SingleConnectionFactory ...@@ -505,35 +532,57 @@ public class SingleConnectionFactory
} }
else if (method.getName().equals("setExceptionListener")) { else if (method.getName().equals("setExceptionListener")) {
// Handle setExceptionListener method: add to the chain. // Handle setExceptionListener method: add to the chain.
ExceptionListener currentExceptionListener = this.target.getExceptionListener(); synchronized (connectionMonitor) {
if (currentExceptionListener instanceof InternalChainedExceptionListener && args[0] != null) { if (aggregatedExceptionListener != null) {
((InternalChainedExceptionListener) currentExceptionListener).addDelegate((ExceptionListener) args[0]); ExceptionListener listener = (ExceptionListener) args[0];
return null; if (listener != this.localExceptionListener) {
} if (this.localExceptionListener != null) {
else { aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
throw new javax.jms.IllegalStateException( }
"setExceptionListener call not supported on proxy for shared Connection. " + if (listener != null) {
"Set the 'exceptionListener' property on the SingleConnectionFactory instead. " + aggregatedExceptionListener.delegates.add(listener);
"Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " + }
"which will allow for registering further ExceptionListeners to the recovery chain."); this.localExceptionListener = listener;
}
return null;
}
else {
throw new javax.jms.IllegalStateException(
"setExceptionListener call not supported on proxy for shared Connection. " +
"Set the 'exceptionListener' property on the SingleConnectionFactory instead. " +
"Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " +
"which will allow for registering further ExceptionListeners to the recovery chain.");
}
} }
} }
else if (method.getName().equals("start")) { else if (method.getName().equals("getExceptionListener")) {
// Handle start method: track started state.
synchronized (connectionMonitor) { synchronized (connectionMonitor) {
if (!started) { if (this.localExceptionListener != null) {
this.target.start(); return this.localExceptionListener;
started = true; }
else {
return getExceptionListener();
} }
} }
}
else if (method.getName().equals("start")) {
localStart();
return null; return null;
} }
else if (method.getName().equals("stop")) { else if (method.getName().equals("stop")) {
// Handle stop method: don't pass the call on. localStop();
return null; return null;
} }
else if (method.getName().equals("close")) { else if (method.getName().equals("close")) {
// Handle close method: don't pass the call on. localStop();
synchronized (connectionMonitor) {
if (this.localExceptionListener != null) {
if (aggregatedExceptionListener != null) {
aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
}
this.localExceptionListener = null;
}
}
return null; return null;
} }
else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") || else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
...@@ -552,7 +601,7 @@ public class SingleConnectionFactory ...@@ -552,7 +601,7 @@ public class SingleConnectionFactory
mode = (transacted ? Session.SESSION_TRANSACTED : ackMode); mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
} }
} }
Session session = getSession(this.target, mode); Session session = getSession(getConnection(), mode);
if (session != null) { if (session != null) {
if (!method.getReturnType().isInstance(session)) { if (!method.getReturnType().isInstance(session)) {
String msg = "JMS Session does not implement specific domain: " + session; String msg = "JMS Session does not implement specific domain: " + session;
...@@ -568,42 +617,61 @@ public class SingleConnectionFactory ...@@ -568,42 +617,61 @@ public class SingleConnectionFactory
} }
} }
try { try {
Object retVal = method.invoke(this.target, args); return method.invoke(getConnection(), args);
if (method.getName().equals("getExceptionListener") && retVal instanceof InternalChainedExceptionListener) {
// Handle getExceptionListener method: hide internal chain.
InternalChainedExceptionListener listener = (InternalChainedExceptionListener) retVal;
return listener.getUserListener();
}
else {
return retVal;
}
} }
catch (InvocationTargetException ex) { catch (InvocationTargetException ex) {
throw ex.getTargetException(); throw ex.getTargetException();
} }
} }
private void localStart() throws JMSException {
synchronized (connectionMonitor) {
if (!this.locallyStarted) {
this.locallyStarted = true;
if (startedCount == 0 && connection != null) {
connection.start();
}
startedCount++;
}
}
}
private void localStop() throws JMSException {
synchronized (connectionMonitor) {
if (this.locallyStarted) {
this.locallyStarted = false;
if (startedCount == 1 && connection != null) {
connection.stop();
}
if (startedCount > 0) {
startedCount--;
}
}
}
}
private SingleConnectionFactory factory() {
return SingleConnectionFactory.this;
}
} }
/** /**
* Internal chained ExceptionListener for handling the internal recovery listener * Internal aggregated ExceptionListener for handling the internal
* in combination with a user-specified listener. * recovery listener in combination with user-specified listeners.
*/ */
private static class InternalChainedExceptionListener extends ChainedExceptionListener { private class AggregatedExceptionListener implements ExceptionListener {
private ExceptionListener userListener; final Set<ExceptionListener> delegates = new LinkedHashSet<ExceptionListener>(2);
public InternalChainedExceptionListener(ExceptionListener internalListener, ExceptionListener userListener) { @Override
addDelegate(internalListener); public void onException(JMSException ex) {
if (userListener != null) { synchronized (connectionMonitor) {
addDelegate(userListener); for (ExceptionListener listener : this.delegates) {
this.userListener = userListener; listener.onException(ex);
}
} }
} }
public ExceptionListener getUserListener() {
return this.userListener;
}
} }
} }
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -46,16 +46,16 @@ public class SingleConnectionFactoryTests { ...@@ -46,16 +46,16 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(con); SingleConnectionFactory scf = new SingleConnectionFactory(con);
Connection con1 = scf.createConnection(); Connection con1 = scf.createConnection();
con1.start(); con1.start();
con1.stop(); // should be ignored con1.stop();
con1.close(); // should be ignored con1.close();
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
con2.start(); // should be ignored con2.start();
con2.stop(); // should be ignored con2.stop();
con2.close(); // should be ignored con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con, times(2)).start();
verify(con).stop(); verify(con, times(2)).stop();
verify(con).close(); verify(con).close();
verifyNoMoreInteractions(con); verifyNoMoreInteractions(con);
} }
...@@ -67,16 +67,16 @@ public class SingleConnectionFactoryTests { ...@@ -67,16 +67,16 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(con); SingleConnectionFactory scf = new SingleConnectionFactory(con);
QueueConnection con1 = scf.createQueueConnection(); QueueConnection con1 = scf.createQueueConnection();
con1.start(); con1.start();
con1.stop(); // should be ignored con1.stop();
con1.close(); // should be ignored con1.close();
QueueConnection con2 = scf.createQueueConnection(); QueueConnection con2 = scf.createQueueConnection();
con2.start(); con2.start();
con2.stop(); // should be ignored con2.stop();
con2.close(); // should be ignored con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con, times(2)).start();
verify(con).stop(); verify(con, times(2)).stop();
verify(con).close(); verify(con).close();
verifyNoMoreInteractions(con); verifyNoMoreInteractions(con);
} }
...@@ -88,16 +88,16 @@ public class SingleConnectionFactoryTests { ...@@ -88,16 +88,16 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(con); SingleConnectionFactory scf = new SingleConnectionFactory(con);
TopicConnection con1 = scf.createTopicConnection(); TopicConnection con1 = scf.createTopicConnection();
con1.start(); con1.start();
con1.stop(); // should be ignored con1.stop();
con1.close(); // should be ignored con1.close();
TopicConnection con2 = scf.createTopicConnection(); TopicConnection con2 = scf.createTopicConnection();
con2.start(); con2.start();
con2.stop(); // should be ignored con2.stop();
con2.close(); // should be ignored con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con, times(2)).start();
verify(con).stop(); verify(con, times(2)).stop();
verify(con).close(); verify(con).close();
verifyNoMoreInteractions(con); verifyNoMoreInteractions(con);
} }
...@@ -111,11 +111,11 @@ public class SingleConnectionFactoryTests { ...@@ -111,11 +111,11 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection(); Connection con1 = scf.createConnection();
con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
con1.start();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con).start();
...@@ -133,11 +133,11 @@ public class SingleConnectionFactoryTests { ...@@ -133,11 +133,11 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection(); Connection con1 = scf.createConnection();
con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
con1.start();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con).start();
...@@ -155,11 +155,11 @@ public class SingleConnectionFactoryTests { ...@@ -155,11 +155,11 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createQueueConnection(); Connection con1 = scf.createQueueConnection();
con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createQueueConnection(); Connection con2 = scf.createQueueConnection();
con1.start();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con).start();
...@@ -177,11 +177,11 @@ public class SingleConnectionFactoryTests { ...@@ -177,11 +177,11 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection(); Connection con1 = scf.createConnection();
con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
con1.start();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con).start();
...@@ -199,11 +199,11 @@ public class SingleConnectionFactoryTests { ...@@ -199,11 +199,11 @@ public class SingleConnectionFactoryTests {
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection(); Connection con1 = scf.createTopicConnection();
con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createTopicConnection(); Connection con2 = scf.createTopicConnection();
con1.start();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).start(); verify(con).start();
...@@ -212,21 +212,52 @@ public class SingleConnectionFactoryTests { ...@@ -212,21 +212,52 @@ public class SingleConnectionFactoryTests {
verifyNoMoreInteractions(con); verifyNoMoreInteractions(con);
} }
@Test
public void testWithConnectionAggregatedStartStop() throws JMSException {
Connection con = mock(Connection.class);
SingleConnectionFactory scf = new SingleConnectionFactory(con);
Connection con1 = scf.createConnection();
con1.start();
verify(con).start();
con1.stop();
verify(con).stop();
Connection con2 = scf.createConnection();
con2.start();
verify(con, times(2)).start();
con2.stop();
verify(con, times(2)).stop();
con2.start();
verify(con, times(3)).start();
con1.start();
con2.stop();
con1.stop();
verify(con, times(3)).stop();
con1.start();
verify(con, times(4)).start();
con1.close();
verify(con, times(4)).stop();
con2.close();
scf.destroy();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test @Test
public void testWithConnectionFactoryAndClientId() throws JMSException { public void testWithConnectionFactoryAndClientId() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class); ConnectionFactory cf = mock(ConnectionFactory.class);
Connection con = mock(Connection.class); Connection con = mock(Connection.class);
given(cf.createConnection()).willReturn(con); given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
scf.setClientId("myId"); scf.setClientId("myId");
Connection con1 = scf.createConnection(); Connection con1 = scf.createConnection();
con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
con1.start();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).setClientID("myId"); verify(con).setClientID("myId");
...@@ -250,17 +281,17 @@ public class SingleConnectionFactoryTests { ...@@ -250,17 +281,17 @@ public class SingleConnectionFactoryTests {
Connection con1 = scf.createConnection(); Connection con1 = scf.createConnection();
assertEquals(listener, con1.getExceptionListener()); assertEquals(listener, con1.getExceptionListener());
con1.start(); con1.start();
con1.stop(); // should be ignored con1.stop();
con1.close(); // should be ignored con1.close();
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
con2.start(); con2.start();
con2.stop(); // should be ignored con2.stop();
con2.close(); // should be ignored con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(con).setExceptionListener(listener); verify(con).setExceptionListener(listener);
verify(con).start(); verify(con, times(2)).start();
verify(con).stop(); verify(con, times(2)).stop();
verify(con).close(); verify(con).close();
} }
...@@ -268,7 +299,6 @@ public class SingleConnectionFactoryTests { ...@@ -268,7 +299,6 @@ public class SingleConnectionFactoryTests {
public void testWithConnectionFactoryAndReconnectOnException() throws JMSException { public void testWithConnectionFactoryAndReconnectOnException() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class); ConnectionFactory cf = mock(ConnectionFactory.class);
TestConnection con = new TestConnection(); TestConnection con = new TestConnection();
given(cf.createConnection()).willReturn(con); given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf); SingleConnectionFactory scf = new SingleConnectionFactory(cf);
...@@ -289,7 +319,6 @@ public class SingleConnectionFactoryTests { ...@@ -289,7 +319,6 @@ public class SingleConnectionFactoryTests {
public void testWithConnectionFactoryAndExceptionListenerAndReconnectOnException() throws JMSException { public void testWithConnectionFactoryAndExceptionListenerAndReconnectOnException() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class); ConnectionFactory cf = mock(ConnectionFactory.class);
TestConnection con = new TestConnection(); TestConnection con = new TestConnection();
given(cf.createConnection()).willReturn(con); given(cf.createConnection()).willReturn(con);
TestExceptionListener listener = new TestExceptionListener(); TestExceptionListener listener = new TestExceptionListener();
...@@ -309,6 +338,78 @@ public class SingleConnectionFactoryTests { ...@@ -309,6 +338,78 @@ public class SingleConnectionFactoryTests {
assertEquals(1, listener.getCount()); assertEquals(1, listener.getCount());
} }
@Test
public void testWithConnectionFactoryAndLocalExceptionListenerWithCleanup() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
TestConnection con = new TestConnection();
given(cf.createConnection()).willReturn(con);
TestExceptionListener listener0 = new TestExceptionListener();
TestExceptionListener listener1 = new TestExceptionListener();
TestExceptionListener listener2 = new TestExceptionListener();
SingleConnectionFactory scf = new SingleConnectionFactory(cf) {
@Override
public void onException(JMSException ex) {
// no-op
}
};
scf.setReconnectOnException(true);
scf.setExceptionListener(listener0);
Connection con1 = scf.createConnection();
con1.setExceptionListener(listener1);
assertSame(listener1, con1.getExceptionListener());
Connection con2 = scf.createConnection();
con2.setExceptionListener(listener2);
assertSame(listener2, con2.getExceptionListener());
con.getExceptionListener().onException(new JMSException(""));
con2.close();
con.getExceptionListener().onException(new JMSException(""));
con1.close();
con.getExceptionListener().onException(new JMSException(""));
scf.destroy(); // should trigger actual close
assertEquals(0, con.getStartCount());
assertEquals(1, con.getCloseCount());
assertEquals(3, listener0.getCount());
assertEquals(2, listener1.getCount());
assertEquals(1, listener2.getCount());
}
@Test
public void testWithConnectionFactoryAndLocalExceptionListenerWithReconnect() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
TestConnection con = new TestConnection();
given(cf.createConnection()).willReturn(con);
TestExceptionListener listener0 = new TestExceptionListener();
TestExceptionListener listener1 = new TestExceptionListener();
TestExceptionListener listener2 = new TestExceptionListener();
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
scf.setReconnectOnException(true);
scf.setExceptionListener(listener0);
Connection con1 = scf.createConnection();
con1.setExceptionListener(listener1);
assertSame(listener1, con1.getExceptionListener());
con1.start();
Connection con2 = scf.createConnection();
con2.setExceptionListener(listener2);
assertSame(listener2, con2.getExceptionListener());
con.getExceptionListener().onException(new JMSException(""));
con2.close();
con1.getMetaData();
con.getExceptionListener().onException(new JMSException(""));
con1.close();
scf.destroy(); // should trigger actual close
assertEquals(2, con.getStartCount());
assertEquals(2, con.getCloseCount());
assertEquals(2, listener0.getCount());
assertEquals(2, listener1.getCount());
assertEquals(1, listener2.getCount());
}
@Test @Test
public void testCachingConnectionFactory() throws JMSException { public void testCachingConnectionFactory() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class); ConnectionFactory cf = mock(ConnectionFactory.class);
...@@ -328,17 +429,17 @@ public class SingleConnectionFactoryTests { ...@@ -328,17 +429,17 @@ public class SingleConnectionFactoryTests {
session1.getTransacted(); session1.getTransacted();
session1.close(); // should lead to rollback session1.close(); // should lead to rollback
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE); session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close(); // should be ignored session1.close();
con1.start(); con1.start();
con1.close(); // should be ignored
Connection con2 = scf.createConnection(); Connection con2 = scf.createConnection();
Session session2 = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session2 = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close(); // should be ignored session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE); session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.commit(); session2.commit();
session2.close(); // should be ignored session2.close();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(txSession).commit(); verify(txSession).commit();
...@@ -366,19 +467,19 @@ public class SingleConnectionFactoryTests { ...@@ -366,19 +467,19 @@ public class SingleConnectionFactoryTests {
Connection con1 = scf.createQueueConnection(); Connection con1 = scf.createQueueConnection();
Session session1 = con1.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session1 = con1.createSession(true, Session.AUTO_ACKNOWLEDGE);
session1.rollback(); session1.rollback();
session1.close(); // should be ignored session1.close();
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE); session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close(); // should be ignored session1.close();
con1.start(); con1.start();
con1.close(); // should be ignored
QueueConnection con2 = scf.createQueueConnection(); QueueConnection con2 = scf.createQueueConnection();
Session session2 = con2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Session session2 = con2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close(); // should be ignored session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE); session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.getTransacted(); session2.getTransacted();
session2.close(); // should lead to rollback session2.close(); // should lead to rollback
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(txSession).rollback(); verify(txSession).rollback();
...@@ -408,17 +509,17 @@ public class SingleConnectionFactoryTests { ...@@ -408,17 +509,17 @@ public class SingleConnectionFactoryTests {
session1.getTransacted(); session1.getTransacted();
session1.close(); // should lead to rollback session1.close(); // should lead to rollback
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE); session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close(); // should be ignored session1.close();
con1.start(); con1.start();
con1.close(); // should be ignored
TopicConnection con2 = scf.createTopicConnection(); TopicConnection con2 = scf.createTopicConnection();
Session session2 = con2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); Session session2 = con2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close(); // should be ignored session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE); session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.getTransacted(); session2.getTransacted();
session2.close(); // should be ignored session2.close();
con2.start(); con2.start();
con2.close(); // should be ignored con1.close();
con2.close();
scf.destroy(); // should trigger actual close scf.destroy(); // should trigger actual close
verify(txSession).close(); verify(txSession).close();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册