提交 e0087819 编写于 作者: C CalvinKirs

rpc

上级 cf41fb77
......@@ -47,6 +47,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
/**
* Invoker
*/
public interface Invoker {
RpcResponse invoke(RpcRequest req) throws Throwable;
}
package org.apache.dolphinscheduler.remote.rpc.client;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain;
import java.lang.reflect.Method;
import java.util.UUID;
/**
* ConsumerInterceptor
*/
public class ConsumerInterceptor {
private Invoker invoker;
private FilterChain filterChain;
public ConsumerInterceptor(Invoker invoker) {
this.filterChain = new FilterChain(invoker);
this.invoker = this.filterChain.buildFilterChain();
}
@RuntimeType
public Object intercept(@AllArguments Object[] args, @Origin Method method) throws Throwable {
RpcRequest request = buildReq(args, method);
//todo
System.out.println(invoker.invoke(request));
return null;
}
private RpcRequest buildReq(Object[] args, Method method) {
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
return request;
}
}
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
/**
* @author jiangli
* @date 2021-01-09 15:27
*/
public class ConsumerInvoker implements Invoker {
@Override
public RpcResponse invoke(RpcRequest req) throws Throwable {
System.out.println(req.getRequestId()+"kris");
return null;
}
}
package org.apache.dolphinscheduler.remote.rpc.client;
/**
* @author jiangli
* @date 2021-01-09 10:58
*/
public interface IRpcClient {
<T> T create(Class<T> clazz) throws Exception;
}
package org.apache.dolphinscheduler.remote.rpc.client;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author jiangli
* @date 2021-01-09 10:59
*/
public class RpcClient implements IRpcClient{
private ConcurrentHashMap<String,Object> classMap=new ConcurrentHashMap<>();
@Override
public <T> T create(Class<T> clazz) throws Exception {
if(!classMap.containsKey(clazz.getName())){
T proxy = new ByteBuddy()
.subclass(clazz)
.method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker())))
.make()
.load(getClass().getClassLoader())
.getLoaded()
.getDeclaredConstructor().newInstance();
classMap.putIfAbsent(clazz.getName(),proxy);
}
return (T) classMap.get(clazz.getName());
}
}
package org.apache.dolphinscheduler.remote.rpc.common;
/**
* @author jiangli
* @date 2021-01-09 13:21
*/
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
package org.apache.dolphinscheduler.remote.rpc.common;
/**
* RpcResponse
*/
public class RpcResponse {
private String requestId;
private String msg;
private Object result;
private Byte status;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public Byte getStatus() {
return status;
}
public void setStatus(Byte status) {
this.status = status;
}
}
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
public interface Filter {
RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable;
}
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import java.util.List;
/**
* FilterChain
*/
public class FilterChain {
private List<Filter> filters;
private Invoker invoker;
public FilterChain(List<Filter> filters, Invoker invoker) {
this.filters = filters;
this.invoker = invoker;
}
public FilterChain(Invoker invoker) {
this(LoaderFilters.create().getFilters(), invoker);
}
public Invoker buildFilterChain() {
// 最后一个
Invoker last = invoker;
for (int i = filters.size() - 1; i >= 0; i--) {
last = new FilterWrapper(filters.get(i), last);
}
// 第一个
return last;
}
}
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
/**
* @author jiangli
* @date 2021-01-11 11:48
*/
public class FilterWrapper implements Invoker {
private Filter next;
private Invoker invoker;
public FilterWrapper(Filter next, Invoker invoker) {
this.next = next;
this.invoker = invoker;
}
@Override
public RpcResponse invoke(RpcRequest args) throws Throwable {
if (next != null) {
return next.filter(invoker, args);
} else {
return invoker.invoke(args);
}
}
}
package org.apache.dolphinscheduler.remote.rpc.filter;
import java.util.ArrayList;
import java.util.List;
/**
* LoaderFilters
*/
public class LoaderFilters {
private List<Filter> filterList = new ArrayList<>();
private LoaderFilters() {
}
public static LoaderFilters create() {
return new LoaderFilters();
}
public List<Filter> getFilters() {
filterList.add(SelectorFilter.getInstance());
return filterList;
}
}
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SelectorFilter
*/
public class SelectorFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(SelectorFilter.class);
private SelectorFilter selectorFilter = SelectorFilter.getInstance();
public static SelectorFilter getInstance() {
return SelectorFilterInner.INSTANCE;
}
private static class SelectorFilterInner {
private static final SelectorFilter INSTANCE = new SelectorFilter();
}
private SelectorFilter() {
}
@Override
public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable {
RpcResponse rsp = new RpcResponse();
rsp.setMsg("ms");
return rsp;
}
}
package org.apache.dolphinscheduler.remote.rpc.selector;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collection;
/**
* AbstractSelector
*/
public abstract class AbstractSelector<T> implements Selector<T>{
@Override
public T select(Collection<T> source) {
if (CollectionUtils.isEmpty(source)) {
throw new IllegalArgumentException("Empty source.");
}
/**
* if only one , return directly
*/
if (source.size() == 1) {
return (T)source.toArray()[0];
}
return doSelect(source);
}
protected abstract T doSelect(Collection<T> source);
}
\ No newline at end of file
package org.apache.dolphinscheduler.remote.rpc.selector;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author jiangli
* @date 2021-01-11 12:00
*/
public class RandomSelector extends AbstractSelector<Host> {
@Override
public Host doSelect(final Collection<Host> source) {
List<Host> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
for (Host host : hosts) {
totalWeight += host.getWeight();
weights[index] = host.getWeight();
index++;
}
if (totalWeight > 0) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < size; i++) {
offset -= weights[i];
if (offset < 0) {
return hosts.get(i);
}
}
}
return hosts.get(ThreadLocalRandom.current().nextInt(size));
}
}
package org.apache.dolphinscheduler.remote.rpc.selector;
import java.util.Collection;
/**
* Selector
*/
public interface Selector<T> {
/**
* select
* @param source source
* @return T
*/
T select(Collection<T> source);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册