提交 c9cba277 编写于 作者: U Ufuk Celebi

[FLINK-3512] [runtime] Savepoint backend should not revert to 'jobmanager'

This closes #1712.
上级 d90672fd
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
package org.apache.flink.runtime.checkpoint; package org.apache.flink.runtime.checkpoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -32,6 +32,7 @@ public class SavepointStoreFactory { ...@@ -32,6 +32,7 @@ public class SavepointStoreFactory {
public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend"; public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend";
public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir"; public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";
public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager";
public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class); public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class);
...@@ -52,55 +53,33 @@ public class SavepointStoreFactory { ...@@ -52,55 +53,33 @@ public class SavepointStoreFactory {
Configuration config) throws Exception { Configuration config) throws Exception {
// Try a the savepoint-specific configuration first. // Try a the savepoint-specific configuration first.
String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, null); String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND);
if (savepointBackend == null) { if (savepointBackend == null) {
LOG.info("No savepoint state backend configured. " + LOG.info("No savepoint state backend configured. " +
"Using job manager savepoint state backend."); "Using job manager savepoint state backend.");
return createJobManagerSavepointStore(); return createJobManagerSavepointStore();
} } else if (savepointBackend.equals("jobmanager")) {
else if (savepointBackend.equals("jobmanager")) {
LOG.info("Using job manager savepoint state backend."); LOG.info("Using job manager savepoint state backend.");
return createJobManagerSavepointStore(); return createJobManagerSavepointStore();
} } else if (savepointBackend.equals("filesystem")) {
else if (savepointBackend.equals("filesystem")) { String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null);
// Sanity check that the checkpoints are not stored on the job manager only
String checkpointBackend = config.getString( if (rootPath == null) {
ConfigConstants.STATE_BACKEND, "jobmanager"); throw new IllegalConfigurationException("Using filesystem as savepoint state backend, " +
"but did not specify directory. Please set the " +
if (checkpointBackend.equals("jobmanager")) { "following configuration key: '" + SAVEPOINT_DIRECTORY_KEY +
LOG.warn("The combination of file system backend for savepoints and " + "' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
"jobmanager backend for checkpoints does not work. The savepoint " + "Falling back to job manager savepoint backend.");
"will *not* be recoverable after the job manager shuts down. " + } else {
"Falling back to job manager savepoint state backend."); LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath);
return createJobManagerSavepointStore(); return createFileSystemSavepointStore(rootPath);
} }
else { } else {
String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null); throw new IllegalConfigurationException("Unexpected savepoint backend " +
"configuration '" + savepointBackend + "'. " +
if (rootPath == null) { "Falling back to job manager savepoint state backend.");
LOG.warn("Using filesystem as savepoint state backend, " +
"but did not specify directory. Please set the " +
"following configuration key: '" + SAVEPOINT_DIRECTORY_KEY +
"' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
"Falling back to job manager savepoint backend.");
return createJobManagerSavepointStore();
}
else {
LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath);
return createFileSystemSavepointStore(rootPath);
}
}
}
else {
// Fallback
LOG.warn("Unexpected savepoint backend configuration '{}'. " +
"Falling back to job manager savepoint state backend.", savepointBackend);
return createJobManagerSavepointStore();
} }
} }
......
...@@ -20,11 +20,13 @@ package org.apache.flink.runtime.checkpoint; ...@@ -20,11 +20,13 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SavepointStoreFactoryTest { public class SavepointStoreFactoryTest {
...@@ -61,28 +63,34 @@ public class SavepointStoreFactoryTest { ...@@ -61,28 +63,34 @@ public class SavepointStoreFactoryTest {
@Test @Test
public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception { public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception {
Configuration config = new Configuration(); Configuration config = new Configuration();
String rootPath = System.getProperty("java.io.tmpdir");
// This combination does not make sense, because the checkpoints will be // This combination does not make sense, because the checkpoints will be
// lost after the job manager shuts down. // lost after the job manager shuts down.
config.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
SavepointStore store = SavepointStoreFactory.createFromConfig(config); SavepointStore store = SavepointStoreFactory.createFromConfig(config);
assertTrue(store.getStateStore() instanceof HeapStateStore); assertTrue(store.getStateStore() instanceof FileSystemStateStore);
FileSystemStateStore<CompletedCheckpoint> stateStore = (FileSystemStateStore<CompletedCheckpoint>)
store.getStateStore();
assertEquals(new Path(rootPath), stateStore.getRootPath());
} }
@Test @Test(expected = IllegalConfigurationException.class)
public void testSavepointBackendFileSystemButNoDirectory() throws Exception { public void testSavepointBackendFileSystemButNoDirectory() throws Exception {
Configuration config = new Configuration(); Configuration config = new Configuration();
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
SavepointStore store = SavepointStoreFactory.createFromConfig(config); SavepointStoreFactory.createFromConfig(config);
assertTrue(store.getStateStore() instanceof HeapStateStore); fail("Did not throw expected Exception");
} }
@Test @Test(expected = IllegalConfigurationException.class)
public void testUnexpectedSavepointBackend() throws Exception { public void testUnexpectedSavepointBackend() throws Exception {
Configuration config = new Configuration(); Configuration config = new Configuration();
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected"); config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected");
SavepointStore store = SavepointStoreFactory.createFromConfig(config); SavepointStoreFactory.createFromConfig(config);
assertTrue(store.getStateStore() instanceof HeapStateStore); fail("Did not throw expected Exception");
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册