提交 a6890b28 编写于 作者: T Till Rohrmann

[FLINK-2354] [runtime] Replace old StateHandleProvider by StateStorageHelper...

[FLINK-2354] [runtime] Replace old StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore

The old StateHandleProvider used in ZooKeeperStateHandleStore had to be replaced because the state backend implementation has changed. Since the new state backend could not be used anymore, a new StateStorageHelper interface has been created. The default implementation FileSystemStateStorageHelper stores the given state onto the specified file system and returns a FileSerializableStateHandle.

Various fixes due to rebasing.
上级 630798d3
...@@ -192,8 +192,6 @@ public class WebRuntimeMonitor implements WebMonitor { ...@@ -192,8 +192,6 @@ public class WebRuntimeMonitor implements WebMonitor {
// job manager configuration, log and stdout // job manager configuration, log and stdout
.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config))) .GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
.GET("/jobmanager/log", new StaticFileServerHandler(logDir))
.GET("/jobmanager/stdout", new StaticFileServerHandler(outDir))
// overview over jobs // overview over jobs
.GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true))) .GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)))
...@@ -222,6 +220,8 @@ public class WebRuntimeMonitor implements WebMonitor { ...@@ -222,6 +220,8 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logDir))
.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, outDir))
// this handler serves all the static contents // this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir)); .GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));
......
...@@ -58,10 +58,12 @@ import scala.concurrent.duration.FiniteDuration; ...@@ -58,10 +58,12 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.file.Files; import java.nio.file.Files;
import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
...@@ -198,7 +200,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> ...@@ -198,7 +200,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
* Response when running with leading JobManager. * Response when running with leading JobManager.
*/ */
private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String requestPath) private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
throws ParseException, IOException { throws IOException, ParseException {
// convert to absolute path // convert to absolute path
final File file = new File(rootPath, requestPath); final File file = new File(rootPath, requestPath);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -46,6 +46,7 @@ import scala.concurrent.duration.Deadline; ...@@ -46,6 +46,7 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.io.File; import java.io.File;
import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Scanner; import java.util.Scanner;
...@@ -62,7 +63,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { ...@@ -62,7 +63,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
private final static FiniteDuration TestTimeout = new FiniteDuration(2, TimeUnit.MINUTES); private final static FiniteDuration TestTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
private final String MAIN_RESOURCES_PATH = getClass().getResource("/../classes/web").getPath(); private final String MAIN_RESOURCES_PATH = getClass().getResource("/web").getPath();
/** /**
* Tests operation of the monitor in standalone operation. * Tests operation of the monitor in standalone operation.
...@@ -82,10 +83,13 @@ public class WebRuntimeMonitorITCase extends TestLogger { ...@@ -82,10 +83,13 @@ public class WebRuntimeMonitorITCase extends TestLogger {
ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head(); ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
ActorRef jmActor = flink.jobManagerActors().get().head(); ActorRef jmActor = flink.jobManagerActors().get().head();
File logDir = temporaryFolder.newFolder("log");
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
Configuration monitorConfig = new Configuration(); Configuration monitorConfig = new Configuration();
monitorConfig.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, MAIN_RESOURCES_PATH);
monitorConfig.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
// Needs to match the leader address from the leader retrieval service // Needs to match the leader address from the leader retrieval service
String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor); String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
...@@ -143,9 +147,13 @@ public class WebRuntimeMonitorITCase extends TestLogger { ...@@ -143,9 +147,13 @@ public class WebRuntimeMonitorITCase extends TestLogger {
final Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( final Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
zooKeeper.getConnectString(), zooKeeper.getConnectString(),
temporaryFolder.getRoot().getPath()); temporaryFolder.getRoot().getPath());
config.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, MAIN_RESOURCES_PATH);
config.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true); File logDir = temporaryFolder.newFolder();
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
for (int i = 0; i < jobManagerSystem.length; i++) { for (int i = 0; i < jobManagerSystem.length; i++) {
jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(), jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
...@@ -280,10 +288,13 @@ public class WebRuntimeMonitorITCase extends TestLogger { ...@@ -280,10 +288,13 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try (TestingServer zooKeeper = new TestingServer()) { try (TestingServer zooKeeper = new TestingServer()) {
File logDir = temporaryFolder.newFolder();
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
final Configuration config = new Configuration(); final Configuration config = new Configuration();
config.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, MAIN_RESOURCES_PATH);
config.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString()); config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
......
...@@ -43,7 +43,7 @@ var path = require('path'); ...@@ -43,7 +43,7 @@ var path = require('path');
var environment = 'development'; var environment = 'development';
var paths = { var paths = {
src: './app/', src: './app/',
dest: '../src/main/resources/web/', dest: './web/',
vendor: './bower_components/', vendor: './bower_components/',
vendorLocal: './vendor-local/', vendorLocal: './vendor-local/',
assets: './assets/', assets: './assets/',
...@@ -168,7 +168,7 @@ gulp.task('watch', function () { ...@@ -168,7 +168,7 @@ gulp.task('watch', function () {
}); });
gulp.task('serve', serve({ gulp.task('serve', serve({
root: '../src/main/resources/web/', root: 'web',
port: 3001 port: 3001
})); }));
......
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class="navbar navbar-default navbar-fixed-top navbar-main">
<div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
<div class="navbar-title">Task Managers</div>
</nav>
<div id="content-inner">
<table class="table table-clickable table-hover">
<thead>
<tr>
<th>Path, ID</th>
<th>Data Port</th>
<th>Last Heartbeat</th>
<th>All Slots</th>
<th>Free Slots</th>
<th>CPU Cores</th>
<th>Physical Memory</th>
<th>Free Memory</th>
<th>Flink Managed Memory</th>
</tr>
</thead>
<tbody>
<tr ng-repeat="manager in managers" ui-sref="single-manager.metrics({taskmanagerid: manager.id})">
<td>
{{ manager.path }}
<div class="small-label">{{ manager.id }}</div>
</td>
<td>{{ manager.dataPort }}</td>
<td>{{ manager.timeSinceLastHeartbeat | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</td>
<td>{{ manager.slotsNumber }}</td>
<td>{{ manager.freeSlots }}</td>
<td>{{ manager.cpuCores }}</td>
<td>{{ manager.physicalMemory | bytes:MB }}</td>
<td>{{ manager.freeMemory | bytes:MB }}</td>
<td>{{ manager.managedMemory | bytes:MB }}</td>
</tr>
</tbody>
</table>
</div>
\ No newline at end of file
...@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; ...@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.IOUtils; import org.apache.flink.runtime.util.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -51,40 +50,27 @@ class FileSystemBlobStore implements BlobStore { ...@@ -51,40 +50,27 @@ class FileSystemBlobStore implements BlobStore {
private final String basePath; private final String basePath;
FileSystemBlobStore(Configuration config) throws IOException { FileSystemBlobStore(Configuration config) throws IOException {
StateBackend stateBackend = StateBackend.fromConfig(config); String stateBackendBasePath = config.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
if (stateBackend == StateBackend.FILESYSTEM) { if (stateBackendBasePath.equals("")) {
String stateBackendBasePath = config.getString( throw new IllegalConfigurationException(String.format("Missing configuration for " +
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); "file system state backend recovery path. Please specify via " +
"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
if (stateBackendBasePath.equals("")) { }
throw new IllegalConfigurationException(String.format("Missing configuration for " +
"file system state backend recovery path. Please specify via " +
"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
}
stateBackendBasePath += "/blob"; stateBackendBasePath += "/blob";
this.basePath = stateBackendBasePath; this.basePath = stateBackendBasePath;
try { try {
FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath)); FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath));
}
catch (URISyntaxException e) {
throw new IOException(e);
}
LOG.info("Created blob directory {}.", basePath);
} }
else { catch (URISyntaxException e) {
// Nothing else support at the moment throw new IOException(e);
throw new IllegalConfigurationException(
String.format("Illegal state backend " +
"configuration '%s'. Please configure '%s' as state " +
"backend and specify the recovery path via '%s' key.",
stateBackend, StateBackend.FILESYSTEM,
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
} }
LOG.info("Created blob directory {}.", basePath);
} }
// - Put ------------------------------------------------------------------ // - Put ------------------------------------------------------------------
......
...@@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths; ...@@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -92,7 +92,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -92,7 +92,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* @param client The Curator ZooKeeper client * @param client The Curator ZooKeeper client
* @param checkpointsPath The ZooKeeper path for the checkpoints (needs to * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to
* start with a '/') * start with a '/')
* @param stateHandleProvider The state handle provider for checkpoints * @param stateStorage State storage to be used to persist the completed
* checkpoint
* @throws Exception * @throws Exception
*/ */
public ZooKeeperCompletedCheckpointStore( public ZooKeeperCompletedCheckpointStore(
...@@ -100,16 +101,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -100,16 +101,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
ClassLoader userClassLoader, ClassLoader userClassLoader,
CuratorFramework client, CuratorFramework client,
String checkpointsPath, String checkpointsPath,
StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception { StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage");
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
checkNotNull(client, "Curator client"); checkNotNull(client, "Curator client");
checkNotNull(checkpointsPath, "Checkpoints path"); checkNotNull(checkpointsPath, "Checkpoints path");
checkNotNull(stateHandleProvider, "State handle provider");
// Ensure that the checkpoints path exists // Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(checkpointsPath) client.newNamespaceAwareEnsurePath(checkpointsPath)
...@@ -118,8 +119,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -118,8 +119,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
// All operations will have the path as root // All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>( this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
this.client, stateHandleProvider);
this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
......
...@@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths; ...@@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -87,13 +87,21 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { ...@@ -87,13 +87,21 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
/** Flag indicating whether this instance is running. */ /** Flag indicating whether this instance is running. */
private boolean isRunning; private boolean isRunning;
/**
* Submitted job graph store backed by ZooKeeper
*
* @param client ZooKeeper client
* @param currentJobsPath ZooKeeper path for current job graphs
* @param stateStorage State storage used to persist the submitted jobs
* @throws Exception
*/
public ZooKeeperSubmittedJobGraphStore( public ZooKeeperSubmittedJobGraphStore(
CuratorFramework client, CuratorFramework client,
String currentJobsPath, String currentJobsPath,
StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception { StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
checkNotNull(currentJobsPath, "Current jobs path"); checkNotNull(currentJobsPath, "Current jobs path");
checkNotNull(stateHandleProvider, "State handle provider"); checkNotNull(stateStorage, "State storage");
// Keep a reference to the original client and not the namespace facade. The namespace // Keep a reference to the original client and not the namespace facade. The namespace
// facade cannot be closed. // facade cannot be closed.
...@@ -104,11 +112,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { ...@@ -104,11 +112,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
.ensure(client.getZookeeperClient()); .ensure(client.getZookeeperClient());
// All operations will have the path as root // All operations will have the path as root
client = client.usingNamespace(client.getNamespace() + currentJobsPath); CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
this.pathCache = new PathChildrenCache(client, "/", false); this.pathCache = new PathChildrenCache(facade, "/", false);
pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener()); pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.OperatorState;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
......
...@@ -18,22 +18,196 @@ ...@@ -18,22 +18,196 @@
package org.apache.flink.runtime.state; package org.apache.flink.runtime.state;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
public enum StateBackend { import java.io.IOException;
JOBMANAGER, FILESYSTEM; import java.io.OutputStream;
import java.io.Serializable;
/**
* A state backend defines how state is stored and snapshotted during checkpoints.
*
* @param <Backend> The type of backend itself. This generic parameter is used to refer to the
* type of backend when creating state backed by this backend.
*/
public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
private static final long serialVersionUID = 4620413814639220247L;
// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------
/**
* This method is called by the task upon deployment to initialize the state backend for
* data for a specific job.
*
* @param job The ID of the job for which the state backend instance checkpoints data.
* @throws Exception Overwritten versions of this method may throw exceptions, in which
* case the job that uses the state backend is considered failed during
* deployment.
*/
public abstract void initializeForJob(JobID job) throws Exception;
/**
* Disposes all state associated with the current job.
*
* @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
*/
public abstract void disposeAllStateForCurrentJob() throws Exception;
/**
* Closes the state backend, releasing all internal resources, but does not delete any persistent
* checkpoint data.
*
* @throws Exception Exceptions can be forwarded and will be logged by the system
*/
public abstract void close() throws Exception;
// ------------------------------------------------------------------------
// key/value state
// ------------------------------------------------------------------------
/** /**
* Returns the configured {@link StateBackend}. * Creates a key/value state backed by this state backend.
*
* @param keySerializer The serializer for the key.
* @param valueSerializer The serializer for the value.
* @param defaultValue The value that is returned when no other value has been associated with a key, yet.
* @param <K> The type of the key.
* @param <V> The type of the value.
*
* @return A new key/value state backed by this backend.
*
* @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
*/
public abstract <K, V> KvState<K, V, Backend> createKvState(
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
V defaultValue) throws Exception;
// ------------------------------------------------------------------------
// storing state for a checkpoint
// ------------------------------------------------------------------------
/**
* Creates an output stream that writes into the state of the given checkpoint. When the stream
* is closes, it returns a state handle that can retrieve the state back.
*
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @return An output stream that writes state for the given checkpoint.
*
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;
/**
* Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
* When the stream is closes, it returns a state handle that can retrieve the state back.
* *
* @param config The config to parse * @param checkpointID The ID of the checkpoint.
* @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not * @param timestamp The timestamp of the checkpoint.
* configured. * @return An DataOutputView stream that writes state for the given checkpoint.
*/ *
public static StateBackend fromConfig(Configuration config) { * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
return StateBackend.valueOf(config.getString( */
ConfigConstants.STATE_BACKEND, public CheckpointStateOutputView createCheckpointStateOutputView(
ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase()); long checkpointID, long timestamp) throws Exception {
return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
}
/**
* Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
*
* @param state The state to be checkpointed.
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param <S> The type of the state.
*
* @return A state handle that can retrieve the checkpoined state.
*
* @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
*/
public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
S state, long checkpointID, long timestamp) throws Exception;
// ------------------------------------------------------------------------
// Checkpoint state output stream
// ------------------------------------------------------------------------
/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
*/
public static abstract class CheckpointStateOutputStream extends OutputStream {
/**
* Closes the stream and gets a state handle that can create an input stream
* producing the data written to this stream.
*
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
}
/**
* A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
*/
public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
private final CheckpointStateOutputStream out;
public CheckpointStateOutputView(CheckpointStateOutputStream out) {
super(out);
this.out = out;
}
/**
* Closes the stream and gets a state handle that can create a DataInputView.
* producing the data written to this stream.
*
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
return new DataInputViewHandle(out.closeAndGetHandle());
}
@Override
public void close() throws IOException {
out.close();
}
}
/**
* Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
*/
private static final class DataInputViewHandle implements StateHandle<DataInputView> {
private static final long serialVersionUID = 2891559813513532079L;
private final StreamStateHandle stream;
private DataInputViewHandle(StreamStateHandle stream) {
this.stream = stream;
}
@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
}
@Override
public void discardState() throws Exception {
stream.discardState();
}
} }
} }
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import java.io.Serializable;
/**
* State handler provider factory.
*
* <p>This is going to be superseded soon.
*/
public class StateHandleProviderFactory {
/**
* Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
* the configured recovery path.
*/
public static <T extends Serializable> StateHandleProvider<T> createRecoveryFileStateHandleProvider(
Configuration config) {
StateBackend stateBackend = StateBackend.fromConfig(config);
if (stateBackend == StateBackend.FILESYSTEM) {
String recoveryPath = config.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
if (recoveryPath.equals("")) {
throw new IllegalConfigurationException("Missing recovery path. Specify via " +
"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
}
else {
return FileStateHandle.createProvider(recoveryPath);
}
}
else {
throw new IllegalConfigurationException("Unexpected state backend configuration " +
stateBackend);
}
}
}
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
......
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.InputStream; import java.io.InputStream;
......
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.streaming.api.state.AbstractHeapKvState; import org.apache.flink.runtime.state.AbstractHeapKvState;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.util.HashMap; import java.util.HashMap;
......
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.streaming.api.state.KvStateSnapshot; import org.apache.flink.runtime.state.KvStateSnapshot;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.util.HashMap; import java.util.HashMap;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
...@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; ...@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
...@@ -16,15 +16,15 @@ ...@@ -16,15 +16,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.filesystem; package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.state.StateBackendFactory; import org.apache.flink.runtime.state.StateBackendFactory;
/** /**
* A factory that creates an {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend} * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
* from a configuration. * from a configuration.
*/ */
public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> { public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.memory; package org.apache.flink.runtime.state.memory;
import org.apache.flink.streaming.api.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
......
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.memory; package org.apache.flink.runtime.state.memory;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.streaming.api.state.AbstractHeapKvState; import org.apache.flink.runtime.state.AbstractHeapKvState;
import java.util.HashMap; import java.util.HashMap;
......
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.memory; package org.apache.flink.runtime.state.memory;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.streaming.api.state.KvStateSnapshot; import org.apache.flink.runtime.state.KvStateSnapshot;
import java.util.HashMap; import java.util.HashMap;
......
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.memory; package org.apache.flink.runtime.state.memory;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state.memory; package org.apache.flink.runtime.state.memory;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;
......
...@@ -34,11 +34,14 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; ...@@ -34,11 +34,14 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService; import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.state.StateHandleProviderFactory; import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
public class ZooKeeperUtils { public class ZooKeeperUtils {
...@@ -170,7 +173,7 @@ public class ZooKeeperUtils { ...@@ -170,7 +173,7 @@ public class ZooKeeperUtils {
String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH, String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH); ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH, String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH); ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
} }
...@@ -188,8 +191,7 @@ public class ZooKeeperUtils { ...@@ -188,8 +191,7 @@ public class ZooKeeperUtils {
checkNotNull(configuration, "Configuration"); checkNotNull(configuration, "Configuration");
StateHandleProvider<SubmittedJobGraph> stateHandleProvider = StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
// ZooKeeper submitted jobs root dir // ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = configuration.getString( String zooKeeperSubmittedJobsPath = configuration.getString(
...@@ -197,7 +199,7 @@ public class ZooKeeperUtils { ...@@ -197,7 +199,7 @@ public class ZooKeeperUtils {
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
return new ZooKeeperSubmittedJobGraphStore( return new ZooKeeperSubmittedJobGraphStore(
client, zooKeeperSubmittedJobsPath, stateHandleProvider); client, zooKeeperSubmittedJobsPath, stateStorage);
} }
/** /**
...@@ -219,21 +221,23 @@ public class ZooKeeperUtils { ...@@ -219,21 +221,23 @@ public class ZooKeeperUtils {
checkNotNull(configuration, "Configuration"); checkNotNull(configuration, "Configuration");
StateHandleProvider<CompletedCheckpoint> stateHandleProvider = String checkpointsPath = configuration.getString(
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration); ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
String completedCheckpointsPath = configuration.getString(
ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
configuration,
"completedCheckpoint");
checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
return new ZooKeeperCompletedCheckpointStore( return new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain, maxNumberOfCheckpointsToRetain,
userClassLoader, userClassLoader,
client, client,
completedCheckpointsPath, checkpointsPath,
stateHandleProvider); stateStorage);
} }
/** /**
...@@ -258,6 +262,30 @@ public class ZooKeeperUtils { ...@@ -258,6 +262,30 @@ public class ZooKeeperUtils {
return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath); return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath);
} }
/**
* Creates a {@link FileSystemStateStorageHelper} instance.
*
* @param configuration {@link Configuration} object
* @param prefix Prefix for the created files
* @param <T> Type of the state objects
* @return {@link FileSystemStateStorageHelper} instance
* @throws IOException
*/
private static <T extends Serializable> FileSystemStateStorageHelper<T> createFileSystemStateStorage(
Configuration configuration,
String prefix) throws IOException {
String rootPath = configuration.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
if (rootPath.equals("")) {
throw new IllegalConfigurationException("Missing recovery path. Specify via " +
"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
} else {
return new FileSystemStateStorageHelper<T>(rootPath, prefix);
}
}
/** /**
* Private constructor to prevent instantiation. * Private constructor to prevent instantiation.
*/ */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.zookeeper;
import org.apache.flink.runtime.state.StateHandle;
import java.io.Serializable;
/**
* State storage helper which is used by {@ZooKeeperStateHandleStore} to persiste state before
* the state handle is written to ZooKeeper.
*
* @param <T>
*/
public interface StateStorageHelper<T extends Serializable> {
/**
* Stores the given state and returns a state handle to it.
*
* @param state State to be stored
* @return State handle to the stored state
* @throws Exception
*/
StateHandle<T> store(T state) throws Exception;
}
...@@ -23,12 +23,14 @@ import org.apache.curator.framework.api.BackgroundCallback; ...@@ -23,12 +23,14 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -65,11 +67,12 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -65,11 +67,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/ */
public class ZooKeeperStateHandleStore<T extends Serializable> { public class ZooKeeperStateHandleStore<T extends Serializable> {
public static Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
/** Curator ZooKeeper client */ /** Curator ZooKeeper client */
private final CuratorFramework client; private final CuratorFramework client;
/** State handle provider */ private final StateStorageHelper<T> storage;
private final StateHandleProvider<T> stateHandleProvider;
/** /**
* Creates a {@link ZooKeeperStateHandleStore}. * Creates a {@link ZooKeeperStateHandleStore}.
...@@ -78,14 +81,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { ...@@ -78,14 +81,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* expected that the client's namespace ensures that the root * expected that the client's namespace ensures that the root
* path is exclusive for all state handles managed by this * path is exclusive for all state handles managed by this
* instance, e.g. <code>client.usingNamespace("/stateHandles")</code> * instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
* @param stateHandleProvider The state handle provider for the state
*/ */
public ZooKeeperStateHandleStore( public ZooKeeperStateHandleStore(
CuratorFramework client, CuratorFramework client,
StateHandleProvider<T> stateHandleProvider) { StateStorageHelper storage) throws IOException {
this.client = checkNotNull(client, "Curator client"); this.client = checkNotNull(client, "Curator client");
this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); this.storage = checkNotNull(storage, "State storage");
} }
/** /**
...@@ -112,12 +114,14 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { ...@@ -112,12 +114,14 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* @return Created {@link StateHandle} * @return Created {@link StateHandle}
* @throws Exception If a ZooKeeper or state handle operation fails * @throws Exception If a ZooKeeper or state handle operation fails
*/ */
public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception { public StateHandle<T> add(
String pathInZooKeeper,
T state,
CreateMode createMode) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State"); checkNotNull(state, "State");
// Create the state handle. Nothing persisted yet. StateHandle<T> stateHandle = storage.store(state);
StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
boolean success = false; boolean success = false;
...@@ -159,7 +163,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { ...@@ -159,7 +163,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
StateHandle<T> oldStateHandle = get(pathInZooKeeper); StateHandle<T> oldStateHandle = get(pathInZooKeeper);
StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state); StateHandle<T> stateHandle = storage.store(state);
boolean success = false; boolean success = false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.zookeeper.filesystem;
import com.google.common.base.Preconditions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
import org.apache.flink.runtime.util.FileUtils;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* {@link StateStorageHelper} implementation which stores the state in the given filesystem path.
*
* @param <T>
*/
public class FileSystemStateStorageHelper<T extends Serializable> implements StateStorageHelper<T> {
private final Path rootPath;
private final String prefix;
private final FileSystem fs;
public FileSystemStateStorageHelper(String rootPath, String prefix) throws IOException {
this(new Path(rootPath), prefix);
}
public FileSystemStateStorageHelper(Path rootPath, String prefix) throws IOException {
this.rootPath = Preconditions.checkNotNull(rootPath, "Root path");
this.prefix = Preconditions.checkNotNull(prefix, "Prefix");
fs = FileSystem.get(rootPath.toUri());
}
@Override
public StateHandle<T> store(T state) throws Exception {
Exception latestException = null;
for (int attempt = 0; attempt < 10; attempt++) {
Path filePath = getNewFilePath();
FSDataOutputStream outStream;
try {
outStream = fs.create(filePath, false);
}
catch (Exception e) {
latestException = e;
continue;
}
try(ObjectOutputStream os = new ObjectOutputStream(outStream)) {
os.writeObject(state);
}
return new FileSerializableStateHandle<>(filePath);
}
throw new Exception("Could not open output stream for state backend", latestException);
}
private Path getNewFilePath() {
return new Path(rootPath, FileUtils.getRandomFilename(prefix));
}
}
...@@ -1542,30 +1542,25 @@ object JobManager { ...@@ -1542,30 +1542,25 @@ object JobManager {
} }
} }
val webMonitor: Option[WebMonitor] = val address = AkkaUtils.getAddress(jobManagerSystem)
if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
val address = AkkaUtils.getAddress(jobManagerSystem)
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get) configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get) configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
// start the job manager web frontend val webMonitor: Option[WebMonitor] =
if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) { if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
val leaderRetrievalService = LeaderRetrievalUtils LOG.info("Starting JobManger web frontend")
.createLeaderRetrievalService(configuration) val leaderRetrievalService = LeaderRetrievalUtils
.createLeaderRetrievalService(configuration)
LOG.info("Starting NEW JobManger web frontend") // start the web frontend. we need to load this dynamically
// start the new web frontend. we need to load this dynamically // because it is not in the same project/dependencies
// because it is not in the same project/dependencies val webServer = WebMonitorUtils.startWebRuntimeMonitor(
Some(startWebRuntimeMonitor(configuration, leaderRetrievalService, jobManagerSystem)) configuration,
} leaderRetrievalService,
else { jobManagerSystem)
LOG.info("Starting JobManger web frontend")
// The old web frontend does not work with recovery mode Option(webServer)
val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(configuration)
Some(new WebInfoServer(configuration, leaderRetrievalService, jobManagerSystem))
}
} }
else { else {
None None
...@@ -1624,16 +1619,8 @@ object JobManager { ...@@ -1624,16 +1619,8 @@ object JobManager {
monitor => monitor =>
val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration) val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration)
monitor.start(jobManagerAkkaUrl) monitor.start(jobManagerAkkaUrl)
LOG.info("Starting JobManger web frontend")
// start the web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
val webServer = WebMonitorUtils.startWebRuntimeMonitor(
configuration,
leaderRetrievalService,
jobManagerSystem)
} }
(jobManagerSystem, jobManager, archive, webMonitor) (jobManagerSystem, jobManager, archive, webMonitor)
} }
catch { catch {
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.apache.flink.runtime.checkpoint; package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.LocalStateHandle; import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
...@@ -56,8 +58,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ...@@ -56,8 +58,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
ClassLoader userLoader) throws Exception { ClassLoader userLoader) throws Exception {
return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader, return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
ZooKeeper.createClient(), CheckpointsPath, new LocalStateHandle ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() {
.LocalStateHandleProvider<CompletedCheckpoint>()); @Override
public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
return new LocalStateHandle<>(state);
}
});
} }
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.flink.runtime.execution.librarycache; package org.apache.flink.runtime.execution.librarycache;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -28,9 +27,9 @@ import org.apache.flink.runtime.blob.BlobKey; ...@@ -28,9 +27,9 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.junit.After; import org.junit.Rule;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
...@@ -46,23 +45,8 @@ import static org.junit.Assert.assertEquals; ...@@ -46,23 +45,8 @@ import static org.junit.Assert.assertEquals;
public class BlobLibraryCacheRecoveryITCase { public class BlobLibraryCacheRecoveryITCase {
private File recoveryDir; @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() throws Exception {
recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
throw new IllegalStateException("Failed to create temp directory for test");
}
}
@After
public void cleanUp() throws Exception {
if (recoveryDir != null) {
FileUtils.deleteDirectory(recoveryDir);
}
}
/** /**
* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobLibraryCacheManager. * participating BlobLibraryCacheManager.
...@@ -81,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase { ...@@ -81,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
Configuration config = new Configuration(); Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
for (int i = 0; i < server.length; i++) { for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config); server[i] = new BlobServer(config);
...@@ -170,7 +154,7 @@ public class BlobLibraryCacheRecoveryITCase { ...@@ -170,7 +154,7 @@ public class BlobLibraryCacheRecoveryITCase {
} }
// Verify everything is clean // Verify everything is clean
File[] recoveryFiles = recoveryDir.listFiles(); File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
} }
} }
...@@ -24,7 +24,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; ...@@ -24,7 +24,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider; import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import org.junit.AfterClass; import org.junit.AfterClass;
...@@ -54,8 +56,13 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { ...@@ -54,8 +56,13 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
private final static LocalStateHandleProvider<SubmittedJobGraph> StateHandleProvider = private final static StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>() {
new LocalStateHandleProvider<>(); @Override
public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception {
return new LocalStateHandle<>(state);
}
};
@AfterClass @AfterClass
public static void tearDown() throws Exception { public static void tearDown() throws Exception {
...@@ -72,8 +79,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { ...@@ -72,8 +79,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test @Test
public void testPutAndRemoveJobGraph() throws Exception { public void testPutAndRemoveJobGraph() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testPutAndRemoveJobGraph", ZooKeeper.createClient(),
StateHandleProvider); "/testPutAndRemoveJobGraph",
localStateStorage);
try { try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class); SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
...@@ -125,7 +133,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { ...@@ -125,7 +133,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test @Test
public void testRecoverJobGraphs() throws Exception { public void testRecoverJobGraphs() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testRecoverJobGraphs", StateHandleProvider); ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
try { try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class); SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
...@@ -175,10 +183,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { ...@@ -175,10 +183,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
try { try {
jobGraphs = new ZooKeeperSubmittedJobGraphStore( jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider); ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
otherJobGraphs = new ZooKeeperSubmittedJobGraphStore( otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider); ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0); SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
...@@ -234,10 +242,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { ...@@ -234,10 +242,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception { public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider); ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore( ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider); ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
jobGraphs.start(null); jobGraphs.start(null);
otherJobGraphs.start(null); otherJobGraphs.start(null);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
...@@ -28,9 +28,13 @@ import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; ...@@ -28,9 +28,13 @@ import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.types.StringValue; import org.apache.flink.types.StringValue;
import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.OperatingSystem;
......
...@@ -16,15 +16,19 @@ ...@@ -16,15 +16,19 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.state; package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.types.StringValue; import org.apache.flink.types.StringValue;
import org.junit.Test; import org.junit.Test;
...@@ -36,7 +40,7 @@ import java.util.HashMap; ...@@ -36,7 +40,7 @@ import java.util.HashMap;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
* Tests for the {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend}. * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
*/ */
public class MemoryStateBackendTest { public class MemoryStateBackendTest {
......
...@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils; ...@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
...@@ -79,7 +80,7 @@ public class ZooKeeperTestUtils { ...@@ -79,7 +80,7 @@ public class ZooKeeperTestUtils {
// File system state backend // File system state backend
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery"); config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
// Akka failure detection and execution retries // Akka failure detection and execution retries
......
...@@ -23,7 +23,6 @@ import org.apache.curator.framework.api.BackgroundCallback; ...@@ -23,7 +23,6 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
...@@ -83,11 +82,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -83,11 +82,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/ */
@Test @Test
public void testAdd() throws Exception { public void testAdd() throws Exception {
// Setup LongStateStorage longStateStorage = new LongStateStorage();
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
ZooKeeper.getClient(), longStateStorage);
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
// Config // Config
final String pathInZooKeeper = "/testAdd"; final String pathInZooKeeper = "/testAdd";
...@@ -98,8 +95,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -98,8 +95,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
// Verify // Verify
// State handle created // State handle created
assertEquals(1, stateHandleProvider.getStateHandles().size()); assertEquals(1, store.getAll().size());
assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null)); assertEquals(state, store.get(pathInZooKeeper).getState(null));
// Path created and is persistent // Path created and is persistent
Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
...@@ -120,10 +117,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -120,10 +117,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/ */
@Test @Test
public void testAddWithCreateMode() throws Exception { public void testAddWithCreateMode() throws Exception {
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage longStateStorage = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), longStateStorage);
ZooKeeper.getClient(), stateHandleProvider);
// Config // Config
Long state = 3457347234L; Long state = 3457347234L;
...@@ -151,8 +147,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -151,8 +147,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
// Verify // Verify
// State handle created // State handle created
assertEquals(i + 1, stateHandleProvider.getStateHandles().size()); assertEquals(i + 1, store.getAll().size());
assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null)); assertEquals(state, longStateStorage.getStateHandles().get(i).getState(null));
// Path created // Path created
Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
...@@ -182,7 +178,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -182,7 +178,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/ */
@Test(expected = Exception.class) @Test(expected = Exception.class)
public void testAddAlreadyExistingPath() throws Exception { public void testAddAlreadyExistingPath() throws Exception {
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -198,7 +194,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -198,7 +194,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testAddDiscardStateHandleAfterFailure() throws Exception { public void testAddDiscardStateHandleAfterFailure() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
CuratorFramework client = spy(ZooKeeper.getClient()); CuratorFramework client = spy(ZooKeeper.getClient());
when(client.create()).thenThrow(new RuntimeException("Expected test Exception.")); when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
...@@ -231,7 +227,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -231,7 +227,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testReplace() throws Exception { public void testReplace() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -270,10 +266,10 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -270,10 +266,10 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/ */
@Test(expected = Exception.class) @Test(expected = Exception.class)
public void testReplaceNonExistingPath() throws Exception { public void testReplaceNonExistingPath() throws Exception {
StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider(); StateStorageHelper<Long> stateStorage = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateStorage);
store.replace("/testReplaceNonExistingPath", 0, 1L); store.replace("/testReplaceNonExistingPath", 0, 1L);
} }
...@@ -284,7 +280,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -284,7 +280,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testReplaceDiscardStateHandleAfterFailure() throws Exception { public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
CuratorFramework client = spy(ZooKeeper.getClient()); CuratorFramework client = spy(ZooKeeper.getClient());
when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
...@@ -329,7 +325,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -329,7 +325,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testGetAndExists() throws Exception { public void testGetAndExists() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -354,7 +350,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -354,7 +350,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/ */
@Test(expected = Exception.class) @Test(expected = Exception.class)
public void testGetNonExistingPath() throws Exception { public void testGetNonExistingPath() throws Exception {
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -368,7 +364,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -368,7 +364,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testGetAll() throws Exception { public void testGetAll() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -399,7 +395,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -399,7 +395,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testGetAllSortedByName() throws Exception { public void testGetAllSortedByName() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -429,7 +425,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -429,7 +425,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testRemove() throws Exception { public void testRemove() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -453,7 +449,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -453,7 +449,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testRemoveWithCallback() throws Exception { public void testRemoveWithCallback() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -492,7 +488,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -492,7 +488,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testRemoveAndDiscardState() throws Exception { public void testRemoveAndDiscardState() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -514,7 +510,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -514,7 +510,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test @Test
public void testRemoveAndDiscardAllState() throws Exception { public void testRemoveAndDiscardAllState() throws Exception {
// Setup // Setup
LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider); ZooKeeper.getClient(), stateHandleProvider);
...@@ -543,21 +539,19 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { ...@@ -543,21 +539,19 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
// Simple test helpers // Simple test helpers
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
private static class LongStateHandleProvider implements StateHandleProvider<Long> { private static class LongStateStorage implements StateStorageHelper<Long> {
private static final long serialVersionUID = 4572084854499402276L;
private final List<LongStateHandle> stateHandles = new ArrayList<>(); private final List<LongStateHandle> stateHandles = new ArrayList<>();
@Override @Override
public StateHandle<Long> createStateHandle(Long state) { public StateHandle<Long> store(Long state) throws Exception {
LongStateHandle stateHandle = new LongStateHandle(state); LongStateHandle stateHandle = new LongStateHandle(state);
stateHandles.add(stateHandle); stateHandles.add(stateHandle);
return stateHandle; return stateHandle;
} }
public List<LongStateHandle> getStateHandles() { List<LongStateHandle> getStateHandles() {
return stateHandles; return stateHandles;
} }
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# limitations under the License. # limitations under the License.
################################################################################ ################################################################################
log4j.rootLogger=INFO, console log4j.rootLogger=OFF, console
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Console (use 'console') # Console (use 'console')
...@@ -36,3 +36,4 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m ...@@ -36,3 +36,4 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
# suppress the irrelevant (wrong) warnings from the netty channel handler # suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
log4j.logger.org.apache.flink.runtime.blob=DEBUG
...@@ -27,11 +27,11 @@ import org.apache.flink.core.fs.FileSystem; ...@@ -27,11 +27,11 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
......
...@@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; ...@@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.types.StringValue; import org.apache.flink.types.StringValue;
import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.SplittableIterator;
...@@ -372,11 +372,11 @@ public abstract class StreamExecutionEnvironment { ...@@ -372,11 +372,11 @@ public abstract class StreamExecutionEnvironment {
* the key/value state, and for checkpointed functions (implementing the interface * the key/value state, and for checkpointed functions (implementing the interface
* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}). * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
* *
* <p>The {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
* maintains the state in heap memory, as objects. It is lightweight without extra dependencies, * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
* but can checkpoint only small states (some counters). * but can checkpoint only small states (some counters).
* *
* <p>In contrast, the {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend} * <p>In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
* stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
* file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
* failures of individual nodes and that streaming program can be executed highly available and strongly * failures of individual nodes and that streaming program can be executed highly available and strongly
......
...@@ -26,8 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; ...@@ -26,8 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.runtime.state.SerializedCheckpointData;
import org.apache.flink.streaming.api.state.SerializedCheckpointData;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
......
...@@ -31,7 +31,7 @@ import org.apache.flink.runtime.util.ClassLoaderUtil; ...@@ -31,7 +31,7 @@ import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException; import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
......
...@@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; ...@@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
......
...@@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; ...@@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.state.KvState; import org.apache.flink.runtime.state.KvState;
import org.apache.flink.streaming.api.state.KvStateSnapshot; import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
......
...@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandle; ...@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateHandle;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
/**
* A state backend defines how state is stored and snapshotted during checkpoints.
*
* @param <Backend> The type of backend itself. This generic parameter is used to refer to the
* type of backend when creating state backed by this backend.
*/
public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
private static final long serialVersionUID = 4620413814639220247L;
// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------
/**
* This method is called by the task upon deployment to initialize the state backend for
* data for a specific job.
*
* @param job The ID of the job for which the state backend instance checkpoints data.
* @throws Exception Overwritten versions of this method may throw exceptions, in which
* case the job that uses the state backend is considered failed during
* deployment.
*/
public abstract void initializeForJob(JobID job) throws Exception;
/**
* Disposes all state associated with the current job.
*
* @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
*/
public abstract void disposeAllStateForCurrentJob() throws Exception;
/**
* Closes the state backend, releasing all internal resources, but does not delete any persistent
* checkpoint data.
*
* @throws Exception Exceptions can be forwarded and will be logged by the system
*/
public abstract void close() throws Exception;
// ------------------------------------------------------------------------
// key/value state
// ------------------------------------------------------------------------
/**
* Creates a key/value state backed by this state backend.
*
* @param keySerializer The serializer for the key.
* @param valueSerializer The serializer for the value.
* @param defaultValue The value that is returned when no other value has been associated with a key, yet.
* @param <K> The type of the key.
* @param <V> The type of the value.
*
* @return A new key/value state backed by this backend.
*
* @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
*/
public abstract <K, V> KvState<K, V, Backend> createKvState(
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
V defaultValue) throws Exception;
// ------------------------------------------------------------------------
// storing state for a checkpoint
// ------------------------------------------------------------------------
/**
* Creates an output stream that writes into the state of the given checkpoint. When the stream
* is closes, it returns a state handle that can retrieve the state back.
*
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @return An output stream that writes state for the given checkpoint.
*
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;
/**
* Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
* When the stream is closes, it returns a state handle that can retrieve the state back.
*
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @return An DataOutputView stream that writes state for the given checkpoint.
*
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
public CheckpointStateOutputView createCheckpointStateOutputView(
long checkpointID, long timestamp) throws Exception {
return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
}
/**
* Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
*
* @param state The state to be checkpointed.
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param <S> The type of the state.
*
* @return A state handle that can retrieve the checkpoined state.
*
* @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
*/
public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
S state, long checkpointID, long timestamp) throws Exception;
// ------------------------------------------------------------------------
// Checkpoint state output stream
// ------------------------------------------------------------------------
/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
*/
public static abstract class CheckpointStateOutputStream extends OutputStream {
/**
* Closes the stream and gets a state handle that can create an input stream
* producing the data written to this stream.
*
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
}
/**
* A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
*/
public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
private final CheckpointStateOutputStream out;
public CheckpointStateOutputView(CheckpointStateOutputStream out) {
super(out);
this.out = out;
}
/**
* Closes the stream and gets a state handle that can create a DataInputView.
* producing the data written to this stream.
*
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
return new DataInputViewHandle(out.closeAndGetHandle());
}
@Override
public void close() throws IOException {
out.close();
}
}
/**
* Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
*/
private static final class DataInputViewHandle implements StateHandle<DataInputView> {
private static final long serialVersionUID = 2891559813513532079L;
private final StreamStateHandle stream;
private DataInputViewHandle(StreamStateHandle stream) {
this.stream = stream;
}
@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
}
@Override
public void discardState() throws Exception {
stream.discardState();
}
}
}
...@@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.MathUtils; ...@@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
......
...@@ -30,17 +30,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; ...@@ -30,17 +30,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.StateBackendFactory; import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
...@@ -493,55 +492,52 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> ...@@ -493,55 +492,52 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
private StateBackend<?> createStateBackend() throws Exception { private StateBackend<?> createStateBackend() throws Exception {
StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader); StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
if (configuredBackend != null) { if (configuredBackend != null) {
// backend has been configured on the environment // backend has been configured on the environment
LOG.info("Using user-defined state backend: " + configuredBackend); LOG.info("Using user-defined state backend: " + configuredBackend);
return configuredBackend; return configuredBackend;
} } else {
else {
// see if we have a backend specified in the configuration // see if we have a backend specified in the configuration
Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null); String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
if (backendName == null) { if (backendName == null) {
LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)"); LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
backendName = "jobmanager"; backendName = "jobmanager";
} }
backendName = backendName.toLowerCase(); backendName = backendName.toLowerCase();
switch (backendName) { switch (backendName) {
case "jobmanager": case "jobmanager":
LOG.info("State backend is set to heap memory (checkpoint to jobmanager)"); LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
return MemoryStateBackend.defaultInstance(); return MemoryStateBackend.defaultInstance();
case "filesystem": case "filesystem":
FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
+ backend.getBasePath() + "\")"); + backend.getBasePath() + "\")");
return backend; return backend;
default: default:
try { try {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Class<? extends StateBackendFactory> clazz = Class<? extends StateBackendFactory> clazz =
Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class); Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
return (StateBackend<?>) clazz.newInstance(); return (StateBackend<?>) clazz.newInstance();
} } catch (ClassNotFoundException e) {
catch (ClassNotFoundException e) {
throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName); throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
} } catch (ClassCastException e) {
catch (ClassCastException e) {
throw new IllegalConfigurationException("The class configured under '" + throw new IllegalConfigurationException("The class configured under '" +
ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" + ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
backendName + ')'); backendName + ')');
} } catch (Throwable t) {
catch (Throwable t) {
throw new IllegalConfigurationException("Cannot create configured state backend", t); throw new IllegalConfigurationException("Cannot create configured state backend", t);
} }
} }
} }
}
/** /**
* Registers a timer. * Registers a timer.
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.flink.streaming.runtime.tasks; package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.KvStateSnapshot; import org.apache.flink.runtime.state.KvStateSnapshot;
import java.io.Serializable; import java.io.Serializable;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
......
...@@ -28,8 +28,8 @@ import org.apache.flink.runtime.execution.Environment; ...@@ -28,8 +28,8 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......
...@@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration; ...@@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
......
...@@ -36,8 +36,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; ...@@ -36,8 +36,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
......
...@@ -30,8 +30,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; ...@@ -30,8 +30,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
......
...@@ -28,8 +28,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; ...@@ -28,8 +28,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
......
...@@ -26,7 +26,7 @@ import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} ...@@ -26,7 +26,7 @@ import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.streaming.api.state.StateBackend import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode} import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
......
...@@ -23,7 +23,7 @@ import java.io.File; ...@@ -23,7 +23,7 @@ import java.io.File;
import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.test.testdata.KMeansData; import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Rule; import org.junit.Rule;
......
...@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; ...@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess; import org.apache.flink.runtime.testutils.JobManagerProcess;
...@@ -542,7 +543,7 @@ public class ChaosMonkeyITCase { ...@@ -542,7 +543,7 @@ public class ChaosMonkeyITCase {
LOG.info("Checking file system backend state..."); LOG.info("Checking file system backend state...");
File fsCheckpoints = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_DIR, "")); File fsCheckpoints = new File(config.getString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, ""));
LOG.info("Checking " + fsCheckpoints); LOG.info("Checking " + fsCheckpoints);
......
...@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; ...@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -68,7 +69,7 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; ...@@ -68,7 +69,7 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
public class JobManagerCheckpointRecoveryITCase { public class JobManagerCheckpointRecoveryITCase extends TestLogger {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
...@@ -144,7 +145,7 @@ public class JobManagerCheckpointRecoveryITCase { ...@@ -144,7 +145,7 @@ public class JobManagerCheckpointRecoveryITCase {
JobGraph jobGraph = env.getStreamGraph().getJobGraph(); JobGraph jobGraph = env.getStreamGraph().getJobGraph();
Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
.getConnectString(), FileStateBackendBasePath.getPath()); .getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
ActorSystem testSystem = null; ActorSystem testSystem = null;
......
...@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.junit.Assert; import org.junit.Assert;
......
...@@ -95,7 +95,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { ...@@ -95,7 +95,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath()); configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
...@@ -144,7 +144,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { ...@@ -144,7 +144,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath()); configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
# Set root logger level to OFF to not flood build logs # Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes # set manually to INFO for debugging purposes
log4j.rootLogger=INFO, testlogger log4j.rootLogger=INFO, testlogger
log4j.logger.org.apache.flink.runtime.client.JobClientActor=DEBUG
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
......
...@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.ActorGateway; ...@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
...@@ -114,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { ...@@ -114,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" + flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
"@@" + ConfigConstants.STATE_BACKEND_FS_DIR + "=" + fsStateHandlePath + "/checkpoints" + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); "@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
......
...@@ -123,7 +123,7 @@ abstract class ApplicationMasterBase { ...@@ -123,7 +123,7 @@ abstract class ApplicationMasterBase {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0. config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
} }
val (actorSystem, jmActor, archivActor, webMonitor) = val (actorSystem, jmActor, archiveActor, webMonitor) =
JobManager.startActorSystemAndJobManagerActors( JobManager.startActorSystemAndJobManagerActors(
config, config,
JobManagerMode.CLUSTER, JobManagerMode.CLUSTER,
......
...@@ -735,7 +735,7 @@ under the License. ...@@ -735,7 +735,7 @@ under the License.
<exclude>flink-runtime-web/web-dashboard/assets/fonts/fontawesome*</exclude> <exclude>flink-runtime-web/web-dashboard/assets/fonts/fontawesome*</exclude>
<!-- generated contents --> <!-- generated contents -->
<exclude>flink-runtime-web/src/main/resources/web/**</exclude> <exclude>flink-runtime-web/web-dashboard/web/**</exclude>
<!-- downloaded and generated web libraries. --> <!-- downloaded and generated web libraries. -->
<exclude>flink-runtime-web/web-dashboard/node_modules/**</exclude> <exclude>flink-runtime-web/web-dashboard/node_modules/**</exclude>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册