From 4c87167c9852fc9213c4d9d59d2d8c5e024dcc95 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 17 Jun 2016 16:54:54 -0400 Subject: [PATCH] Add heartbeat lock to SockJS server sessions Even before this change SockJS sessions always cancelled the heartbeat task first prior to sending messages. However when the heartbeat task is already in progress, cancellation of it is not enough and we must wait until the heartbeat is sent. This commit adds a heartbeat write lock which is obtained and held during the sending of a heartbeat. Now when sessions send a message they still cancel the heartbeat task but if that fails they also wait for the heartbeat write lock. Issue: SPR-14356 --- .../session/AbstractSockJsSession.java | 40 +++++++++++++++---- .../transport/session/SockJsSessionTests.java | 7 ++-- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java index 90351ba51b..f2fbadeef5 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -106,6 +108,8 @@ public abstract class AbstractSockJsSession implements SockJsSession { private volatile ScheduledFuture heartbeatTask; + private final Lock heartbeatLock = new ReentrantLock(); + private volatile boolean heartbeatDisabled; @@ -246,8 +250,15 @@ public abstract class AbstractSockJsSession implements SockJsSession { public void sendHeartbeat() throws SockJsTransportFailureException { if (isActive()) { - writeFrame(SockJsFrame.heartbeatFrame()); - scheduleHeartbeat(); + if (heartbeatLock.tryLock()) { + try { + writeFrame(SockJsFrame.heartbeatFrame()); + scheduleHeartbeat(); + } + finally { + heartbeatLock.unlock(); + } + } } } @@ -282,13 +293,26 @@ public abstract class AbstractSockJsSession implements SockJsSession { try { ScheduledFuture task = this.heartbeatTask; this.heartbeatTask = null; + if (task == null || task.isCancelled()) { + return; + } - if ((task != null) && !task.isDone()) { - if (logger.isTraceEnabled()) { - logger.trace("Cancelling heartbeat in session " + getId()); - } - task.cancel(false); + if (logger.isTraceEnabled()) { + logger.trace("Cancelling heartbeat in session " + getId()); + } + if (task.cancel(false)) { + return; + } + + if (logger.isTraceEnabled()) { + logger.trace("Failed to cancel heartbeat, acquiring heartbeat write lock."); + } + this.heartbeatLock.lock(); + + if (logger.isTraceEnabled()) { + logger.trace("Releasing heartbeat lock."); } + this.heartbeatLock.unlock(); } catch (Throwable ex) { logger.debug("Failure while cancelling heartbeat in session " + getId(), ex); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java index 9b699b4089..8741ef315e 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -287,11 +287,12 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests