diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index f6f8a80afb6d48e178f6d27ead85bac5464e820a..e66cead4121c118a92093fe898ed97363b8a49fd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.stats.BrokerStatsManager; public abstract class AbstractPluginMessageStore implements MessageStore { protected MessageStore next = null; @@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore { public ConsumeQueue getConsumeQueue(String topic, int queueId) { return next.getConsumeQueue(topic, queueId); } + + @Override + public BrokerStatsManager getBrokerStatsManager() { + return next.getBrokerStatsManager(); + }; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 356aafc46fb0637a7411c5042fc6170a33d2560c..73fe43942709136ec2d94c879feb66b9a0e23286 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); + if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) { + log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Delay offset not supported in this messagetore"); + return response; + } + String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { - log.error("get all delay offset from master error.", e); + log.error("Get all delay offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); @@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final ViewBrokerStatsDataRequestHeader requestHeader = (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + MessageStore messageStore = this.brokerController.getMessageStore(); StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); if (null == statsItem) { diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java index 9110517fa530a7a2a42c24637be9c9b9826b70ab..8d86544be696c50eb95dad01b4857488196c483f 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java @@ -33,7 +33,7 @@ public class MixAllTest { List localInetAddress = MixAll.getLocalInetAddress(); String local = InetAddress.getLocalHost().getHostAddress(); assertThat(localInetAddress).contains("127.0.0.1"); - + assertThat(local).isNotNull(); } @Test diff --git a/pom.xml b/pom.xml index 1c2c7146530818e4b970cc2c1a153828ebcd7a3d..535893c21002260509dece08a43f157b98907970 100644 --- a/pom.xml +++ b/pom.xml @@ -323,6 +323,9 @@ 1 1 true + + **/IT*.java + @@ -335,23 +338,6 @@ sonar-maven-plugin 3.0.2 - - maven-failsafe-plugin - 2.19.1 - - 1 - - **/NormalMsgDelayIT.java - - - - - - integration-test - - - - diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 8dccebc04575db85ddc70e8c41de1a2cdf8a93c2..206b96ad1d7694ae54a2d6d8868e5eacfb814ae5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract { */ protected volatile SslContext sslContext; + /** + * custom rpc hooks + */ + protected List rpcHooks = new ArrayList(); + + + static { NettyLogger.initNettyLogger(); } @@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract { } } + protected void doBeforeRpcHooks(String addr, RemotingCommand request) { + if (rpcHooks.size() > 0) { + for (RPCHook rpcHook: rpcHooks) { + rpcHook.doBeforeRequest(addr, request); + } + } + } + + protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) { + if (rpcHooks.size() > 0) { + for (RPCHook rpcHook: rpcHooks) { + rpcHook.doAfterResponse(addr, request, response); + } + } + } + + /** * Process incoming request command issued by remote peer. * @@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract { @Override public void run() { try { - RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); - if (rpcHook != null) { - rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - } - + doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); - if (rpcHook != null) { - rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); - } + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { @@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract { } } + + /** * Custom RPC hook. + * Just be compatible with the previous version, use getRPCHooks instead. + */ + @Deprecated + protected RPCHook getRPCHook() { + if (rpcHooks.size() > 0) { + return rpcHooks.get(0); + } + return null; + } + + /** + * Custom RPC hooks. * - * @return RPC hook if specified; null otherwise. + * @return RPC hooks if specified; null otherwise. */ - public abstract RPCHook getRPCHook(); + public List getRPCHooks() { + return rpcHooks; + } + /** * This method specifies thread pool to use while invoking callback methods. diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 33c2eed8de188e0b21a88d21aad68494460d3ccc..e891ad7299a268b4178a540b52c246abfda9ab88 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -34,6 +34,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; import java.net.SocketAddress; import java.security.cert.CertificateException; @@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -64,8 +67,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { @@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; - private RPCHook rpcHook; public NettyRemotingClient(final NettyClientConfig nettyClientConfig) { this(nettyClientConfig, null); @@ -283,7 +283,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + if (!rpcHooks.contains(rpcHook)) { + rpcHooks.add(rpcHook); + } } public void closeChannel(final Channel channel) { @@ -357,6 +359,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } + + @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { @@ -364,17 +368,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } + doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); - if (this.rpcHook != null) { - this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); - } + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); @@ -522,9 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } + doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTooMuchRequestException("invokeAsync call timeout"); @@ -547,9 +545,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } + doBeforeRpcHooks(addr, request); this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); @@ -592,10 +588,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelEventListener; } - @Override - public RPCHook getRPCHook() { - return this.rpcHook; - } @Override public ExecutorService getCallbackExecutor() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 198484251c0365eaf3bd2cebd97b1456e0dc7de2..90386f37e2658c9acf630bb221b39385511e3af9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -40,6 +40,8 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.io.IOException; import java.net.InetSocketAddress; import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.List; import java.util.NoSuchElementException; import java.util.Timer; import java.util.TimerTask; @@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private final Timer timer = new Timer("ServerHouseKeepingService", true); private DefaultEventExecutorGroup defaultEventExecutorGroup; - private RPCHook rpcHook; private int port = 0; @@ -266,7 +267,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + if (!rpcHooks.contains(rpcHook)) { + rpcHooks.add(rpcHook); + } } @Override @@ -318,10 +321,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return channelEventListener; } - @Override - public RPCHook getRPCHook() { - return this.rpcHook; - } @Override public ExecutorService getCallbackExecutor() { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 1ade7c2838e0382eb8223646fd6ad31a74b77def..ff431ed889a8fadabce0c320b1ac1e7e94ea6907 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore { cq.putMessagePositionInfoWrapper(dispatchRequest); } + @Override public BrokerStatsManager getBrokerStatsManager() { return brokerStatsManager; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 907dfe2093b79d86c5b3cca4cc49e02a425a8f46..0f9b4f0ae6e35ef552c521855078b5638b705313 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.stats.BrokerStatsManager; /** * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store. @@ -358,4 +359,11 @@ public interface MessageStore { * @return Consume queue. */ ConsumeQueue getConsumeQueue(String topic, int queueId); + + /** + * Get BrokerStatsManager of the messageStore. + * + * @return BrokerStatsManager. + */ + BrokerStatsManager getBrokerStatsManager(); }