From 3fe27ac0796bf7fffce0901954bc2eec3eee02be Mon Sep 17 00:00:00 2001 From: "Wright, Eron" Date: Thu, 18 May 2017 17:34:16 -0700 Subject: [PATCH] [FLINK-6379] [mesos] Add Mesos ResourceManager (FLIP-6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Make the RPC gateway of the ResourceManager extensible to allow for framework-specific RPC methods - Introduce FLIP-6 MesosResourceManager w/ tests - Introduce a Mesos-specific RPC gateway for callbacks from child actors and from the Mesos scheduler client - Enhance the persistent Mesos worker store to track the resource profile associated with a worker - Convert RegisteredMesosWorkerNode to Java - Decline TE registration if framework doesn’t recognize the worker This closes #3942. --- .../MesosFlinkResourceManager.java | 2 +- .../MesosResourceManager.java | 677 ++++++++++++++++ .../MesosResourceManagerGateway.java | 37 + .../RegisteredMesosWorkerNode.java | 58 ++ .../store/MesosWorkerStore.java | 34 +- .../mesos/scheduler/SchedulerGateway.java | 89 +++ .../mesos/scheduler/SchedulerProxyV2.java | 103 +++ .../MesosResourceManagerTest.java | 736 ++++++++++++++++++ .../resourcemanager/ResourceManager.java | 11 +- .../ResourceManagerRunner.java | 2 +- .../StandaloneResourceManager.java | 2 +- .../StandaloneResourceManagerGateway.java | 25 + .../apache/flink/runtime/rpc/RpcEndpoint.java | 2 +- .../taskexecutor/TaskExecutorITCase.java | 3 +- .../YarnFlinkApplicationMasterRunner.java | 2 +- .../flink/yarn/YarnResourceManager.java | 2 +- .../yarn/YarnResourceManagerGateway.java | 16 +- 17 files changed, 1777 insertions(+), 24 deletions(-) create mode 100644 flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java create mode 100644 flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java create mode 100644 flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java create mode 100644 flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java create mode 100644 flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java create mode 100644 flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java rename flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala => flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java (57%) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java index 6c708fa47cf..d6b5c9d710a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java @@ -498,7 +498,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager { + protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceManager.class); + + /** The Flink configuration */ + private final Configuration flinkConfig; + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Container specification for launching a TM */ + private final ContainerSpecification taskManagerContainerSpec; + + /** Resolver for HTTP artifacts */ + private final MesosArtifactResolver artifactResolver; + + /** Persistent storage of allocated containers */ + private final MesosWorkerStore workerStore; + + /** A local actor system for using the helper actors */ + private final ActorSystem actorSystem; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxyV2 schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + /** an adapter to receive messages from Akka actors */ + @VisibleForTesting + ActorRef selfActor; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + /** planning state related to workers - package private for unit test purposes */ + final Map workersInNew; + final Map workersInLaunch; + final Map workersBeingReturned; + + public MesosResourceManager( + // base class + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + ResourceManagerConfiguration resourceManagerConfiguration, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + FatalErrorHandler fatalErrorHandler, + // Mesos specifics + ActorSystem actorSystem, + Configuration flinkConfig, + MesosConfiguration mesosConfig, + MesosWorkerStore workerStore, + MesosTaskManagerParameters taskManagerParameters, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver) { + super( + rpcService, + resourceManagerEndpointId, + resourceId, + resourceManagerConfiguration, + highAvailabilityServices, + heartbeatServices, + slotManager, + metricRegistry, + jobLeaderIdService, + fatalErrorHandler); + + this.actorSystem = actorSystem; + + this.flinkConfig = requireNonNull(flinkConfig); + this.mesosConfig = requireNonNull(mesosConfig); + + this.workerStore = requireNonNull(workerStore); + this.artifactResolver = requireNonNull(artifactResolver); + + this.taskManagerParameters = requireNonNull(taskManagerParameters); + this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec); + + this.workersInNew = new HashMap<>(); + this.workersInLaunch = new HashMap<>(); + this.workersBeingReturned = new HashMap<>(); + } + + protected ActorRef createSelfActor() { + return actorSystem.actorOf( + AkkaAdapter.createActorProps(getSelf()),"ResourceManager"); + } + + protected ActorRef createConnectionMonitor() { + return actorSystem.actorOf( + ConnectionMonitor.createActorProps(ConnectionMonitor.class, flinkConfig), + "connectionMonitor"); + } + + protected ActorRef createTaskRouter() { + return actorSystem.actorOf( + Tasks.createActorProps(Tasks.class, flinkConfig, schedulerDriver, TaskMonitor.class), + "tasks"); + } + + protected ActorRef createLaunchCoordinator() { + return actorSystem.actorOf( + LaunchCoordinator.createActorProps(LaunchCoordinator.class, selfActor, flinkConfig, schedulerDriver, createOptimizer()), + "launchCoordinator"); + } + + protected ActorRef createReconciliationCoordinator() { + return actorSystem.actorOf( + ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, flinkConfig, schedulerDriver), + "reconciliationCoordinator"); + } + + // ------------------------------------------------------------------------ + // Resource Manager overrides + // ------------------------------------------------------------------------ + + /** + * Starts the Mesos-specifics. + */ + @Override + protected void initialize() throws ResourceManagerException { + // start the worker store + try { + workerStore.start(); + } + catch(Exception e) { + throw new ResourceManagerException("Unable to initialize the worker store.", e); + } + + // create the scheduler driver to communicate with Mesos + schedulerCallbackHandler = new SchedulerProxyV2(getSelf()); + + // register with Mesos + // TODO : defer connection until RM acquires leadership + + Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() + .clone() + .setCheckpoint(true); + try { + Option frameworkID = workerStore.getFrameworkID(); + if (frameworkID.isEmpty()) { + LOG.info("Registering as new framework."); + } else { + LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue()); + frameworkInfo.setId(frameworkID.get()); + } + } + catch(Exception e) { + throw new ResourceManagerException("Unable to recover the framework ID.", e); + } + + MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo); + MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig); + schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false); + + // create supporting actors + selfActor = createSelfActor(); + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(); + reconciliationCoordinator = createReconciliationCoordinator(); + taskRouter = createTaskRouter(); + + // recover state + try { + recoverWorkers(); + } + catch(Exception e) { + throw new ResourceManagerException("Unable to recover Mesos worker state.", e); + } + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager initialized."); + } + + /** + * Recover framework/worker information persisted by a prior incarnation of the RM. + */ + private void recoverWorkers() throws Exception { + // if this resource manager is recovering from failure, + // then some worker tasks are most likely still alive and we can re-obtain them + final List tasksFromPreviousAttempts = workerStore.recoverWorkers(); + + assert(workersInNew.isEmpty()); + assert(workersInLaunch.isEmpty()); + assert(workersBeingReturned.isEmpty()); + + if (!tasksFromPreviousAttempts.isEmpty()) { + LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size()); + + List> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size()); + + for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) { + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile()); + + switch(worker.state()) { + case New: + // remove new workers because allocation requests are transient + workerStore.removeWorker(worker.taskID()); + break; + case Launched: + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); + break; + case Released: + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + break; + } + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor); + } + + // tell the launch coordinator about prior assignments + if(toAssign.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor); + } + } + } + + @Override + protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + LOG.info("Shutting down and unregistering as a Mesos framework."); + try { + // unregister the framework, which implicitly removes all tasks. + schedulerDriver.stop(false); + } + catch(Exception ex) { + LOG.warn("unable to unregister the framework", ex); + } + + try { + workerStore.stop(true); + } + catch(Exception ex) { + LOG.warn("unable to stop the worker state store", ex); + } + + LOG.info("Shutdown completed."); + } + + @Override + public void startNewWorker(ResourceProfile resourceProfile) { + LOG.info("Starting a new worker."); + try { + // generate new workers into persistent state and launch associated actors + MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), resourceProfile); + workerStore.putWorker(worker); + workersInNew.put(extractResourceID(worker.taskID()), worker); + + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile); + + LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", + launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()); + + // tell the task router about the new plans + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor); + + // tell the launch coordinator to launch the new tasks + launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor); + } + catch(Exception ex) { + onFatalErrorAsync(new ResourceManagerException("unable to request new workers", ex)); + } + } + + @Override + public void stopWorker(InstanceID instanceId) { + // TODO implement worker release + } + + /** + * Callback when a worker was started. + * @param resourceID The worker resource id (as provided by the TaskExecutor) + */ + @Override + protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) { + + // note: this may occur more than once for a given worker. + MesosWorkerStore.Worker inLaunch = workersInLaunch.get(resourceID); + if (inLaunch != null) { + return new RegisteredMesosWorkerNode(inLaunch); + } + + // the worker is unrecognized or was already released + // return null to indicate that TaskExecutor registration should be declined + return null; + } + + // ------------------------------------------------------------------------ + // RPC methods + // ------------------------------------------------------------------------ + + @RpcMethod + public void registered(Registered message) { + connectionMonitor.tell(message, selfActor); + try { + workerStore.setFrameworkID(Option.apply(message.frameworkId())); + } + catch(Exception ex) { + onFatalError(new ResourceManagerException("unable to store the assigned framework ID", ex)); + return; + } + launchCoordinator.tell(message, selfActor); + reconciliationCoordinator.tell(message, selfActor); + taskRouter.tell(message, selfActor); + } + + /** + * Called when reconnected to Mesos following a failover event. + */ + @RpcMethod + public void reregistered(ReRegistered message) { + connectionMonitor.tell(message, selfActor); + launchCoordinator.tell(message, selfActor); + reconciliationCoordinator.tell(message, selfActor); + taskRouter.tell(message, selfActor); + } + + /** + * Called when disconnected from Mesos. + */ + @RpcMethod + public void disconnected(Disconnected message) { + connectionMonitor.tell(message, selfActor); + launchCoordinator.tell(message, selfActor); + reconciliationCoordinator.tell(message, selfActor); + taskRouter.tell(message, selfActor); + } + + /** + * Called when resource offers are made to the framework. + */ + @RpcMethod + public void resourceOffers(ResourceOffers message) { + launchCoordinator.tell(message, selfActor); + } + + /** + * Called when resource offers are rescinded. + */ + @RpcMethod + public void offerRescinded(OfferRescinded message) { + launchCoordinator.tell(message, selfActor); + } + + /** + * Accept offers as advised by the launch coordinator. + * + * Acceptance is routed through the RM to update the persistent state before + * forwarding the message to Mesos. + */ + @RpcMethod + public void acceptOffers(AcceptOffers msg) { + try { + List toMonitor = new ArrayList<>(msg.operations().size()); + + // transition the persistent state of some tasks to Launched + for (Protos.Offer.Operation op : msg.operations()) { + if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) { + continue; + } + for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) { + MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId())); + assert (worker != null); + + worker = worker.launchWorker(info.getSlaveId(), msg.hostname()); + workerStore.putWorker(worker); + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + + LOG.info("Launching Mesos task {} on host {}.", + worker.taskID().getValue(), worker.hostname().get()); + + toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker))); + } + } + + // tell the task router about the new plans + for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) { + taskRouter.tell(update, selfActor); + } + + // send the acceptance message to Mesos + schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters()); + } + catch(Exception ex) { + onFatalError(new ResourceManagerException("unable to accept offers", ex)); + } + } + + /** + * Handles a task status update from Mesos. + */ + @RpcMethod + public void statusUpdate(StatusUpdate message) { + taskRouter.tell(message, selfActor); + reconciliationCoordinator.tell(message, selfActor); + schedulerDriver.acknowledgeStatusUpdate(message.status()); + } + + /** + * Handles a reconciliation request from a task monitor. + */ + @RpcMethod + public void reconcile(ReconciliationCoordinator.Reconcile message) { + // forward to the reconciliation coordinator + reconciliationCoordinator.tell(message, selfActor); + } + + /** + * Handles a termination notification from a task monitor. + */ + @RpcMethod + public void taskTerminated(TaskMonitor.TaskTerminated message) { + Protos.TaskID taskID = message.taskID(); + Protos.TaskStatus status = message.status(); + + // note: this callback occurs for failed containers and for released containers alike + final ResourceID id = extractResourceID(taskID); + + boolean existed; + try { + existed = workerStore.removeWorker(taskID); + } + catch(Exception ex) { + onFatalError(new ResourceManagerException("unable to remove worker", ex)); + return; + } + + if(!existed) { + LOG.info("Received a termination notice for an unrecognized worker: {}", id); + return; + } + + // check if this is a failed task or a released task + assert(!workersInNew.containsKey(id)); + if (workersBeingReturned.remove(id) != null) { + // regular finished worker that we released + LOG.info("Worker {} finished successfully with message: {}", + id, status.getMessage()); + } else { + // failed worker, either at startup, or running + final MesosWorkerStore.Worker launched = workersInLaunch.remove(id); + assert(launched != null); + LOG.info("Worker {} failed with status: {}, reason: {}, message: {}. " + + "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage()); + + // TODO : launch a replacement worker? + } + + closeTaskManagerConnection(id, new Exception(status.getMessage())); + } + + @RpcMethod + public void frameworkMessage(FrameworkMessage message) {} + + @RpcMethod + public void slaveLost(SlaveLost message) {} + + @RpcMethod + public void executorLost(ExecutorLost message) {} + + /** + * Called when an error is reported by the scheduler callback. + */ + @RpcMethod + public void error(Error message) { + onFatalError(new ResourceManagerException("Connection to Mesos failed", new Exception(message.message()))); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Creates a launchable task for Fenzo to process. + */ + private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) { + + // create the specific TM parameters from the resource profile and some defaults + MesosTaskManagerParameters params = new MesosTaskManagerParameters( + resourceProfile.getCpuCores() < 1 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(), + taskManagerParameters.containerType(), + taskManagerParameters.containerImageName(), + new ContaineredTaskManagerParameters( + resourceProfile.getMemoryInMB() < 0 ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(), + resourceProfile.getHeapMemoryInMB(), + resourceProfile.getDirectMemoryInMB(), + 1, + new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())), + taskManagerParameters.containerVolumes(), + taskManagerParameters.constraints(), + taskManagerParameters.bootstrapCommand(), + taskManagerParameters.getTaskManagerHostname() + ); + + LaunchableMesosWorker launchable = + new LaunchableMesosWorker( + artifactResolver, + params, + taskManagerContainerSpec, + taskID, + mesosConfig); + + return launchable; + } + + /** + * Extracts a unique ResourceID from the Mesos task. + * + * @param taskId the Mesos TaskID + * @return The ResourceID for the container + */ + static ResourceID extractResourceID(Protos.TaskID taskId) { + return new ResourceID(taskId.getValue()); + } + + /** + * Extracts the Mesos task goal state from the worker information. + * @param worker the persistent worker information. + * @return goal state information for the {@Link TaskMonitor}. + */ + static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) { + switch(worker.state()) { + case New: return new TaskMonitor.New(worker.taskID()); + case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get()); + case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get()); + default: throw new IllegalArgumentException("unsupported worker state"); + } + } + + /** + * Creates the Fenzo optimizer (builder). + * The builder is an indirection to facilitate unit testing of the Launch Coordinator. + */ + private static TaskSchedulerBuilder createOptimizer() { + return new TaskSchedulerBuilder() { + TaskScheduler.Builder builder = new TaskScheduler.Builder(); + + @Override + public TaskSchedulerBuilder withLeaseRejectAction(Action1 action) { + builder.withLeaseRejectAction(action); + return this; + } + + @Override + public TaskScheduler build() { + return builder.build(); + } + }; + } + + /** + * Adapts incoming Akka messages as RPC calls to the resource manager. + */ + static class AkkaAdapter extends UntypedActor { + private final MesosResourceManagerGateway gateway; + AkkaAdapter(MesosResourceManagerGateway gateway) { + this.gateway = gateway; + } + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof ReconciliationCoordinator.Reconcile) { + gateway.reconcile((ReconciliationCoordinator.Reconcile) message); + } else if (message instanceof TaskMonitor.TaskTerminated) { + gateway.taskTerminated((TaskMonitor.TaskTerminated) message); + } else if (message instanceof AcceptOffers) { + gateway.acceptOffers((AcceptOffers) message); + } else { + MesosResourceManager.LOG.error("unrecognized message: " + message); + } + } + + public static Props createActorProps(MesosResourceManagerGateway gateway) { + return Props.create(AkkaAdapter.class, gateway); + } + } +} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java new file mode 100644 index 00000000000..e353dcca19f --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerGateway; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; + +/** + * The {@link MesosResourceManager}'s RPC gateway interface. + */ +public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway { + + void acceptOffers(AcceptOffers msg); + + void reconcile(ReconciliationCoordinator.Reconcile message); + + void taskTerminated(TaskMonitor.TaskTerminated message); +} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java new file mode 100644 index 00000000000..c65c482661d --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}. + */ +public class RegisteredMesosWorkerNode implements Serializable, ResourceIDRetrievable { + + private static final long serialVersionUID = 2; + + private final MesosWorkerStore.Worker worker; + + public RegisteredMesosWorkerNode(MesosWorkerStore.Worker worker) { + this.worker = Preconditions.checkNotNull(worker); + Preconditions.checkArgument(worker.slaveID().isDefined()); + Preconditions.checkArgument(worker.hostname().isDefined()); + } + + public MesosWorkerStore.Worker getWorker() { + return worker; + } + + @Override + public ResourceID getResourceID() { + return MesosResourceManager.extractResourceID(worker.taskID()); + } + + @Override + public String toString() { + return "RegisteredMesosWorkerNode{" + + "worker=" + worker + + '}'; + } +} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java index f1f54ce1d5c..e76ff63536b 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java @@ -18,6 +18,7 @@ package org.apache.flink.mesos.runtime.clusterframework.store; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.mesos.Protos; import java.io.Serializable; @@ -93,19 +94,24 @@ public interface MesosWorkerStore { private final Protos.TaskID taskID; + private final ResourceProfile profile; + private final Option slaveID; private final Option hostname; private final WorkerState state; - private Worker(Protos.TaskID taskID, Option slaveID, Option hostname, WorkerState state) { + private Worker(Protos.TaskID taskID, ResourceProfile profile, + Option slaveID, Option hostname, WorkerState state) { requireNonNull(taskID, "taskID"); + requireNonNull(profile, "profile"); requireNonNull(slaveID, "slaveID"); requireNonNull(hostname, "hostname"); requireNonNull(state, "state"); this.taskID = taskID; + this.profile = profile; this.slaveID = slaveID; this.hostname = hostname; this.state = state; @@ -118,6 +124,14 @@ public interface MesosWorkerStore { return taskID; } + /** + * Get the resource profile associated with the worker. + * @return + */ + public ResourceProfile profile() { + return profile; + } + /** * Get the worker's assigned slave ID. */ @@ -148,6 +162,19 @@ public interface MesosWorkerStore { public static Worker newWorker(Protos.TaskID taskID) { return new Worker( taskID, + ResourceProfile.UNKNOWN, + Option.empty(), Option.empty(), + WorkerState.New); + } + + /** + * Create a new worker with the given taskID. + * @return a new worker instance. + */ + public static Worker newWorker(Protos.TaskID taskID, ResourceProfile profile) { + return new Worker( + taskID, + profile, Option.empty(), Option.empty(), WorkerState.New); } @@ -157,7 +184,7 @@ public interface MesosWorkerStore { * @return a new worker instance (does not mutate the current instance). */ public Worker launchWorker(Protos.SlaveID slaveID, String hostname) { - return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), WorkerState.Launched); + return new Worker(taskID, profile, Option.apply(slaveID), Option.apply(hostname), WorkerState.Launched); } /** @@ -165,7 +192,7 @@ public interface MesosWorkerStore { * @return a new worker instance (does not mutate the current instance). */ public Worker releaseWorker() { - return new Worker(taskID, slaveID, hostname, WorkerState.Released); + return new Worker(taskID, profile, slaveID, hostname, WorkerState.Released); } @Override @@ -195,6 +222,7 @@ public interface MesosWorkerStore { ", slaveID=" + slaveID + ", hostname=" + hostname + ", state=" + state + + ", profile=" + profile + '}'; } } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java new file mode 100644 index 00000000000..0dea4e7017a --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.ExecutorLost; +import org.apache.flink.mesos.scheduler.messages.FrameworkMessage; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.runtime.rpc.RpcGateway; + +/** + * A scheduler's RPC gateway interface. + * + * Implemented by RPC endpoints that accept Mesos scheduler messages. + */ +public interface SchedulerGateway extends RpcGateway { + + /** + * Called when connected to Mesos as a new framework. + */ + void registered(Registered message); + + /** + * Called when reconnected to Mesos following a failover event. + */ + void reregistered(ReRegistered message); + + /** + * Called when disconnected from Mesos. + */ + void disconnected(Disconnected message); + + /** + * Called when resource offers are made to the framework. + */ + void resourceOffers(ResourceOffers message); + + /** + * Called when resource offers are rescinded. + */ + void offerRescinded(OfferRescinded message); + + /** + * Called when a status update arrives from the Mesos master. + */ + void statusUpdate(StatusUpdate message); + + /** + * Called when a framework message arrives from a custom Mesos task executor. + */ + void frameworkMessage(FrameworkMessage message); + + /** + * Called when a Mesos slave is lost. + */ + void slaveLost(SlaveLost message); + + /** + * Called when a custom Mesos task executor is lost. + */ + void executorLost(ExecutorLost message); + + /** + * Called when an error is reported by the scheduler callback. + */ + void error(Error message); +} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java new file mode 100644 index 00000000000..21c8346a36d --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.ExecutorLost; +import org.apache.flink.mesos.scheduler.messages.FrameworkMessage; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * Forwards incoming messages to the {@link MesosResourceManager} RPC gateway. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxyV2 implements Scheduler { + + /** The actor to which we report the callbacks */ + private final SchedulerGateway gateway; + + public SchedulerProxyV2(SchedulerGateway gateway) { + this.gateway = gateway; + } + + @Override + public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) { + gateway.registered(new Registered(frameworkId, masterInfo)); + } + + @Override + public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { + gateway.reregistered(new ReRegistered(masterInfo)); + } + + @Override + public void disconnected(SchedulerDriver driver) { + gateway.disconnected(new Disconnected()); + } + + @Override + public void resourceOffers(SchedulerDriver driver, List offers) { + gateway.resourceOffers(new ResourceOffers(offers)); + } + + @Override + public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) { + gateway.offerRescinded(new OfferRescinded(offerId)); + } + + @Override + public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) { + gateway.statusUpdate(new StatusUpdate(status)); + } + + @Override + public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) { + gateway.frameworkMessage(new FrameworkMessage(executorId, slaveId, data)); + } + + @Override + public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) { + gateway.slaveLost(new SlaveLost(slaveId)); + } + + @Override + public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) { + gateway.executorLost(new ExecutorLost(executorId, slaveId, status)); + } + + @Override + public void error(SchedulerDriver driver, String message) { + gateway.error(new Error(message)); + } +} diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java new file mode 100644 index 00000000000..0c715baa6d6 --- /dev/null +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -0,0 +1,736 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import com.netflix.fenzo.ConstraintEvaluator; +import junit.framework.AssertionFailedError; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.messages.*; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.*; +import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.TestLogger; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; +import org.junit.*; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.singletonList; +import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState; +import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.*; + +/** + * General tests for the Mesos resource manager component (v2). + */ +public class MesosResourceManagerTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(MesosResourceManagerTest.class); + + private static Configuration flinkConfig = new Configuration(); + + private static ActorSystem system; + + @Before + public void setup() { + system = AkkaUtils.createLocalActorSystem(flinkConfig); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * The RM with some test-specific behavior. + */ + static class TestingMesosResourceManager extends MesosResourceManager { + + public TestProbe connectionMonitor = new TestProbe(system); + public TestProbe taskRouter = new TestProbe(system); + public TestProbe launchCoordinator = new TestProbe(system); + public TestProbe reconciliationCoordinator = new TestProbe(system); + + public final Set closedTaskManagerConnections = new HashSet<>(); + + public TestingMesosResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + ResourceManagerConfiguration resourceManagerConfiguration, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + FatalErrorHandler fatalErrorHandler, + // Mesos specifics + ActorSystem actorSystem, + Configuration flinkConfig, + MesosConfiguration mesosConfig, + MesosWorkerStore workerStore, + MesosTaskManagerParameters taskManagerParameters, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver) { + super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration, + highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, + jobLeaderIdService, fatalErrorHandler, actorSystem, flinkConfig, mesosConfig, workerStore, + taskManagerParameters, taskManagerContainerSpec, artifactResolver); + } + + @Override + protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); } + @Override + protected ActorRef createTaskRouter() { return taskRouter.ref(); } + @Override + protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); } + @Override + protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); } + + @Override + protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause) { + super.closeTaskManagerConnection(resourceID, cause); + closedTaskManagerConnections.add(resourceID); + } + } + + /** + * The context fixture. + */ + static class Context implements AutoCloseable { + + // services + TestingSerialRpcService rpcService; + TestingFatalErrorHandler fatalErrorHandler; + MockMesosResourceManagerRuntimeServices rmServices; + + // RM + ResourceManagerConfiguration rmConfiguration; + ResourceID rmResourceID; + static final String rmAddress = "/resourceManager"; + TestingMesosResourceManager resourceManager; + + // domain objects for test purposes + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1); + final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 2); + + Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build(); + public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build(); + public String slave1host = "localhost"; + public Protos.OfferID offer1 = Protos.OfferID.newBuilder().setValue("offer1").build(); + public Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build(); + public Protos.TaskID task2 = Protos.TaskID.newBuilder().setValue("taskmanager-00002").build(); + public Protos.TaskID task3 = Protos.TaskID.newBuilder().setValue("taskmanager-00003").build(); + + // task executors + SlotReport slotReport = new SlotReport(); + public MockTaskExecutor task1Executor; + public MockTaskExecutor task2Executor; + public MockTaskExecutor task3Executor; + + // job masters + public MockJobMaster jobMaster1; + + /** + * Create mock RM dependencies. + */ + public Context() { + try { + rpcService = new TestingSerialRpcService(); + fatalErrorHandler = new TestingFatalErrorHandler(); + rmServices = new MockMesosResourceManagerRuntimeServices(); + + // TaskExecutor templating + ContainerSpecification containerSpecification = new ContainerSpecification(); + ContaineredTaskManagerParameters containeredParams = + new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap()); + MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters( + 1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.empty(), containeredParams, + Collections.emptyList(), Collections.emptyList(), Option.empty(), + Option.empty()); + + // resource manager + rmConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L)); + rmResourceID = ResourceID.generate(); + resourceManager = + new TestingMesosResourceManager( + rpcService, + rmAddress, + rmResourceID, + rmConfiguration, + rmServices.highAvailabilityServices, + rmServices.heartbeatServices, + rmServices.slotManager, + rmServices.metricRegistry, + rmServices.jobLeaderIdService, + fatalErrorHandler, + // Mesos specifics + system, + flinkConfig, + rmServices.mesosConfig, + rmServices.workerStore, + tmParams, + containerSpecification, + rmServices.artifactResolver + ); + + // TaskExecutors + task1Executor = mockTaskExecutor(task1); + task2Executor = mockTaskExecutor(task2); + task3Executor = mockTaskExecutor(task3); + + // JobMaster + jobMaster1 = mockJobMaster(rmServices, new JobID(1,0)); + + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * Mock services needed by the resource manager. + */ + class MockResourceManagerRuntimeServices { + + public final ScheduledExecutor scheduledExecutor; + public final TestingHighAvailabilityServices highAvailabilityServices; + public final HeartbeatServices heartbeatServices; + public final MetricRegistry metricRegistry; + public final TestingLeaderElectionService rmLeaderElectionService; + public final JobLeaderIdService jobLeaderIdService; + public final SlotManager slotManager; + public ResourceManagerActions rmActions; + + public UUID rmLeaderSessionId; + + public MockResourceManagerRuntimeServices() throws Exception { + scheduledExecutor = mock(ScheduledExecutor.class); + highAvailabilityServices = new TestingHighAvailabilityServices(); + rmLeaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); + metricRegistry = mock(MetricRegistry.class); + slotManager = mock(SlotManager.class); + jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + Time.minutes(5L)); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class); + return null; + } + }).when(slotManager).start(any(UUID.class),any(Executor.class),any(ResourceManagerActions.class)); + + when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true); + +// when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenAnswer(new Answer() { +// @Override +// public Object answer(InvocationOnMock invocationOnMock) throws Throwable { +// SlotRequest request = invocationOnMock.getArgumentAt(0, SlotRequest.class); +// return new RMSlotRequestRegistered(request.getAllocationId()); +// } +// }); + } + + public void grantLeadership() { + rmLeaderSessionId = UUID.randomUUID(); + rmLeaderElectionService.isLeader(rmLeaderSessionId); + } + } + + class MockMesosResourceManagerRuntimeServices extends MockResourceManagerRuntimeServices { + public SchedulerDriver schedulerDriver; + public MesosConfiguration mesosConfig; + public MesosWorkerStore workerStore; + public MesosArtifactResolver artifactResolver; + + public MockMesosResourceManagerRuntimeServices() throws Exception { + super(); + schedulerDriver = mock(SchedulerDriver.class); + + mesosConfig = mock(MesosConfiguration.class); + when(mesosConfig.frameworkInfo()).thenReturn(Protos.FrameworkInfo.newBuilder()); + when(mesosConfig.withFrameworkInfo(any(Protos.FrameworkInfo.Builder.class))).thenReturn(mesosConfig); + when(mesosConfig.createDriver(any(Scheduler.class), anyBoolean())).thenReturn(schedulerDriver); + + workerStore = mock(MesosWorkerStore.class); + when(workerStore.getFrameworkID()).thenReturn(Option.empty()); + + artifactResolver = mock(MesosArtifactResolver.class); + } + } + + class MockJobMaster { + public final JobID jobID; + public final ResourceID resourceID; + public final String address; + public final JobMasterGateway gateway; + public final UUID leaderSessionID; + public final TestingLeaderRetrievalService leaderRetrievalService; + + public MockJobMaster(JobID jobID) { + this.jobID = jobID; + this.resourceID = new ResourceID(jobID.toString()); + this.address = "/" + jobID; + this.gateway = mock(JobMasterGateway.class); + this.leaderSessionID = UUID.randomUUID(); + this.leaderRetrievalService = new TestingLeaderRetrievalService(this.address, this.leaderSessionID); + } + } + + private MockJobMaster mockJobMaster(MockResourceManagerRuntimeServices rmServices, JobID jobID) { + MockJobMaster jm = new MockJobMaster(jobID); + rpcService.registerGateway(jm.address, jm.gateway); + rmServices.highAvailabilityServices.setJobMasterLeaderRetriever(jm.jobID, jm.leaderRetrievalService); + return jm; + } + + static class MockTaskExecutor { + public final Protos.TaskID taskID; + public final String address; + public final ResourceID resourceID; + public final TaskExecutorGateway gateway; + + public MockTaskExecutor(Protos.TaskID taskID) { + this.taskID = taskID; + this.address = "/" + taskID; + this.gateway = mock(TaskExecutorGateway.class); + this.resourceID = MesosResourceManager.extractResourceID(this.taskID); + } + } + + private MockTaskExecutor mockTaskExecutor(Protos.TaskID taskID) { + MockTaskExecutor task = new MockTaskExecutor(taskID); + rpcService.registerGateway(task.address, task.gateway); + return task; + } + + /** + * Start the resource manager and grant leadership to it. + */ + public void startResourceManager() { + try { + resourceManager.start(); + rmServices.grantLeadership(); + + // drain probe events + verify(rmServices.schedulerDriver).start(); + resourceManager.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class); + } catch (Exception e) { + throw new RuntimeException("unable to initialize the RM", e); + } + } + + /** + * Register a job master with the RM. + */ + public void registerJobMaster(MockJobMaster jobMaster) throws Exception { + Future registration = resourceManager.registerJobManager( + rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID); + assertTrue(registration.get() instanceof JobMasterRegistrationSuccess); + } + + /** + * Allocate a worker using the RM. + */ + public MesosWorkerStore.Worker allocateWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) throws Exception { + when(rmServices.workerStore.newTaskID()).thenReturn(taskID); + rmServices.rmActions.allocateResource(resourceProfile); + MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(taskID, resourceProfile); + + // drain the probe messages + verify(rmServices.workerStore).putWorker(expected); + assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(taskID), expected)); + resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class); + resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class); + return expected; + } + + /** + * Prepares a launch operation. + */ + public Protos.Offer.Operation launch(Protos.TaskInfo... taskInfo) { + return Protos.Offer.Operation.newBuilder() + .setType(Protos.Offer.Operation.Type.LAUNCH) + .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addAllTaskInfos(Arrays.asList(taskInfo)) + ).build(); + } + + @Override + public void close() throws Exception { + rpcService.stopService(); + } + } + + @Test + public void testInitialize() throws Exception { + new Context() {{ + startResourceManager(); + LOG.info("initialized"); + }}; + } + + /** + * Test recovery of persistent workers. + */ + @Test + public void testRecoverWorkers() throws Exception { + new Context() {{ + // set the initial persistent state then initialize the RM + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1); + MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host); + MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker(); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3)); + startResourceManager(); + + // verify that the internal state was updated, the task router was notified, + // and the launch coordinator was asked to launch a task. + // note: "new" workers are discarded + assertThat(resourceManager.workersInNew.entrySet(), empty()); + assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task2), worker2)); + assertThat(resourceManager.workersBeingReturned, hasEntry(extractResourceID(task3), worker3)); + resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class); + LaunchCoordinator.Assign actualAssign = + resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class); + assertThat(actualAssign.tasks(), hasSize(1)); + assertThat(actualAssign.tasks().get(0).f0.getId(), equalTo(task2.getValue())); + assertThat(actualAssign.tasks().get(0).f1, equalTo(slave1host)); + resourceManager.launchCoordinator.expectNoMsg(); + }}; + } + + /** + * Test request for new workers. + */ + @Test + public void testRequestNewWorkers() throws Exception { + new Context() {{ + startResourceManager(); + + // allocate a worker + when(rmServices.workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError()); + rmServices.rmActions.allocateResource(resourceProfile1); + + // verify that a new worker was persisted, the internal state was updated, the task router was notified, + // and the launch coordinator was asked to launch a task + MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1); + verify(rmServices.workerStore).putWorker(expected); + assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(task1), expected)); + resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class); + resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class); + }}; + } + + /** + * Test offer handling. + */ + @Test + public void testOfferHandling() throws Exception { + new Context() {{ + startResourceManager(); + + // Verify that the RM forwards offers to the launch coordinator. + resourceManager.resourceOffers(new ResourceOffers(Collections.emptyList())); + resourceManager.launchCoordinator.expectMsgClass(ResourceOffers.class); + resourceManager.offerRescinded(new OfferRescinded(offer1)); + resourceManager.launchCoordinator.expectMsgClass(OfferRescinded.class); + }}; + } + + /** + * Test offer acceptance. + */ + @Test + public void testAcceptOffers() throws Exception { + new Context() {{ + startResourceManager(); + + // allocate a new worker + MesosWorkerStore.Worker worker1 = allocateWorker(task1, resourceProfile1); + + // send an AcceptOffers message as the LaunchCoordinator would + // to launch task1 onto slave1 with offer1 + Protos.TaskInfo task1info = Protos.TaskInfo.newBuilder() + .setTaskId(task1).setName("").setSlaveId(slave1).build(); + AcceptOffers msg = new AcceptOffers(slave1host, singletonList(offer1), singletonList(launch(task1info))); + resourceManager.acceptOffers(msg); + + // verify that the worker was persisted, the internal state was updated, + // Mesos was asked to launch task1, and the task router was notified + MesosWorkerStore.Worker worker1launched = worker1.launchWorker(slave1, slave1host); + verify(rmServices.workerStore).putWorker(worker1launched); + assertThat(resourceManager.workersInNew.entrySet(), empty()); + assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched)); + resourceManager.taskRouter.expectMsg( + new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched))); + verify(rmServices.schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters()); + }}; + } + + /** + * Test status handling. + */ + @Test + public void testStatusHandling() throws Exception { + new Context() {{ + startResourceManager(); + + // Verify that the RM forwards status updates to the launch coordinator and task router. + resourceManager.statusUpdate(new StatusUpdate(Protos.TaskStatus.newBuilder() + .setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_LOST).build())); + resourceManager.reconciliationCoordinator.expectMsgClass(StatusUpdate.class); + resourceManager.taskRouter.expectMsgClass(StatusUpdate.class); + }}; + } + + + /** + * Test worker registration after launch. + */ + @Test + public void testWorkerStarted() throws Exception { + new Context() {{ + // set the initial state with a (recovered) launched worker + MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched)); + startResourceManager(); + assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched)); + + // send registration message + Future successfulFuture = + resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport); + RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + assertTrue(response instanceof TaskExecutorRegistrationSuccess); + + // verify the internal state + assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched)); + }}; + } + + + /** + * Test the planned release of registered workers. + */ + @Test + @Ignore + public void testReleaseRegisteredWorker() throws Exception { + // not supported by RM + } + + /** + * Test unplanned task failure of a pending worker. + */ + @Test + public void testWorkerFailed() throws Exception { + new Context() {{ + // set the initial persistent state with a launched worker + MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched)); + startResourceManager(); + + // tell the RM that a task failed + when(rmServices.workerStore.removeWorker(task1)).thenReturn(true); + resourceManager.taskTerminated(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder() + .setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build())); + + // verify that the instance state was updated + verify(rmServices.workerStore).removeWorker(task1); + assertThat(resourceManager.workersInLaunch.entrySet(), empty()); + assertThat(resourceManager.workersBeingReturned.entrySet(), empty()); + + // verify that `closeTaskManagerConnection` was called + assertThat(resourceManager.closedTaskManagerConnections, hasItem(extractResourceID(task1))); + }}; + } + + /** + * Test application shutdown handling. + */ + @Test + public void testShutdownApplication() throws Exception { + new Context() {{ + startResourceManager(); + resourceManager.shutDownCluster(ApplicationStatus.SUCCEEDED, ""); + + // verify that the Mesos framework is shutdown + verify(rmServices.schedulerDriver).stop(false); + verify(rmServices.workerStore).stop(true); + }}; + } + + // ------------- connectivity tests ----------------------------- + + /** + * Test Mesos registration handling. + */ + @Test + public void testRegistered() throws Exception { + new Context() {{ + startResourceManager(); + + Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder() + .setId("master1").setIp(0).setPort(5050).build(); + resourceManager.registered(new Registered(framework1, masterInfo)); + + verify(rmServices.workerStore).setFrameworkID(Option.apply(framework1)); + resourceManager.connectionMonitor.expectMsgClass(Registered.class); + resourceManager.reconciliationCoordinator.expectMsgClass(Registered.class); + resourceManager.launchCoordinator.expectMsgClass(Registered.class); + resourceManager.taskRouter.expectMsgClass(Registered.class); + }}; + } + + + /** + * Test Mesos re-registration handling. + */ + @Test + public void testReRegistered() throws Exception { + new Context() {{ + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + startResourceManager(); + + Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder() + .setId("master1").setIp(0).setPort(5050).build(); + resourceManager.reregistered(new ReRegistered(masterInfo)); + + resourceManager.connectionMonitor.expectMsgClass(ReRegistered.class); + resourceManager.reconciliationCoordinator.expectMsgClass(ReRegistered.class); + resourceManager.launchCoordinator.expectMsgClass(ReRegistered.class); + resourceManager.taskRouter.expectMsgClass(ReRegistered.class); + }}; + } + + /** + * Test Mesos re-registration handling. + */ + @Test + public void testDisconnected() throws Exception { + new Context() {{ + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + startResourceManager(); + + resourceManager.disconnected(new Disconnected()); + + resourceManager.connectionMonitor.expectMsgClass(Disconnected.class); + resourceManager.reconciliationCoordinator.expectMsgClass(Disconnected.class); + resourceManager.launchCoordinator.expectMsgClass(Disconnected.class); + resourceManager.taskRouter.expectMsgClass(Disconnected.class); + }}; + } + + /** + * Test Mesos scheduler error. + */ + @Test + public void testError() throws Exception { + new Context() {{ + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + startResourceManager(); + resourceManager.error(new Error("test")); + assertTrue(fatalErrorHandler.hasExceptionOccurred()); + }}; + } + + @Test + public void testAdapter() throws Exception { + Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build(); + Protos.TaskStatus status1 = Protos.TaskStatus.newBuilder().setTaskId(task1).setState(Protos.TaskState.TASK_KILLED).build(); + String host1 = "host1"; + + MesosResourceManagerGateway gateway = mock(MesosResourceManagerGateway.class); + ActorRef adapter = system.actorOf(MesosResourceManager.AkkaAdapter.createActorProps(gateway)); + + List tasks = Collections.singletonList(status1); + ReconciliationCoordinator.Reconcile msg1 = new ReconciliationCoordinator.Reconcile( + scala.collection.JavaConverters.asScalaBufferConverter(tasks).asScala(), false); + adapter.tell(msg1, ActorRef.noSender()); + verify(gateway).reconcile(eq(msg1)); + + TaskMonitor.TaskTerminated msg2 = new TaskMonitor.TaskTerminated(task1, status1); + adapter.tell(msg2, ActorRef.noSender()); + verify(gateway).taskTerminated(eq(msg2)); + + AcceptOffers msg3 = new AcceptOffers(host1, Collections.emptyList(), Collections.emptyList()); + adapter.tell(msg3, ActorRef.noSender()); + verify(gateway).acceptOffers(eq(msg3)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index bef0aa32567..92f007f1b3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -83,8 +83,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; *
  • {@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager
  • * */ -public abstract class ResourceManager - extends RpcEndpoint +public abstract class ResourceManager + extends RpcEndpoint implements LeaderContender { public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; @@ -419,6 +419,11 @@ public abstract class ResourceManager } WorkerType newWorker = workerStarted(taskExecutorResourceId); + if(newWorker == null) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " + + "not recognize it", taskExecutorResourceId, taskExecutorAddress); + return new RegistrationResponse.Decline("unrecognized TaskExecutor"); + } WorkerRegistration registration = new WorkerRegistration<>(taskExecutorGateway, newWorker); @@ -783,7 +788,7 @@ public abstract class ResourceManager * * @param t The exception describing the fatal error */ - void onFatalError(Throwable t) { + protected void onFatalError(Throwable t) { log.error("Fatal error occurred.", t); fatalErrorHandler.onFatalError(t); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index d0c411ceea6..12d3a7d47b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -42,7 +42,7 @@ public class ResourceManagerRunner implements FatalErrorHandler { private final ResourceManagerRuntimeServices resourceManagerRuntimeServices; - private final ResourceManager resourceManager; + private final ResourceManager resourceManager; public ResourceManagerRunner( final ResourceID resourceId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index a921a29183b..afeddc4b993 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.rpc.RpcService; * * This ResourceManager doesn't acquire new resources. */ -public class StandaloneResourceManager extends ResourceManager { +public class StandaloneResourceManager extends ResourceManager { public StandaloneResourceManager( RpcService rpcService, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java new file mode 100644 index 00000000000..6c8de666c04 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +/** + * The {@link StandaloneResourceManager}'s RPC gateway interface. + */ +public interface StandaloneResourceManagerGateway extends ResourceManagerGateway { +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 40b9568377f..311fa4905a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * of Erlang or Akka. * *

    The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)} - * and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread. + * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread. * * @param The RPC gateway counterpart for the implementing RPC endpoint */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 6a0bd8712d2..85ed9505d61 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; @@ -126,7 +127,7 @@ public class TaskExecutorITCase extends TestLogger { TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); - ResourceManager resourceManager = new StandaloneResourceManager( + ResourceManager resourceManager = new StandaloneResourceManager( rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, rmResourceId, diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index 2ad90655f62..cef33783690 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -193,7 +193,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); } - private ResourceManager createResourceManager(Configuration config) throws Exception { + private ResourceManager createResourceManager(Configuration config) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6099d18cef9..4ee30f4e65e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -63,7 +63,7 @@ import scala.concurrent.duration.FiniteDuration; * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ -public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { +public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { /** The process environment variables. */ private final Map env; diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java similarity index 57% rename from flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala rename to flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java index 7ca388f03ca..485fb90dda8 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java @@ -16,18 +16,12 @@ * limitations under the License. */ -package org.apache.flink.mesos.runtime.clusterframework +package org.apache.flink.yarn; -import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore -import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; /** - * A representation of a registered Mesos task managed by the [[MesosFlinkResourceManager]]. - */ -case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable { - - require(task.slaveID().isDefined) - require(task.hostname().isDefined) - - override val getResourceID: ResourceID = MesosFlinkResourceManager.extractResourceID(task.taskID()) + * The {@link YarnResourceManager}'s RPC gateway interface. + */ +public interface YarnResourceManagerGateway extends ResourceManagerGateway { } -- GitLab