提交 36cec933 编写于 作者: D duhenglucky

Add async response interface and implementation

上级 9f5383e5
...@@ -50,6 +50,7 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -50,6 +50,7 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private boolean unitMode = false; private boolean unitMode = false;
@CFNullable @CFNullable
private boolean batch = false; private boolean batch = false;
private Integer maxReconsumeTimes; private Integer maxReconsumeTimes;
private String enodeAddr; private String enodeAddr;
......
...@@ -100,8 +100,8 @@ ...@@ -100,8 +100,8 @@
<maven.test.skip>false</maven.test.skip> <maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip> <maven.javadoc.skip>true</maven.javadoc.skip>
<!-- Compiler settings properties --> <!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- Exclude all generated code --> <!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath> <sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting; package org.apache.rocketmq.remoting;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
...@@ -53,4 +54,5 @@ public interface RemotingServer extends RemotingService { ...@@ -53,4 +54,5 @@ public interface RemotingServer extends RemotingService {
RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener); RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
void sendResponse(final ChannelHandlerContext channel, RemotingCommand remotingCommand);
} }
...@@ -4,6 +4,7 @@ import io.netty.bootstrap.ServerBootstrap; ...@@ -4,6 +4,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
...@@ -240,4 +241,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -240,4 +241,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
return this.channelEventListener; return this.channelEventListener;
} }
@Override
public void sendResponse(ChannelHandlerContext channel, RemotingCommand remotingCommand) {
}
} }
...@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; ...@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
...@@ -344,4 +345,25 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -344,4 +345,25 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
public void push(String addr, String sessionId, RemotingCommand remotingCommand) { public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
} }
@Override
public void sendResponse(ChannelHandlerContext channel, RemotingCommand response) {
if (response != null) {
response.markResponseType();
try {
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(response.toString());
}
}
}
} }
...@@ -135,9 +135,9 @@ public class SnodeController { ...@@ -135,9 +135,9 @@ public class SnodeController {
} }
public void registerProcessor() { public void registerProcessor() {
snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this.snodeOuterService), sendMessageExcutor); snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this), sendMessageExcutor);
snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor); snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this.snodeOuterService), pullMessageExcutor); snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this), pullMessageExcutor);
snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor); snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
} }
......
package org.apache.rocketmq.snode.service.impl;/* package org.apache.rocketmq.snode.constant;/*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -15,31 +15,11 @@ package org.apache.rocketmq.snode.service.impl;/* ...@@ -15,31 +15,11 @@ package org.apache.rocketmq.snode.service.impl;/*
* limitations under the License. * limitations under the License.
*/ */
import org.apache.rocketmq.common.ServiceState; public class SnodeConstant {
import org.apache.rocketmq.remoting.protocol.RemotingCommand; public static final long heartbeatTimeout = 3000;
import org.apache.rocketmq.snode.service.SendTransferService;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class SendTransferServiceImpl implements SendTransferService { public static final long oneWaytimeout = 10;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private SnodeOuterService snodeOuterService;
public SendTransferServiceImpl(SnodeOuterService snodeOuterService) { public static final long defaultTimeoutMills = 3000L;
snodeOuterService = snodeOuterService;
}
@Override
public RemotingCommand sendMessage(RemotingCommand request) {
return snodeOuterService.sendMessage(request);
}
@Override
public boolean start() {
return false;
}
@Override
public void shutdown() {
}
} }
...@@ -80,11 +80,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { ...@@ -80,11 +80,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response.setRemark(null); response.setRemark(null);
return response; return response;
} else { } else {
log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(), log.warn("Get all client failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())); RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
} }
} else { } else {
log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(), log.warn("GetConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())); RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
} }
......
package org.apache.rocketmq.snode.processor;/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -14,16 +14,13 @@ package org.apache.rocketmq.snode.processor;/* ...@@ -14,16 +14,13 @@ package org.apache.rocketmq.snode.processor;/*
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
......
...@@ -16,21 +16,34 @@ package org.apache.rocketmq.snode.processor;/* ...@@ -16,21 +16,34 @@ package org.apache.rocketmq.snode.processor;/*
*/ */
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.SnodeOuterService; import org.apache.rocketmq.snode.SnodeController;
public class PullMessageProcessor implements NettyRequestProcessor { public class PullMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeOuterService snodeOuterService; private final SnodeController snodeController;
public PullMessageProcessor(SnodeOuterService snodeOuterService){ public PullMessageProcessor(SnodeController snodeController) {
this.snodeOuterService = snodeOuterService; this.snodeController = snodeController;
} }
@Override @Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
return snodeOuterService.pullMessage(request); CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().pullMessage(ctx, request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
this.snodeController.getSnodeServer().sendResponse(ctx, data);
} else {
log.error("Pull message error: {}", ex);
}
});
return null;
} }
@Override @Override
......
...@@ -16,25 +16,34 @@ package org.apache.rocketmq.snode.processor;/* ...@@ -16,25 +16,34 @@ package org.apache.rocketmq.snode.processor;/*
*/ */
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.SnodeOuterService; import org.apache.rocketmq.snode.SnodeController;
public class SendMessageProcessor implements NettyRequestProcessor { public class SendMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeOuterService snodeOuterService; private final SnodeController snodeController;
public SendMessageProcessor(final SnodeOuterService snodeOuterService) { public SendMessageProcessor(final SnodeController snodeController) {
this.snodeOuterService = snodeOuterService; this.snodeController = snodeController;
} }
@Override @Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
return snodeOuterService.sendMessage(request); CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().sendMessage(request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
snodeController.getSnodeServer().sendResponse(ctx, data);
} else {
log.error("Send Message error: {}", ex);
}
});
return null;
} }
@Override @Override
......
...@@ -16,6 +16,9 @@ package org.apache.rocketmq.snode.service;/* ...@@ -16,6 +16,9 @@ package org.apache.rocketmq.snode.service;/*
*/ */
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.CompleteFuture;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
...@@ -27,9 +30,10 @@ import org.apache.rocketmq.snode.config.SnodeConfig; ...@@ -27,9 +30,10 @@ import org.apache.rocketmq.snode.config.SnodeConfig;
public interface SnodeOuterService { public interface SnodeOuterService {
void sendHearbeat(RemotingCommand remotingCommand); void sendHearbeat(RemotingCommand remotingCommand);
RemotingCommand sendMessage(RemotingCommand remotingCommand); CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
RemotingCommand pullMessage(RemotingCommand remotingCommand); CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
final RemotingCommand remotingCommand);
void saveSubscriptionData(RemotingCommand remotingCommand); void saveSubscriptionData(RemotingCommand remotingCommand);
......
...@@ -16,11 +16,14 @@ package org.apache.rocketmq.snode.service.impl;/* ...@@ -16,11 +16,14 @@ package org.apache.rocketmq.snode.service.impl;/*
*/ */
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.CompleteFuture;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
...@@ -31,22 +34,23 @@ import org.apache.rocketmq.common.namesrv.TopAddressing; ...@@ -31,22 +34,23 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory; import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.SnodeOuterService; import org.apache.rocketmq.snode.service.SnodeOuterService;
public class SnodeOuterServiceImpl implements SnodeOuterService { public class SnodeOuterServiceImpl implements SnodeOuterService {
...@@ -58,7 +62,6 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -58,7 +62,6 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
private static SnodeOuterServiceImpl snodeOuterService; private static SnodeOuterServiceImpl snodeOuterService;
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable = private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final long defaultTimeoutMills = 3000L;
private SnodeOuterServiceImpl() { private SnodeOuterServiceImpl() {
...@@ -97,7 +100,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -97,7 +100,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
String enodeAddr = entry.getValue().get(MixAll.MASTER_ID); String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
if (enodeAddr != null) { if (enodeAddr != null) {
try { try {
RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, defaultTimeoutMills); RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex); log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
} }
...@@ -106,27 +109,19 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -106,27 +109,19 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
} }
@Override @Override
public RemotingCommand sendMessage(RemotingCommand request) { public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
try { RemotingCommand request) {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
RemotingCommand response =
this.client.invokeSync(sendMessageRequestHeaderV2.getN(), request, defaultTimeoutMills);
return response;
} catch (Exception ex) {
log.error("Send message async error:", ex);
}
return null;
}
@Override
public RemotingCommand pullMessage(RemotingCommand request) {
try { try {
final PullMessageRequestHeader requestHeader = final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
RemotingCommand remotingCommand = this.client.invokeSync(requestHeader.getEnodeAddr(), request, 20 * defaultTimeoutMills); this.client.invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.defaultTimeoutMills, new InvokeCallback() {
log.info("Pull message response:{}", remotingCommand); @Override
log.info("Pull message response:{}", remotingCommand.getBody().length); public void operationComplete(ResponseFuture responseFuture) {
return remotingCommand; RemotingCommand response = responseFuture.getResponseCommand();
snodeController.getSnodeServer().sendResponse(context, response);
}
});
return null;
} catch (Exception ex) { } catch (Exception ex) {
log.error("pull message async error:", ex); log.error("pull message async error:", ex);
} }
...@@ -176,7 +171,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -176,7 +171,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException, public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingSendRequestException, RemotingConnectException, MQBrokerException {
synchronized (this) { synchronized (this) {
ClusterInfo clusterInfo = getBrokerClusterInfo(defaultTimeoutMills); ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
if (clusterInfo != null) { if (clusterInfo != null) {
HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable(); HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) { for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
...@@ -212,7 +207,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -212,7 +207,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
if (nameServerAddressList != null && nameServerAddressList.size() > 0) { if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
for (String nameServer : nameServerAddressList) { for (String nameServer : nameServerAddressList) {
try { try {
this.client.invokeSync(nameSrvAddr, remotingCommand, 3000L); this.client.invokeSync(nameSrvAddr, remotingCommand, SnodeConstant.heartbeatTimeout);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex); log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
} }
...@@ -220,6 +215,21 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -220,6 +215,21 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
} }
} }
@Override
public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
this.client.invokeAsync(sendMessageRequestHeaderV2.getN(), request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
future.complete(responseFuture.getResponseCommand());
});
} catch (Exception ex) {
log.error("Send message async error:{}", ex);
future.completeExceptionally(ex);
}
return future;
}
@Override @Override
public void notifyConsumerIdsChanged( public void notifyConsumerIdsChanged(
final Channel channel, final Channel channel,
...@@ -235,7 +245,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService { ...@@ -235,7 +245,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try { try {
this.snodeController.getSnodeServer().invokeOneway(channel, request, 10); this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
} catch (Exception e) { } catch (Exception e) {
log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册