diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 72a4c58cea1e2da7e7d3dd17f975268a574979c0..e3b4f4d2eccca5aa634d569f8fe89a8e5fdc5a04 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -83,10 +83,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { // DB storage directories /** Base paths for RocksDB directory, as configured. May be null. */ - private Path[] dbBasePaths; + private Path[] configuredDbBasePaths; /** Base paths for RocksDB directory, as initialized */ - private File[] dbStorageDirectories; + private File[] initializedDbBasePaths; private int nextDirectory; @@ -171,15 +171,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { this.jobId = env.getJobID(); // initialize the paths where the local RocksDB files should be stored - if (dbBasePaths == null) { + if (configuredDbBasePaths == null) { // initialize from the temp directories - dbStorageDirectories = env.getIOManager().getSpillingDirectories(); + initializedDbBasePaths = env.getIOManager().getSpillingDirectories(); } else { - List dirs = new ArrayList<>(dbBasePaths.length); + List dirs = new ArrayList<>(configuredDbBasePaths.length); String errorMessage = ""; - for (Path path : dbBasePaths) { + for (Path path : configuredDbBasePaths) { File f = new File(path.toUri().getPath()); if (!f.exists() && !f.mkdirs()) { String msg = "Local DB files directory '" + f.getAbsolutePath() @@ -193,11 +193,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { if (dirs.isEmpty()) { throw new Exception("No local storage directories available. " + errorMessage); } else { - dbStorageDirectories = dirs.toArray(new File[dirs.size()]); + initializedDbBasePaths = dirs.toArray(new File[dirs.size()]); } } - nextDirectory = new Random().nextInt(dbStorageDirectories.length); + nextDirectory = new Random().nextInt(initializedDbBasePaths.length); } @Override @@ -225,15 +225,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { } File[] getStoragePaths() { - return dbStorageDirectories; + return initializedDbBasePaths; } File getNextStoragePath() { int ni = nextDirectory + 1; - ni = ni >= dbStorageDirectories.length ? 0 : ni; + ni = ni >= initializedDbBasePaths.length ? 0 : ni; nextDirectory = ni; - return dbStorageDirectories[ni]; + return initializedDbBasePaths[ni]; } // ------------------------------------------------------------------------ @@ -330,7 +330,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { */ public void setDbStoragePaths(String... paths) { if (paths == null) { - dbBasePaths = null; + configuredDbBasePaths = null; } else if (paths.length == 0) { throw new IllegalArgumentException("empty paths"); @@ -350,7 +350,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { } } - dbBasePaths = pp; + configuredDbBasePaths = pp; } } @@ -359,12 +359,12 @@ public class RocksDBStateBackend extends AbstractStateBackend { * @return The configured DB storage paths, or null, if none were configured. */ public String[] getDbStoragePaths() { - if (dbBasePaths == null) { + if (configuredDbBasePaths == null) { return null; } else { - String[] paths = new String[dbBasePaths.length]; + String[] paths = new String[configuredDbBasePaths.length]; for (int i = 0; i < paths.length; i++) { - paths[i] = dbBasePaths[i].toString(); + paths[i] = configuredDbBasePaths[i].toString(); } return paths; }