Description
I have a problem with reconnection when SingleConnectionFactory
(CachingConnectionFactory
) is used.
I think it is a similar (or same?) problem reported in #23058 issue.
From all sources, which I have (logs, heap dump ...), it seems that problem is caused due missing Exception Listener
on connection (ActiveMQConnection
), which is stored on SingleConnectionFactory
.
I have following environment:
- Apache ActiveMQ Broker 5.16.2
failover:(tcp://localhost:61616)?maxReconnectAttempts=5
SingleConnectionFactory
(CachingConnectionFactory
)setReconnectOnException(true)
Logs
- it is visible that "resetting the underlying JMS Connection" on
CachingConnectionFactory
is invoked - but suddenly these logs are no longer present and only logs from
DefaultMessageListenerContainer
are present- I know that it is not recommended to use DMLC with
CachingConnectionFactory
, but please stay with me for a moment (because I think the problem is inSingleConnectionFactory
)
- I know that it is not recommended to use DMLC with
2022-09-06T05:16:06.024+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:18:37.210+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:21:08.254+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:23:39.319+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:26:15.367+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:26:25.498+00:00 {taskExecutor-151} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: The JMS connection has failed: ...cluster.local
2022-09-06T05:26:30.499+00:00 {taskExecutor-151} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: The JMS
...
2022-09-06T05:46:30.559+00:00 {taskExecutor-142} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=242, maxAttempts=unlimited}. Cause: The JMS connection has failed: ...cluster.local
2022-09-06T05:46:35.559+00:00 {taskExecutor-151} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=243, maxAttempts=unlimited}. Cause: The JMS connection has failed: ...cluster.local
...
HeapDump
- I see only one
CachingConnectionFactory
- I see
SingleConnectionFactory$AggregatedExceptionListener
inaggregatedExceptionListener
- I see
ActiveMQConnection
underconnection
field- I see that
exceptionListener
(onActiveMQConnection
) isnull
(ExceptionListener is missing - but this exception listener must be present to invoke reset of underlying JMS Connection stored onSingleConnectionFactory
) - I see that
transportFailed
(onActiveMQConnection
) istrue
(it is a probably a reason why ExceptionListener is missing, see bellow) - I see that
firstFailureError
(onActiveMQConnection
) isjava.net.UnknownHostException
(but type of the failure is not too important)
- I see that
- I see
Debugger
- I am able to simulate the problem in debugger
- It is enough to "delay" thread, which invokes
SingleConnectionFactory.initConnection()
- after new connection is created
- but before connection is prepared, where exception listener is established
Code (SingleConnectionFactory.initConnection()
)
public void initConnection() throws JMSException {
if (getTargetConnectionFactory() == null) {
throw new IllegalStateException(
"'targetConnectionFactory' is required for lazily initializing a Connection");
}
synchronized (this.connectionMonitor) {
if (this.connection != null) {
closeConnection(this.connection);
}
this.connection = doCreateConnection();
prepareConnection(this.connection);
if (this.startedCount > 0) {
this.connection.start();
}
if (logger.isDebugEnabled()) {
logger.debug("Established shared JMS Connection: " + this.connection);
}
}
}
Why ExceptionListener on JMS Connection stored on SingleConnectionFactory
is missing?
- please suppose following situation
- the new connection is created successfully (
this.connection = doCreateConnection();
) - but some of subsequent methods (in my case
prepareConnection(this.connection)
) throwsJMSException
- we get new connection (stored on
SingleConnectionFactory
), but without JMS Exception Listener - this finally caused, that
SingleConnectionFactory.resetConnection()
is never invoked and "invalid" connection stays stored onSingleConnectionFactory
(till restart or "manual" invocation ofSingleConnectionFactory.resetConnection()
)
I know it is (nearly) impossible to get this situation, because setup of exception listener (in prepareConnection(Connection)
) should be (nearly) always done before invocation of ActiveMQConnection.onException(IOException)
method, which sets transportFailed
and firstFailureError
, which finally caused that ActiveMQConnection.setExceptionListener(ExceptionListener)
(invoked in prepareConnection(this.connection)
) throws ConnectionFailedException
, but I see this state in application heap dump (I am sorry).
My proposal is to change SingleConnectionFactory.initConnection()
in a way, when new connection is created and configured/prepared in method local field and when all is ok, then assign it to instance connection field. Something like this:
public void initConnection() throws JMSException {
...
synchronized (this.connectionMonitor) {
if (this.connection != null) {
closeConnection(this.connection);
}
Connection con = doCreateConnection();
try {
prepareConnection(con);
this.connection = con;
}
catch (JMSException ex) {
try {
con.close();
}
catch(Throwable th) {
logger.warn("Could not close new (but not used as shared) JMS Connection", th);
}
throw ex;
}
if (this.startedCount > 0) {
this.connection.start();
}
...
}
}