未验证 提交 81b4293a 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #745 from zongtanghu/snode

[ISSUE #743]adjust some codes of supporting Http2 for rocketmq.
...@@ -89,11 +89,11 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -89,11 +89,11 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
10000, "Remoting-PublicExecutor", true); 10000, "Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
buildSslContext(); buildHttp2SslClientContext();
return this; return this;
} }
private void buildSslContext() { private void buildHttp2SslClientContext() {
SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
try { try {
sslContext = SslContextBuilder.forClient() sslContext = SslContextBuilder.forClient()
...@@ -104,7 +104,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -104,7 +104,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
.trustManager(InsecureTrustManagerFactory.INSTANCE) .trustManager(InsecureTrustManagerFactory.INSTANCE)
.build(); .build();
} catch (SSLException e) { } catch (SSLException e) {
e.printStackTrace(); log.error("Can not build Http2 SSL Client context !", e);
} }
} }
...@@ -147,16 +147,12 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -147,16 +147,12 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
if (this.defaultEventExecutorGroup != null) { if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully(); this.defaultEventExecutorGroup.shutdownGracefully();
} }
} catch (Exception e) {
log.error("Http2ClientImpl shutdown exception, ", e);
}
if (this.publicExecutor != null) { if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown(); this.publicExecutor.shutdown();
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
} }
} catch (Exception e) {
log.error("Http2ClientImpl shutdown exception, ", e);
} }
} }
......
...@@ -78,12 +78,11 @@ public class Http2Handler extends Http2ConnectionHandler { ...@@ -78,12 +78,11 @@ public class Http2Handler extends Http2ConnectionHandler {
frameReader); frameReader);
Http2Settings settings = new Http2Settings(); Http2Settings settings = new Http2Settings();
if (!isServer) { if (!isServer) {
settings.pushEnabled(true); settings.pushEnabled(true);
} }
//10MiB
settings.initialWindowSize(1048576 * 10); //10MiB settings.initialWindowSize(10485760);
settings.maxConcurrentStreams(Integer.MAX_VALUE); settings.maxConcurrentStreams(Integer.MAX_VALUE);
return newHandler(decoder, encoder, settings, isServer); return newHandler(decoder, encoder, settings, isServer);
...@@ -98,8 +97,10 @@ public class Http2Handler extends Http2ConnectionHandler { ...@@ -98,8 +97,10 @@ public class Http2Handler extends Http2ConnectionHandler {
public void write(final ChannelHandlerContext ctx, final Object msg, public void write(final ChannelHandlerContext ctx, final Object msg,
final ChannelPromise promise) throws Exception { final ChannelPromise promise) throws Exception {
if (isServer) { if (isServer) {
assert msg instanceof ByteBuf; if (!(msg instanceof ByteBuf)) {
sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg); return;
}
sendAndPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
} else { } else {
final Http2Headers headers = new DefaultHttp2Headers(); final Http2Headers headers = new DefaultHttp2Headers();
...@@ -111,13 +112,13 @@ public class Http2Handler extends Http2ConnectionHandler { ...@@ -111,13 +112,13 @@ public class Http2Handler extends Http2ConnectionHandler {
encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise()); encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
ctx.flush(); ctx.flush();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("Http2Handler write data exception,", e);
} }
} }
} }
private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId, private void sendAndPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
ByteBuf payload) throws Http2Exception { ByteBuf payload) throws Http2Exception {
encoder().writePushPromise(ctx, streamId, pushPromiseStreamId, encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
......
...@@ -115,11 +115,11 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -115,11 +115,11 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(), this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
this.port = nettyServerConfig.getListenPort(); this.port = nettyServerConfig.getListenPort();
buildHttp2SslContext(); buildHttp2SslServerContext();
return this; return this;
} }
private void buildHttp2SslContext() { private void buildHttp2SslServerContext() {
try { try {
SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
SelfSignedCertificate ssc; SelfSignedCertificate ssc;
......
...@@ -17,13 +17,17 @@ ...@@ -17,13 +17,17 @@
package org.apache.rocketmq.remoting; package org.apache.rocketmq.remoting;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.junit.After; import org.junit.After;
...@@ -90,6 +94,34 @@ public class RemotingHttp2InvokingTest { ...@@ -90,6 +94,34 @@ public class RemotingHttp2InvokingTest {
} }
@Test
public void testInvokeOneway() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
remotingHttp2Client.invokeOneway("localhost:8888", request, 1000 * 3);
}
@Test
public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
final CountDownLatch latch = new CountDownLatch(1);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
remotingHttp2Client.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
assertTrue(responseFuture != null);
assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});
latch.await(3000, TimeUnit.SECONDS);
}
class Http2RequestHeader implements CommandCustomHeader { class Http2RequestHeader implements CommandCustomHeader {
@CFNullable @CFNullable
private Integer count; private Integer count;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册