提交 9f58eb7e 编写于 作者: C CalvinKirs

add rpc config

上级 a2d245dd
...@@ -56,7 +56,7 @@ public class NettyServerConfig { ...@@ -56,7 +56,7 @@ public class NettyServerConfig {
/** /**
* listen port * listen port
*/ */
private int listenPort = 12636; private int listenPort = 12346;
public int getListenPort() { public int getListenPort() {
return listenPort; return listenPort;
......
...@@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf; ...@@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils;
/** /**
......
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
/**
* Invoker
*/
public interface Invoker {
RpcResponse invoke(RpcRequest req) throws Throwable;
}
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer;
/**
* @author jiangli
* @date 2021-01-20 14:54
*/
public class MainServerTest {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(new NettyServerConfig());
}
}
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.remote.rpc.client.RpcClient;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* @author jiangli
* @date 2021-01-11 21:06
*/
public class MainTest {
public static void main(String[] args) throws Exception {
// NettyServer nettyServer = new NettyServer(new NettyServerConfig());
// NettyClient nettyClient=new NettyClient(new NettyClientConfig());
Host host = new Host("127.0.0.1", 12636);
IRpcClient rpcClient = new RpcClient();
IUserService userService = rpcClient.create(IUserService.class, host);
boolean result = userService.say("calvin");
System.out.println("异步回掉成功" + result);
System.out.println(userService.hi(10));
System.out.println(userService.hi(188888888));
IUserService user = rpcClient.create(IUserService.class, host);
System.out.println(user.hi(99999));
System.out.println(user.hi(998888888));
System.out.println(IUserService.class.getSimpleName());
System.out.println(UserService.class.getSimpleName());
}
}
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
/**
* @author jiangli
* @date 2021-01-15 07:32
*/
public class UserCallback extends AbstractRpcCallBack {
@Override
public void run(Object object) {
Boolean msg= (Boolean) object;
System.out.println("我是异步回调handle Kris"+msg);
}
}
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.base; package org.apache.dolphinscheduler.rpc.base;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.base; package org.apache.dolphinscheduler.rpc.base;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack;
import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants; import org.apache.dolphinscheduler.rpc.common.ConsumerConfigConstants;
/** /**
* We will cache the consumer configuration, when the rpc call is generated, the consumer configuration will be first obtained from here * We will cache the consumer configuration, when the rpc call is generated, the consumer configuration will be first obtained from here
...@@ -39,7 +39,7 @@ public class ConsumerConfig { ...@@ -39,7 +39,7 @@ public class ConsumerConfig {
return serviceCallBackClass; return serviceCallBackClass;
} }
public void setServiceCallBackClass(Class<? extends AbstractRpcCallBack> serviceCallBackClass) { void setServiceCallBackClass(Class<? extends AbstractRpcCallBack> serviceCallBackClass) {
this.serviceCallBackClass = serviceCallBackClass; this.serviceCallBackClass = serviceCallBackClass;
} }
...@@ -47,7 +47,7 @@ public class ConsumerConfig { ...@@ -47,7 +47,7 @@ public class ConsumerConfig {
return ackCallBackClass; return ackCallBackClass;
} }
public void setAckCallBackClass(Class<? extends AbstractRpcCallBack> ackCallBackClass) { void setAckCallBackClass(Class<? extends AbstractRpcCallBack> ackCallBackClass) {
this.ackCallBackClass = ackCallBackClass; this.ackCallBackClass = ackCallBackClass;
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
......
...@@ -15,14 +15,14 @@ ...@@ -15,14 +15,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.rpc.base.Rpc;
import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.remote.NettyClient;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.UUID; import java.util.UUID;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
......
...@@ -15,14 +15,12 @@ ...@@ -15,14 +15,12 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.concurrent.ConcurrentHashMap;
import net.bytebuddy.ByteBuddy; import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.MethodDelegation;
...@@ -31,22 +29,14 @@ import net.bytebuddy.implementation.MethodDelegation; ...@@ -31,22 +29,14 @@ import net.bytebuddy.implementation.MethodDelegation;
*/ */
public class RpcClient implements IRpcClient { public class RpcClient implements IRpcClient {
private ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<>();
@Override @Override
public <T> T create(Class<T> clazz, Host host) throws Exception { public <T> T create(Class<T> clazz, Host host) throws Exception {
// if(!classMap.containsKey(clazz.getName())){ return new ByteBuddy()
T proxy = new ByteBuddy()
.subclass(clazz) .subclass(clazz)
.method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(host))) .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(host)))
.make() .make()
.load(getClass().getClassLoader()) .load(getClass().getClassLoader())
.getLoaded() .getLoaded()
.getDeclaredConstructor().newInstance(); .getDeclaredConstructor().newInstance();
// classMap.putIfAbsent(clazz.getName(),proxy);
return proxy;
// }
// return (T) classMap.get(clazz.getName());
} }
} }
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
/** /**
* RpcRequestCache * RpcRequestCache
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.common; package org.apache.dolphinscheduler.rpc.common;
/** /**
* AbstractRpcCallBack * AbstractRpcCallBack
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.common; package org.apache.dolphinscheduler.rpc.common;
/** /**
* ConsumerConfigConstants * ConsumerConfigConstants
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.common; package org.apache.dolphinscheduler.rpc.common;
public enum RequestEventType { public enum RequestEventType {
......
package org.apache.dolphinscheduler.remote.rpc.common;/* package org.apache.dolphinscheduler.rpc.common;/*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -29,7 +29,6 @@ public enum ResponseEventType { ...@@ -29,7 +29,6 @@ public enum ResponseEventType {
this.description = description; this.description = description;
} }
public Byte getType() { public Byte getType() {
return type; return type;
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.common; package org.apache.dolphinscheduler.rpc.common;
/** /**
* RpcRequest * RpcRequest
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.common; package org.apache.dolphinscheduler.rpc.common;
/** /**
* RpcResponse * RpcResponse
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc.common;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
import java.util.concurrent.TimeUnit;
public enum ThreadPoolManager {
INSTANCE;
ExecutorService executorService;
ThreadPoolManager() {
int SIZE_WORK_QUEUE = 200;
long KEEP_ALIVE_TIME = 60;
int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
int MAXI_MUM_POOL_SIZE = CORE_POOL_SIZE * 4;
executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXI_MUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(SIZE_WORK_QUEUE),
new DiscardPolicy());
}
public void addExecuteTask(Runnable task) {
executorService.submit(task);
}
}
...@@ -15,16 +15,14 @@ ...@@ -15,16 +15,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.config; package org.apache.dolphinscheduler.rpc.config;
import org.apache.dolphinscheduler.remote.rpc.IUserService; import org.apache.dolphinscheduler.rpc.base.RpcService;
import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -32,7 +30,9 @@ import org.reflections.Reflections; ...@@ -32,7 +30,9 @@ import org.reflections.Reflections;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* ServiceBean find all rpcService
*/
public class ServiceBean { public class ServiceBean {
private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class); private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class);
...@@ -42,9 +42,8 @@ public class ServiceBean { ...@@ -42,9 +42,8 @@ public class ServiceBean {
private static AtomicBoolean initialized = new AtomicBoolean(false); private static AtomicBoolean initialized = new AtomicBoolean(false);
private static synchronized void init() { private static synchronized void init() {
Reflections f = new Reflections("org/apache/dolphinscheduler/remote/rpc"); // todo config
Reflections f = new Reflections("org/apache/dolphinscheduler/rpc");
List<Class<?>> list = new ArrayList<>(f.getTypesAnnotatedWith(RpcService.class)); List<Class<?>> list = new ArrayList<>(f.getTypesAnnotatedWith(RpcService.class));
list.forEach(rpcClass -> { list.forEach(rpcClass -> {
RpcService rpcService = rpcClass.getAnnotation(RpcService.class); RpcService rpcService = rpcClass.getAnnotation(RpcService.class);
...@@ -52,10 +51,6 @@ public class ServiceBean { ...@@ -52,10 +51,6 @@ public class ServiceBean {
}); });
} }
public static void main(String[] args) {
init();
}
public static Class getServiceClass(String className) { public static Class getServiceClass(String className) {
if (initialized.get()) { if (initialized.get()) {
return (Class) serviceMap.get(className); return (Class) serviceMap.get(className);
......
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.future; package org.apache.dolphinscheduler.rpc.future;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
......
...@@ -15,16 +15,16 @@ ...@@ -15,16 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; import org.apache.dolphinscheduler.remote.decoder.NettyDecoder;
import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; import org.apache.dolphinscheduler.remote.decoder.NettyEncoder;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
...@@ -98,7 +98,7 @@ public class NettyClient { ...@@ -98,7 +98,7 @@ public class NettyClient {
/** /**
* get channel * get channel
*/ */
public Channel getChannel(Host host) { private Channel getChannel(Host host) {
Channel channel = channels.get(host); Channel channel = channels.get(host);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
return channel; return channel;
...@@ -138,7 +138,7 @@ public class NettyClient { ...@@ -138,7 +138,7 @@ public class NettyClient {
* *
* @param clientConfig client config * @param clientConfig client config
*/ */
public NettyClient(final NettyClientConfig clientConfig) { private NettyClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
if (NettyUtils.useEpoll()) { if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
...@@ -169,24 +169,24 @@ public class NettyClient { ...@@ -169,24 +169,24 @@ public class NettyClient {
private void start() { private void start() {
this.bootstrap this.bootstrap
.group(this.workerGroup) .group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass()) .channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new LoggingHandler(LogLevel.DEBUG)) .handler(new LoggingHandler(LogLevel.DEBUG))
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) { public void initChannel(SocketChannel ch) {
ch.pipeline() ch.pipeline()
.addLast(new NettyEncoder(RpcRequest.class)) .addLast(new NettyEncoder(RpcRequest.class))
.addLast(new NettyDecoder(RpcResponse.class)) .addLast(new NettyDecoder(RpcResponse.class))
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyClientHandler()); .addLast(new NettyClientHandler());
} }
}); });
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
...@@ -196,9 +196,8 @@ public class NettyClient { ...@@ -196,9 +196,8 @@ public class NettyClient {
Channel channel = getChannel(host); Channel channel = getChannel(host);
assert channel != null; assert channel != null;
RpcRequestCache rpcRequestCache = new RpcRequestCache(); RpcRequestCache rpcRequestCache = new RpcRequestCache();
rpcRequestCache.setServiceName(request.getClassName() + request.getMethodName()); String serviceName = request.getClassName() + request.getMethodName();
rpcRequestCache.setServiceName(serviceName);
RpcFuture future = null; RpcFuture future = null;
if (!async) { if (!async) {
future = new RpcFuture(); future = new RpcFuture();
...@@ -209,15 +208,15 @@ public class NettyClient { ...@@ -209,15 +208,15 @@ public class NettyClient {
RpcResponse result = null; RpcResponse result = null;
if (async) { if (async) {
result=new RpcResponse(); result = new RpcResponse();
result.setStatus((byte)0); result.setStatus((byte) 0);
result.setResult(true); result.setResult(true);
return result; return result;
} }
try { try {
result = future.get(); result = future.get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
logger.error("send msg error",e); logger.error("send msg error,service name is {}", serviceName, e);
} }
return result; return result;
} }
......
...@@ -15,21 +15,16 @@ ...@@ -15,21 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import io.netty.channel.ChannelHandler; import org.apache.dolphinscheduler.rpc.client.ConsumerConfig;
import io.netty.channel.ChannelHandlerContext; import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache;
import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import io.netty.handler.timeout.IdleStateEvent; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfig; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfigCache; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -37,6 +32,12 @@ import java.net.InetSocketAddress; ...@@ -37,6 +32,12 @@ import java.net.InetSocketAddress;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.FastThreadLocalThread;
/** /**
* NettyClientHandler * NettyClientHandler
*/ */
...@@ -46,6 +47,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -46,6 +47,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE;
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
...@@ -58,10 +61,13 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -58,10 +61,13 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId()); RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId());
if (null == rpcRequest) { if (null == rpcRequest) {
logger.warn("未知响应"); logger.warn("rpc read error,this request does not exist");
return; return;
} }
threadPoolManager.addExecuteTask(() -> readHandler(rsp, rpcRequest));
}
private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest) {
String serviceName = rpcRequest.getServiceName(); String serviceName = rpcRequest.getServiceName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (!consumerConfig.getAsync()) { if (!consumerConfig.getAsync()) {
...@@ -71,7 +77,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -71,7 +77,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
return; return;
} }
//async //async
new FastThreadLocalThread(() -> { new FastThreadLocalThread(() -> {
try { try {
...@@ -79,18 +84,16 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -79,18 +84,16 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
try { try {
consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult()); consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult());
} catch (InvocationTargetException | NoSuchMethodException e) { } catch (InvocationTargetException | NoSuchMethodException e) {
logger.error("rpc call back error",e); logger.error("rpc call back error, serviceName {} ", serviceName, e);
} }
} else { } else {
logger.error("xxxx fail"); logger.error("rpc response error ,serviceName {}", serviceName);
} }
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
logger.error("execute async error", e); logger.error("execute async error,serviceName {}", serviceName, e);
} }
}).start(); }).start();
} }
@Override @Override
...@@ -99,7 +102,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -99,7 +102,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
RpcRequest request = new RpcRequest(); RpcRequest request = new RpcRequest();
request.setEventType((byte)0); request.setEventType((byte) 0);
ctx.channel().writeAndFlush(request); ctx.channel().writeAndFlush(request);
logger.debug("send heart beat msg..."); logger.debug("send heart beat msg...");
...@@ -110,12 +113,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -110,12 +113,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("exceptionCaught");
logger.error("exceptionCaught : {}", cause.getMessage(), cause); logger.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close(); ctx.channel().close();
} }
private void executeAsyncHandler(){
}
} }
...@@ -15,29 +15,15 @@ ...@@ -15,29 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder;
import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; import org.apache.dolphinscheduler.remote.decoder.NettyEncoder;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -47,12 +33,24 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -47,12 +33,24 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
/** /**
* NettyServer * NettyServer
*/ */
public class NettyServer { public class NettyServer {
private static final Logger logger =LoggerFactory.getLogger(NettyServer.class); private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
/** /**
* boss group * boss group
...@@ -126,29 +124,28 @@ public class NettyServer { ...@@ -126,29 +124,28 @@ public class NettyServer {
this.start(); this.start();
} }
/** /**
* server start * server start
*/ */
public void start() { public void start() {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap this.serverBootstrap
.group(this.bossGroup, this.workGroup) .group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass()) .channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.handler(new LoggingHandler(LogLevel.DEBUG)) .handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch){ protected void initChannel(SocketChannel ch) {
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
ChannelFuture future; ChannelFuture future;
try { try {
...@@ -165,7 +162,6 @@ public class NettyServer { ...@@ -165,7 +162,6 @@ public class NettyServer {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
} }
} }
System.out.println("netty ser ver start");
} }
/** /**
...@@ -175,13 +171,12 @@ public class NettyServer { ...@@ -175,13 +171,12 @@ public class NettyServer {
*/ */
private void initNettyChannel(SocketChannel ch) { private void initNettyChannel(SocketChannel ch) {
ch.pipeline() ch.pipeline()
.addLast(new NettyDecoder(RpcRequest.class)) .addLast(new NettyDecoder(RpcRequest.class))
.addLast(new NettyEncoder(RpcResponse.class)) .addLast(new NettyEncoder(RpcResponse.class))
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", new NettyServerHandler()); .addLast("handler", new NettyServerHandler());
} }
public void close() { public void close() {
if (isStarted.compareAndSet(true, false)) { if (isStarted.compareAndSet(true, false)) {
try { try {
...@@ -199,5 +194,4 @@ public class NettyServer { ...@@ -199,5 +194,4 @@ public class NettyServer {
} }
} }
} }
...@@ -15,18 +15,14 @@ ...@@ -15,18 +15,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import org.apache.dolphinscheduler.remote.rpc.IUserService; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.base.RpcService; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.config.ServiceBean;
import org.apache.dolphinscheduler.remote.rpc.config.ServiceBean;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -42,19 +38,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -42,19 +38,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE;
@Override @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx); super.channelRegistered(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
logger.info("channel close"); logger.info("channel close");
ctx.channel().close(); ctx.channel().close();
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) { public void channelActive(ChannelHandlerContext ctx) {
logger.info("client connect success !" + ctx.channel().remoteAddress()); logger.info("client connect success !" + ctx.channel().remoteAddress());
...@@ -65,19 +61,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -65,19 +61,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
RpcRequest req = (RpcRequest) msg; RpcRequest req = (RpcRequest) msg;
RpcResponse response = new RpcResponse();
if (req.getEventType() == 0) { if (req.getEventType() == 0) {
logger.info("accept heartbeat msg"); logger.info("accept heartbeat msg");
return; return;
} }
//todo 使用业务线程池去处理 不要占用netty的资源 threadPoolManager.addExecuteTask(() -> readHandler(ctx, req));
response.setRequestId(req.getRequestId()); }
private void readHandler(ChannelHandlerContext ctx, RpcRequest req) {
RpcResponse response = new RpcResponse();
response.setRequestId(req.getRequestId());
response.setStatus((byte) 0); response.setStatus((byte) 0);
String classname = req.getClassName(); String classname = req.getClassName();
String methodName = req.getMethodName(); String methodName = req.getMethodName();
...@@ -95,7 +92,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -95,7 +92,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
result = method.invoke(object, arguments); result = method.invoke(object, arguments);
} catch (Exception e) { } catch (Exception e) {
logger.error("netty server execute error", e); logger.error("netty server execute error,service name {}", classname + methodName, e);
response.setStatus((byte) -1); response.setStatus((byte) -1);
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.rpc.client.RpcClient;
import org.apache.dolphinscheduler.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.rpc.remote.NettyServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class RpcTest {
private NettyServer nettyServer;
private IUserService userService;
private Host host;
@Before
public void before() throws Exception {
nettyServer = new NettyServer(new NettyServerConfig());
IRpcClient rpcClient = new RpcClient();
host = new Host("127.0.0.1", 12346);
userService = rpcClient.create(IUserService.class, host);
}
@Test
public void sendTest() {
Integer result = userService.hi(3);
Assert.assertSame(4, result);
result = userService.hi(4);
Assert.assertSame(5, result);
userService.say("sync");
}
@After
public void after() {
NettyClient.getInstance().close();
nettyServer.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc;
import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack;
/**
* UserCallback
*/
public class UserCallback extends AbstractRpcCallBack {
@Override
public void run(Object object) {
}
}
package org.apache.dolphinscheduler.remote.rpc; package org.apache.dolphinscheduler.rpc;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc; import org.apache.dolphinscheduler.rpc.base.RpcService;
import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
/** /**
* @author jiangli * UserService
* @date 2021-01-11 21:05
*/ */
@RpcService("IUserService") @RpcService("IUserService")
public class UserService implements IUserService{ public class UserService implements IUserService{
@Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999)
@Override @Override
public Boolean say(String s) { public Boolean say(String s) {
return true; return true;
} }
@Override @Override
public String hi(int num) { public Integer hi(int num) {
return "this world has " + num + "sun"; return ++num;
} }
} }
...@@ -868,6 +868,7 @@ ...@@ -868,6 +868,7 @@
<include>**/dao/entity/UdfFuncTest.java</include> <include>**/dao/entity/UdfFuncTest.java</include>
<include>**/remote/JsonSerializerTest.java</include> <include>**/remote/JsonSerializerTest.java</include>
<include>**/remote/RemoveTaskLogResponseCommandTest.java</include> <include>**/remote/RemoveTaskLogResponseCommandTest.java</include>
<include>**/rpc/RpcTest.java</include>
<include>**/remote/RemoveTaskLogRequestCommandTest.java</include> <include>**/remote/RemoveTaskLogRequestCommandTest.java</include>
<include>**/remote/NettyRemotingClientTest.java</include> <include>**/remote/NettyRemotingClientTest.java</include>
<include>**/remote/NettyUtilTest.java</include> <include>**/remote/NettyUtilTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册