diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index e4a6e3b6806c40c159a16e33725055913ada435f..74fdabc21c9163573406d23f3be8881217873bea 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -51,6 +51,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; +import org.omg.PortableServer.RequestProcessingPolicy; public abstract class NettyRemotingAbstract { @@ -200,7 +201,6 @@ public abstract class NettyRemotingAbstract { public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { @@ -221,7 +221,15 @@ public abstract class NettyRemotingAbstract { } } }; - processor.asyncProcessRequest(ctx, cmd, callback); + if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { + AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); + processor.asyncProcessRequest(ctx, cmd, callback); + } else { + NettyRequestProcessor processor = pair.getObject1(); + RemotingCommand response = processor.processRequest(ctx, cmd); + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + callback.callback(response); + } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString());