diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 85009d620f573ab97adb149dd417a86237bf3d92..194f2850fa57274d6a620c72c403ba1c5f1868dd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1240,6 +1240,7 @@ public class BrokerController { } } - - + public ExecutorService getSendMessageExecutor() { + return sendMessageExecutor; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 801d886c43be168a4bb37dabbf32d0900f97fe12..4dc311db913b0189c8702d2b2bd6f7bed01821d2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -79,7 +79,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement @Override public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { - asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback); + asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor()); } public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,