未验证 提交 3fe27ac0 编写于 作者: W Wright, Eron 提交者: Till Rohrmann

[FLINK-6379] [mesos] Add Mesos ResourceManager (FLIP-6)

- Make the RPC gateway of the ResourceManager extensible to allow for framework-specific RPC methods
- Introduce FLIP-6 MesosResourceManager w/ tests
- Introduce a Mesos-specific RPC gateway for callbacks from child actors and from the Mesos scheduler client
- Enhance the persistent Mesos worker store to track the resource profile associated with a worker
- Convert RegisteredMesosWorkerNode to Java
- Decline TE registration if framework doesn’t recognize the worker

This closes #3942.
上级 b59148cf
......@@ -498,7 +498,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
*/
@Override
protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
releaseWorker(worker.task());
releaseWorker(worker.getWorker());
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.SchedulerGateway;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
/**
* The {@link MesosResourceManager}'s RPC gateway interface.
*/
public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway {
void acceptOffers(AcceptOffers msg);
void reconcile(ReconciliationCoordinator.Reconcile message);
void taskTerminated(TaskMonitor.TaskTerminated message);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
/**
* A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}.
*/
public class RegisteredMesosWorkerNode implements Serializable, ResourceIDRetrievable {
private static final long serialVersionUID = 2;
private final MesosWorkerStore.Worker worker;
public RegisteredMesosWorkerNode(MesosWorkerStore.Worker worker) {
this.worker = Preconditions.checkNotNull(worker);
Preconditions.checkArgument(worker.slaveID().isDefined());
Preconditions.checkArgument(worker.hostname().isDefined());
}
public MesosWorkerStore.Worker getWorker() {
return worker;
}
@Override
public ResourceID getResourceID() {
return MesosResourceManager.extractResourceID(worker.taskID());
}
@Override
public String toString() {
return "RegisteredMesosWorkerNode{" +
"worker=" + worker +
'}';
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.mesos.runtime.clusterframework.store;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.mesos.Protos;
import java.io.Serializable;
......@@ -93,19 +94,24 @@ public interface MesosWorkerStore {
private final Protos.TaskID taskID;
private final ResourceProfile profile;
private final Option<Protos.SlaveID> slaveID;
private final Option<String> hostname;
private final WorkerState state;
private Worker(Protos.TaskID taskID, Option<Protos.SlaveID> slaveID, Option<String> hostname, WorkerState state) {
private Worker(Protos.TaskID taskID, ResourceProfile profile,
Option<Protos.SlaveID> slaveID, Option<String> hostname, WorkerState state) {
requireNonNull(taskID, "taskID");
requireNonNull(profile, "profile");
requireNonNull(slaveID, "slaveID");
requireNonNull(hostname, "hostname");
requireNonNull(state, "state");
this.taskID = taskID;
this.profile = profile;
this.slaveID = slaveID;
this.hostname = hostname;
this.state = state;
......@@ -118,6 +124,14 @@ public interface MesosWorkerStore {
return taskID;
}
/**
* Get the resource profile associated with the worker.
* @return
*/
public ResourceProfile profile() {
return profile;
}
/**
* Get the worker's assigned slave ID.
*/
......@@ -148,6 +162,19 @@ public interface MesosWorkerStore {
public static Worker newWorker(Protos.TaskID taskID) {
return new Worker(
taskID,
ResourceProfile.UNKNOWN,
Option.<Protos.SlaveID>empty(), Option.<String>empty(),
WorkerState.New);
}
/**
* Create a new worker with the given taskID.
* @return a new worker instance.
*/
public static Worker newWorker(Protos.TaskID taskID, ResourceProfile profile) {
return new Worker(
taskID,
profile,
Option.<Protos.SlaveID>empty(), Option.<String>empty(),
WorkerState.New);
}
......@@ -157,7 +184,7 @@ public interface MesosWorkerStore {
* @return a new worker instance (does not mutate the current instance).
*/
public Worker launchWorker(Protos.SlaveID slaveID, String hostname) {
return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), WorkerState.Launched);
return new Worker(taskID, profile, Option.apply(slaveID), Option.apply(hostname), WorkerState.Launched);
}
/**
......@@ -165,7 +192,7 @@ public interface MesosWorkerStore {
* @return a new worker instance (does not mutate the current instance).
*/
public Worker releaseWorker() {
return new Worker(taskID, slaveID, hostname, WorkerState.Released);
return new Worker(taskID, profile, slaveID, hostname, WorkerState.Released);
}
@Override
......@@ -195,6 +222,7 @@ public interface MesosWorkerStore {
", slaveID=" + slaveID +
", hostname=" + hostname +
", state=" + state +
", profile=" + profile +
'}';
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mesos.scheduler;
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.runtime.rpc.RpcGateway;
/**
* A scheduler's RPC gateway interface.
*
* Implemented by RPC endpoints that accept Mesos scheduler messages.
*/
public interface SchedulerGateway extends RpcGateway {
/**
* Called when connected to Mesos as a new framework.
*/
void registered(Registered message);
/**
* Called when reconnected to Mesos following a failover event.
*/
void reregistered(ReRegistered message);
/**
* Called when disconnected from Mesos.
*/
void disconnected(Disconnected message);
/**
* Called when resource offers are made to the framework.
*/
void resourceOffers(ResourceOffers message);
/**
* Called when resource offers are rescinded.
*/
void offerRescinded(OfferRescinded message);
/**
* Called when a status update arrives from the Mesos master.
*/
void statusUpdate(StatusUpdate message);
/**
* Called when a framework message arrives from a custom Mesos task executor.
*/
void frameworkMessage(FrameworkMessage message);
/**
* Called when a Mesos slave is lost.
*/
void slaveLost(SlaveLost message);
/**
* Called when a custom Mesos task executor is lost.
*/
void executorLost(ExecutorLost message);
/**
* Called when an error is reported by the scheduler callback.
*/
void error(Error message);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mesos.scheduler;
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
import org.apache.mesos.SchedulerDriver;
import java.util.List;
/**
* This class reacts to callbacks from the Mesos scheduler driver.
*
* Forwards incoming messages to the {@link MesosResourceManager} RPC gateway.
*
* See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
*/
public class SchedulerProxyV2 implements Scheduler {
/** The actor to which we report the callbacks */
private final SchedulerGateway gateway;
public SchedulerProxyV2(SchedulerGateway gateway) {
this.gateway = gateway;
}
@Override
public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) {
gateway.registered(new Registered(frameworkId, masterInfo));
}
@Override
public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
gateway.reregistered(new ReRegistered(masterInfo));
}
@Override
public void disconnected(SchedulerDriver driver) {
gateway.disconnected(new Disconnected());
}
@Override
public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
gateway.resourceOffers(new ResourceOffers(offers));
}
@Override
public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
gateway.offerRescinded(new OfferRescinded(offerId));
}
@Override
public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
gateway.statusUpdate(new StatusUpdate(status));
}
@Override
public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) {
gateway.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
}
@Override
public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) {
gateway.slaveLost(new SlaveLost(slaveId));
}
@Override
public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) {
gateway.executorLost(new ExecutorLost(executorId, slaveId, status));
}
@Override
public void error(SchedulerDriver driver, String message) {
gateway.error(new Error(message));
}
}
......@@ -83,8 +83,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
public abstract class ResourceManager<WorkerType extends Serializable>
extends RpcEndpoint<ResourceManagerGateway>
public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerType extends Serializable>
extends RpcEndpoint<C>
implements LeaderContender {
public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
......@@ -419,6 +419,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
WorkerType newWorker = workerStarted(taskExecutorResourceId);
if(newWorker == null) {
log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +
"not recognize it", taskExecutorResourceId, taskExecutorAddress);
return new RegistrationResponse.Decline("unrecognized TaskExecutor");
}
WorkerRegistration<WorkerType> registration =
new WorkerRegistration<>(taskExecutorGateway, newWorker);
......@@ -783,7 +788,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
*
* @param t The exception describing the fatal error
*/
void onFatalError(Throwable t) {
protected void onFatalError(Throwable t) {
log.error("Fatal error occurred.", t);
fatalErrorHandler.onFatalError(t);
}
......
......@@ -42,7 +42,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
private final ResourceManagerRuntimeServices resourceManagerRuntimeServices;
private final ResourceManager<?> resourceManager;
private final ResourceManager<? extends ResourceManagerGateway, ?> resourceManager;
public ResourceManagerRunner(
final ResourceID resourceId,
......
......@@ -36,7 +36,7 @@ import org.apache.flink.runtime.rpc.RpcService;
*
* This ResourceManager doesn't acquire new resources.
*/
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public class StandaloneResourceManager extends ResourceManager<StandaloneResourceManagerGateway, ResourceID> {
public StandaloneResourceManager(
RpcService rpcService,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.resourcemanager;
/**
* The {@link StandaloneResourceManager}'s RPC gateway interface.
*/
public interface StandaloneResourceManagerGateway extends ResourceManagerGateway {
}
......@@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* of Erlang or Akka.
*
* <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
* and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread.
* and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread.
*
* @param <C> The RPC gateway counterpart for the implementing RPC endpoint
*/
......
......@@ -47,6 +47,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
......@@ -126,7 +127,7 @@ public class TaskExecutorITCase extends TestLogger {
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime());
ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
ResourceManager<StandaloneResourceManagerGateway,ResourceID> resourceManager = new StandaloneResourceManager(
rpcService,
FlinkResourceManager.RESOURCE_MANAGER_NAME,
rmResourceId,
......
......@@ -193,7 +193,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
}
private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
private ResourceManager<?,?> createResourceManager(Configuration config) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
......
......@@ -63,7 +63,7 @@ import scala.concurrent.duration.FiniteDuration;
* The yarn implementation of the resource manager. Used when the system is started
* via the resource framework YARN.
*/
public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
public class YarnResourceManager extends ResourceManager<YarnResourceManagerGateway, ResourceID> implements AMRMClientAsync.CallbackHandler {
/** The process environment variables. */
private final Map<String, String> env;
......
......@@ -16,18 +16,12 @@
* limitations under the License.
*/
package org.apache.flink.mesos.runtime.clusterframework
package org.apache.flink.yarn;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
/**
* A representation of a registered Mesos task managed by the [[MesosFlinkResourceManager]].
*/
case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable {
require(task.slaveID().isDefined)
require(task.hostname().isDefined)
override val getResourceID: ResourceID = MesosFlinkResourceManager.extractResourceID(task.taskID())
* The {@link YarnResourceManager}'s RPC gateway interface.
*/
public interface YarnResourceManagerGateway extends ResourceManagerGateway {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册