提交 cfa882c4 编写于 作者: Q qiaozhanwei

Merge remote-tracking branch 'remotes/upstream/refactor-architecture' into dev

......@@ -28,20 +28,19 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.rmi.RemoteException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -56,18 +55,20 @@ public class NettyRemotingClient {
private final NettyEncoder encoder = new NettyEncoder();
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(128);
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final NioEventLoopGroup workerGroup;
private final NettyClientHandler clientHandler = new NettyClientHandler(this);
private final NettyClientConfig clientConfig;
private final Semaphore asyncSemaphore = new Semaphore(200, true);
private final ExecutorService callbackExecutor;
private final NettyClientHandler clientHandler;
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
......@@ -78,6 +79,10 @@ public class NettyRemotingClient {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.start();
}
......@@ -103,65 +108,79 @@ public class NettyRemotingClient {
isStarted.compareAndSet(false, true);
}
//TODO
public void send(final Address address, final Command command, final InvokeCallback invokeCallback) throws RemotingException {
public void sendAsync(final Address address, final Command command, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
}
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
logger.info("sent command {} to {}", command, address);
} else{
logger.error("send command {} to {} failed, error {}", command, address, future.cause());
final long opaque = command.getOpaque();
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if(acquired){
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore);
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Throwable ex){
logger.error("execute callback error", ex);
} finally{
responseFuture.release();
}
}
}
});
} catch (Exception ex) {
String msg = String.format("send command %s to address %s encounter error", command, address);
throw new RemotingException(msg, ex);
});
} catch (Throwable ex){
responseFuture.release();
throw new RemotingException(String.format("send command to address: %s failed", address), ex);
}
} else{
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message);
}
}
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", address));
}
final long opaque = command.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
responseFuture.setSendOk(true);
return;
} else{
responseFuture.setSendOk(false);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
logger.error("send command {} to address {} failed", command, address);
}
}
});
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
} else{
throw new RemoteException(address.toString(), responseFuture.getCause());
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to address {} failed", command, address);
}
});
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
} else{
throw new RemotingException(address.toString(), responseFuture.getCause());
}
return result;
} catch (Exception ex) {
String msg = String.format("send command %s to address %s error", command, address);
throw new RemotingException(msg, ex);
}
return result;
}
public Channel getChannel(Address address) {
......@@ -192,10 +211,6 @@ public class NettyRemotingClient {
return null;
}
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
......@@ -203,8 +218,8 @@ public class NettyRemotingClient {
if(workerGroup != null){
this.workerGroup.shutdownGracefully();
}
if(defaultExecutor != null){
defaultExecutor.shutdown();
if(callbackExecutor != null){
this.callbackExecutor.shutdownNow();
}
} catch (Exception ex) {
logger.error("netty client close exception", ex);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* @Author: Tboy
*/
public class RemotingTimeoutException extends RemotingException{
public RemotingTimeoutException(String message) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
public class RemotingTooMuchRequestException extends RemotingException{
public RemotingTooMuchRequestException(String message) {
super(message);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.future;
/**
* @Author: Tboy
* invoke callback
*/
public interface InvokeCallback {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* release semaphore
*/
public class ReleaseSemaphore {
private final Semaphore semaphore;
private final AtomicBoolean released;
public ReleaseSemaphore(Semaphore semaphore){
this.semaphore = semaphore;
this.released = new AtomicBoolean(false);
}
public void release(){
if(this.released.compareAndSet(false, true)){
this.semaphore.release();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* @Author: Tboy
* response future
*/
public class ResponseFuture {
......@@ -19,6 +34,8 @@ public class ResponseFuture {
private final InvokeCallback invokeCallback;
private final ReleaseSemaphore releaseSemaphore;
private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis();
......@@ -29,11 +46,11 @@ public class ResponseFuture {
private volatile Throwable cause;
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback) {
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback, ReleaseSemaphore releaseSemaphore) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.releaseSemaphore = releaseSemaphore;
FUTURE_TABLE.put(opaque, this);
}
......@@ -95,7 +112,17 @@ public class ResponseFuture {
return responseCommand;
}
public void setResponseCommand(Command responseCommand) {
this.responseCommand = responseCommand;
}
public InvokeCallback getInvokeCallback() {
return invokeCallback;
}
public void release() {
if(this.releaseSemaphore != null){
this.releaseSemaphore.release();
}
}
}
......@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
/**
* netty client request handler
*/
......@@ -34,8 +36,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final NettyRemotingClient nettyRemotingClient;
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
private final ExecutorService callbackExecutor;
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
}
@Override
......@@ -52,8 +57,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private void processReceived(final Command responseCommand) {
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
if(future != null){
future.putResponse(responseCommand);
future.executeInvokeCallback();
future.setResponseCommand(responseCommand);
future.release();
if(future.getInvokeCallback() != null){
this.callbackExecutor.submit(new Runnable() {
@Override
public void run() {
future.executeInvokeCallback();
}
});
} else{
future.putResponse(responseCommand);
}
} else{
logger.warn("receive response {}, but not matched any request ", responseCommand);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* caller thread execute
*/
public class CallerThreadExecutePolicy implements RejectedExecutionHandler {
private final Logger logger = LoggerFactory.getLogger(CallerThreadExecutePolicy.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.warn("queue is full, trigger caller thread execute");
r.run();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger increment = new AtomicInteger(1);
private final String name;
private final int count;
public NamedThreadFactory(String name){
this(name, 0);
}
public NamedThreadFactory(String name, int count){
this.name = name;
this.count = count;
}
@Override
public Thread newThread(Runnable r) {
final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
: String.format(name + "_%d", increment.getAndIncrement());
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}
}
......@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.command.Ping;
import org.apache.dolphinscheduler.remote.command.Pong;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.junit.Assert;
......@@ -42,7 +44,7 @@ public class NettyRemotingClientTest {
* test ping
*/
@Test
public void testSend(){
public void testSendSync(){
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
......@@ -52,6 +54,8 @@ public class NettyRemotingClientTest {
channel.writeAndFlush(Pong.create(command.getOpaque()));
}
});
server.start();
//
final NettyClientConfig clientConfig = new NettyClientConfig();
......@@ -64,4 +68,37 @@ public class NettyRemotingClientTest {
e.printStackTrace();
}
}
@Test
public void testSendAsync(){
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
@Override
public void process(Channel channel, Command command) {
channel.writeAndFlush(Pong.create(command.getOpaque()));
}
});
server.start();
//
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
CountDownLatch latch = new CountDownLatch(1);
Command commandPing = Ping.create();
try {
final AtomicLong opaque = new AtomicLong(0);
client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
opaque.set(responseFuture.getOpaque());
latch.countDown();
}
});
latch.await();
Assert.assertEquals(commandPing.getOpaque(), opaque.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册