diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 06918086ff1a8cc06240569b8d9c33176ed98660..76752529af24f021c6860fb90866f629d4a8a75b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -257,14 +257,13 @@ public abstract class NettyRemotingAbstract { if (responseFuture != null) { responseFuture.setResponseCommand(cmd); - responseFuture.release(); - responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); + responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); @@ -287,6 +286,8 @@ public abstract class NettyRemotingAbstract { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); + } finally { + responseFuture.release(); } } }); @@ -303,6 +304,8 @@ public abstract class NettyRemotingAbstract { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); + } finally { + responseFuture.release(); } } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java new file mode 100644 index 0000000000000000000000000000000000000000..99aab9e075728639f496747d6d2c55c81a7dcd3f --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.netty; + +import java.util.concurrent.Semaphore; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class NettyRemotingAbstractTest { + @Spy + private NettyRemotingAbstract remotingAbstract = new NettyRemotingClient(new NettyClientConfig()); + + @Test + public void testProcessResponseCommand() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + ResponseFuture responseFuture = new ResponseFuture(1, 3000, new InvokeCallback() { + @Override + public void operationComplete(final ResponseFuture responseFuture) { + assertThat(semaphore.availablePermits()).isEqualTo(0); + } + }, new SemaphoreReleaseOnlyOnce(semaphore)); + + remotingAbstract.responseTable.putIfAbsent(1, responseFuture); + + RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo"); + response.setOpaque(1); + remotingAbstract.processResponseCommand(null, response); + + // Acquire the release permit after call back + semaphore.acquire(1); + assertThat(semaphore.availablePermits()).isEqualTo(0); + } + + @Test + public void testProcessResponseCommand_NullCallBack() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + ResponseFuture responseFuture = new ResponseFuture(1, 3000, null, + new SemaphoreReleaseOnlyOnce(semaphore)); + + remotingAbstract.responseTable.putIfAbsent(1, responseFuture); + + RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo"); + response.setOpaque(1); + remotingAbstract.processResponseCommand(null, response); + + assertThat(semaphore.availablePermits()).isEqualTo(1); + } + + @Test + public void testProcessResponseCommand_RunCallBackInCurrentThread() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + ResponseFuture responseFuture = new ResponseFuture(1, 3000, new InvokeCallback() { + @Override + public void operationComplete(final ResponseFuture responseFuture) { + assertThat(semaphore.availablePermits()).isEqualTo(0); + } + }, new SemaphoreReleaseOnlyOnce(semaphore)); + + remotingAbstract.responseTable.putIfAbsent(1, responseFuture); + when(remotingAbstract.getCallbackExecutor()).thenReturn(null); + + RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo"); + response.setOpaque(1); + remotingAbstract.processResponseCommand(null, response); + + // Acquire the release permit after call back finished in current thread + semaphore.acquire(1); + assertThat(semaphore.availablePermits()).isEqualTo(0); + } +} \ No newline at end of file