未验证 提交 aea7461a 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge branch 'feature_acl' into develop-acl

...@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public abstract class AbstractPluginMessageStore implements MessageStore { public abstract class AbstractPluginMessageStore implements MessageStore {
protected MessageStore next = null; protected MessageStore next = null;
...@@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore { ...@@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
public ConsumeQueue getConsumeQueue(String topic, int queueId) { public ConsumeQueue getConsumeQueue(String topic, int queueId) {
return next.getConsumeQueue(topic, queueId); return next.getConsumeQueue(topic, queueId);
} }
@Override
public BrokerStatsManager getBrokerStatsManager() {
return next.getBrokerStatsManager();
};
} }
...@@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue; ...@@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor { public class AdminBrokerProcessor implements NettyRequestProcessor {
...@@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Delay offset not supported in this messagetore");
return response;
}
String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) { if (content != null && content.length() > 0) {
try { try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
log.error("get all delay offset from master error.", e); log.error("Get all delay offset from master error.", e);
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e); response.setRemark("UnsupportedEncodingException " + e);
...@@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final ViewBrokerStatsDataRequestHeader requestHeader = final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); MessageStore messageStore = this.brokerController.getMessageStore();
StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) { if (null == statsItem) {
......
...@@ -33,7 +33,7 @@ public class MixAllTest { ...@@ -33,7 +33,7 @@ public class MixAllTest {
List<String> localInetAddress = MixAll.getLocalInetAddress(); List<String> localInetAddress = MixAll.getLocalInetAddress();
String local = InetAddress.getLocalHost().getHostAddress(); String local = InetAddress.getLocalHost().getHostAddress();
assertThat(localInetAddress).contains("127.0.0.1"); assertThat(localInetAddress).contains("127.0.0.1");
assertThat(local).isNotNull();
} }
@Test @Test
......
...@@ -323,6 +323,9 @@ ...@@ -323,6 +323,9 @@
<skipAfterFailureCount>1</skipAfterFailureCount> <skipAfterFailureCount>1</skipAfterFailureCount>
<forkCount>1</forkCount> <forkCount>1</forkCount>
<reuseForks>true</reuseForks> <reuseForks>true</reuseForks>
<excludes>
<exclude>**/IT*.java</exclude>
</excludes>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
...@@ -335,23 +338,6 @@ ...@@ -335,23 +338,6 @@
<artifactId>sonar-maven-plugin</artifactId> <artifactId>sonar-maven-plugin</artifactId>
<version>3.0.2</version> <version>3.0.2</version>
</plugin> </plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<skipAfterFailureCount>1</skipAfterFailureCount>
<excludes>
<exclude>**/NormalMsgDelayIT.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
<pluginManagement> <pluginManagement>
......
...@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; ...@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
...@@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract { ...@@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract {
*/ */
protected volatile SslContext sslContext; protected volatile SslContext sslContext;
/**
* custom rpc hooks
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
static { static {
NettyLogger.initNettyLogger(); NettyLogger.initNettyLogger();
} }
...@@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract { ...@@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract {
} }
} }
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
}
protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doAfterResponse(addr, request, response);
}
}
}
/** /**
* Process incoming request command issued by remote peer. * Process incoming request command issued by remote peer.
* *
...@@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract { ...@@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract {
@Override @Override
public void run() { public void run() {
try { try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) { if (!cmd.isOnewayRPC()) {
if (response != null) { if (response != null) {
...@@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract { ...@@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract {
} }
} }
/** /**
* Custom RPC hook. * Custom RPC hook.
* Just be compatible with the previous version, use getRPCHooks instead.
*/
@Deprecated
protected RPCHook getRPCHook() {
if (rpcHooks.size() > 0) {
return rpcHooks.get(0);
}
return null;
}
/**
* Custom RPC hooks.
* *
* @return RPC hook if specified; null otherwise. * @return RPC hooks if specified; null otherwise.
*/ */
public abstract RPCHook getRPCHook(); public List<RPCHook> getRPCHooks() {
return rpcHooks;
}
/** /**
* This method specifies thread pool to use while invoking callback methods. * This method specifies thread pool to use while invoking callback methods.
......
...@@ -34,6 +34,7 @@ import io.netty.handler.timeout.IdleState; ...@@ -34,6 +34,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
...@@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
...@@ -64,8 +67,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; ...@@ -64,8 +67,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
...@@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private ExecutorService callbackExecutor; private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener; private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup; private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) { public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null); this(nettyClientConfig, null);
...@@ -283,7 +283,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -283,7 +283,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override @Override
public void registerRPCHook(RPCHook rpcHook) { public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook; if (!rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
} }
public void closeChannel(final Channel channel) { public void closeChannel(final Channel channel) {
...@@ -357,6 +359,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -357,6 +359,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
} }
} }
@Override @Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
...@@ -364,17 +368,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -364,17 +368,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr); final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) { doBeforeRpcHooks(addr, request);
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) { if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout"); throw new RemotingTimeoutException("invokeSync call timeout");
} }
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response; return response;
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr); log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
...@@ -522,9 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -522,9 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr); final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) { doBeforeRpcHooks(addr, request);
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) { if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout"); throw new RemotingTooMuchRequestException("invokeAsync call timeout");
...@@ -547,9 +545,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -547,9 +545,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr); final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) { doBeforeRpcHooks(addr, request);
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis); this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
...@@ -592,10 +588,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -592,10 +588,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelEventListener; return channelEventListener;
} }
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override @Override
public ExecutorService getCallbackExecutor() { public ExecutorService getCallbackExecutor() {
......
...@@ -40,6 +40,8 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; ...@@ -40,6 +40,8 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
...@@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ServerHouseKeepingService", true); private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup; private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
private int port = 0; private int port = 0;
...@@ -266,7 +267,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -266,7 +267,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override @Override
public void registerRPCHook(RPCHook rpcHook) { public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook; if (!rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
} }
@Override @Override
...@@ -318,10 +321,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -318,10 +321,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return channelEventListener; return channelEventListener;
} }
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override @Override
public ExecutorService getCallbackExecutor() { public ExecutorService getCallbackExecutor() {
......
...@@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore {
cq.putMessagePositionInfoWrapper(dispatchRequest); cq.putMessagePositionInfoWrapper(dispatchRequest);
} }
@Override
public BrokerStatsManager getBrokerStatsManager() { public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager; return brokerStatsManager;
} }
......
...@@ -21,6 +21,7 @@ import java.util.LinkedList; ...@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/** /**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store. * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
...@@ -358,4 +359,11 @@ public interface MessageStore { ...@@ -358,4 +359,11 @@ public interface MessageStore {
* @return Consume queue. * @return Consume queue.
*/ */
ConsumeQueue getConsumeQueue(String topic, int queueId); ConsumeQueue getConsumeQueue(String topic, int queueId);
/**
* Get BrokerStatsManager of the messageStore.
*
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册