diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index bb200600e7b21b785b0a8855e2747358564ad22e..43b58bce31a02fff120da14024066cbee62a39a0 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -18,15 +18,6 @@ package org.apache.flink.mesos.runtime.clusterframework; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.UntypedActor; -import com.netflix.fenzo.TaskRequest; -import com.netflix.fenzo.TaskScheduler; -import com.netflix.fenzo.VirtualMachineLease; -import com.netflix.fenzo.functions.Action1; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; @@ -34,13 +25,11 @@ import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; import org.apache.flink.mesos.scheduler.LaunchableTask; import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; -import org.apache.flink.mesos.scheduler.SchedulerProxyV2; import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; import org.apache.flink.mesos.scheduler.Tasks; import org.apache.flink.mesos.scheduler.messages.AcceptOffers; import org.apache.flink.mesos.scheduler.messages.Disconnected; -import org.apache.flink.mesos.scheduler.messages.Error; import org.apache.flink.mesos.scheduler.messages.ExecutorLost; import org.apache.flink.mesos.scheduler.messages.FrameworkMessage; import org.apache.flink.mesos.scheduler.messages.OfferRescinded; @@ -66,14 +55,23 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import java.util.ArrayList; import java.util.Collections; @@ -81,42 +79,40 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import scala.Option; + /** * The Mesos implementation of the resource manager. */ -public class MesosResourceManager extends ResourceManager { +public class MesosResourceManager extends ResourceManager { protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceManager.class); - /** The Flink configuration */ + /** The Flink configuration. */ private final Configuration flinkConfig; - /** The Mesos configuration (master and framework info) */ + /** The Mesos configuration (master and framework info). */ private final MesosConfiguration mesosConfig; - /** The TaskManager container parameters (like container memory size) */ + /** The TaskManager container parameters (like container memory size). */ private final MesosTaskManagerParameters taskManagerParameters; - /** Container specification for launching a TM */ + /** Container specification for launching a TM. */ private final ContainerSpecification taskManagerContainerSpec; - /** Resolver for HTTP artifacts */ + /** Resolver for HTTP artifacts. */ private final MesosArtifactResolver artifactResolver; - /** Persistent storage of allocated containers */ + /** Persistent storage of allocated containers. */ private final MesosWorkerStore workerStore; - /** A local actor system for using the helper actors */ + /** A local actor system for using the helper actors. */ private final ActorSystem actorSystem; - /** Callback handler for the asynchronous Mesos scheduler */ - private SchedulerProxyV2 schedulerCallbackHandler; - - /** Mesos scheduler driver */ + /** Mesos scheduler driver. */ private SchedulerDriver schedulerDriver; - /** an adapter to receive messages from Akka actors */ - @VisibleForTesting - ActorRef selfActor; + /** an adapter to receive messages from Akka actors. */ + private ActorRef selfActor; private ActorRef connectionMonitor; @@ -126,7 +122,7 @@ public class MesosResourceManager extends ResourceManager workersInNew; final Map workersInLaunch; final Map workersBeingReturned; @@ -181,7 +177,8 @@ public class MesosResourceManager extends ResourceManager> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size()); + List> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size()); for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) { LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile()); @@ -310,31 +305,38 @@ public class MesosResourceManager extends ResourceManager= 1) { + if (toAssign.size() >= 1) { launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor); } } } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + protected void shutDownApplication( + ApplicationStatus finalStatus, + String optionalDiagnostics) throws ResourceManagerException { LOG.info("Shutting down and unregistering as a Mesos framework."); + + Exception exception = null; + try { // unregister the framework, which implicitly removes all tasks. schedulerDriver.stop(false); - } - catch(Exception ex) { - LOG.warn("unable to unregister the framework", ex); + } catch (Exception ex) { + exception = new Exception("Could not unregister the Mesos framework.", ex); } try { workerStore.stop(true); - } - catch(Exception ex) { - LOG.warn("unable to stop the worker state store", ex); + } catch (Exception ex) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not stop the Mesos worker store.", ex), + exception); } - LOG.info("Shutdown completed."); + if (exception != null) { + throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception); + } } @Override @@ -356,8 +358,7 @@ public class MesosResourceManager extends ResourceManagerAcceptance is routed through the RM to update the persistent state before * forwarding the message to Mesos. */ - @RpcMethod public void acceptOffers(AcceptOffers msg) { try { List toMonitor = new ArrayList<>(msg.operations().size()); @@ -481,26 +490,14 @@ public class MesosResourceManager extends ResourceManager offers) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.resourceOffers(new ResourceOffers(offers)); + } + }); + } + + @Override + public void offerRescinded(SchedulerDriver driver, final Protos.OfferID offerId) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.offerRescinded(new OfferRescinded(offerId)); + } + }); + } + + @Override + public void statusUpdate(SchedulerDriver driver, final Protos.TaskStatus status) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.statusUpdate(new StatusUpdate(status)); + } + }); + } + + @Override + public void frameworkMessage(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] data) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.frameworkMessage(new FrameworkMessage(executorId, slaveId, data)); + } + }); + } + + @Override + public void disconnected(SchedulerDriver driver) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.disconnected(new Disconnected()); + } + }); + } + + @Override + public void slaveLost(SchedulerDriver driver, final Protos.SlaveID slaveId) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.slaveLost(new SlaveLost(slaveId)); + } + }); + } + + @Override + public void executorLost(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int status) { + runAsync(new Runnable() { + @Override + public void run() { + MesosResourceManager.this.executorLost(new ExecutorLost(executorId, slaveId, status)); + } + }); + } + + @Override + public void error(SchedulerDriver driver, final String message) { + runAsync(new Runnable() { + @Override + public void run() { + onFatalError(new ResourceManagerException(message)); + } + }); + } + } + /** * Adapts incoming Akka messages as RPC calls to the resource manager. */ - static class AkkaAdapter extends UntypedActor { - private final MesosResourceManagerGateway gateway; - AkkaAdapter(MesosResourceManagerGateway gateway) { - this.gateway = gateway; - } + private class AkkaAdapter extends UntypedActor { @Override - public void onReceive(Object message) throws Exception { + public void onReceive(final Object message) throws Exception { if (message instanceof ReconciliationCoordinator.Reconcile) { - gateway.reconcile((ReconciliationCoordinator.Reconcile) message); + runAsync(new Runnable() { + @Override + public void run() { + reconcile((ReconciliationCoordinator.Reconcile) message); + } + }); } else if (message instanceof TaskMonitor.TaskTerminated) { - gateway.taskTerminated((TaskMonitor.TaskTerminated) message); + runAsync(new Runnable() { + @Override + public void run() { + taskTerminated((TaskMonitor.TaskTerminated) message); + } + }); } else if (message instanceof AcceptOffers) { - gateway.acceptOffers((AcceptOffers) message); + runAsync(new Runnable() { + @Override + public void run() { + acceptOffers((AcceptOffers) message); + } + }); } else { MesosResourceManager.LOG.error("unrecognized message: " + message); } } - - public static Props createActorProps(MesosResourceManagerGateway gateway) { - return Props.create(AkkaAdapter.class, gateway); - } } } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java similarity index 78% rename from flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java rename to flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java index 70ed47d9b9ffd9f08beefcacc6e205b212a227cd..e1b0300ece13c0a6ed1ff7b04c01b39a852b51a4 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java @@ -20,20 +20,21 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.mesos.scheduler.LaunchCoordinator; import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; -import org.apache.flink.mesos.scheduler.SchedulerGateway; import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.messages.AcceptOffers; -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; /** - * The {@link MesosResourceManager}'s RPC gateway interface. + * Actions defined by the MesosResourceManager. + * + *

These are called by the MesosResourceManager components such + * as {@link LaunchCoordinator}, and {@link TaskMonitor}. */ -public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway { +public interface MesosResourceManagerActions { /** * Accept the given offers as advised by the launch coordinator. * - * Note: This method is a callback for the {@link LaunchCoordinator}. + *

Note: This method is a callback for the {@link LaunchCoordinator}. * * @param offersToAccept Offers to accept from Mesos */ @@ -42,7 +43,7 @@ public interface MesosResourceManagerGateway extends ResourceManagerGateway, Sch /** * Trigger reconciliation with the Mesos master. * - * Note: This method is a callback for the {@link TaskMonitor}. + *

Note: This method is a callback for the {@link TaskMonitor}. * * @param reconciliationRequest Message containing the tasks which shall be reconciled */ @@ -51,7 +52,7 @@ public interface MesosResourceManagerGateway extends ResourceManagerGateway, Sch /** * Notify that the given Mesos task has been terminated. * - * Note: This method is a callback for the {@link TaskMonitor}. + *

Note: This method is a callback for the {@link TaskMonitor}. * * @param terminatedTask Message containing the terminated task */ diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java index a9ec892c413c83a5fd716dd2e7d48b4e5135171e..06aeb59e839b190b8931c582dd69c86c49b607c9 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java @@ -19,6 +19,7 @@ package org.apache.flink.mesos.runtime.clusterframework.store; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + import org.apache.mesos.Protos; import java.io.Serializable; diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java deleted file mode 100644 index 0dea4e7017aaf78fac6d834338612e1765f8fd05..0000000000000000000000000000000000000000 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.mesos.scheduler; - -import org.apache.flink.mesos.scheduler.messages.Disconnected; -import org.apache.flink.mesos.scheduler.messages.Error; -import org.apache.flink.mesos.scheduler.messages.ExecutorLost; -import org.apache.flink.mesos.scheduler.messages.FrameworkMessage; -import org.apache.flink.mesos.scheduler.messages.OfferRescinded; -import org.apache.flink.mesos.scheduler.messages.ReRegistered; -import org.apache.flink.mesos.scheduler.messages.Registered; -import org.apache.flink.mesos.scheduler.messages.ResourceOffers; -import org.apache.flink.mesos.scheduler.messages.SlaveLost; -import org.apache.flink.mesos.scheduler.messages.StatusUpdate; -import org.apache.flink.runtime.rpc.RpcGateway; - -/** - * A scheduler's RPC gateway interface. - * - * Implemented by RPC endpoints that accept Mesos scheduler messages. - */ -public interface SchedulerGateway extends RpcGateway { - - /** - * Called when connected to Mesos as a new framework. - */ - void registered(Registered message); - - /** - * Called when reconnected to Mesos following a failover event. - */ - void reregistered(ReRegistered message); - - /** - * Called when disconnected from Mesos. - */ - void disconnected(Disconnected message); - - /** - * Called when resource offers are made to the framework. - */ - void resourceOffers(ResourceOffers message); - - /** - * Called when resource offers are rescinded. - */ - void offerRescinded(OfferRescinded message); - - /** - * Called when a status update arrives from the Mesos master. - */ - void statusUpdate(StatusUpdate message); - - /** - * Called when a framework message arrives from a custom Mesos task executor. - */ - void frameworkMessage(FrameworkMessage message); - - /** - * Called when a Mesos slave is lost. - */ - void slaveLost(SlaveLost message); - - /** - * Called when a custom Mesos task executor is lost. - */ - void executorLost(ExecutorLost message); - - /** - * Called when an error is reported by the scheduler callback. - */ - void error(Error message); -} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java deleted file mode 100644 index 21c8346a36ddf0609d924008fcc76fa882dd78ee..0000000000000000000000000000000000000000 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.mesos.scheduler; - -import org.apache.flink.mesos.scheduler.messages.Disconnected; -import org.apache.flink.mesos.scheduler.messages.Error; -import org.apache.flink.mesos.scheduler.messages.ExecutorLost; -import org.apache.flink.mesos.scheduler.messages.FrameworkMessage; -import org.apache.flink.mesos.scheduler.messages.OfferRescinded; -import org.apache.flink.mesos.scheduler.messages.ReRegistered; -import org.apache.flink.mesos.scheduler.messages.Registered; -import org.apache.flink.mesos.scheduler.messages.ResourceOffers; -import org.apache.flink.mesos.scheduler.messages.SlaveLost; -import org.apache.flink.mesos.scheduler.messages.StatusUpdate; -import org.apache.mesos.Protos; -import org.apache.mesos.Scheduler; -import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; -import org.apache.mesos.SchedulerDriver; - -import java.util.List; - -/** - * This class reacts to callbacks from the Mesos scheduler driver. - * - * Forwards incoming messages to the {@link MesosResourceManager} RPC gateway. - * - * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html - */ -public class SchedulerProxyV2 implements Scheduler { - - /** The actor to which we report the callbacks */ - private final SchedulerGateway gateway; - - public SchedulerProxyV2(SchedulerGateway gateway) { - this.gateway = gateway; - } - - @Override - public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) { - gateway.registered(new Registered(frameworkId, masterInfo)); - } - - @Override - public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { - gateway.reregistered(new ReRegistered(masterInfo)); - } - - @Override - public void disconnected(SchedulerDriver driver) { - gateway.disconnected(new Disconnected()); - } - - @Override - public void resourceOffers(SchedulerDriver driver, List offers) { - gateway.resourceOffers(new ResourceOffers(offers)); - } - - @Override - public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) { - gateway.offerRescinded(new OfferRescinded(offerId)); - } - - @Override - public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) { - gateway.statusUpdate(new StatusUpdate(status)); - } - - @Override - public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) { - gateway.frameworkMessage(new FrameworkMessage(executorId, slaveId, data)); - } - - @Override - public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) { - gateway.slaveLost(new SlaveLost(slaveId)); - } - - @Override - public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) { - gateway.executorLost(new ExecutorLost(executorId, slaveId, status)); - } - - @Override - public void error(SchedulerDriver driver, String message) { - gateway.error(new Error(message)); - } -} diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 929e927467c501d234f988cc8954d48014752827..6e6a59c3e24f4bfde19a4c782d95a87ec93bbf75 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -18,23 +18,20 @@ package org.apache.flink.mesos.runtime.clusterframework; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import akka.testkit.TestProbe; -import com.netflix.fenzo.ConstraintEvaluator; -import junit.framework.AssertionFailedError; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; -import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; import org.apache.flink.mesos.scheduler.TaskMonitor; -import org.apache.flink.mesos.scheduler.messages.*; -import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; import org.apache.flink.mesos.util.MesosArtifactResolver; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -55,7 +52,9 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.resourcemanager.*; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -66,30 +65,51 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import com.netflix.fenzo.ConstraintEvaluator; +import junit.framework.AssertionFailedError; import org.apache.mesos.Protos; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; -import org.junit.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; -import scala.collection.JavaConverters; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import scala.Option; + import static java.util.Collections.singletonList; import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState; import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * General tests for the Mesos resource manager component (v2). @@ -151,13 +171,24 @@ public class MesosResourceManagerTest extends TestLogger { } @Override - protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); } + protected ActorRef createConnectionMonitor() { + return connectionMonitor.ref(); + } + @Override - protected ActorRef createTaskRouter() { return taskRouter.ref(); } + protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) { + return taskRouter.ref(); + } + @Override - protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); } + protected ActorRef createLaunchCoordinator(SchedulerDriver schedulerDriver, ActorRef selfActorRef) { + return launchCoordinator.ref(); + } + @Override - protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); } + protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) { + return reconciliationCoordinator.ref(); + } @Override protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause) { @@ -179,7 +210,7 @@ public class MesosResourceManagerTest extends TestLogger { // RM ResourceManagerConfiguration rmConfiguration; ResourceID rmResourceID; - static final String rmAddress = "/resourceManager"; + static final String RM_ADDRESS = "/resourceManager"; TestingMesosResourceManager resourceManager; // domain objects for test purposes @@ -227,7 +258,7 @@ public class MesosResourceManagerTest extends TestLogger { resourceManager = new TestingMesosResourceManager( rpcService, - rmAddress, + RM_ADDRESS, rmResourceID, rmConfiguration, rmServices.highAvailabilityServices, @@ -252,7 +283,7 @@ public class MesosResourceManagerTest extends TestLogger { task3Executor = mockTaskExecutor(task3); // JobMaster - jobMaster1 = mockJobMaster(rmServices, new JobID(1,0)); + jobMaster1 = mockJobMaster(rmServices, new JobID(1, 0)); } /** @@ -290,7 +321,7 @@ public class MesosResourceManagerTest extends TestLogger { rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class); return null; } - }).when(slotManager).start(any(UUID.class),any(Executor.class),any(ResourceManagerActions.class)); + }).when(slotManager).start(any(UUID.class), any(Executor.class), any(ResourceManagerActions.class)); when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true); } @@ -669,41 +700,4 @@ public class MesosResourceManagerTest extends TestLogger { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } - - /** - * Test Mesos scheduler error. - */ - @Test - public void testError() throws Exception { - new Context() {{ - when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); - startResourceManager(); - resourceManager.error(new Error("test")); - assertTrue(fatalErrorHandler.hasExceptionOccurred()); - }}; - } - - @Test - public void testAdapter() throws Exception { - Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build(); - Protos.TaskStatus status1 = Protos.TaskStatus.newBuilder().setTaskId(task1).setState(Protos.TaskState.TASK_KILLED).build(); - String host1 = "host1"; - - MesosResourceManagerGateway gateway = mock(MesosResourceManagerGateway.class); - ActorRef adapter = system.actorOf(MesosResourceManager.AkkaAdapter.createActorProps(gateway)); - - List tasks = Collections.singletonList(status1); - ReconciliationCoordinator.Reconcile msg1 = new ReconciliationCoordinator.Reconcile( - JavaConverters.asScalaBufferConverter(tasks).asScala(), false); - adapter.tell(msg1, ActorRef.noSender()); - verify(gateway, timeout(1000L)).reconcile(eq(msg1)); - - TaskMonitor.TaskTerminated msg2 = new TaskMonitor.TaskTerminated(task1, status1); - adapter.tell(msg2, ActorRef.noSender()); - verify(gateway, timeout(1000L)).taskTerminated(eq(msg2)); - - AcceptOffers msg3 = new AcceptOffers(host1, Collections.emptyList(), Collections.emptyList()); - adapter.tell(msg3, ActorRef.noSender()); - verify(gateway, timeout(1000L)).acceptOffers(eq(msg3)); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index df452e3a96b7108493f9daed8fb1e33abc1a7c8c..6e7c6afaccee9170d4ef039cff440ad085d20552 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -83,8 +83,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; *

  • {@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager
  • * */ -public abstract class ResourceManager - extends RpcEndpoint +public abstract class ResourceManager + extends RpcEndpoint implements LeaderContender { public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; @@ -609,8 +609,13 @@ public abstract class ResourceManager resourceManager; + private final ResourceManager resourceManager; public ResourceManagerRunner( final ResourceID resourceId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index afeddc4b993dce00720fa727cd7c10506bc21f05..a921a29183ba820c65f31ad2ebecf051da9676d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.rpc.RpcService; * * This ResourceManager doesn't acquire new resources. */ -public class StandaloneResourceManager extends ResourceManager { +public class StandaloneResourceManager extends ResourceManager { public StandaloneResourceManager( RpcService rpcService, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java deleted file mode 100644 index 6c8de666c041ff9f396517803025fbc442805e63..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -/** - * The {@link StandaloneResourceManager}'s RPC gateway interface. - */ -public interface StandaloneResourceManagerGateway extends ResourceManagerGateway { -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 85ed9505d61a95b56e0587ef08c9514f9877ee49..6a0bd8712d258702413c8943ca85881d3ee586b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; -import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; @@ -127,7 +126,7 @@ public class TaskExecutorITCase extends TestLogger { TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); - ResourceManager resourceManager = new StandaloneResourceManager( + ResourceManager resourceManager = new StandaloneResourceManager( rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, rmResourceId, diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index cef33783690d2cea7d4d5e7cc4884e690b913d55..2ad90655f629719b29a4551704b7e976f2af1e38 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -193,7 +193,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); } - private ResourceManager createResourceManager(Configuration config) throws Exception { + private ResourceManager createResourceManager(Configuration config) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 4ee30f4e65efa2548024a77bea3b5da25fc707c2..6099d18cef9ac6e1bee834e8aee0862f09b3c951 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -63,7 +63,7 @@ import scala.concurrent.duration.FiniteDuration; * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ -public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { +public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { /** The process environment variables. */ private final Map env; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java deleted file mode 100644 index 485fb90dda8c5a569beba1d28976cf44e18f3711..0000000000000000000000000000000000000000 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; - -/** - * The {@link YarnResourceManager}'s RPC gateway interface. - */ -public interface YarnResourceManagerGateway extends ResourceManagerGateway { -}