提交 f722b737 编写于 作者: M Maximilian Michels

[FLINK-4141] remove leaderUpdated() method from ResourceManager

This removes the leaderUpdated method from the framework. Further it
lets the RM client thread communicate directly with the
ResourceManager actor. This is fine since the two are always spawned
together. Failures of the ResourceManager actor will lead to dropped
messages of the RM client thread. Failures of the RM client thread will
inform the JobManager.

The leaderUpdated() method was used to signal the ResourceManager
framework that a new leader was elected. However, the method was not
always called when the leader changed, only when a new leader was
elected. This dropped all messages from the async Yarn RM client
thread (YarnResourceManagerCallbackHandler) for the time that the old
leader had failed and no new leader had been elected. The Yarn RM client
thread used leader tagged messages to communicate with the main Flink
ResourceManager actor.

This closes #2190
上级 16cdb612
......@@ -494,9 +494,6 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
jobManager = newJobManagerLeader;
// inform the framework that we have updated the leader
leaderUpdated();
if (workers.size() > 0) {
LOG.info("Received TaskManagers that were registered at the leader JobManager. " +
"Trying to consolidate.");
......@@ -644,12 +641,6 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
*/
protected abstract void initialize() throws Exception;
/**
* Provides codes to handle an update of the leader (relevant for HA). The framework has to deal
* with the consequences of a leader update.
*/
protected abstract void leaderUpdated();
/**
* The framework specific code for shutting down the application. This should report the
* application's final status and shut down the resource manager cleanly.
......
......@@ -58,11 +58,6 @@ public class StandaloneResourceManager extends FlinkResourceManager<ResourceID>
// nothing to initialize
}
@Override
protected void leaderUpdated() {
// nothing to update
}
@Override
protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
}
......
......@@ -29,8 +29,6 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
......@@ -181,8 +179,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
LOG.info("Initializing YARN resource master");
// create the client to communicate with the ResourceManager
ActorGateway selfGateway = new AkkaActorGateway(self(), getLeaderSessionID());
resourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler(selfGateway);
resourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler(self());
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
yarnHeartbeatIntervalMillis, resourceManagerCallbackHandler);
......@@ -223,12 +220,6 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
}
}
@Override
protected void leaderUpdated() {
AkkaActorGateway newGateway = new AkkaActorGateway(self(), getLeaderSessionID());
resourceManagerCallbackHandler.setCurrentLeaderGateway(newGateway);
}
@Override
protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
// first, de-register from YARN
......
......@@ -18,8 +18,8 @@
package org.apache.flink.yarn;
import akka.actor.ActorRef;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
......@@ -38,13 +38,13 @@ import java.util.List;
public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.CallbackHandler {
/** The yarn master to which we report the callbacks */
private ActorGateway yarnFrameworkMaster;
private ActorRef yarnFrameworkMaster;
/** The progress we report */
private float currentProgress;
public YarnResourceManagerCallbackHandler(ActorGateway yarnFrameworkMaster) {
public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) {
this.yarnFrameworkMaster = yarnFrameworkMaster;
}
......@@ -65,12 +65,16 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb
@Override
public void onContainersCompleted(List<ContainerStatus> list) {
yarnFrameworkMaster.tell(new ContainersComplete(list));
yarnFrameworkMaster.tell(
new ContainersComplete(list),
ActorRef.noSender());
}
@Override
public void onContainersAllocated(List<Container> containers) {
yarnFrameworkMaster.tell(new ContainersAllocated(containers));
yarnFrameworkMaster.tell(
new ContainersAllocated(containers),
ActorRef.noSender());
}
@Override
......@@ -85,14 +89,8 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb
@Override
public void onError(Throwable error) {
yarnFrameworkMaster.tell(new FatalErrorOccurred("Connection to YARN Resource Manager failed", error));
}
/**
* Leaders may change. The current gateway can be adjusted here.
* @param gateway The current gateway to the leading job manager.
*/
public void setCurrentLeaderGateway(ActorGateway gateway) {
this.yarnFrameworkMaster = gateway;
yarnFrameworkMaster.tell(
new FatalErrorOccurred("Connection to YARN Resource Manager failed", error),
ActorRef.noSender());
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.yarn.messages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.yarn.YarnFlinkResourceManager;
import org.apache.hadoop.yarn.api.records.Container;
......@@ -30,7 +29,7 @@ import java.util.List;
*
* NOTE: This message is not serializable, because the Container object is not serializable.
*/
public class ContainersAllocated implements RequiresLeaderSessionID {
public class ContainersAllocated {
private final List<Container> containers;
......
......@@ -18,7 +18,6 @@
package org.apache.flink.yarn.messages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.yarn.YarnFlinkResourceManager;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
......@@ -31,7 +30,7 @@ import java.util.List;
*
* NOTE: This message is not serializable, because the ContainerStatus object is not serializable.
*/
public class ContainersComplete implements RequiresLeaderSessionID {
public class ContainersComplete {
private final List<ContainerStatus> containers;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册