未验证 提交 63641cd0 编写于 作者: Y yanghua 提交者: Till Rohrmann

[FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor

上级 21043356
...@@ -19,16 +19,14 @@ ...@@ -19,16 +19,14 @@
package org.apache.flink.runtime.metrics; package org.apache.flink.runtime.metrics;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -171,10 +169,7 @@ public class MetricRegistryConfiguration { ...@@ -171,10 +169,7 @@ public class MetricRegistryConfiguration {
} }
} }
final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE); final long maximumFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
// padding to account for serialization overhead // padding to account for serialization overhead
final long messageSizeLimitPadding = 256; final long messageSizeLimitPadding = 256;
......
...@@ -70,6 +70,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; ...@@ -70,6 +70,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
...@@ -262,8 +263,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { ...@@ -262,8 +263,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
// bring up all the RPC services // bring up all the RPC services
LOG.info("Starting RPC Service(s)"); LOG.info("Starting RPC Service(s)");
AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
// we always need the 'commonRpcService' for auxiliary calls // we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(configuration, rpcTimeout, false, null); commonRpcService = createRpcService(akkaRpcServiceConfig, false, null);
// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem( metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
...@@ -290,12 +292,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { ...@@ -290,12 +292,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress); jobManagerRpcService = createRpcService(akkaRpcServiceConfig, true, jobManagerBindAddress);
resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress); resourceManagerRpcService = createRpcService(akkaRpcServiceConfig, true, resourceManagerBindAddress);
for (int i = 0; i < numTaskManagers; i++) { for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = createRpcService( taskManagerRpcServices[i] = createRpcService(akkaRpcServiceConfig, true, taskManagerBindAddress);
configuration, rpcTimeout, true, taskManagerBindAddress);
} }
this.jobManagerRpcService = jobManagerRpcService; this.jobManagerRpcService = jobManagerRpcService;
...@@ -742,9 +743,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { ...@@ -742,9 +743,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
/** /**
* Factory method to instantiate the RPC service. * Factory method to instantiate the RPC service.
* *
* @param configuration * @param akkaRpcServiceConfig
* The configuration of the mini cluster
* @param askTimeout
* The default RPC timeout for asynchronous "ask" requests. * The default RPC timeout for asynchronous "ask" requests.
* @param remoteEnabled * @param remoteEnabled
* True, if the RPC service should be reachable from other (remote) RPC services. * True, if the RPC service should be reachable from other (remote) RPC services.
...@@ -754,24 +753,23 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { ...@@ -754,24 +753,23 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
* @return The instantiated RPC service * @return The instantiated RPC service
*/ */
protected RpcService createRpcService( protected RpcService createRpcService(
Configuration configuration, AkkaRpcServiceConfiguration akkaRpcServiceConfig,
Time askTimeout,
boolean remoteEnabled, boolean remoteEnabled,
String bindAddress) { String bindAddress) {
final Config akkaConfig; final Config akkaConfig;
if (remoteEnabled) { if (remoteEnabled) {
akkaConfig = AkkaUtils.getAkkaConfig(configuration, bindAddress, 0); akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration(), bindAddress, 0);
} else { } else {
akkaConfig = AkkaUtils.getAkkaConfig(configuration); akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration());
} }
final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig); final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig);
final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig); final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig);
return new AkkaRpcService(actorSystem, askTimeout); return new AkkaRpcService(actorSystem, akkaRpcServiceConfig);
} }
protected ResourceManagerRunner startResourceManager( protected ResourceManagerRunner startResourceManager(
......
...@@ -27,12 +27,14 @@ import org.apache.flink.runtime.rpc.RpcServer; ...@@ -27,12 +27,14 @@ import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync; import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation; import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation; import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync; import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.pattern.Patterns; import akka.pattern.Patterns;
...@@ -48,6 +50,7 @@ import java.lang.reflect.Method; ...@@ -48,6 +50,7 @@ import java.lang.reflect.Method;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
...@@ -203,14 +206,29 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc ...@@ -203,14 +206,29 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
tell(rpcInvocation); tell(rpcInvocation);
result = null; result = null;
} else if (Objects.equals(returnType, CompletableFuture.class)) { } else {
// execute an asynchronous call // execute an asynchronous call
result = ask(rpcInvocation, futureTimeout); CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
if (o instanceof SerializedValue) {
try {
return ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException("Could not deserialize the serialized payload of RPC method : "
+ methodName, e));
}
} else { } else {
// execute a synchronous call return o;
CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout); }
});
result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit()); if (!Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
} else {
result = completableFuture;
}
} }
return result; return result;
......
...@@ -33,7 +33,9 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation; ...@@ -33,7 +33,9 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage; import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync; import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Status; import akka.actor.Status;
...@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit; ...@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise; import scala.concurrent.impl.Promise;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
/** /**
...@@ -85,13 +88,22 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { ...@@ -85,13 +88,22 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
private final int version; private final int version;
private final long maximumFramesize;
private State state; private State state;
AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) { AkkaRpcActor(
final T rpcEndpoint,
final CompletableFuture<Boolean> terminationFuture,
final int version,
final long maximumFramesize) {
checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.terminationFuture = checkNotNull(terminationFuture); this.terminationFuture = checkNotNull(terminationFuture);
this.version = version; this.version = version;
this.maximumFramesize = maximumFramesize;
this.state = State.STOPPED; this.state = State.STOPPED;
} }
...@@ -254,6 +266,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { ...@@ -254,6 +266,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
return; return;
} }
boolean remoteSender = isRemoteSender();
final String methodName = rpcMethod.getName();
if (result instanceof CompletableFuture) { if (result instanceof CompletableFuture) {
final CompletableFuture<?> future = (CompletableFuture<?>) result; final CompletableFuture<?> future = (CompletableFuture<?>) result;
Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>(); Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
...@@ -263,14 +278,33 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { ...@@ -263,14 +278,33 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
if (throwable != null) { if (throwable != null) {
promise.failure(throwable); promise.failure(throwable);
} else { } else {
if (!remoteSender) {
promise.success(value); promise.success(value);
} else {
Either<SerializedValue, AkkaRpcException> serializedResult =
serializeRemoteResultAndVerifySize(value, methodName);
if (serializedResult.isLeft()) {
promise.success(serializedResult.left());
} else {
promise.failure(serializedResult.right());
}
}
} }
}); });
Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender()); Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
} else { } else {
// tell the sender the result of the computation if (!remoteSender) {
getSender().tell(new Status.Success(result), getSelf()); getSender().tell(result, getSelf());
} else {
Either<SerializedValue, AkkaRpcException> serializedResult =
serializeRemoteResultAndVerifySize(result, methodName);
if (serializedResult.isLeft()) {
getSender().tell(new Status.Success(serializedResult.left()), getSelf());
} else {
getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
}
}
} }
} }
} catch (Throwable e) { } catch (Throwable e) {
...@@ -281,6 +315,29 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { ...@@ -281,6 +315,29 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
} }
} }
protected boolean isRemoteSender() {
return !getSender().path().address().hasLocalScope();
}
private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
Object result, String methodName) {
try {
SerializedValue serializedResult = new SerializedValue(result);
long resultSize = serializedResult.getByteArray().length;
if (resultSize > maximumFramesize) {
return Either.Right(new AkkaRpcException(
"The method " + methodName + "'s result size " + resultSize
+ " exceeds the maximum size " + maximumFramesize + " ."));
} else {
return Either.Left(serializedResult);
}
} catch (IOException e) {
return Either.Right(new AkkaRpcException(
"Failed to serialize the result for RPC call : " + methodName + ".", e));
}
}
/** /**
* Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable} * Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable}
* in the context of the actor thread. * in the context of the actor thread.
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rpc.akka; package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
...@@ -87,18 +86,14 @@ public class AkkaRpcService implements RpcService { ...@@ -87,18 +86,14 @@ public class AkkaRpcService implements RpcService {
static final int VERSION = 1; static final int VERSION = 1;
static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
private final Object lock = new Object(); private final Object lock = new Object();
private final ActorSystem actorSystem; private final ActorSystem actorSystem;
private final Time timeout; private final AkkaRpcServiceConfiguration configuration;
@GuardedBy("lock") @GuardedBy("lock")
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4); private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);
private final long maximumFramesize;
private final String address; private final String address;
private final int port; private final int port;
...@@ -108,16 +103,9 @@ public class AkkaRpcService implements RpcService { ...@@ -108,16 +103,9 @@ public class AkkaRpcService implements RpcService {
private volatile boolean stopped; private volatile boolean stopped;
public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
this.actorSystem = checkNotNull(actorSystem, "actor system"); this.actorSystem = checkNotNull(actorSystem, "actor system");
this.timeout = checkNotNull(timeout, "timeout"); this.configuration = checkNotNull(configuration, "akka rpc service configuration");
if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
} else {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem); Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
...@@ -174,8 +162,8 @@ public class AkkaRpcService implements RpcService { ...@@ -174,8 +162,8 @@ public class AkkaRpcService implements RpcService {
addressHostname.f0, addressHostname.f0,
addressHostname.f1, addressHostname.f1,
actorRef, actorRef,
timeout, configuration.getTimeout(),
maximumFramesize, configuration.getMaximumFramesize(),
null); null);
}); });
} }
...@@ -193,8 +181,8 @@ public class AkkaRpcService implements RpcService { ...@@ -193,8 +181,8 @@ public class AkkaRpcService implements RpcService {
addressHostname.f0, addressHostname.f0,
addressHostname.f1, addressHostname.f1,
actorRef, actorRef,
timeout, configuration.getTimeout(),
maximumFramesize, configuration.getMaximumFramesize(),
null, null,
() -> fencingToken); () -> fencingToken);
}); });
...@@ -208,9 +196,19 @@ public class AkkaRpcService implements RpcService { ...@@ -208,9 +196,19 @@ public class AkkaRpcService implements RpcService {
final Props akkaRpcActorProps; final Props akkaRpcActorProps;
if (rpcEndpoint instanceof FencedRpcEndpoint) { if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion()); akkaRpcActorProps = Props.create(
FencedAkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
} else { } else {
akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion()); akkaRpcActorProps = Props.create(
getAkkaRpcActorClass(),
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
} }
ActorRef actorRef; ActorRef actorRef;
...@@ -245,8 +243,8 @@ public class AkkaRpcService implements RpcService { ...@@ -245,8 +243,8 @@ public class AkkaRpcService implements RpcService {
akkaAddress, akkaAddress,
hostname, hostname,
actorRef, actorRef,
timeout, configuration.getTimeout(),
maximumFramesize, configuration.getMaximumFramesize(),
terminationFuture, terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken); ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
...@@ -256,8 +254,8 @@ public class AkkaRpcService implements RpcService { ...@@ -256,8 +254,8 @@ public class AkkaRpcService implements RpcService {
akkaAddress, akkaAddress,
hostname, hostname,
actorRef, actorRef,
timeout, configuration.getTimeout(),
maximumFramesize, configuration.getMaximumFramesize(),
terminationFuture); terminationFuture);
} }
...@@ -283,8 +281,8 @@ public class AkkaRpcService implements RpcService { ...@@ -283,8 +281,8 @@ public class AkkaRpcService implements RpcService {
rpcServer.getAddress(), rpcServer.getAddress(),
rpcServer.getHostname(), rpcServer.getHostname(),
((AkkaBasedEndpoint) rpcServer).getActorRef(), ((AkkaBasedEndpoint) rpcServer).getActorRef(),
timeout, configuration.getTimeout(),
maximumFramesize, configuration.getMaximumFramesize(),
null, null,
() -> fencingToken); () -> fencingToken);
...@@ -392,6 +390,10 @@ public class AkkaRpcService implements RpcService { ...@@ -392,6 +390,10 @@ public class AkkaRpcService implements RpcService {
return FutureUtils.toJava(scalaFuture); return FutureUtils.toJava(scalaFuture);
} }
protected Class getAkkaRpcActorClass() {
return AkkaRpcActor.class;
}
// --------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------
// Private helper methods // Private helper methods
// --------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------
...@@ -421,7 +423,7 @@ public class AkkaRpcService implements RpcService { ...@@ -421,7 +423,7 @@ public class AkkaRpcService implements RpcService {
final ActorSelection actorSel = actorSystem.actorSelection(address); final ActorSelection actorSel = actorSystem.actorSelection(address);
final Future<ActorIdentity> identify = Patterns final Future<ActorIdentity> identify = Patterns
.ask(actorSel, new Identify(42), timeout.toMilliseconds()) .ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class)); .<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify); final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
...@@ -438,7 +440,7 @@ public class AkkaRpcService implements RpcService { ...@@ -438,7 +440,7 @@ public class AkkaRpcService implements RpcService {
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose( final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava( (ActorRef actorRef) -> FutureUtils.toJava(
Patterns Patterns
.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds()) .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class)))); .<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
return actorRefFuture.thenCombineAsync( return actorRefFuture.thenCombineAsync(
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Configuration object for {@link AkkaRpcService}.
*/
public class AkkaRpcServiceConfiguration {
private final Time timeout;
private final long maximumFramesize;
private final Configuration configuration;
public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
checkNotNull(timeout);
checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
this.timeout = timeout;
this.maximumFramesize = maximumFramesize;
this.configuration = configuration;
}
public Time getTimeout() {
return timeout;
}
public long getMaximumFramesize() {
return maximumFramesize;
}
public Configuration getConfiguration() {
return configuration;
}
public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
FiniteDuration duration = AkkaUtils.getTimeout(configuration);
Time timeout = Time.of(duration.length(), duration.unit());
long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
}
public static AkkaRpcServiceConfiguration defaultConfiguration() {
return fromConfiguration(new Configuration());
}
}
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rpc.akka; package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
...@@ -55,6 +55,12 @@ public class AkkaRpcServiceUtils { ...@@ -55,6 +55,12 @@ public class AkkaRpcServiceUtils {
private static final String AKKA_TCP = "akka.tcp"; private static final String AKKA_TCP = "akka.tcp";
private static final String AKKA_SSL_TCP = "akka.ssl.tcp"; private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
"akka {remote {netty.tcp {maximum-frame-size = %s}}}";
private static final String MAXIMUM_FRAME_SIZE_PATH =
"akka.remote.netty.tcp.maximum-frame-size";
private static final AtomicLong nextNameOffset = new AtomicLong(0L); private static final AtomicLong nextNameOffset = new AtomicLong(0L);
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -99,8 +105,7 @@ public class AkkaRpcServiceUtils { ...@@ -99,8 +105,7 @@ public class AkkaRpcServiceUtils {
@Nonnull @Nonnull
private static RpcService instantiateAkkaRpcService(Configuration configuration, ActorSystem actorSystem) { private static RpcService instantiateAkkaRpcService(Configuration configuration, ActorSystem actorSystem) {
final Time timeout = AkkaUtils.getTimeoutAsTime(configuration); return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
return new AkkaRpcService(actorSystem, timeout);
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -199,6 +204,17 @@ public class AkkaRpcServiceUtils { ...@@ -199,6 +204,17 @@ public class AkkaRpcServiceUtils {
return prefix + '_' + nameOffset; return prefix + '_' + nameOffset;
} }
// ------------------------------------------------------------------------
// RPC service configuration
// ------------------------------------------------------------------------
public static long extractMaximumFramesize(Configuration configuration) {
String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr);
Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH);
}
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
/** This class is not meant to be instantiated. */ /** This class is not meant to be instantiated. */
......
...@@ -39,8 +39,13 @@ import java.util.concurrent.CompletableFuture; ...@@ -39,8 +39,13 @@ import java.util.concurrent.CompletableFuture;
*/ */
public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> { public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) { public FencedAkkaRpcActor(
super(rpcEndpoint, terminationFuture, version); T rpcEndpoint,
CompletableFuture<Boolean> terminationFuture,
int version,
final long maximumFramesize) {
super(rpcEndpoint, terminationFuture, version, maximumFramesize);
} }
@Override @Override
......
...@@ -96,6 +96,7 @@ import org.apache.flink.runtime.rpc.RpcService; ...@@ -96,6 +96,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle;
...@@ -265,8 +266,9 @@ public class JobMasterTest extends TestLogger { ...@@ -265,8 +266,9 @@ public class JobMasterTest extends TestLogger {
final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem(); final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout); AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout); rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>(); final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>();
......
...@@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate ...@@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
...@@ -85,7 +86,7 @@ public class SlotPoolRpcTest extends TestLogger { ...@@ -85,7 +86,7 @@ public class SlotPoolRpcTest extends TestLogger {
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
rpcService = new AkkaRpcService(actorSystem, Time.seconds(10)); rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
} }
@AfterClass @AfterClass
......
...@@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; ...@@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
...@@ -61,7 +62,7 @@ public class AsyncCallsTest extends TestLogger { ...@@ -61,7 +62,7 @@ public class AsyncCallsTest extends TestLogger {
private static final Time timeout = Time.seconds(10L); private static final Time timeout = Time.seconds(10L);
private static final AkkaRpcService akkaRpcService = private static final AkkaRpcService akkaRpcService =
new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
@AfterClass @AfterClass
public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
......
...@@ -20,11 +20,12 @@ package org.apache.flink.runtime.rpc; ...@@ -20,11 +20,12 @@ package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
...@@ -59,7 +60,9 @@ public class RpcConnectionTest extends TestLogger { ...@@ -59,7 +60,9 @@ public class RpcConnectionTest extends TestLogger {
// we start the RPC service with a very long timeout to ensure that the test // we start the RPC service with a very long timeout to ensure that the test
// can only pass if the connection problem is not recognized merely via a timeout // can only pass if the connection problem is not recognized merely via a timeout
rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS)); Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class); CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
......
...@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; ...@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
...@@ -49,7 +50,7 @@ public class RpcEndpointTest extends TestLogger { ...@@ -49,7 +50,7 @@ public class RpcEndpointTest extends TestLogger {
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
actorSystem = AkkaUtils.createDefaultActorSystem(); actorSystem = AkkaUtils.createDefaultActorSystem();
rpcService = new AkkaRpcService(actorSystem, TIMEOUT); rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
} }
@AfterClass @AfterClass
......
...@@ -18,13 +18,13 @@ ...@@ -18,13 +18,13 @@
package org.apache.flink.runtime.rpc; package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
...@@ -91,8 +91,11 @@ public class RpcSSLAuthITCase extends TestLogger { ...@@ -91,8 +91,11 @@ public class RpcSSLAuthITCase extends TestLogger {
// we start the RPC service with a very long timeout to ensure that the test // we start the RPC service with a very long timeout to ensure that the test
// can only pass if the connection problem is not recognized merely via a timeout // can only pass if the connection problem is not recognized merely via a timeout
rpcService1 = new AkkaRpcService(actorSystem1, Time.of(10000000, TimeUnit.SECONDS)); Configuration configuration = new Configuration();
rpcService2 = new AkkaRpcService(actorSystem2, Time.of(10000000, TimeUnit.SECONDS)); configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
TestEndpoint endpoint = new TestEndpoint(rpcService1); TestEndpoint endpoint = new TestEndpoint(rpcService1);
endpoint.start(); endpoint.start();
......
...@@ -18,11 +18,11 @@ ...@@ -18,11 +18,11 @@
package org.apache.flink.runtime.rpc; package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -66,7 +66,8 @@ public class TestingRpcService extends AkkaRpcService { ...@@ -66,7 +66,8 @@ public class TestingRpcService extends AkkaRpcService {
* Creates a new {@code TestingRpcService}, using the given configuration. * Creates a new {@code TestingRpcService}, using the given configuration.
*/ */
public TestingRpcService(Configuration configuration) { public TestingRpcService(Configuration configuration) {
super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10)); super(AkkaUtils.createLocalActorSystem(configuration),
AkkaRpcServiceConfiguration.fromConfiguration(configuration));
this.registeredConnections = new ConcurrentHashMap<>(); this.registeredConnections = new ConcurrentHashMap<>();
} }
......
...@@ -60,9 +60,11 @@ public class AkkaRpcActorHandshakeTest extends TestLogger { ...@@ -60,9 +60,11 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem(); final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout); AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.defaultConfiguration();
akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout); akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, timeout); akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(
wrongVersionActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
} }
@AfterClass @AfterClass
...@@ -135,8 +137,8 @@ public class AkkaRpcActorHandshakeTest extends TestLogger { ...@@ -135,8 +137,8 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
private static class WrongVersionAkkaRpcService extends AkkaRpcService { private static class WrongVersionAkkaRpcService extends AkkaRpcService {
WrongVersionAkkaRpcService(ActorSystem actorSystem, Time timeout) { WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration configuration) {
super(actorSystem, timeout); super(actorSystem, configuration);
} }
@Override @Override
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.apache.flink.runtime.rpc.akka; package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcEndpoint;
...@@ -38,6 +40,9 @@ import org.junit.AfterClass; ...@@ -38,6 +40,9 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -144,6 +149,92 @@ public class AkkaRpcActorTest extends TestLogger { ...@@ -144,6 +149,92 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.shutDown(); rpcEndpoint.shutDown();
} }
@Test
public void testOversizedResponseMsg() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
OversizedResponseRpcEndpoint rpcEndpoint = null;
ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
try {
rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
rpcEndpoint.start();
CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
CompletableFuture<String> result = rpcGateway.calculate();
result.get(timeout.getSize(), timeout.getUnit());
fail("Expected an AkkaRpcException.");
} catch (Exception e) {
assertTrue(e.getCause() instanceof IOException);
} finally {
if (rpcEndpoint != null) {
rpcEndpoint.shutDown();
}
final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
terminationFutures.add(rpcService1.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
terminationFutures.add(rpcService2.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
FutureUtils
.waitForAll(terminationFutures)
.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}
@Test
public void testNonOversizedResponseMsg() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
OversizedResponseRpcEndpoint rpcEndpoint = null;
ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
try {
rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
rpcEndpoint.start();
CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
CompletableFuture<String> result = rpcGateway.calculate();
String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
assertEquals("hello world", actualTxt);
} finally {
if (rpcEndpoint != null) {
rpcEndpoint.shutDown();
}
final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
terminationFutures.add(rpcService1.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
terminationFutures.add(rpcService2.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
FutureUtils
.waitForAll(terminationFutures)
.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}
/** /**
* Tests that we can wait for a RpcEndpoint to terminate. * Tests that we can wait for a RpcEndpoint to terminate.
* *
...@@ -248,7 +339,8 @@ public class AkkaRpcActorTest extends TestLogger { ...@@ -248,7 +339,8 @@ public class AkkaRpcActorTest extends TestLogger {
@Test @Test
public void testActorTerminationWhenServiceShutdown() throws Exception { public void testActorTerminationWhenServiceShutdown() throws Exception {
final ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem(); final ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem();
final RpcService rpcService = new AkkaRpcService(rpcActorSystem, timeout); final RpcService rpcService = new AkkaRpcService(
rpcActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
try { try {
SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint(rpcService, SimpleRpcEndpoint.class.getSimpleName()); SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint(rpcService, SimpleRpcEndpoint.class.getSimpleName());
...@@ -429,4 +521,27 @@ public class AkkaRpcActorTest extends TestLogger { ...@@ -429,4 +521,27 @@ public class AkkaRpcActorTest extends TestLogger {
return postStopFuture; return postStopFuture;
} }
} }
// -------------------------------------------------------------------------
interface OversizedResponseMsgRpcGateway extends RpcGateway {
CompletableFuture<String> calculate();
}
static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
private volatile String txt;
public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
super(rpcService);
this.txt = txt;
}
@Override
public CompletableFuture<String> calculate() {
return CompletableFuture.completedFuture(txt);
}
}
} }
...@@ -56,7 +56,7 @@ public class AkkaRpcServiceTest extends TestLogger { ...@@ -56,7 +56,7 @@ public class AkkaRpcServiceTest extends TestLogger {
private static final Time TIMEOUT = Time.milliseconds(10000L); private static final Time TIMEOUT = Time.milliseconds(10000L);
private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, TIMEOUT); private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, AkkaRpcServiceConfiguration.defaultConfiguration());
@AfterClass @AfterClass
public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
...@@ -136,7 +136,8 @@ public class AkkaRpcServiceTest extends TestLogger { ...@@ -136,7 +136,8 @@ public class AkkaRpcServiceTest extends TestLogger {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTerminationFuture() throws Exception { public void testTerminationFuture() throws Exception {
final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000)); final AkkaRpcService rpcService = new AkkaRpcService(
actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture(); CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rpc.akka; package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcEndpoint;
...@@ -46,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger { ...@@ -46,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
// actual test // actual test
AkkaRpcService akkaRpcService = new AkkaRpcService( AkkaRpcService akkaRpcService = new AkkaRpcService(
AkkaUtils.createDefaultActorSystem(), AkkaUtils.createDefaultActorSystem(),
Time.milliseconds(10000)); AkkaRpcServiceConfiguration.defaultConfiguration());
try { try {
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.apache.flink.runtime.rpc.akka; package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcEndpoint;
...@@ -27,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService; ...@@ -27,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import org.hamcrest.core.Is; import org.hamcrest.core.Is;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
...@@ -60,14 +60,15 @@ public class MessageSerializationTest extends TestLogger { ...@@ -60,14 +60,15 @@ public class MessageSerializationTest extends TestLogger {
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
Config akkaConfig = AkkaUtils.getDefaultAkkaConfig(); Configuration configuration = new Configuration();
Config modifiedAkkaConfig = akkaConfig.withValue(AkkaRpcService.MAXIMUM_FRAME_SIZE_PATH, ConfigValueFactory.fromAnyRef(maxFrameSize + "b")); configuration.setString(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig); actorSystem1 = AkkaUtils.createDefaultActorSystem();
actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig); actorSystem2 = AkkaUtils.createDefaultActorSystem();
akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout); AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout); akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
} }
@AfterClass @AfterClass
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* RPC sync invoke test.
*/
public class SyncCallsTest extends TestLogger {
// ------------------------------------------------------------------------
// shared test members
// ------------------------------------------------------------------------
private static final Time timeout = Time.seconds(10L);
private static ActorSystem actorSystem1;
private static ActorSystem actorSystem2;
private static AkkaRpcService akkaRpcService1;
private static AkkaRpcService akkaRpcService2;
@BeforeClass
public static void setup() {
Configuration configuration = new Configuration();
actorSystem1 = AkkaUtils.createDefaultActorSystem();
actorSystem2 = AkkaUtils.createDefaultActorSystem();
AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
}
@AfterClass
public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
terminationFutures.add(akkaRpcService1.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
terminationFutures.add(akkaRpcService2.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
FutureUtils
.waitForAll(terminationFutures)
.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
@Test
public void testSimpleLocalSyncCall() throws Exception {
RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
rpcEndpoint.start();
try {
DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
int actualResult = gateway.foobar();
assertEquals(1234, actualResult);
} finally {
rpcEndpoint.shutDown();
}
}
@Test
public void testSimpleRemoteSyncCall() throws Exception {
RpcEndpoint rpcEndpoint = null;
try {
rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
rpcEndpoint.start();
CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
int actualResult = rpcGateway.foobar();
assertEquals(1234, actualResult);
} finally {
if (rpcEndpoint != null) {
rpcEndpoint.shutDown();
}
}
}
@Test
public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
OversizedMsgRpcEndpoint rpcEndpoint = null;
ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
try {
rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
rpcEndpoint.start();
CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
String result = rpcGateway.response();
fail("Expected an AkkaRpcException.");
} catch (Exception e) {
assertTrue(e.getCause() instanceof IOException);
} finally {
if (rpcEndpoint != null) {
rpcEndpoint.shutDown();
}
final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
terminationFutures.add(rpcService1.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
terminationFutures.add(rpcService2.stopService());
terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
FutureUtils
.waitForAll(terminationFutures)
.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}
/**
* A dummy rpc gateway.
*/
public interface DummyRpcGateway extends RpcGateway {
int foobar();
}
/**
* A dummy rpc endpoint.
*/
public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
DummyRpcEndpoint(RpcService rpcService) {
super(rpcService);
}
@Override
public int foobar() {
return 1234;
}
@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
}
/**
* Oversized message rpc gateway.
*/
private interface OversizedMsgRpcGateway extends RpcGateway {
String response();
}
/**
* Oversized message rpc endpoint.
*/
private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
private String txt;
public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
super(rpcService);
this.txt = txt;
}
@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
@Override
public String response() {
return this.txt;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册