From 63b13c5ffe89a2bb44b735a71615a2b07e3536ad Mon Sep 17 00:00:00 2001 From: Xintong Song Date: Tue, 10 Mar 2020 15:32:35 +0800 Subject: [PATCH] [hotfix][runtime] Code deduplication in ResourceManagerFactory and its implementations. --- .../StandaloneJobClusterEntryPoint.java | 2 +- .../KubernetesResourceManagerFactory.java | 24 +++---- .../MesosResourceManagerFactory.java | 22 ++++--- .../StandaloneSessionClusterEntrypoint.java | 2 +- .../runtime/minicluster/MiniCluster.java | 2 +- .../ActiveResourceManagerFactory.java | 17 +---- .../ResourceManagerFactory.java | 66 +++++++++++++++---- .../StandaloneResourceManagerFactory.java | 45 ++++++++----- .../ProcessFailureCancelingITCase.java | 2 +- .../YarnResourceManagerFactory.java | 22 ++++--- 10 files changed, 125 insertions(+), 79 deletions(-) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index e07501ada39..d28052c1594 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -80,7 +80,7 @@ public final class StandaloneJobClusterEntryPoint extends ClusterEntrypoint { new DefaultDispatcherRunnerFactory( ApplicationDispatcherLeaderProcessFactoryFactory .create(configuration, SessionDispatcherFactory.INSTANCE, program)), - StandaloneResourceManagerFactory.INSTANCE, + StandaloneResourceManagerFactory.getInstance(), JobRestEndpointFactory.INSTANCE); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java index 74edb72260e..707988a3b71 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ConfigurationException; import javax.annotation.Nullable; @@ -57,7 +58,7 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto } @Override - public ResourceManager createActiveResourceManager( + public ResourceManager createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, @@ -66,14 +67,9 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception { - final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = - ResourceManagerRuntimeServicesConfiguration.fromConfiguration( - configuration, KubernetesWorkerResourceSpecFactory.INSTANCE); - final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( - rmServicesConfiguration, - highAvailabilityServices, - rpcService.getScheduledExecutor()); + ResourceManagerMetricGroup resourceManagerMetricGroup, + ResourceManagerRuntimeServices resourceManagerRuntimeServices) { + final KubernetesResourceManagerConfiguration kubernetesResourceManagerConfiguration = new KubernetesResourceManagerConfiguration( configuration.getString(KubernetesConfigOptions.CLUSTER_ID), @@ -85,13 +81,19 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto configuration, highAvailabilityServices, heartbeatServices, - rmRuntimeServices.getSlotManager(), + resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, - rmRuntimeServices.getJobLeaderIdService(), + resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, KubeClientFactory.fromConfiguration(configuration), kubernetesResourceManagerConfiguration); } + + @Override + protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration( + Configuration configuration) throws ConfigurationException { + return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, KubernetesWorkerResourceSpecFactory.INSTANCE); + } } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java index 9aab235634b..8f5cd5d57e6 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,7 @@ public class MesosResourceManagerFactory extends ActiveResourceManagerFactory createActiveResourceManager( + public ResourceManager createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, @@ -71,13 +72,8 @@ public class MesosResourceManagerFactory extends ActiveResourceManagerFactory type of the {@link ResourceIDRetrievable} */ -public abstract class ActiveResourceManagerFactory implements ResourceManagerFactory { +public abstract class ActiveResourceManagerFactory extends ResourceManagerFactory { @Override public ResourceManager createResourceManager( @@ -54,7 +54,7 @@ public abstract class ActiveResourceManagerFactory createActiveResourceManager( - Configuration configuration, - ResourceID resourceId, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - FatalErrorHandler fatalErrorHandler, - ClusterInformation clusterInformation, - @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java index f7d5400e052..89d0cd8c46a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ConfigurationException; import javax.annotation.Nullable; @@ -35,16 +36,57 @@ import javax.annotation.Nullable; * * @param type of the workers of the ResourceManager */ -public interface ResourceManagerFactory { - - ResourceManager createResourceManager( - Configuration configuration, - ResourceID resourceId, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - FatalErrorHandler fatalErrorHandler, - ClusterInformation clusterInformation, - @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception; +public abstract class ResourceManagerFactory { + + public ResourceManager createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl, + ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception { + + final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices( + configuration, rpcService, highAvailabilityServices); + + return createResourceManager( + configuration, + resourceId, + rpcService, + highAvailabilityServices, + heartbeatServices, + fatalErrorHandler, + clusterInformation, + webInterfaceUrl, + resourceManagerMetricGroup, + resourceManagerRuntimeServices); + } + + protected abstract ResourceManager createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl, + ResourceManagerMetricGroup resourceManagerMetricGroup, + ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception; + + private ResourceManagerRuntimeServices createResourceManagerRuntimeServices( + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices) throws ConfigurationException { + return ResourceManagerRuntimeServices.fromConfiguration( + createResourceManagerRuntimeServicesConfiguration(configuration), + highAvailabilityServices, + rpcService.getScheduledExecutor()); + } + + protected abstract ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration( + Configuration configuration) throws ConfigurationException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java index 5fcd3f363d6..c79a7dd251d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java @@ -30,32 +30,35 @@ import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTra import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ConfigurationException; import javax.annotation.Nullable; /** * {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}. */ -public enum StandaloneResourceManagerFactory implements ResourceManagerFactory { - INSTANCE; +public final class StandaloneResourceManagerFactory extends ResourceManagerFactory { + + private static final StandaloneResourceManagerFactory INSTANCE = new StandaloneResourceManagerFactory(); + + private StandaloneResourceManagerFactory() {} + + public static StandaloneResourceManagerFactory getInstance() { + return INSTANCE; + } @Override - public ResourceManager createResourceManager( - Configuration configuration, - ResourceID resourceId, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - FatalErrorHandler fatalErrorHandler, - ClusterInformation clusterInformation, - @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception { - final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = - ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, ArbitraryWorkerResourceSpecFactory.INSTANCE); - final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( - resourceManagerRuntimeServicesConfiguration, - highAvailabilityServices, - rpcService.getScheduledExecutor()); + protected ResourceManager createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl, + ResourceManagerMetricGroup resourceManagerMetricGroup, + ResourceManagerRuntimeServices resourceManagerRuntimeServices) { final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); @@ -73,4 +76,10 @@ public enum StandaloneResourceManagerFactory implements ResourceManagerFactory createActiveResourceManager( + public ResourceManager createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, @@ -60,13 +61,8 @@ public class YarnResourceManagerFactory extends ActiveResourceManagerFactory