提交 7eb6a7a9 编写于 作者: H huzongtang

add http2 unit test case,fix http2 shutdown issue and adjust some codes

上级 e4c0a4c4
......@@ -42,6 +42,7 @@ public class RemotingUtil {
private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;
public static final String DEFAULT_PROTOCOL = "rocketmq";
public static final String HTTP2_PROTOCOL = "http2";
public static final String REMOTING_CHARSET = "UTF-8";
static {
......
......@@ -279,13 +279,10 @@ public abstract class NettyRemotingAbstract {
/**
* Process response from remote peer to the previous issued requests.
*
* @param remotingChannel channel handler context.
* @param remotingChannel remotingChannel.
* @param cmd response command instance.
*/
public void processResponseCommand(final RemotingChannel remotingChannel, RemotingCommand cmd) {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
final ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
......@@ -300,6 +297,8 @@ public abstract class NettyRemotingAbstract {
responseFuture.release();
}
} else {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
final ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
log.warn("receive response, but not matched any request: {}, cmd: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
}
......
......@@ -43,7 +43,6 @@ import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
......@@ -68,7 +67,7 @@ import org.apache.rocketmq.remoting.util.JvmUtils;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class Http2ServerImpl extends NettyRemotingServerAbstract implements RemotingServer {
private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ServerBootstrap serverBootstrap;
private EventLoopGroup bossGroup;
......@@ -131,7 +130,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
.sslProvider(provider)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
} catch (Exception e) {
LOG.error("Can not build SSL context !", e);
log.error("Can not build SSL context !", e);
}
}
......@@ -228,8 +227,28 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
@Override
public void shutdown() {
super.shutdown();
ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
try {
super.shutdown();
if (this.bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (this.ioGroup != null) {
this.ioGroup.shutdownGracefully();
}
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("Http2RemotingServer shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
} catch (Exception e) {
log.error("Http2RemotingServer shutdown exception, ", e);
}
}
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
import java.util.concurrent.Executors;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
public class RemotingHttp2InvokingTest {
private RemotingServer remotingHttp2Server;
private RemotingClient remotingHttp2Client;
private int defaultRequestCode = 0;
public RemotingServer createHttp2RemotingServer() throws InterruptedException {
RemotingServer remotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.HTTP2_PROTOCOL).init(new ServerConfig()
,null);
remotingServer.registerProcessor(defaultRequestCode, new RequestProcessor() {
@Override
public RemotingCommand processRequest(RemotingChannel ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newSingleThreadExecutor());
remotingServer.start();
return remotingServer;
}
public RemotingClient createHttp2RemotingClient() {
RemotingClient client = RemotingClientFactory.getInstance().createRemotingClient(RemotingUtil.HTTP2_PROTOCOL).init(new ClientConfig(), null);
client.start();
return client;
}
@Before
public void setup() throws InterruptedException {
remotingHttp2Server = createHttp2RemotingServer();
remotingHttp2Client = createHttp2RemotingClient();
}
@After
public void destroy() {
remotingHttp2Client.shutdown();
remotingHttp2Server.shutdown();
}
@Test
public void testHttp2InvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
Http2RequestHeader http2RequestHeader = new Http2RequestHeader();
http2RequestHeader.setCount(1);
http2RequestHeader.setMessageTitle("Welcome");
RemotingCommand request = RemotingCommand.createRequestCommand(0, http2RequestHeader);
RemotingCommand response = remotingHttp2Client.invokeSync("localhost:8888", request, 1000 * 5);
assertTrue(response != null);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
class Http2RequestHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@CFNullable
private String messageTitle;
@Override
public void checkFields() throws RemotingCommandException {
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public String getMessageTitle() {
return messageTitle;
}
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
}
}
}
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.remoting;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -26,25 +27,24 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
public class RemotingServerTest {
private static RemotingServer remotingServer;
private static RemotingClient remotingClient;
public class RemotingNettyInvokingTest {
private RemotingServer remotingServer;
private RemotingClient remotingClient;
private int defaultRequestCode = 0;
public static RemotingServer createRemotingServer() throws InterruptedException {
ServerConfig config = new ServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new RequestProcessor() {
public RemotingServer createRemotingServer() throws InterruptedException {
RemotingServer remotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(new ServerConfig()
,null);
remotingServer.registerProcessor(defaultRequestCode, new RequestProcessor() {
@Override
public RemotingCommand processRequest(RemotingChannel ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.remoteAddress());
......@@ -55,31 +55,25 @@ public class RemotingServerTest {
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
}, Executors.newSingleThreadExecutor());
remotingServer.start();
return remotingServer;
}
public static RemotingClient createRemotingClient() {
return createRemotingClient(new ClientConfig());
}
public static RemotingClient createRemotingClient(ClientConfig nettyClientConfig) {
RemotingClient client = new NettyRemotingClient(nettyClientConfig);
public RemotingClient createRemotingClient() {
RemotingClient client = RemotingClientFactory.getInstance().createRemotingClient().init(new ClientConfig(), null);
client.start();
return client;
}
@BeforeClass
public static void setup() throws InterruptedException {
@Before
public void setup() throws InterruptedException {
remotingServer = createRemotingServer();
remotingClient = createRemotingClient();
}
@AfterClass
public static void destroy() {
@After
public void destroy() {
remotingClient.shutdown();
remotingServer.shutdown();
}
......@@ -123,35 +117,34 @@ public class RemotingServerTest {
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});
latch.await();
latch.await(3000, TimeUnit.SECONDS);
}
}
class RequestHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
class RequestHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@CFNullable
private String messageTitle;
@CFNullable
private String messageTitle;
@Override
public void checkFields() throws RemotingCommandException {
}
@Override
public void checkFields() throws RemotingCommandException {
}
public Integer getCount() {
return count;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public void setCount(Integer count) {
this.count = count;
}
public String getMessageTitle() {
return messageTitle;
}
public String getMessageTitle() {
return messageTitle;
}
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
}
}
}
......@@ -22,7 +22,10 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.Executors;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
......@@ -34,8 +37,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
......@@ -66,10 +67,35 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
public class TlsTest {
private RemotingServer remotingServer;
private RemotingClient remotingClient;
private int defaultRequestCode = 0;
public RemotingServer createRemotingServer() throws InterruptedException {
RemotingServer remotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(new ServerConfig()
,null);
remotingServer.registerProcessor(defaultRequestCode, new RequestProcessor() {
@Override
public RemotingCommand processRequest(RemotingChannel ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newSingleThreadExecutor());
remotingServer.start();
return remotingServer;
}
public RemotingClient createRemotingClient(ClientConfig clientConfig) {
RemotingClient client = RemotingClientFactory.getInstance().createRemotingClient().init(clientConfig, null);
client.start();
return client;
}
@Rule
public TestName name = new TestName();
......@@ -139,8 +165,8 @@ public class TlsTest {
tlsServerNeedClientAuth = "none";
}
remotingServer = RemotingServerTest.createRemotingServer();
remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
remotingServer = createRemotingServer();
remotingClient = createRemotingClient(clientConfig);
}
@After
......@@ -175,7 +201,7 @@ public class TlsTest {
//Start another client
ClientConfig clientConfig = new ClientConfig();
clientConfig.setUseTLS(true);
RemotingClient remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
RemotingClient remotingClient = createRemotingClient(clientConfig);
requestThenAssertResponse(remotingClient);
}
......@@ -322,4 +348,32 @@ public class TlsTest {
assertThat(response.getExtFields()).hasSize(2);
assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome");
}
static class RequestHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@CFNullable
private String messageTitle;
@Override
public void checkFields() throws RemotingCommandException {
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public String getMessageTitle() {
return messageTitle;
}
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
}
}
}
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient
http2=org.apache.rocketmq.remoting.transport.http2.Http2ClientImpl
\ No newline at end of file
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册