diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java index 04a32276f62021d56d0fbd6df5dfc8ccdfa3243d..6d25e18f283223b273ae5eb9606a2b0bfc32a583 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +32,7 @@ public class SavepointStoreFactory { public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend"; public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir"; + public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager"; public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class); @@ -52,55 +53,33 @@ public class SavepointStoreFactory { Configuration config) throws Exception { // Try a the savepoint-specific configuration first. - String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, null); + String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND); if (savepointBackend == null) { LOG.info("No savepoint state backend configured. " + "Using job manager savepoint state backend."); return createJobManagerSavepointStore(); - } - else if (savepointBackend.equals("jobmanager")) { + } else if (savepointBackend.equals("jobmanager")) { LOG.info("Using job manager savepoint state backend."); return createJobManagerSavepointStore(); - } - else if (savepointBackend.equals("filesystem")) { - // Sanity check that the checkpoints are not stored on the job manager only - String checkpointBackend = config.getString( - ConfigConstants.STATE_BACKEND, "jobmanager"); - - if (checkpointBackend.equals("jobmanager")) { - LOG.warn("The combination of file system backend for savepoints and " + - "jobmanager backend for checkpoints does not work. The savepoint " + - "will *not* be recoverable after the job manager shuts down. " + - "Falling back to job manager savepoint state backend."); - - return createJobManagerSavepointStore(); + } else if (savepointBackend.equals("filesystem")) { + String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null); + + if (rootPath == null) { + throw new IllegalConfigurationException("Using filesystem as savepoint state backend, " + + "but did not specify directory. Please set the " + + "following configuration key: '" + SAVEPOINT_DIRECTORY_KEY + + "' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " + + "Falling back to job manager savepoint backend."); + } else { + LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath); + + return createFileSystemSavepointStore(rootPath); } - else { - String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null); - - if (rootPath == null) { - LOG.warn("Using filesystem as savepoint state backend, " + - "but did not specify directory. Please set the " + - "following configuration key: '" + SAVEPOINT_DIRECTORY_KEY + - "' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " + - "Falling back to job manager savepoint backend."); - - return createJobManagerSavepointStore(); - } - else { - LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath); - - return createFileSystemSavepointStore(rootPath); - } - } - } - else { - // Fallback - LOG.warn("Unexpected savepoint backend configuration '{}'. " + - "Falling back to job manager savepoint state backend.", savepointBackend); - - return createJobManagerSavepointStore(); + } else { + throw new IllegalConfigurationException("Unexpected savepoint backend " + + "configuration '" + savepointBackend + "'. " + + "Falling back to job manager savepoint state backend."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java index 69b6f814a75a862944287f7ce556661a08223afc..c0605f7e0c3e0f383bef180035e4d1887248c8b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java @@ -20,11 +20,13 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SavepointStoreFactoryTest { @@ -61,28 +63,34 @@ public class SavepointStoreFactoryTest { @Test public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception { Configuration config = new Configuration(); - + String rootPath = System.getProperty("java.io.tmpdir"); // This combination does not make sense, because the checkpoints will be // lost after the job manager shuts down. config.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); + config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath); + SavepointStore store = SavepointStoreFactory.createFromConfig(config); - assertTrue(store.getStateStore() instanceof HeapStateStore); + assertTrue(store.getStateStore() instanceof FileSystemStateStore); + + FileSystemStateStore stateStore = (FileSystemStateStore) + store.getStateStore(); + assertEquals(new Path(rootPath), stateStore.getRootPath()); } - @Test + @Test(expected = IllegalConfigurationException.class) public void testSavepointBackendFileSystemButNoDirectory() throws Exception { Configuration config = new Configuration(); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); - SavepointStore store = SavepointStoreFactory.createFromConfig(config); - assertTrue(store.getStateStore() instanceof HeapStateStore); + SavepointStoreFactory.createFromConfig(config); + fail("Did not throw expected Exception"); } - @Test + @Test(expected = IllegalConfigurationException.class) public void testUnexpectedSavepointBackend() throws Exception { Configuration config = new Configuration(); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected"); - SavepointStore store = SavepointStoreFactory.createFromConfig(config); - assertTrue(store.getStateStore() instanceof HeapStateStore); + SavepointStoreFactory.createFromConfig(config); + fail("Did not throw expected Exception"); } }