提交 8c110561 编写于 作者: T Till Rohrmann 提交者: Stephan Ewen

[FLINK-4346] [rpc] Add new RPC abstraction

上级 5b4e3d88
......@@ -202,6 +202,11 @@ under the License.
<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
</dependencies>
<build>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
import akka.util.Timeout;
import scala.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
/**
* Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
* rpc server.
*
* This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
* implementation which allows to dispatch local procedures to the main thread of the underlying
* rpc server.
*/
public interface MainThreadExecutor {
/**
* Execute the runnable in the main thread of the underlying rpc server.
*
* @param runnable Runnable to be executed
*/
void runAsync(Runnable runnable);
/**
* Execute the callable in the main thread of the underlying rpc server and return a future for
* the callable result. If the future is not completed within the given timeout, the returned
* future will throw a {@link TimeoutException}.
*
* @param callable Callable to be executed
* @param timeout Timeout for the future to complete
* @param <V> Return value of the callable
* @return Future of the callable result
*/
<V> Future<V> callAsync(Callable<V> callable, Timeout timeout);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
import akka.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.util.concurrent.Callable;
/**
* Base class for rpc endpoints. Distributed components which offer remote procedure calls have to
* extend the rpc endpoint base class.
*
* The main idea is that a rpc endpoint is backed by a rpc server which has a single thread
* processing the rpc calls. Thus, by executing all state changing operations within the main
* thread, we don't have to reason about concurrent accesses. The rpc provides provides
* {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
* {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread.
*
* @param <C> Rpc gateway counterpart for the implementing rpc endpoint
*/
public abstract class RpcEndpoint<C extends RpcGateway> {
protected final Logger log = LoggerFactory.getLogger(getClass());
/** Rpc service to be used to start the rpc server and to obtain rpc gateways */
private final RpcService rpcService;
/** Self gateway which can be used to schedule asynchronous calls on yourself */
private C self;
/**
* The main thread execution context to be used to execute future callbacks in the main thread
* of the executing rpc server.
*
* IMPORTANT: The main thread context is only available after the rpc server has been started.
*/
private MainThreadExecutionContext mainThreadExecutionContext;
public RpcEndpoint(RpcService rpcService) {
this.rpcService = rpcService;
}
/**
* Get self-gateway which should be used to run asynchronous rpc calls on this endpoint.
*
* IMPORTANT: Always issue local method calls via the self-gateway if the current thread
* is not the main thread of the underlying rpc server, e.g. from within a future callback.
*
* @return Self gateway
*/
public C getSelf() {
return self;
}
/**
* Execute the runnable in the main thread of the underlying rpc server.
*
* @param runnable Runnable to be executed in the main thread of the underlying rpc server
*/
public void runAsync(Runnable runnable) {
((MainThreadExecutor) self).runAsync(runnable);
}
/**
* Execute the callable in the main thread of the underlying rpc server returning a future for
* the result of the callable. If the callable is not completed within the given timeout, then
* the future will be failed with a {@link java.util.concurrent.TimeoutException}.
*
* @param callable Callable to be executed in the main thread of the underlying rpc server
* @param timeout Timeout for the callable to be completed
* @param <V> Return type of the callable
* @return Future for the result of the callable.
*/
public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
return ((MainThreadExecutor) self).callAsync(callable, timeout);
}
/**
* Gets the main thread execution context. The main thread execution context can be used to
* execute tasks in the main thread of the underlying rpc server.
*
* @return Main thread execution context
*/
public ExecutionContext getMainThreadExecutionContext() {
return mainThreadExecutionContext;
}
/**
* Gets the used rpc service.
*
* @return Rpc service
*/
public RpcService getRpcService() {
return rpcService;
}
/**
* Starts the underlying rpc server via the rpc service and creates the main thread execution
* context. This makes the rpc endpoint effectively reachable from the outside.
*
* Can be overriden to add rpc endpoint specific start up code. Should always call the parent
* start method.
*/
public void start() {
self = rpcService.startServer(this);
mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
}
/**
* Shuts down the underlying rpc server via the rpc service.
*
* Can be overriden to add rpc endpoint specific shut down code. Should always call the parent
* shut down method.
*/
public void shutDown() {
rpcService.stopServer(self);
}
/**
* Gets the address of the underlying rpc server. The address should be fully qualified so that
* a remote system can connect to this rpc server via this address.
*
* @return Fully qualified address of the underlying rpc server
*/
public String getAddress() {
return rpcService.getAddress(self);
}
/**
* Execution context which executes runnables in the main thread context. A reported failure
* will cause the underlying rpc server to shut down.
*/
private class MainThreadExecutionContext implements ExecutionContext {
private final MainThreadExecutor gateway;
MainThreadExecutionContext(MainThreadExecutor gateway) {
this.gateway = gateway;
}
@Override
public void execute(Runnable runnable) {
gateway.runAsync(runnable);
}
@Override
public void reportFailure(final Throwable t) {
gateway.runAsync(new Runnable() {
@Override
public void run() {
log.error("Encountered failure in the main thread execution context.", t);
shutDown();
}
});
}
@Override
public ExecutionContext prepare() {
return this;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
/**
* Rpc gateway interface which has to be implemented by Rpc gateways.
*/
public interface RpcGateway {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation for rpc method in a {@link RpcEndpoint} implementation. Every rpc method must have a
* respective counterpart in the {@link RpcGateway} implementation for this rpc server. The
* RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of
* gateway methods in the corresponding gateway implementation are identical.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcMethod {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
import scala.concurrent.Future;
/**
* Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}.
* Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote
* procedures.
*/
public interface RpcService {
/**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
* be used to communicate with the rpc server.
*
* @param address Address of the remote rpc server
* @param clazz Class of the rpc gateway to return
* @param <C> Type of the rpc gateway to return
* @return Future containing the rpc gateway
*/
<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
/**
* Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
*
* @param rpcEndpoint Rpc protocl to dispath the rpcs to
* @param <S> Type of the rpc endpoint
* @param <C> Type of the self rpc gateway associated with the rpc server
* @return Self gateway to dispatch remote procedure calls to oneself
*/
<S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint);
/**
* Stop the underlying rpc server of the provided self gateway.
*
* @param selfGateway Self gateway describing the underlying rpc server
* @param <C> Type of the rpc gateway
*/
<C extends RpcGateway> void stopServer(C selfGateway);
/**
* Stop the rpc service shutting down all started rpc servers.
*/
void stopService();
/**
* Get the fully qualified address of the underlying rpc server represented by the self gateway.
* It must be possible to connect from a remote host to the rpc server via the returned fully
* qualified address.
*
* @param selfGateway Self gateway associated with the underlying rpc server
* @param <C> Type of the rpc gateway
* @return Fully qualified address
*/
<C extends RpcGateway> String getAddress(C selfGateway);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation for {@link RpcGateway} methods to specify an additional timeout parameter for the
* returned future to be completed. The rest of the provided parameters is passed to the remote rpc
* server for the rpc.
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcTimeout {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
/**
* Interface for Akka based rpc gateways
*/
public interface AkkaGateway {
ActorRef getActorRef();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Mapper;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
import scala.concurrent.Future;
import java.util.HashSet;
import java.util.Set;
public class AkkaRpcService implements RpcService {
private final ActorSystem actorSystem;
private final Timeout timeout;
private final Set<ActorRef> actors = new HashSet<>();
public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
this.actorSystem = actorSystem;
this.timeout = timeout;
}
@Override
public <C extends RpcGateway> Future<C> connect(String address, final Class<C> clazz) {
ActorSelection actorSel = actorSystem.actorSelection(address);
AskableActorSelection asker = new AskableActorSelection(actorSel);
Future<Object> identify = asker.ask(new Identify(42), timeout);
return identify.map(new Mapper<Object, C>(){
public C apply(Object obj) {
ActorRef actorRef = ((ActorIdentity) obj).getRef();
if (clazz == TaskExecutorGateway.class) {
return (C) new TaskExecutorAkkaGateway(actorRef, timeout);
} else if (clazz == ResourceManagerGateway.class) {
return (C) new ResourceManagerAkkaGateway(actorRef, timeout);
} else if (clazz == JobMasterGateway.class) {
return (C) new JobMasterAkkaGateway(actorRef, timeout);
} else {
throw new RuntimeException("Could not find remote endpoint " + clazz);
}
}
}, actorSystem.dispatcher());
}
@Override
public <S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint) {
ActorRef ref;
C self;
if (rpcEndpoint instanceof TaskExecutor) {
ref = actorSystem.actorOf(
Props.create(TaskExecutorAkkaActor.class, rpcEndpoint)
);
self = (C) new TaskExecutorAkkaGateway(ref, timeout);
} else if (rpcEndpoint instanceof ResourceManager) {
ref = actorSystem.actorOf(
Props.create(ResourceManagerAkkaActor.class, rpcEndpoint)
);
self = (C) new ResourceManagerAkkaGateway(ref, timeout);
} else if (rpcEndpoint instanceof JobMaster) {
ref = actorSystem.actorOf(
Props.create(JobMasterAkkaActor.class, rpcEndpoint)
);
self = (C) new JobMasterAkkaGateway(ref, timeout);
} else {
throw new RuntimeException("Could not start RPC server for class " + rpcEndpoint.getClass());
}
actors.add(ref);
return self;
}
@Override
public <C extends RpcGateway> void stopServer(C selfGateway) {
if (selfGateway instanceof AkkaGateway) {
AkkaGateway akkaClient = (AkkaGateway) selfGateway;
if (actors.contains(akkaClient.getActorRef())) {
akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
} else {
// don't stop this actor since it was not started by this RPC service
}
}
}
@Override
public void stopService() {
actorSystem.shutdown();
actorSystem.awaitTermination();
}
@Override
public <C extends RpcGateway> String getAddress(C selfGateway) {
if (selfGateway instanceof AkkaGateway) {
return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) selfGateway).getActorRef());
} else {
throw new RuntimeException("Cannot get address for non " + AkkaGateway.class.getName() + ".");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import akka.actor.Status;
import akka.actor.UntypedActor;
import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BaseAkkaActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(BaseAkkaActor.class);
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof RunnableMessage) {
try {
((RunnableMessage) message).getRunnable().run();
} catch (Exception e) {
LOG.error("Encountered error while executing runnable.", e);
}
} else if (message instanceof CallableMessage<?>) {
try {
Object result = ((CallableMessage<?>) message).getCallable().call();
sender().tell(new Status.Success(result), getSelf());
} catch (Exception e) {
sender().tell(new Status.Failure(e), getSelf());
}
} else {
throw new RuntimeException("Unknown message " + message);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.runtime.rpc.MainThreadExecutor;
import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
import scala.concurrent.Future;
import java.util.concurrent.Callable;
public abstract class BaseAkkaGateway implements MainThreadExecutor, AkkaGateway {
@Override
public void runAsync(Runnable runnable) {
getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender());
}
@Override
public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
return (Future<V>) Patterns.ask(getActorRef(), new CallableMessage(callable), timeout);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.jobmaster;
import akka.actor.ActorRef;
import akka.actor.Status;
import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
public class JobMasterAkkaActor extends BaseAkkaActor {
private final JobMaster jobMaster;
public JobMasterAkkaActor(JobMaster jobMaster) {
this.jobMaster = jobMaster;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof UpdateTaskExecutionState) {
final ActorRef sender = getSender();
UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message;
try {
Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
sender.tell(new Status.Success(result), getSelf());
} catch (Exception e) {
sender.tell(new Status.Failure(e), getSelf());
}
} else if (message instanceof RegisterAtResourceManager) {
RegisterAtResourceManager registerAtResourceManager = (RegisterAtResourceManager) message;
jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress());
} else {
super.onReceive(message);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.jobmaster;
import akka.actor.ActorRef;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;
public class JobMasterAkkaGateway extends BaseAkkaGateway implements JobMasterGateway {
private final AskableActorRef actorRef;
private final Timeout timeout;
public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
this.actorRef = new AskableActorRef(actorRef);
this.timeout = timeout;
}
@Override
public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout)
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
}
@Override
public void registerAtResourceManager(String address) {
actorRef.actorRef().tell(new RegisterAtResourceManager(address), actorRef.actorRef());
}
@Override
public ActorRef getActorRef() {
return actorRef.actorRef();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import java.util.concurrent.Callable;
public class CallableMessage<V> {
private final Callable<V> callable;
public CallableMessage(Callable<V> callable) {
this.callable = callable;
}
public Callable<V> getCallable() {
return callable;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import java.io.Serializable;
public class CancelTask implements Serializable {
private static final long serialVersionUID = -2998176874447950595L;
private final ExecutionAttemptID executionAttemptID;
public CancelTask(ExecutionAttemptID executionAttemptID) {
this.executionAttemptID = executionAttemptID;
}
public ExecutionAttemptID getExecutionAttemptID() {
return executionAttemptID;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import java.io.Serializable;
public class ExecuteTask implements Serializable {
private static final long serialVersionUID = -6769958430967048348L;
private final TaskDeploymentDescriptor taskDeploymentDescriptor;
public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
this.taskDeploymentDescriptor = taskDeploymentDescriptor;
}
public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
return taskDeploymentDescriptor;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import java.io.Serializable;
public class RegisterAtResourceManager implements Serializable {
private static final long serialVersionUID = -4175905742620903602L;
private final String address;
public RegisterAtResourceManager(String address) {
this.address = address;
}
public String getAddress() {
return address;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
import java.io.Serializable;
public class RegisterJobMaster implements Serializable{
private static final long serialVersionUID = -4616879574192641507L;
private final JobMasterRegistration jobMasterRegistration;
public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
this.jobMasterRegistration = jobMasterRegistration;
}
public JobMasterRegistration getJobMasterRegistration() {
return jobMasterRegistration;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
import java.io.Serializable;
public class RequestSlot implements Serializable {
private static final long serialVersionUID = 7207463889348525866L;
private final SlotRequest slotRequest;
public RequestSlot(SlotRequest slotRequest) {
this.slotRequest = slotRequest;
}
public SlotRequest getSlotRequest() {
return slotRequest;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
public class RunnableMessage {
private final Runnable runnable;
public RunnableMessage(Runnable runnable) {
this.runnable = runnable;
}
public Runnable getRunnable() {
return runnable;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.messages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import java.io.Serializable;
public class UpdateTaskExecutionState implements Serializable{
private static final long serialVersionUID = -6662229114427331436L;
private final TaskExecutionState taskExecutionState;
public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
this.taskExecutionState = taskExecutionState;
}
public TaskExecutionState getTaskExecutionState() {
return taskExecutionState;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.resourcemanager;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.pattern.Patterns;
import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
import scala.concurrent.Future;
public class ResourceManagerAkkaActor extends BaseAkkaActor {
private final ResourceManager resourceManager;
public ResourceManagerAkkaActor(ResourceManager resourceManager) {
this.resourceManager = resourceManager;
}
@Override
public void onReceive(Object message) throws Exception {
final ActorRef sender = getSender();
if (message instanceof RegisterJobMaster) {
RegisterJobMaster registerJobMaster = (RegisterJobMaster) message;
try {
Future<RegistrationResponse> response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
Patterns.pipe(response, getContext().dispatcher()).to(sender());
} catch (Exception e) {
sender.tell(new Status.Failure(e), getSelf());
}
} else if (message instanceof RequestSlot) {
RequestSlot requestSlot = (RequestSlot) message;
try {
SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest());
sender.tell(new Status.Success(response), getSelf());
} catch (Exception e) {
sender.tell(new Status.Failure(e), getSelf());
}
} else {
super.onReceive(message);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.resourcemanager;
import akka.actor.ActorRef;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements ResourceManagerGateway {
private final AskableActorRef actorRef;
private final Timeout timeout;
public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
this.actorRef = new AskableActorRef(actorRef);
this.timeout = timeout;
}
@Override
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout) {
return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), new Timeout(timeout))
.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
}
@Override
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout)
.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
}
@Override
public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
return actorRef.ask(new RequestSlot(slotRequest), timeout)
.mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
}
@Override
public ActorRef getActorRef() {
return actorRef.actorRef();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.taskexecutor;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.dispatch.OnComplete;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
public class TaskExecutorAkkaActor extends BaseAkkaActor {
private final TaskExecutorGateway taskExecutor;
public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void onReceive(Object message) throws Exception {
final ActorRef sender = getSender();
if (message instanceof ExecuteTask) {
ExecuteTask executeTask = (ExecuteTask) message;
taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
new OnComplete<Acknowledge>() {
@Override
public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
if (failure != null) {
sender.tell(new Status.Failure(failure), getSelf());
} else {
sender.tell(new Status.Success(Acknowledge.get()), getSelf());
}
}
},
getContext().dispatcher()
);
} else if (message instanceof CancelTask) {
CancelTask cancelTask = (CancelTask) message;
taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
new OnComplete<Acknowledge>() {
@Override
public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
if (failure != null) {
sender.tell(new Status.Failure(failure), getSelf());
} else {
sender.tell(new Status.Success(Acknowledge.get()), getSelf());
}
}
},
getContext().dispatcher()
);
} else {
super.onReceive(message);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka.taskexecutor;
import akka.actor.ActorRef;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;
public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements TaskExecutorGateway {
private final AskableActorRef actorRef;
private final Timeout timeout;
public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
this.actorRef = new AskableActorRef(actorRef);
this.timeout = timeout;
}
@Override
public Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout)
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
}
@Override
public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId) {
return actorRef.ask(new CancelTask(executionAttemptId), timeout)
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
}
@Override
public ActorRef getActorRef() {
return actorRef.actorRef();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.jobmaster;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* JobMaster implementation. The job master is responsible for the execution of a single
* {@link org.apache.flink.runtime.jobgraph.JobGraph}.
*
* It offers the following methods as part of its rpc interface to interact with the JobMaster
* remotely:
* <ul>
* <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li>
* <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
* given task</li>
* </ul>
*/
public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Execution context for future callbacks */
private final ExecutionContext executionContext;
/** Execution context for scheduled runnables */
private final ScheduledExecutorService scheduledExecutorService;
private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS);
private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
private final long failedRegistrationDelay = 10000;
/** Gateway to connected resource manager, null iff not connected */
private ResourceManagerGateway resourceManager = null;
/** UUID to filter out old registration runs */
private UUID currentRegistrationRun;
public JobMaster(RpcService rpcService, ExecutorService executorService) {
super(rpcService);
executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
}
public ResourceManagerGateway getResourceManager() {
return resourceManager;
}
//----------------------------------------------------------------------------------------------
// RPC methods
//----------------------------------------------------------------------------------------------
/**
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
* @return Acknowledge the task execution state update
*/
@RpcMethod
public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
System.out.println("TaskExecutionState: " + taskExecutionState);
return Acknowledge.get();
}
/**
* Triggers the registration of the job master at the resource manager.
*
* @param address Address of the resource manager
*/
@RpcMethod
public void registerAtResourceManager(final String address) {
currentRegistrationRun = UUID.randomUUID();
Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
handleResourceManagerRegistration(
new JobMasterRegistration(getAddress()),
1,
resourceManagerFuture,
currentRegistrationRun,
initialRegistrationTimeout,
maxRegistrationTimeout,
registrationDuration.fromNow());
}
//----------------------------------------------------------------------------------------------
// Helper methods
//----------------------------------------------------------------------------------------------
/**
* Helper method to handle the resource manager registration process. If a registration attempt
* times out, then a new attempt with the doubled time out is initiated. The whole registration
* process has a deadline. Once this deadline is overdue without successful registration, the
* job master shuts down.
*
* @param jobMasterRegistration Job master registration info which is sent to the resource
* manager
* @param attemptNumber Registration attempt number
* @param resourceManagerFuture Future of the resource manager gateway
* @param registrationRun UUID describing the current registration run
* @param timeout Timeout of the last registration attempt
* @param maxTimeout Maximum timeout between registration attempts
* @param deadline Deadline for the registration
*/
void handleResourceManagerRegistration(
final JobMasterRegistration jobMasterRegistration,
final int attemptNumber,
final Future<ResourceManagerGateway> resourceManagerFuture,
final UUID registrationRun,
final FiniteDuration timeout,
final FiniteDuration maxTimeout,
final Deadline deadline) {
// filter out concurrent registration runs
if (registrationRun.equals(currentRegistrationRun)) {
log.info("Start registration attempt #{}.", attemptNumber);
if (deadline.isOverdue()) {
// we've exceeded our registration deadline. This means that we have to shutdown the JobMaster
log.error("Exceeded registration deadline without successfully registering at the ResourceManager.");
shutDown();
} else {
Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
@Override
public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) {
return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway));
}
}, executionContext);
registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
@Override
public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable {
if (failure != null) {
if (failure instanceof TimeoutException) {
// we haven't received an answer in the given timeout interval,
// so increase it and try again.
final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
handleResourceManagerRegistration(
jobMasterRegistration,
attemptNumber + 1,
resourceManagerFuture,
registrationRun,
newTimeout,
maxTimeout,
deadline);
} else {
log.error("Received unknown error while registering at the ResourceManager.", failure);
shutDown();
}
} else {
final RegistrationResponse response = tuple._1();
final ResourceManagerGateway gateway = tuple._2();
if (response.isSuccess()) {
finishResourceManagerRegistration(gateway, response.getInstanceID());
} else {
log.info("The registration was refused. Try again.");
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// we have to execute scheduled runnable in the main thread
// because we need consistency wrt currentRegistrationRun
runAsync(new Runnable() {
@Override
public void run() {
// our registration attempt was refused. Start over.
handleResourceManagerRegistration(
jobMasterRegistration,
1,
resourceManagerFuture,
registrationRun,
initialRegistrationTimeout,
maxTimeout,
deadline);
}
});
}
}, failedRegistrationDelay, TimeUnit.MILLISECONDS);
}
}
}
}, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread
}
} else {
log.info("Discard out-dated registration run.");
}
}
/**
* Finish the resource manager registration by setting the new resource manager gateway.
*
* @param resourceManager New resource manager gateway
* @param instanceID Instance id assigned by the resource manager
*/
void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) {
log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID);
this.resourceManager = resourceManager;
}
/**
* Return if the job master is connected to a resource manager.
*
* @return true if the job master is connected to the resource manager
*/
public boolean isConnected() {
return resourceManager != null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.jobmaster;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import scala.concurrent.Future;
/**
* {@link JobMaster} rpc gateway interface
*/
public interface JobMasterGateway extends RpcGateway {
/**
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
* @return Future acknowledge of the task execution state update
*/
Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
/**
* Triggers the registration of the job master at the resource manager.
*
* @param address Address of the resource manager
*/
void registerAtResourceManager(final String address);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.resourcemanager;
import java.io.Serializable;
public class JobMasterRegistration implements Serializable {
private static final long serialVersionUID = 8411214999193765202L;
private final String address;
public JobMasterRegistration(String address) {
this.address = address;
}
public String getAddress() {
return address;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.resourcemanager;
import org.apache.flink.runtime.instance.InstanceID;
import java.io.Serializable;
public class RegistrationResponse implements Serializable {
private static final long serialVersionUID = -2379003255993119993L;
private final boolean isSuccess;
private final InstanceID instanceID;
public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
this.isSuccess = isSuccess;
this.instanceID = instanceID;
}
public boolean isSuccess() {
return isSuccess;
}
public InstanceID getInstanceID() {
return instanceID;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.resourcemanager;
import akka.dispatch.Mapper;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
* and bookkeeping.
*
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* <ul>
* <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
private final ExecutionContext executionContext;
private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
public ResourceManager(RpcService rpcService, ExecutorService executorService) {
super(rpcService);
this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
this.jobMasterGateways = new HashMap<>();
}
/**
* Register a {@link JobMaster} at the resource manager.
*
* @param jobMasterRegistration Job master registration information
* @return Future registration response
*/
@RpcMethod
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
@Override
public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
InstanceID instanceID;
if (jobMasterGateways.containsKey(jobMasterGateway)) {
instanceID = jobMasterGateways.get(jobMasterGateway);
} else {
instanceID = new InstanceID();
jobMasterGateways.put(jobMasterGateway, instanceID);
}
return new RegistrationResponse(true, instanceID);
}
}, getMainThreadExecutionContext());
}
/**
* Requests a slot from the resource manager.
*
* @param slotRequest Slot request
* @return Slot assignment
*/
@RpcMethod
public SlotAssignment requestSlot(SlotRequest slotRequest) {
System.out.println("SlotRequest: " + slotRequest);
return new SlotAssignment();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.resourcemanager;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
* {@link ResourceManager} rpc gateway interface.
*/
public interface ResourceManagerGateway extends RpcGateway {
/**
* Register a {@link JobMaster} at the resource manager.
*
* @param jobMasterRegistration Job master registration information
* @param timeout Timeout for the future to complete
* @return Future registration response
*/
Future<RegistrationResponse> registerJobMaster(
JobMasterRegistration jobMasterRegistration,
@RpcTimeout FiniteDuration timeout);
/**
* Register a {@link JobMaster} at the resource manager.
*
* @param jobMasterRegistration Job master registration information
* @return Future registration response
*/
Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
/**
* Requests a slot from the resource manager.
*
* @param slotRequest Slot request
* @return Future slot assignment
*/
Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.resourcemanager;
import java.io.Serializable;
public class SlotAssignment implements Serializable{
private static final long serialVersionUID = -6990813455942742322L;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.resourcemanager;
import java.io.Serializable;
public class SlotRequest implements Serializable{
private static final long serialVersionUID = -6586877187990445986L;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.taskexecutor;
import akka.dispatch.ExecutionContexts$;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import scala.concurrent.ExecutionContext;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
* TaskExecutor implementation. The task executor is responsible for the execution of multiple
* {@link org.apache.flink.runtime.taskmanager.Task}.
*
* It offers the following methods as part of its rpc interface to interact with him remotely:
* <ul>
* <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li>
* <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li>
* </ul>
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private final ExecutionContext executionContext;
private final Set<ExecutionAttemptID> tasks = new HashSet<>();
public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
super(rpcService);
this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
}
/**
* Execute the given task on the task executor. The task is described by the provided
* {@link TaskDeploymentDescriptor}.
*
* @param taskDeploymentDescriptor Descriptor for the task to be executed
* @return Acknowledge the start of the task execution
*/
@RpcMethod
public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
tasks.add(taskDeploymentDescriptor.getExecutionId());
return Acknowledge.get();
}
/**
* Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
* the method throws an {@link Exception}.
*
* @param executionAttemptId Execution attempt ID identifying the task to be canceled.
* @return Acknowledge the task canceling
* @throws Exception if the task with the given execution attempt id could not be found
*/
@RpcMethod
public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception {
if (tasks.contains(executionAttemptId)) {
return Acknowledge.get();
} else {
throw new Exception("Could not find task.");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.taskexecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import scala.concurrent.Future;
/**
* {@link TaskExecutor} rpc gateway interface
*/
public interface TaskExecutorGateway extends RpcGateway {
/**
* Execute the given task on the task executor. The task is described by the provided
* {@link TaskDeploymentDescriptor}.
*
* @param taskDeploymentDescriptor Descriptor for the task to be executed
* @return Future acknowledge of the start of the task execution
*/
Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor);
/**
* Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
* the method throws an {@link Exception}.
*
* @param executionAttemptId Execution attempt ID identifying the task to be canceled.
* @return Future acknowledge of the task canceling
*/
Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.reflections.Reflections;
import scala.concurrent.Future;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RpcCompletenessTest extends TestLogger {
private static final Class<?> futureClass = Future.class;
@Test
public void testRpcCompleteness() {
Reflections reflections = new Reflections("org.apache.flink");
Set<Class<? extends RpcEndpoint>> classes = reflections.getSubTypesOf(RpcEndpoint.class);
Class<? extends RpcEndpoint> c;
for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
c = rpcEndpoint;
Type superClass = c.getGenericSuperclass();
Class<?> rpcGatewayType = extractTypeParameter(superClass, 0);
if (rpcGatewayType != null) {
checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
} else {
fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName());
}
}
}
private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
Method[] serverMethods = rpcEndpoint.getDeclaredMethods();
Map<String, Set<Method>> rpcMethods = new HashMap<>();
Set<Method> unmatchedRpcMethods = new HashSet<>();
for (Method serverMethod : serverMethods) {
if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
if (rpcMethods.containsKey(serverMethod.getName())) {
Set<Method> methods = rpcMethods.get(serverMethod.getName());
methods.add(serverMethod);
rpcMethods.put(serverMethod.getName(), methods);
} else {
Set<Method> methods = new HashSet<>();
methods.add(serverMethod);
rpcMethods.put(serverMethod.getName(), methods);
}
unmatchedRpcMethods.add(serverMethod);
}
}
for (Method gatewayMethod : gatewayMethods) {
assertTrue(
"The rpc endpoint " + rpcEndpoint.getName() + " does not contain a RpcMethod " +
"annotated method with the same name and signature " +
generateEndpointMethodSignature(gatewayMethod) + ".",
rpcMethods.containsKey(gatewayMethod.getName()));
checkGatewayMethod(gatewayMethod);
if (!matchGatewayMethodWithEndpoint(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
fail("Could not find a RpcMethod annotated method in rpc endpoint " +
rpcEndpoint.getName() + " matching the rpc gateway method " +
generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " +
rpcGateway.getName() + ".");
}
}
if (!unmatchedRpcMethods.isEmpty()) {
StringBuilder builder = new StringBuilder();
for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
builder.append(unmatchedRpcMethod).append("\n");
}
fail("The rpc endpoint " + rpcEndpoint.getName() + " contains rpc methods which " +
"are not matched to gateway methods of " + rpcGateway.getName() + ":\n" +
builder.toString());
}
}
/**
* Checks whether the gateway method fulfills the gateway method requirements.
* <ul>
* <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
* <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
* </ul>
*
* @param gatewayMethod Gateway method to check
*/
private void checkGatewayMethod(Method gatewayMethod) {
if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
assertTrue(
"The return type of method " + gatewayMethod.getName() + " in the rpc gateway " +
gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
"future. Non-void return types have to be returned as a future.",
gatewayMethod.getReturnType().equals(futureClass));
}
Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations();
int rpcTimeoutParameters = 0;
for (Annotation[] parameterAnnotation : parameterAnnotations) {
for (Annotation annotation : parameterAnnotation) {
if (annotation.equals(RpcTimeout.class)) {
rpcTimeoutParameters++;
}
}
}
assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " +
"annotated parameter.", rpcTimeoutParameters <= 1);
}
/**
* Checks whether we find a matching overloaded version for the gateway method among the methods
* with the same name in the rpc endpoint.
*
* @param gatewayMethod Gateway method
* @param endpointMethods Set of rpc methods on the rpc endpoint with the same name as the gateway
* method
* @param unmatchedRpcMethods Set of unmatched rpc methods on the endpoint side (so far)
*/
private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) {
for (Method endpointMethod : endpointMethods) {
if (checkMethod(gatewayMethod, endpointMethod)) {
unmatchedRpcMethods.remove(endpointMethod);
return true;
}
}
return false;
}
private boolean checkMethod(Method gatewayMethod, Method endpointMethod) {
Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes();
Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations();
Class<?>[] endpointParameterTypes = endpointMethod.getParameterTypes();
List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>();
assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length);
// filter out the RpcTimeout parameters
for (int i = 0; i < gatewayParameterTypes.length; i++) {
if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
}
}
if (filteredGatewayParameterTypes.size() != endpointParameterTypes.length) {
return false;
} else {
// check the parameter types
for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) {
if (!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) {
return false;
}
}
// check the return types
if (endpointMethod.getReturnType() == void.class) {
if (gatewayMethod.getReturnType() != void.class) {
return false;
}
} else {
// has return value. The gateway method should be wrapped in a future
Class<?> futureClass = gatewayMethod.getReturnType();
// sanity check that the return type of a gateway method must be void or a future
if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
return false;
} else {
Class<?> valueClass = extractTypeParameter(futureClass, 0);
if (endpointMethod.getReturnType().equals(futureClass)) {
Class<?> rpcEndpointValueClass = extractTypeParameter(endpointMethod.getReturnType(), 0);
// check if we have the same future value types
if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) {
return false;
}
} else {
if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) {
return false;
}
}
}
}
return gatewayMethod.getName().equals(endpointMethod.getName());
}
}
private boolean checkType(Class<?> firstType, Class<?> secondType) {
return firstType.equals(secondType);
}
/**
* Generates from a gateway rpc method signature the corresponding rpc endpoint signature.
*
* For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway
* signature which is not relevant on the server side.
*
* @param method Method to generate the signature string for
* @return String of the respective server side rpc method signature
*/
private String generateEndpointMethodSignature(Method method) {
StringBuilder builder = new StringBuilder();
if (method.getReturnType().equals(Void.TYPE)) {
builder.append("void").append(" ");
} else if (method.getReturnType().equals(futureClass)) {
Class<?> valueClass = extractTypeParameter(method.getGenericReturnType(), 0);
builder
.append(futureClass.getSimpleName())
.append("<")
.append(valueClass != null ? valueClass.getSimpleName() : "")
.append(">");
if (valueClass != null) {
builder.append("/").append(valueClass.getSimpleName());
}
builder.append(" ");
} else {
return "Invalid rpc method signature.";
}
builder.append(method.getName()).append("(");
Class<?>[] parameterTypes = method.getParameterTypes();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
assertEquals(parameterTypes.length, parameterAnnotations.length);
for (int i = 0; i < parameterTypes.length; i++) {
// filter out the RpcTimeout parameters
if (!isRpcTimeout(parameterAnnotations[i])) {
builder.append(parameterTypes[i].getName());
if (i < parameterTypes.length -1) {
builder.append(", ");
}
}
}
builder.append(")");
return builder.toString();
}
private Class<?> extractTypeParameter(Type genericType, int position) {
if (genericType instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) genericType;
Type[] typeArguments = parameterizedType.getActualTypeArguments();
if (position < 0 || position >= typeArguments.length) {
throw new IndexOutOfBoundsException("The generic type " +
parameterizedType.getRawType() + " only has " + typeArguments.length +
" type arguments.");
} else {
Type typeArgument = typeArguments[position];
if (typeArgument instanceof Class<?>) {
return (Class<?>) typeArgument;
} else {
return null;
}
}
} else {
return null;
}
}
private boolean isRpcTimeout(Annotation[] annotations) {
for (Annotation annotation : annotations) {
if (annotation.annotationType().equals(RpcTimeout.class)) {
return true;
}
}
return false;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import akka.util.Timeout;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AkkaRpcServiceTest extends TestLogger {
/**
* Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
* {@link AkkaRpcService}.
*/
@Test
public void testJobMasterResourceManagerRegistration() throws Exception {
Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout);
AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout);
ExecutorService executorService = new ForkJoinPool();
ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
resourceManager.start();
ResourceManagerGateway rm = resourceManager.getSelf();
assertTrue(rm instanceof AkkaGateway);
AkkaGateway akkaClient = (AkkaGateway) rm;
jobMaster.start();
jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef()));
// wait for successful registration
FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
Deadline deadline = timeout.fromNow();
while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
Thread.sleep(100);
}
assertFalse(deadline.isOverdue());
jobMaster.shutDown();
resourceManager.shutDown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rpc.taskexecutor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.net.URL;
import java.util.Collections;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class TaskExecutorTest extends TestLogger {
/**
* Tests that we can deploy and cancel a task on the TaskExecutor without exceptions
*/
@Test
public void testTaskExecution() throws Exception {
RpcService testingRpcService = mock(RpcService.class);
DirectExecutorService directExecutorService = null;
TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
new JobID(),
"Test job",
new JobVertexID(),
new ExecutionAttemptID(),
new SerializedValue<ExecutionConfig>(null),
"Test task",
0,
1,
0,
new Configuration(),
new Configuration(),
"Invokable",
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
0
);
Acknowledge ack = taskExecutor.executeTask(tdd);
ack = taskExecutor.cancelTask(tdd.getExecutionId());
}
/**
* Tests that cancelling a non-existing task will return an exception
*/
@Test(expected=Exception.class)
public void testWrongTaskCancellation() throws Exception {
RpcService testingRpcService = mock(RpcService.class);
DirectExecutorService directExecutorService = null;
TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
taskExecutor.cancelTask(new ExecutionAttemptID());
fail("The cancellation should have thrown an exception.");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DirectExecutorService implements ExecutorService {
private boolean _shutdown = false;
@Override
public void shutdown() {
_shutdown = true;
}
@Override
public List<Runnable> shutdownNow() {
_shutdown = true;
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return _shutdown;
}
@Override
public boolean isTerminated() {
return _shutdown;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return _shutdown;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
try {
T result = task.call();
return new CompletedFuture<>(result, null);
} catch (Exception e) {
return new CompletedFuture<>(null, e);
}
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
task.run();
return new CompletedFuture<>(result, null);
}
@Override
public Future<?> submit(Runnable task) {
task.run();
return new CompletedFuture<>(null, null);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
ArrayList<Future<T>> result = new ArrayList<>();
for (Callable<T> task : tasks) {
try {
result.add(new CompletedFuture<T>(task.call(), null));
} catch (Exception e) {
result.add(new CompletedFuture<T>(null, e));
}
}
return result;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
long end = System.currentTimeMillis() + unit.toMillis(timeout);
Iterator<? extends Callable<T>> iterator = tasks.iterator();
ArrayList<Future<T>> result = new ArrayList<>();
while (end > System.currentTimeMillis() && iterator.hasNext()) {
Callable<T> callable = iterator.next();
try {
result.add(new CompletedFuture<T>(callable.call(), null));
} catch (Exception e) {
result.add(new CompletedFuture<T>(null, e));
}
}
while(iterator.hasNext()) {
iterator.next();
result.add(new Future<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return true;
}
@Override
public boolean isDone() {
return false;
}
@Override
public T get() throws InterruptedException, ExecutionException {
throw new CancellationException("Task has been cancelled.");
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new CancellationException("Task has been cancelled.");
}
});
}
return result;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
Exception exception = null;
for (Callable<T> task : tasks) {
try {
return task.call();
} catch (Exception e) {
// try next task
exception = e;
}
}
throw new ExecutionException("No tasks finished successfully.", exception);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long end = System.currentTimeMillis() + unit.toMillis(timeout);
Exception exception = null;
Iterator<? extends Callable<T>> iterator = tasks.iterator();
while (end > System.currentTimeMillis() && iterator.hasNext()) {
Callable<T> callable = iterator.next();
try {
return callable.call();
} catch (Exception e) {
// ignore exception and try next
exception = e;
}
}
if (iterator.hasNext()) {
throw new TimeoutException("Could not finish execution of tasks within time.");
} else {
throw new ExecutionException("No tasks finished successfully.", exception);
}
}
@Override
public void execute(Runnable command) {
command.run();
}
public static class CompletedFuture<V> implements Future<V> {
private final V value;
private final Exception exception;
public CompletedFuture(V value, Exception exception) {
this.value = value;
this.exception = exception;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public V get() throws InterruptedException, ExecutionException {
if (exception != null) {
throw new ExecutionException(exception);
} else {
return value;
}
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
}
}
......@@ -202,7 +202,6 @@ under the License.
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
</dependencies>
......
......@@ -447,6 +447,13 @@ under the License.
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册