diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index badfbe2c7e44f5eea6574d5d07d1a81dfa595be3..91fbba60df530aba70965916779f61f3391fe23d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -179,13 +179,7 @@ public abstract class ResourceManager Exception exception = null; try { - jobLeaderIdService.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { - leaderElectionService.stop(); + super.shutDown(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } @@ -193,7 +187,7 @@ public abstract class ResourceManager clearState(); try { - super.shutDown(); + leaderElectionService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java index d04d852c972da310ebb6cedf4e9b60a6d1a7fa41..fa75bbba4b3604443e96d8368aa8c089ba95ae67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; import org.apache.flink.util.Preconditions; import scala.concurrent.duration.Duration; @@ -33,15 +32,12 @@ public class ResourceManagerConfiguration { private final Time timeout; private final Time heartbeatInterval; - private final Time jobTimeout; public ResourceManagerConfiguration( Time timeout, - Time heartbeatInterval, - Time jobTimeout) { + Time heartbeatInterval) { this.timeout = Preconditions.checkNotNull(timeout, "timeout"); this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval"); - this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout"); } public Time getTimeout() { @@ -52,10 +48,6 @@ public class ResourceManagerConfiguration { return heartbeatInterval; } - public Time getJobTimeout() { - return jobTimeout; - } - // -------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------- @@ -81,16 +73,6 @@ public class ResourceManagerConfiguration { "value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e); } - final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT); - final Time jobTimeout; - - try { - jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis()); - } catch (NumberFormatException e) { - throw new ConfigurationException("Could not parse the resource manager's job timeout " + - "value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e); - } - - return new ResourceManagerConfiguration(timeout, heartbeatInterval, jobTimeout); + return new ResourceManagerConfiguration(timeout, heartbeatInterval); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index 749b4075223bfd57fa016dfcfdedca6975e8c2ff..73b27b5417dd62e84b756b2e5984bae378839dd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -21,10 +21,9 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +38,8 @@ public class ResourceManagerRunner implements FatalErrorHandler { private final Object lock = new Object(); + private final ResourceManagerRuntimeServices resourceManagerRuntimeServices; + private final ResourceManager resourceManager; public ResourceManagerRunner( @@ -53,19 +54,21 @@ public class ResourceManagerRunner implements FatalErrorHandler { Preconditions.checkNotNull(metricRegistry); final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); - final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + + final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); + + resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + resourceManagerRuntimeServicesConfiguration, highAvailabilityServices, - rpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + rpcService.getScheduledExecutor()); this.resourceManager = new StandaloneResourceManager( rpcService, resourceManagerConfiguration, highAvailabilityServices, - slotManagerFactory, + resourceManagerRuntimeServices.getSlotManagerFactory(), metricRegistry, - jobLeaderIdService, + resourceManagerRuntimeServices.getJobLeaderIdService(), this); } @@ -82,8 +85,24 @@ public class ResourceManagerRunner implements FatalErrorHandler { } private void shutDownInternally() throws Exception { + Exception exception = null; synchronized (lock) { - resourceManager.shutDown(); + + try { + resourceManager.shutDown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + resourceManagerRuntimeServices.shutDown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + ExceptionUtils.rethrow(exception, "Error while shutting down the resource manager runner."); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java new file mode 100644 index 0000000000000000000000000000000000000000..56edde4c1a3701c6745996b919d96e4e234c2a44 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.util.Preconditions; + +/** + * Container class for the {@link ResourceManager} services. + */ +public class ResourceManagerRuntimeServices { + + private final SlotManagerFactory slotManagerFactory; + private final JobLeaderIdService jobLeaderIdService; + + public ResourceManagerRuntimeServices(SlotManagerFactory slotManagerFactory, JobLeaderIdService jobLeaderIdService) { + this.slotManagerFactory = Preconditions.checkNotNull(slotManagerFactory); + this.jobLeaderIdService = Preconditions.checkNotNull(jobLeaderIdService); + } + + public SlotManagerFactory getSlotManagerFactory() { + return slotManagerFactory; + } + + public JobLeaderIdService getJobLeaderIdService() { + return jobLeaderIdService; + } + + // -------------------- Lifecycle methods ----------------------------------- + + public void shutDown() throws Exception { + jobLeaderIdService.stop(); + } + + // -------------------- Static methods -------------------------------------- + + public static ResourceManagerRuntimeServices fromConfiguration( + ResourceManagerRuntimeServicesConfiguration configuration, + HighAvailabilityServices highAvailabilityServices, + ScheduledExecutor scheduledExecutor) throws Exception { + + final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + configuration.getJobTimeout()); + + return new ResourceManagerRuntimeServices(slotManagerFactory, jobLeaderIdService); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..6de5f4d6153571ed720ce4a897f8835c48590e97 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.util.Preconditions; +import scala.concurrent.duration.Duration; + +/** + * Configuration class for the {@link ResourceManagerRuntimeServices} class. + */ +public class ResourceManagerRuntimeServicesConfiguration { + + private final Time jobTimeout; + + public ResourceManagerRuntimeServicesConfiguration(Time jobTimeout) { + this.jobTimeout = Preconditions.checkNotNull(jobTimeout); + } + + public Time getJobTimeout() { + return jobTimeout; + } + + // ---------------------------- Static methods ---------------------------------- + + public static ResourceManagerRuntimeServicesConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { + + final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT); + final Time jobTimeout; + + try { + jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis()); + } catch (NumberFormatException e) { + throw new ConfigurationException("Could not parse the resource manager's job timeout " + + "value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e); + } + + return new ResourceManagerRuntimeServicesConfiguration(jobTimeout); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 58dedc33f3ae845fdb9b9d00e7295331010c0373..1aa799b4a2320087a33c47f4544bb61b382dc7f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -48,14 +48,17 @@ public class ResourceManagerHATest { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), - Time.seconds(5L), - Time.minutes(5L)); + Time.seconds(5L)); + + ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L)); + ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + resourceManagerRuntimeServicesConfiguration, + highAvailabilityServices, + rpcService.getScheduledExecutor()); + SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( - highAvailabilityServices, - rpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = @@ -65,7 +68,7 @@ public class ResourceManagerHATest { highAvailabilityServices, slotManagerFactory, metricRegistry, - jobLeaderIdService, + resourceManagerRuntimeServices.getJobLeaderIdService(), testingFatalErrorHandler); resourceManager.start(); // before grant leadership, resourceManager's leaderId is null diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 031f76ec753f5a62c69e27583007e00f3486d0de..fb166d41e49cf6c6901dbbaef9f487aca2855581 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -195,14 +195,13 @@ public class ResourceManagerJobMasterTest { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), - Time.seconds(5L), - Time.minutes(5L)); + Time.seconds(5L)); SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + Time.minutes(5L)); ResourceManager resourceManager = new StandaloneResourceManager( rpcService, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 4456235f5aab0832e3225b01f9bc74a43022464d..0a1addb4c5ab140f57c8a20eff0f2abae1922e3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -148,14 +148,13 @@ public class ResourceManagerTaskExecutorTest { TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), - Time.seconds(5L), - Time.minutes(5L)); + Time.seconds(5L)); + MetricRegistry metricRegistry = mock(MetricRegistry.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); - + Time.minutes(5L)); StandaloneResourceManager resourceManager = new StandaloneResourceManager( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 1e5edbed364d849b43f72f7806a6fbc5af1aa0e1..ea660f84188d6688aa42e070593312c65c98b33f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -109,13 +109,12 @@ public class SlotProtocolTest extends TestLogger { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), - Time.seconds(5L), - Time.minutes(5L)); + Time.seconds(5L)); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( testingHaServices, testRpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + Time.seconds(5L)); final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); SpiedResourceManager resourceManager = @@ -217,13 +216,12 @@ public class SlotProtocolTest extends TestLogger { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), - Time.seconds(5L), - Time.minutes(5L)); + Time.seconds(5L)); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( testingHaServices, testRpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + Time.seconds(5L)); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 16edbf70e7701efbfadea55759e6130903de85c9..43f33a31e471194a8b5e1c78b3a0eea293f35ef9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -97,13 +97,12 @@ public class TaskExecutorITCase { TestingSerialRpcService rpcService = new TestingSerialRpcService(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.milliseconds(500L), - Time.milliseconds(500L), - Time.minutes(5L)); + Time.milliseconds(500L)); SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( testingHAServices, rpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + Time.minutes(5L)); MetricRegistry metricRegistry = mock(MetricRegistry.class); HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index 6fb7c8605acf6740409b6d79d9903ee67b2a5018..7a0dbbeaa95cfbd30f5204e841865bd8cbf796e8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -35,11 +35,10 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; -import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; -import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; @@ -64,8 +63,8 @@ import java.io.ObjectInputStream; *

The lifetime of the YARN application bound to that of the Flink job. Other * YARN Application Master implementations are for example the YARN session. * - * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner} - * and {@link org.apache.flink.yarn.YarnResourceManager}. + * It starts actor system and the actors for {@link JobManagerRunner} + * and {@link YarnResourceManager}. * * The JobMasnagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster} * JobMaster handles Flink job execution, while the YarnResourceManager handles container @@ -193,20 +192,20 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati private ResourceManager createResourceManager(Configuration config) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); - final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config); + final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + resourceManagerRuntimeServicesConfiguration, haServices, - commonRpcService.getScheduledExecutor(), - resourceManagerConfiguration.getJobTimeout()); + commonRpcService.getScheduledExecutor()); return new YarnResourceManager(config, ENV, commonRpcService, resourceManagerConfiguration, haServices, - slotManagerFactory, + resourceManagerRuntimeServices.getSlotManagerFactory(), metricRegistry, - jobLeaderIdService, + resourceManagerRuntimeServices.getJobLeaderIdService(), this); }