提交 ed31f4c7 编写于 作者: G Gary Yao

[FLINK-12432][runtime] Add SchedulerNG stub implementation

Add new SchedulerNG stub implementation, which will represents the future
default scheduler.

Add feature toggle to switch between existing scheduler and stub
implementation.

Add ThrowingRestartStrategy to validate that in new scheduling code paths, the
legacy restart strategies are not used.

This closes #8452.
上级 f9f43a51
......@@ -160,6 +160,19 @@ public class JobManagerOptions {
// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
/**
* Config parameter determining the scheduler implementation.
*/
@Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
public static final ConfigOption<String> SCHEDULER =
key("jobmanager.scheduler")
.defaultValue("legacy")
.withDescription(Description.builder()
.text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
.list(
text("'legacy': legacy scheduler"),
text("'ng': new generation scheduler"))
.build());
// ---------------------------------------------------------------------------------------------
......
......@@ -57,8 +57,7 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
jobManagerServices.getRestartStrategyFactory());
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, jobManagerServices.getRestartStrategyFactory());
final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
jobMasterConfiguration,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
final class SchedulerNGFactoryFactory {
private SchedulerNGFactoryFactory() {}
static SchedulerNGFactory createSchedulerNGFactory(
final Configuration configuration,
final RestartStrategyFactory restartStrategyFactory) {
final String schedulerName = configuration.getString(JobManagerOptions.SCHEDULER);
switch (schedulerName) {
case "legacy":
return new LegacySchedulerFactory(restartStrategyFactory);
case "ng":
return new DefaultSchedulerFactory();
default:
throw new IllegalArgumentException(String.format(
"Illegal value [%s] for config option [%s]",
schedulerName,
JobManagerOptions.SCHEDULER.key()));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
/**
* A restart strategy that validates that it is not in use by throwing {@link IllegalStateException}
* on any method call.
*/
public class ThrowingRestartStrategy implements RestartStrategy {
@Override
public boolean canRestart() {
throw new IllegalStateException("Unexpected canRestart() call");
}
@Override
public void restart(final RestartCallback restarter, final ScheduledExecutor executor) {
throw new IllegalStateException("Unexpected restart() call");
}
/**
* Factory for {@link ThrowingRestartStrategy}.
*/
public static class ThrowingRestartStrategyFactory extends RestartStrategyFactory {
private static final long serialVersionUID = 1L;
@Override
public RestartStrategy createRestartStrategy() {
return new ThrowingRestartStrategy();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.slf4j.Logger;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* Stub implementation of the future default scheduler.
*/
public class DefaultScheduler extends LegacyScheduler {
public DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout) throws Exception {
super(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
slotProvider,
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout);
}
@Override
public void startScheduling() {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.slf4j.Logger;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* Factory for {@link DefaultScheduler}.
*/
public class DefaultSchedulerFactory implements SchedulerNGFactory {
@Override
public SchedulerNG createInstance(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout) throws Exception {
return new DefaultScheduler(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
slotProvider,
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link SchedulerNGFactory}.
*/
public class SchedulerNGFactoryFactoryTest extends TestLogger {
private static final NoRestartStrategy.NoRestartStrategyFactory TEST_RESTART_STRATEGY_FACTORY = new NoRestartStrategy.NoRestartStrategyFactory();
@Test
public void createLegacySchedulerFactoryByDefault() {
final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(new Configuration());
assertThat(schedulerNGFactory, is(instanceOf(LegacySchedulerFactory.class)));
}
@Test
public void createSchedulerNGFactoryIfConfigured() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULER, "ng");
final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(configuration);
assertThat(schedulerNGFactory, is(instanceOf(DefaultSchedulerFactory.class)));
}
@Test
public void throwsExceptionIfSchedulerNameIsInvalid() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULER, "invalid-scheduler-name");
try {
createSchedulerNGFactory(configuration);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Illegal value [invalid-scheduler-name]"));
}
}
private static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
return SchedulerNGFactoryFactory.createSchedulerNGFactory(
configuration,
TEST_RESTART_STRATEGY_FACTORY);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Tests for {@link ThrowingRestartStrategy}.
*/
public class ThrowingRestartStrategyFactoryTest extends TestLogger {
private RestartStrategy restartStrategy;
@Before
public void setUp() {
restartStrategy = new ThrowingRestartStrategy();
}
@Test
public void restartShouldThrowException() {
final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
try {
restartStrategy.restart(new NoOpRestarter(), manuallyTriggeredScheduledExecutor);
fail("Expected exception not thrown");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), is(equalTo("Unexpected restart() call")));
assertThat(manuallyTriggeredScheduledExecutor.numQueuedRunnables(), is(equalTo(0)));
}
}
@Test
public void canRestartShouldThrowException() {
try {
restartStrategy.canRestart();
fail("Expected exception not thrown");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), is(equalTo("Unexpected canRestart() call")));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册