[FLINK-7754] [rpc] Complete termination future after actor has been stopped

This commit waits not only until the Actor has called postStop but also until the actor
has been completely shut down by the ActorSystem before completing the termination
future.

This closes #4770.
上级 dae21da7
......@@ -223,11 +223,12 @@ public abstract class RpcEndpoint implements RpcGateway {
}
/**
* Return a future which is completed when the rpc endpoint has been terminated.
* Return a future which is completed with true when the rpc endpoint has been terminated.
* In case of a failure, this future is completed with the occurring exception.
*
* @return Future which is completed when the rpc endpoint has been terminated.
*/
public CompletableFuture<Void> getTerminationFuture() {
public CompletableFuture<Boolean> getTerminationFuture() {
return rpcServer.getTerminationFuture();
}
......
......@@ -30,5 +30,5 @@ public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGate
*
* @return Future indicating when the rpc endpoint has been terminated
*/
CompletableFuture<Void> getTerminationFuture();
CompletableFuture<Boolean> getTerminationFuture();
}
......@@ -18,13 +18,28 @@
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
import org.apache.flink.runtime.rpc.RpcGateway;
import akka.actor.ActorRef;
import java.util.concurrent.CompletableFuture;
/**
* Interface for Akka based rpc gateways
*/
interface AkkaGateway extends RpcGateway {
interface AkkaBasedEndpoint extends RpcGateway {
/**
* Returns the {@link ActorRef} of the underlying RPC actor.
*
* @return the {@link ActorRef} of the underlying RPC actor
*/
ActorRef getActorRef();
ActorRef getRpcEndpoint();
/**
* Returns the internal termination future.
*
* @return Internal termination future
*/
CompletableFuture<Void> getInternalTerminationFuture();
}
......@@ -57,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
* executed.
*/
class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer {
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer {
private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);
/**
......@@ -82,7 +82,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
private final long maximumFramesize;
// null if gateway; otherwise non-null
private final CompletableFuture<Void> terminationFuture;
@Nullable
private final CompletableFuture<Boolean> terminationFuture;
// null if gateway; otherwise non-null
@Nullable
private final CompletableFuture<Void> internalTerminationFuture;
AkkaInvocationHandler(
String address,
......@@ -90,7 +95,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Void> terminationFuture) {
@Nullable CompletableFuture<Boolean> terminationFuture,
@Nullable CompletableFuture<Void> internalTerminationFuture) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
......@@ -99,6 +105,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
this.terminationFuture = terminationFuture;
this.internalTerminationFuture = internalTerminationFuture;
}
@Override
......@@ -107,7 +114,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
Object result;
if (declaringClass.equals(AkkaGateway.class) ||
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
declaringClass.equals(Object.class) ||
declaringClass.equals(RpcGateway.class) ||
declaringClass.equals(StartStoppable.class) ||
......@@ -127,7 +134,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
}
@Override
public ActorRef getRpcEndpoint() {
public ActorRef getActorRef() {
return rpcEndpoint;
}
......@@ -339,7 +346,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
}
@Override
public CompletableFuture<Void> getTerminationFuture() {
public CompletableFuture<Boolean> getTerminationFuture() {
return terminationFuture;
}
@Override
public CompletableFuture<Void> getInternalTerminationFuture() {
return internalTerminationFuture;
}
}
......@@ -77,12 +77,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
/** the helper that tracks whether calls come from the main thread. */
private final MainThreadValidatorUtil mainThreadValidator;
private final CompletableFuture<Void> terminationFuture;
private final CompletableFuture<Void> internalTerminationFuture;
AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture) {
AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> internalTerminationFuture) {
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.terminationFuture = checkNotNull(terminationFuture);
this.internalTerminationFuture = checkNotNull(internalTerminationFuture);
}
@Override
......@@ -106,9 +106,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
// Complete the termination future so that others know that we've stopped.
if (shutdownThrowable != null) {
terminationFuture.completeExceptionally(shutdownThrowable);
internalTerminationFuture.completeExceptionally(shutdownThrowable);
} else {
terminationFuture.complete(null);
internalTerminationFuture.complete(null);
}
} finally {
mainThreadValidator.exitMainThread();
......
......@@ -40,6 +40,7 @@ import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Identify;
import akka.actor.Kill;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
......@@ -47,18 +48,24 @@ import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import scala.Option;
......@@ -83,7 +90,10 @@ public class AkkaRpcService implements RpcService {
private final ActorSystem actorSystem;
private final Time timeout;
private final Set<ActorRef> actors = new HashSet<>(4);
@GuardedBy("lock")
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);
private final long maximumFramesize;
private final String address;
......@@ -119,6 +129,8 @@ public class AkkaRpcService implements RpcService {
}
internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
stopped = false;
}
@Override
......@@ -149,6 +161,7 @@ public class AkkaRpcService implements RpcService {
actorRef,
timeout,
maximumFramesize,
null,
null);
});
}
......@@ -169,6 +182,7 @@ public class AkkaRpcService implements RpcService {
timeout,
maximumFramesize,
null,
null,
() -> fencingToken);
});
}
......@@ -177,13 +191,14 @@ public class AkkaRpcService implements RpcService {
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
CompletableFuture<Boolean> terminationFuture = new CompletableFuture<>();
CompletableFuture<Void> internalTerminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
} else {
akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
}
ActorRef actorRef;
......@@ -191,12 +206,12 @@ public class AkkaRpcService implements RpcService {
synchronized (lock) {
checkState(!stopped, "RpcService is stopped");
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.add(actorRef);
actors.put(actorRef, rpcEndpoint);
}
LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
......@@ -208,30 +223,32 @@ public class AkkaRpcService implements RpcService {
Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaGateway.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
address,
akkaAddress,
hostname,
actorRef,
timeout,
maximumFramesize,
terminationFuture,
internalTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler = new AkkaInvocationHandler(
address,
akkaAddress,
hostname,
actorRef,
timeout,
maximumFramesize,
terminationFuture);
terminationFuture,
internalTerminationFuture);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
......@@ -250,15 +267,16 @@ public class AkkaRpcService implements RpcService {
@Override
public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) {
if (rpcServer instanceof AkkaGateway) {
if (rpcServer instanceof AkkaBasedEndpoint) {
InvocationHandler fencedInvocationHandler = new FencedAkkaInvocationHandler<>(
rpcServer.getAddress(),
rpcServer.getHostname(),
((AkkaGateway) rpcServer).getRpcEndpoint(),
((AkkaBasedEndpoint) rpcServer).getActorRef(),
timeout,
maximumFramesize,
null,
null,
() -> fencingToken);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
......@@ -268,7 +286,7 @@ public class AkkaRpcService implements RpcService {
return (RpcServer) Proxy.newProxyInstance(
classLoader,
new Class<?>[]{RpcServer.class, AkkaGateway.class},
new Class<?>[]{RpcServer.class, AkkaBasedEndpoint.class},
fencedInvocationHandler);
} else {
throw new RuntimeException("The given RpcServer must implement the AkkaGateway in order to fence it.");
......@@ -277,24 +295,45 @@ public class AkkaRpcService implements RpcService {
@Override
public void stopServer(RpcServer selfGateway) {
if (selfGateway instanceof AkkaGateway) {
AkkaGateway akkaClient = (AkkaGateway) selfGateway;
if (selfGateway instanceof AkkaBasedEndpoint) {
AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
boolean fromThisService;
synchronized (lock) {
if (stopped) {
return;
} else {
fromThisService = actors.remove(akkaClient.getRpcEndpoint());
fromThisService = actors.remove(akkaClient.getActorRef()) != null;
}
}
if (fromThisService) {
ActorRef selfActorRef = akkaClient.getRpcEndpoint();
ActorRef selfActorRef = akkaClient.getActorRef();
LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path());
actorSystem.stop(selfActorRef);
CompletableFuture<Boolean> akkaTerminationFuture = FutureUtils.toJava(
Patterns.gracefulStop(
selfActorRef,
FutureUtils.toFiniteDuration(timeout),
Kill.getInstance()));
akkaTerminationFuture
.thenCombine(
akkaClient.getInternalTerminationFuture(),
(Boolean terminated, Void ignored) -> true)
.whenComplete(
(Boolean terminated, Throwable throwable) -> {
if (throwable != null) {
LOG.debug("Graceful RPC endpoint shutdown failed.", throwable);
actorSystem.stop(selfActorRef);
selfGateway.getTerminationFuture().completeExceptionally(throwable);
} else {
selfGateway.getTerminationFuture().complete(null);
}
});
} else {
LOG.debug("RPC endpoint {} already stopped or from different RPC service");
LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress());
}
}
}
......@@ -303,6 +342,8 @@ public class AkkaRpcService implements RpcService {
public void stopService() {
LOG.info("Stopping Akka RPC service.");
final List<RpcEndpoint> actorsToTerminate;
synchronized (lock) {
if (stopped) {
return;
......@@ -311,11 +352,36 @@ public class AkkaRpcService implements RpcService {
stopped = true;
actorSystem.shutdown();
actorsToTerminate = new ArrayList<>(actors.values());
actors.clear();
}
actorSystem.awaitTermination();
// complete the termination futures of all actors
for (RpcEndpoint rpcEndpoint : actorsToTerminate) {
final CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();
AkkaBasedEndpoint akkaBasedEndpoint = rpcEndpoint.getSelfGateway(AkkaBasedEndpoint.class);
CompletableFuture<Void> internalTerminationFuture = akkaBasedEndpoint.getInternalTerminationFuture();
internalTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(true);
}
});
// make sure that if the internal termination futures haven't completed yet, then they time out
internalTerminationFuture.completeExceptionally(
new TimeoutException("The RpcEndpoint " + rpcEndpoint.getAddress() + " did not terminate in time."));
}
LOG.info("Stopped Akka RPC service.");
}
......@@ -331,6 +397,7 @@ public class AkkaRpcService implements RpcService {
return actorSystem.dispatcher();
}
@Override
public ScheduledExecutor getScheduledExecutor() {
return internalScheduledExecutor;
}
......
......@@ -60,9 +60,10 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Void> terminationFuture,
@Nullable CompletableFuture<Boolean> terminationFuture,
@Nullable CompletableFuture<Void> internalTerminationFuture,
Supplier<F> fencingTokenSupplier) {
super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, internalTerminationFuture);
this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
}
......@@ -84,11 +85,11 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
checkNotNull(runnable, "runnable");
if (isLocal) {
getRpcEndpoint().tell(
getActorRef().tell(
new UnfencedMessage<>(new RunAsync(runnable, 0L)), ActorRef.noSender());
} else {
throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
getRpcEndpoint().path() + ". This is not supported.");
getActorRef().path() + ". This is not supported.");
}
}
......@@ -101,14 +102,14 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
@SuppressWarnings("unchecked")
CompletableFuture<V> resultFuture = (CompletableFuture<V>) FutureUtils.toJava(
Patterns.ask(
getRpcEndpoint(),
getActorRef(),
new UnfencedMessage<>(new CallAsync(callable)),
timeout.toMilliseconds()));
return resultFuture;
} else {
throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
getRpcEndpoint().path() + ". This is not supported.");
getActorRef().path() + ". This is not supported.");
}
}
......
......@@ -38,8 +38,8 @@ import java.util.concurrent.CompletableFuture;
*/
public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> terminationFuture) {
super(rpcEndpoint, terminationFuture);
public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> internalTerminationFuture) {
super(rpcEndpoint, internalTerminationFuture);
}
@Override
......
......@@ -163,7 +163,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
}
// export the termination future for caller to know it is terminated
public CompletableFuture<Void> getTerminationFuture() {
public CompletableFuture<Boolean> getTerminationFuture() {
return taskManager.getTerminationFuture();
}
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
......@@ -35,6 +36,7 @@ import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
......@@ -174,7 +176,7 @@ public class AkkaRpcActorTest extends TestLogger {
final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
rpcEndpoint.start();
CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();
assertFalse(terminationFuture.isDone());
......@@ -235,7 +237,7 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.shutDown();
CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();
try {
terminationFuture.get();
......@@ -254,13 +256,37 @@ public class AkkaRpcActorTest extends TestLogger {
simpleRpcEndpoint.shutDown();
CompletableFuture<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
CompletableFuture<Boolean> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
// check that we executed the postStop method in the main thread, otherwise an exception
// would be thrown here.
terminationFuture.get();
}
/**
* Tests that actors are properly terminated when the AkkaRpcService is shut down.
*/
@Test
public void testActorTerminationWhenServiceShutdown() throws Exception {
final ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem();
final RpcService rpcService = new AkkaRpcService(rpcActorSystem, timeout);
try {
SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint(rpcService, SimpleRpcEndpoint.class.getSimpleName());
rpcEndpoint.start();
CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();
rpcService.stopService();
terminationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
} finally {
rpcActorSystem.shutdown();
rpcActorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
}
}
// ------------------------------------------------------------------------
// Test Actors and Interfaces
// ------------------------------------------------------------------------
......
......@@ -18,13 +18,14 @@
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorSystem;
import org.junit.AfterClass;
import org.junit.Test;
......@@ -49,13 +50,16 @@ public class AkkaRpcServiceTest extends TestLogger {
private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
private static final Time timeout = Time.milliseconds(10000);
private static AkkaRpcService akkaRpcService =
new AkkaRpcService(actorSystem, Time.milliseconds(10000));
new AkkaRpcService(actorSystem, timeout);
@AfterClass
public static void shutdown() {
akkaRpcService.stopService();
actorSystem.shutdown();
actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
}
// ------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册