提交 41366dda 编写于 作者: D duhenglucky

Add async push logic after send message succeed

上级 6136e2d6
......@@ -248,14 +248,12 @@ public class BrokerController {
result = result && this.messageStore.load();
if (result) {
this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
ServerConfig fastConfig = (ServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
// this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.fastRemotingServer = RemotingServerFactory.createInstance();
this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService);
this.fastRemotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
......
......@@ -71,7 +71,7 @@ public class BrokerOuterAPI {
}
public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(nettyClientConfig, null);
this.remotingClient.registerRPCHook(rpcHook);
}
......
......@@ -100,7 +100,7 @@ public class PullMessageProcessor implements RequestProcessor {
response.setOpaque(request.getOpaque());
log.debug("receive PullMessage request command, {}", request);
log.info("receive PullMessage request command, {}", request);
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
......
......@@ -562,7 +562,6 @@ public class MQClientAPIImpl {
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
requestHeader.setEnodeAddr(addr);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
......@@ -58,6 +59,11 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
Set<Integer> queueIdSet = new HashSet<Integer>();
for (MessageQueue messageQueue : mqAll) {
queueIdSet.add(messageQueue.getQueueId());
}
subscriptionData.setQueueIdSet(queueIdSet);
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
......
......@@ -172,4 +172,6 @@ public class RequestCode {
public static final int SNODE_PULL_MESSAGE = 351;
public static final int SNODE_PUSH_MESSAGE = 352;
}
......@@ -48,14 +48,14 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
private Long subVersion;
private String expressionType;
private String enodeAddr;
private String enodeName;
public String getEnodeAddr() {
return enodeAddr;
public String getEnodeName() {
return enodeName;
}
public void setEnodeAddr(String enodeAddr) {
this.enodeAddr = enodeAddr;
public void setEnodeName(String enodeName) {
this.enodeName = enodeName;
}
@Override
......
......@@ -31,6 +31,7 @@ public class ConsumerData {
private ConsumeFromWhere consumeFromWhere;
private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
private boolean unitMode;
private boolean realPushEnable = true;
public String getGroupName() {
return groupName;
......@@ -80,10 +81,24 @@ public class ConsumerData {
this.unitMode = isUnitMode;
}
public boolean isRealPushEnable() {
return realPushEnable;
}
public void setRealPushEnable(boolean realPushEnable) {
this.realPushEnable = realPushEnable;
}
@Override
public String toString() {
return "ConsumerData [groupName=" + groupName + ", consumeType=" + consumeType + ", messageModel="
+ messageModel + ", consumeFromWhere=" + consumeFromWhere + ", unitMode=" + unitMode
+ ", subscriptionDataSet=" + subscriptionDataSet + "]";
return "ConsumerData{" +
"groupName='" + groupName + '\'' +
", consumeType=" + consumeType +
", messageModel=" + messageModel +
", consumeFromWhere=" + consumeFromWhere +
", subscriptionDataSet=" + subscriptionDataSet +
", unitMode=" + unitMode +
", realPushEnable=" + realPushEnable +
'}';
}
}
......@@ -38,8 +38,6 @@ public class SubscriptionGroupConfig {
private boolean notifyConsumerIdsChangedEnable = true;
private boolean realPushEnable = false;
public String getGroupName() {
return groupName;
}
......@@ -112,14 +110,6 @@ public class SubscriptionGroupConfig {
this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
}
public boolean isRealPushEnable() {
return realPushEnable;
}
public void setRealPushEnable(boolean realPushEnable) {
this.realPushEnable = realPushEnable;
}
@Override
public int hashCode() {
final int prime = 31;
......@@ -128,7 +118,6 @@ public class SubscriptionGroupConfig {
result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
result = prime * result + (consumeEnable ? 1231 : 1237);
result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
result = prime * result + (realPushEnable ? 1231 : 1237);
result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
result = prime * result + retryMaxTimes;
......@@ -155,8 +144,6 @@ public class SubscriptionGroupConfig {
return false;
if (consumeFromMinEnable != other.consumeFromMinEnable)
return false;
if (realPushEnable != other.realPushEnable)
return false;
if (groupName == null) {
if (other.groupName != null)
return false;
......@@ -173,18 +160,13 @@ public class SubscriptionGroupConfig {
return true;
}
@Override public String toString() {
return "SubscriptionGroupConfig{" +
"groupName='" + groupName + '\'' +
", consumeEnable=" + consumeEnable +
", consumeFromMinEnable=" + consumeFromMinEnable +
", consumeBroadcastEnable=" + consumeBroadcastEnable +
", retryQueueNums=" + retryQueueNums +
", retryMaxTimes=" + retryMaxTimes +
", brokerId=" + brokerId +
", whichBrokerWhenConsumeSlowly=" + whichBrokerWhenConsumeSlowly +
", notifyConsumerIdsChangedEnable=" + notifyConsumerIdsChangedEnable +
", realPushEnable=" + realPushEnable +
'}';
@Override
public String toString() {
return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+ ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+ consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+ retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+ whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+ notifyConsumerIdsChangedEnable + "]";
}
}
......@@ -77,9 +77,8 @@ public class NamesrvController {
this.kvConfigManager.load();
this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer();
this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
......
......@@ -26,19 +26,19 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
void updateNameServerAddressList(final List<String> addrs);
void updateNameServerAddressList(final List<String> addresses);
List<String> getNameServerAddressList();
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
RemotingCommand invokeSync(final String address, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
void invokeAsync(final String address, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
void invokeOneway(final String address, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
......
......@@ -24,7 +24,11 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingClientFactory {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static RemotingClientFactory instance = new RemotingClientFactory();
public static RemotingClientFactory getInstance(){
return instance;
}
private RemotingClientFactory() {
}
......@@ -37,11 +41,11 @@ public class RemotingClientFactory {
paths = ServiceProvider.loadPath(CLIENT_LOCATION);
}
public static RemotingClient createInstance(String protocol) {
public RemotingClient createRemotingClient(String protocol) {
return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class);
}
public static RemotingClient createInstance() {
public RemotingClient createRemotingClient() {
return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class);
}
}
......@@ -24,8 +24,11 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingServerFactory {
private static RemotingServerFactory instance = new RemotingServerFactory();
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
public static RemotingServerFactory getInstance() {
return instance;
}
private RemotingServerFactory() {
}
......@@ -38,11 +41,11 @@ public class RemotingServerFactory {
protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
}
public static RemotingServer createInstance(String protocol) {
public RemotingServer createRemotingServer(String protocol) {
return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
}
public static RemotingServer createInstance() {
public RemotingServer createRemotingServer() {
return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class);
}
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.interceptor;
package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
......@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.interceptor;
package org.apache.rocketmq.remoting.interceptor;
public interface Interceptor {
void beforeSendMessage(RequestContext requestContext);
void beforeRequest(RequestContext requestContext);
void afterSendMessage(ResponseContext responseContext);
void afterRequest(ResponseContext responseContext);
void onException(ExceptionContext exceptionContext);
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.interceptor;
package org.apache.rocketmq.remoting.interceptor;
import java.util.List;
import org.apache.rocketmq.remoting.util.ServiceProvider;
......@@ -26,21 +26,12 @@ public class InterceptorFactory {
return ourInstance;
}
private final String SEND_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
private final String CONSUME_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
private InterceptorFactory() {
}
public List loadConsumeMessageInterceptors() {
List<Interceptor> consumeMessageInterceptors = ServiceProvider.loadServiceList(CONSUME_MESSAGE_INTERCEPTOR, Interceptor.class);
return consumeMessageInterceptors;
}
public List loadSendMessageInterceptors() {
List<Interceptor> sendMessageInterceptors = ServiceProvider.loadServiceList(SEND_MESSAGE_INTERCEPTOR, Interceptor.class);
return sendMessageInterceptors;
public List loadInterceptors(String servicePath) {
List<Interceptor> interceptors = ServiceProvider.loadServiceList(servicePath, Interceptor.class);
return interceptors;
}
}
......@@ -14,28 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.interceptor;
package org.apache.rocketmq.remoting.interceptor;
import java.util.ArrayList;
import java.util.List;
public class InterceptorGroup {
private List<Interceptor> interceptors = new ArrayList<>();
private List<Interceptor> interceptors = new ArrayList<Interceptor>();
public void registerInterceptor(Interceptor sendMessageInterceptor) {
if (sendMessageInterceptor != null) {
interceptors.add(sendMessageInterceptor);
public void registerInterceptor(Interceptor interceptor) {
if (interceptor != null) {
interceptors.add(interceptor);
}
}
public void beforeRequest(RequestContext requestContext) {
for (Interceptor interceptor : interceptors) {
interceptor.beforeSendMessage(requestContext);
interceptor.beforeRequest(requestContext);
}
}
public void afterRequest(ResponseContext responseContext) {
for (Interceptor interceptor : interceptors) {
interceptor.afterSendMessage(responseContext);
interceptor.afterRequest(responseContext);
}
}
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.interceptor;
package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.interceptor;
package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
......@@ -468,7 +468,7 @@ public abstract class NettyRemotingAbstract {
public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException {
final long beginStartTime = System.currentTimeMillis();
boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
......
......@@ -29,10 +29,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
......@@ -40,17 +38,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyEvent;
import org.apache.rocketmq.remoting.netty.NettyEventType;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
......@@ -179,7 +171,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
@Override
protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel(timeout);
return getAndCreateNameServerChannel(timeout);
}
ChannelWrapper cw = this.channelTables.get(addr);
......@@ -190,7 +182,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
return this.createChannel(addr, timeout);
}
private Channel getAndCreateNameserverChannel(long timeout) throws InterruptedException {
private Channel getAndCreateNameServerChannel(long timeout) throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
......
......@@ -20,12 +20,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.nio.ByteBuffer;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
......
......@@ -61,7 +61,6 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
private ExecutorService publicExecutor;
private ExecutorService asyncExecutor;
/**
* Invoke the callback methods in this executor when process response.
*/
......
......@@ -20,7 +20,6 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
......@@ -46,6 +45,8 @@ import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.TlsMode;
......@@ -53,10 +54,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
......@@ -28,23 +28,22 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ConsumerManager;
import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager;
import org.apache.rocketmq.snode.client.PushSessionManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.interceptor.InterceptorFactory;
import org.apache.rocketmq.snode.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
......@@ -86,7 +85,6 @@ public class SnodeController {
private HeartbeatProcessor hearbeatProcessor;
private InterceptorGroup consumeMessageInterceptorGroup;
private InterceptorGroup sendMessageInterceptorGroup;
private PushSessionManager pushSessionManager;
private PushService pushService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
......@@ -101,14 +99,14 @@ public class SnodeController {
this.enodeService = new EnodeServiceImpl(this);
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.createInstance().init(this.getNettyClientConfig(), null);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(this.getNettyClientConfig(), null);
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
......@@ -117,7 +115,7 @@ public class SnodeController {
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
......@@ -126,7 +124,7 @@ public class SnodeController {
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
......@@ -135,7 +133,7 @@ public class SnodeController {
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
......@@ -144,7 +142,7 @@ public class SnodeController {
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
......@@ -164,7 +162,6 @@ public class SnodeController {
this.sendMessageProcessor = new SendMessageProcessor(this);
this.hearbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pushSessionManager = new PushSessionManager();
this.pushService = new PushServiceImpl(this);
}
......@@ -174,22 +171,22 @@ public class SnodeController {
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, this.clientHousekeepingService);
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initInterceptorGroup();
return true;
}
private void initInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadConsumeMessageInterceptors();
if (consumeMessageInterceptors != null) {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
}
}
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadSendMessageInterceptors();
if (sendMessageInterceptors != null) {
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) {
this.sendMessageInterceptorGroup.registerInterceptor(interceptor);
......@@ -275,10 +272,6 @@ public class SnodeController {
return sendMessageInterceptorGroup;
}
public PushSessionManager getPushSessionManager() {
return pushSessionManager;
}
public PushService getPushService() {
return pushService;
}
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.client;
import java.nio.channels.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
......
......@@ -85,8 +85,7 @@ public class ConsumerManager {
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
log.info("Unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
log.info("Unregister consumer ok, no any connection, and remove consumer group, {}", next.getKey());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
......@@ -208,7 +207,7 @@ public class ConsumerManager {
}
}
public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
public ClientChannelInfo getClientInfoTable(String topic, Integer queueId) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
return clientChannelInfoMap.get(queueId);
......
......@@ -16,11 +16,8 @@
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
......@@ -31,7 +28,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import java.util.concurrent.ConcurrentHashMap;
public class PushSessionManager {
private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ClientChannelInfo>> topicConsumerTable = new ConcurrentHashMap<>(2048);
public void updateTopicConsumerTable(String topic, int queueId, ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap == null) {
clientChannelInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
if (prev != null) {
clientChannelInfoMap = prev;
}
}
clientChannelInfoMap.put(queueId, clientChannelInfo);
}
public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
return clientChannelInfoMap.get(queueId);
}
return null;
}
public void removeConsumerTopicTable(String topic, Integer queueId, ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
ClientChannelInfo old = clientChannelInfoMap.get(queueId);
//TODO Thread safe issue: wait for the next heartbeat
if (old == clientChannelInfo) {
clientChannelInfoMap.remove(queueId, clientChannelInfo);
}
}
}
}
......@@ -20,7 +20,6 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
......
......@@ -67,6 +67,10 @@ public class SnodeConfig {
private int snodePushMessageThreadPoolQueueCapacity = 10000;
private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
private final String consumeMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
private int listenPort = 11911;
......@@ -264,4 +268,12 @@ public class SnodeConfig {
public void setSnodePushMessageThreadPoolQueueCapacity(int snodePushMessageThreadPoolQueueCapacity) {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
}
public String getSendMessageInterceptorPath() {
return sendMessageInterceptorPath;
}
public String getConsumeMessageInterceptorPath() {
return consumeMessageInterceptorPath;
}
}
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.processor;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -31,6 +30,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ClientChannelInfo;
......@@ -60,7 +61,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
remotingChannel,
new NettyChannelImpl((((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel())),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
......@@ -99,7 +100,7 @@ public class HeartbeatProcessor implements RequestProcessor {
);
}
if (subscriptionGroupConfig.isRealPushEnable()) {
if (data.isRealPushEnable()) {
this.snodeController.getConsumerManager().updateTopicConsumerTable(data.getSubscriptionDataSet(), clientChannelInfo);
}
}
......
......@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
......@@ -24,6 +25,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -32,9 +34,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
import org.apache.rocketmq.snode.interceptor.ExceptionContext;
import org.apache.rocketmq.snode.interceptor.RequestContext;
import org.apache.rocketmq.snode.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class PullMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
......@@ -63,11 +65,9 @@ public class PullMessageProcessor implements RequestProcessor {
private RemotingCommand pullMessage(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo = snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
SubscriptionGroupConfig subscriptionGroupConfig =
this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
......@@ -82,29 +82,53 @@ public class PullMessageProcessor implements RequestProcessor {
return response;
}
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
}
if ((consumerGroupInfo == null) || (consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()) == null)) {
log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
if (requestHeader.getQueueId() < 0) {
String errorInfo = String.format("QueueId[%d] is illegal, topic:[%s] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), remotingChannel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("The consumer's subscription not latest");
return response;
SubscriptionData subscriptionData;
if (!hasSubscriptionFlag) {
ConsumerGroupInfo consumerGroupInfo =
this.snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
log.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("The consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
}
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("The consumer's subscription not latest");
return response;
}
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(), request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
......
......@@ -15,17 +15,22 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.interceptor.ExceptionContext;
import org.apache.rocketmq.snode.interceptor.RequestContext;
import org.apache.rocketmq.snode.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class SendMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
......@@ -37,12 +42,30 @@ public class SendMessageProcessor implements RequestProcessor {
}
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) {
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
RequestContext requestContext = new RequestContext(request, remotingChannel);
this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext);
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
String enodeName;
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null;
boolean isSendBack = false;
if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN();
} else {
isSendBack = true;
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
final String topic = sendMessageRequestHeaderV2.getB();
final Integer queueId = sendMessageRequestHeaderV2.getE();
final byte[] message = request.getBody();
final boolean isNeedPush = !isSendBack;
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
......@@ -50,6 +73,9 @@ public class SendMessageProcessor implements RequestProcessor {
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
}
remotingChannel.reply(data);
if (isNeedPush) {
this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
}
} else {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
......
......@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
......@@ -29,13 +30,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface EnodeService {
void sendHearbeat(RemotingCommand remotingCommand);
CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
CompletableFuture<RemotingCommand> sendMessage(final String enodeName, final RemotingCommand request);
CompletableFuture<RemotingCommand> pullMessage(final RemotingCommand request);
CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
void notifyConsumerIdsChanged(final RemotingChannel channel, final String consumerGroup);
RemotingCommand creatTopic(String enodeName, TopicConfig topicConfig)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException ;
RemotingCommand creatTopic(String enodeName,
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException;
......
......@@ -16,13 +16,15 @@
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface PushService {
boolean registerPushSession(String consumerGroup);
void unregisterPushSession(String consumerGroup);
void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
final long queueOffset);
void pushMessage(final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response);
void start();
......
......@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
......@@ -79,13 +80,12 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public CompletableFuture<RemotingCommand> pullMessage(RemotingCommand request) {
public CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
this.snodeController.getRemotingClient().invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
......@@ -110,17 +110,9 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
public CompletableFuture<RemotingCommand> sendMessage(String enodeName, RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
String enodeName;
if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN();
} else {
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
}
String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
future.complete(responseFuture.getResponseCommand());
......@@ -149,7 +141,7 @@ public class EnodeServiceImpl implements EnodeService {
try {
this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
} catch (Exception e) {
log.error("NotifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e);
}
}
......
......@@ -21,7 +21,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -45,57 +47,55 @@ public class PushServiceImpl implements PushService {
this.snodeController.getSnodeConfig().getSnodePushMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
"SnodePushMessageThread",
false);
}
public class PushTask implements Runnable {
private AtomicBoolean canceled;
private final String messageId;
private AtomicBoolean canceled = new AtomicBoolean(false);
private final byte[] message;
private final Integer queueId;
private final String topic;
private final long queueOffset;
private final RemotingCommand response;
public PushTask(final String messageId, final byte[] message, final Integer queueId, final String topic,
final long queueOffset) {
this.messageId = messageId;
public PushTask(final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) {
this.message = message;
this.queueId = queueId;
this.topic = topic;
this.queueOffset = queueOffset;
this.response = response;
}
@Override
public void run() {
if (!canceled.get()) {
PushMessageHeader pushMessageHeader = new PushMessageHeader();
pushMessageHeader.setMessageId(this.messageId);
pushMessageHeader.setQueueOffset(queueOffset);
pushMessageHeader.setTopic(topic);
pushMessageHeader.setQueueId(queueId);
RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
pushMessage.setBody(message);
pushMessage.setCustomHeader(pushMessageHeader);
try {
ClientChannelInfo clientChannelInfo = snodeController.getPushSessionManager().getClientInfoTable(topic, queueId);
SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
PushMessageHeader pushMessageHeader = new PushMessageHeader();
pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
pushMessageHeader.setTopic(topic);
pushMessageHeader.setQueueId(queueId);
RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
pushMessage.setBody(message);
pushMessage.setCustomHeader(pushMessageHeader);
pushMessage.setCode(RequestCode.SNODE_PUSH_MESSAGE);
ClientChannelInfo clientChannelInfo = snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (clientChannelInfo != null) {
log.warn("Push message to topic: {} queueId: {}, message:{}", topic, queueId, pushMessage);
RemotingChannel remotingChannel = clientChannelInfo.getChannel();
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills);
} else {
log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId);
}
} catch (Exception ex) {
log.warn("Push message to topic: {} queueId: {} ex:{}", topic, queueId, ex);
log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex);
}
} else {
log.info("Push message to topic: {} queueId: {} canceled!", topic, queueId);
}
}
public AtomicBoolean getCanceled() {
return canceled;
}
public void setCanceled(AtomicBoolean canceled) {
this.canceled = canceled;
}
......@@ -113,10 +113,15 @@ public class PushServiceImpl implements PushService {
}
@Override
public void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
final long queueOffset) {
PushTask pushTask = new PushTask(messageId, message, queueId, topic, queueOffset);
pushMessageExecutorService.submit(pushTask);
public void pushMessage(final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) {
ClientChannelInfo clientChannelInfo = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (clientChannelInfo != null) {
PushTask pushTask = new PushTask(topic, queueId, message, response);
pushMessageExecutorService.submit(pushTask);
} else {
log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
}
}
@Override
......
......@@ -112,7 +112,7 @@ public class ScheduledServiceImpl implements ScheduledService {
log.warn("Update broker addr error:{}", ex);
}
}
}, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册