diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 85009d620f573ab97adb149dd417a86237bf3d92..194f2850fa57274d6a620c72c403ba1c5f1868dd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1240,6 +1240,7 @@ public class BrokerController { } } - - + public ExecutorService getSendMessageExecutor() { + return sendMessageExecutor; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index dac4c27856c7064863c20cdaada265ed37012fe0..20b5cfaae829351d6d89496540f9bd5bb71d14f1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -79,7 +79,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement @Override public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { - asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback); + asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor()); } public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,