提交 9ee16794 编写于 作者: S Stephan Ewen

[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.

This also cleans up the generics in the RocksDB state classes.

This closes #1608
上级 28c6254e
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.util.HDFSCopyFromLocal;
import org.apache.flink.util.HDFSCopyToLocal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.rocksdb.BackupEngine;
import org.rocksdb.BackupableDBOptions;
import org.rocksdb.Env;
......@@ -38,7 +40,7 @@ import org.rocksdb.Options;
import org.rocksdb.RestoreOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.StringAppendOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -60,10 +62,9 @@ import static java.util.Objects.requireNonNull;
* @param <N> The type of the namespace.
* @param <S> The type of {@link State}.
* @param <SD> The type of {@link StateDescriptor}.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
implements KvState<K, N, S, SD, Backend>, State {
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
......@@ -95,9 +96,11 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath) {
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath,
Options options) {
this.keySerializer = requireNonNull(keySerializer);
this.namespaceSerializer = namespaceSerializer;
this.dbPath = dbPath;
......@@ -105,9 +108,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
RocksDB.loadLibrary();
Options options = new Options().setCreateIfMissing(true);
options.setMergeOperator(new StringAppendOperator());
if (!dbPath.exists()) {
if (!dbPath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
......@@ -128,9 +128,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
}
options.dispose();
}
/**
......@@ -143,10 +140,11 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
* @param restorePath The path to a backup directory from which to restore RocksDb database.
*/
protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath,
String restorePath) {
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath,
String restorePath,
Options options) {
RocksDB.loadLibrary();
......@@ -162,9 +160,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
this.dbPath = dbPath;
this.checkpointPath = checkpointPath;
Options options = new Options().setCreateIfMissing(true);
options.setMergeOperator(new StringAppendOperator());
if (!dbPath.exists()) {
if (!dbPath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
......@@ -176,8 +171,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
}
options.dispose();
}
// ------------------------------------------------------------------------
......@@ -211,12 +204,10 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
this.currentNamespace = namespace;
}
protected abstract KvStateSnapshot<K, N, S, SD, Backend> createRocksDBSnapshot(URI backupUri, long checkpointId);
protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId);
@Override
final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
long checkpointId,
long timestamp) throws Exception {
public final AbstractRocksDBSnapshot<K, N, S, SD> snapshot(long checkpointId, long timestamp) throws Exception {
boolean success = false;
final File localBackupPath = new File(dbPath, "backup-" + checkpointId);
......@@ -234,7 +225,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
KvStateSnapshot<K, N, S, SD, Backend> result = createRocksDBSnapshot(backupUri, checkpointId);
AbstractRocksDBSnapshot<K, N, S, SD> result = createRocksDBSnapshot(backupUri, checkpointId);
success = true;
return result;
} finally {
......@@ -256,7 +247,9 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
}
public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend>
{
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
......@@ -293,12 +286,13 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
protected final SD stateDesc;
public AbstractRocksDBSnapshot(File dbPath,
String checkpointPath,
URI backupUri,
long checkpointId,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc) {
String checkpointPath,
URI backupUri,
long checkpointId,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc) {
this.dbPath = dbPath;
this.checkpointPath = checkpointPath;
this.backupUri = backupUri;
......@@ -309,19 +303,21 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
this.namespaceSerializer = namespaceSerializer;
}
protected abstract KvState<K, N, S, SD, Backend> createRocksDBState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
File dbPath,
String backupPath,
String restorePath) throws Exception;
protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception;
@Override
public final KvState<K, N, S, SD, Backend> restoreState(
Backend stateBackend,
TypeSerializer<K> keySerializer,
ClassLoader classLoader,
long recoveryTimestamp) throws Exception {
public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(
RocksDBStateBackend stateBackend,
TypeSerializer<K> keySerializer,
ClassLoader classLoader,
long recoveryTimestamp) throws Exception {
// validity checks
if (!this.keySerializer.equals(keySerializer)) {
......@@ -352,7 +348,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, localBackupPath.getAbsolutePath());
return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.rocksdb.Options;
/**
* A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}.
* Options have to be created lazily by this factory, because the {@code Options}
* class is not serializable and holds pointers to native code.
*/
public interface OptionsFactory extends java.io.Serializable {
Options createOptions();
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ListState;
......@@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import java.io.ByteArrayInputStream;
......@@ -44,10 +45,9 @@ import static java.util.Objects.requireNonNull;
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the values in the list state.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, Backend>
public class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>>
implements ListState<V> {
/** Serializer for the values */
......@@ -66,11 +66,13 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected RocksDBListState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath);
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
Options options) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
}
......@@ -85,12 +87,14 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected RocksDBListState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
}
......@@ -143,13 +147,16 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
}
@Override
protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
}
private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> {
private static class Snapshot<K, N, V> extends
AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>>
{
private static final long serialVersionUID = 1L;
public Snapshot(File dbPath,
......@@ -169,14 +176,17 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
}
@Override
protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) throws Exception {
return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception {
return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, restorePath, options);
}
}
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
......@@ -35,15 +16,17 @@ package org.apache.flink.contrib.streaming.state;
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import java.io.ByteArrayInputStream;
......@@ -60,10 +43,9 @@ import static java.util.Objects.requireNonNull;
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of value that the state state stores.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend>
public class RocksDBReducingState<K, N, V>
extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
implements ReducingState<V> {
/** Serializer for the values */
......@@ -85,23 +67,27 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected RocksDBReducingState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
File dbPath,
String backupPath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath);
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
Options options) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
this.reduceFunction = stateDesc.getReduceFunction();
}
protected RocksDBReducingState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
this.stateDesc = stateDesc;
this.valueSerializer = stateDesc.getSerializer();
this.reduceFunction = stateDesc.getReduceFunction();
......@@ -150,13 +136,16 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
}
@Override
protected KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
}
private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> {
private static class Snapshot<K, N, V> extends
AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
{
private static final long serialVersionUID = 1L;
public Snapshot(File dbPath,
......@@ -176,14 +165,17 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
}
@Override
protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) throws Exception {
return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception {
return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,
dbPath, checkpointPath, restorePath, options);
}
}
}
......
......@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -31,11 +31,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.api.common.state.StateBackend;
import org.rocksdb.Options;
import org.rocksdb.StringAppendOperator;
import static java.util.Objects.requireNonNull;
/**
*
* A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
* store very large state that exceeds memory and spills to disk.
*
* <p>All key/value state (including windows) is stored in the key/value index of RocksDB.
* For persistence against loss of machines, checkpoints take a snapshot of the
* RocksDB database, and persist that snapshot in a file system (by default) or
* another configurable state backend.
*/
public class RocksDBStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 1L;
......@@ -53,6 +62,13 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private JobID jobId;
private AbstractStateBackend backingStateBackend;
/** The options factory to create the RocksDB options in the cluster */
private OptionsFactory optionsFactory;
/** The options from the options factory, cached */
private transient Options rocksDbOptions;
public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend backingStateBackend) {
this.dbBasePath = requireNonNull(dbBasePath);
......@@ -71,13 +87,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
public void disposeAllStateForCurrentJob() throws Exception {
}
public void disposeAllStateForCurrentJob() throws Exception {}
@Override
public void close() throws Exception {
Options opt = this.rocksDbOptions;
if (opt != null) {
opt.dispose();
this.rocksDbOptions = null;
}
}
private File getDbPath(String stateName) {
......@@ -93,7 +111,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
ValueStateDescriptor<T> stateDesc) throws Exception {
File dbPath = getDbPath(stateDesc.getName());
String checkpointPath = getCheckpointPath(stateDesc.getName());
return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
return new RocksDBValueState<>(keySerializer, namespaceSerializer,
stateDesc, dbPath, checkpointPath, getRocksDBOptions());
}
@Override
......@@ -101,7 +121,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
ListStateDescriptor<T> stateDesc) throws Exception {
File dbPath = getDbPath(stateDesc.getName());
String checkpointPath = getCheckpointPath(stateDesc.getName());
return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
return new RocksDBListState<>(keySerializer, namespaceSerializer,
stateDesc, dbPath, checkpointPath, getRocksDBOptions());
}
@Override
......@@ -109,7 +131,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
ReducingStateDescriptor<T> stateDesc) throws Exception {
File dbPath = getDbPath(stateDesc.getName());
String checkpointPath = getCheckpointPath(stateDesc.getName());
return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
return new RocksDBReducingState<>(keySerializer, namespaceSerializer,
stateDesc, dbPath, checkpointPath, getRocksDBOptions());
}
@Override
......@@ -124,4 +148,38 @@ public class RocksDBStateBackend extends AbstractStateBackend {
long timestamp) throws Exception {
return backingStateBackend.checkpointStateSerializable(state, checkpointID, timestamp);
}
// ------------------------------------------------------------------------
// Parametrize with Options
// ------------------------------------------------------------------------
/**
* Defines the {@link org.rocksdb.Options} for the RocksDB instances.
* Because the options are not serializable and hold native code references,
* they must be specified through a factory.
*
* @param optionsFactory The options factory that lazily creates the RocksDB options.
*/
public void setOptions(OptionsFactory optionsFactory) {
this.optionsFactory = optionsFactory;
}
/**
* Gets the options factory that lazily creates the RocksDB options.
*
* @return The options factory.
*/
public OptionsFactory getOptions() {
return optionsFactory;
}
Options getRocksDBOptions() {
if (rocksDbOptions == null) {
Options opt = optionsFactory == null ? new Options() : optionsFactory.createOptions();
opt = opt.setCreateIfMissing(true);
opt = opt.setMergeOperator(new StringAppendOperator());
rocksDbOptions = opt;
}
return rocksDbOptions;
}
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ValueState;
......@@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import java.io.ByteArrayInputStream;
......@@ -41,10 +42,9 @@ import static java.util.Objects.requireNonNull;
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of value that the state state stores.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend>
public class RocksDBValueState<K, N, V>
extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>>
implements ValueState<V> {
/** Serializer for the values */
......@@ -63,22 +63,26 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected RocksDBValueState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
File dbPath,
String backupPath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath);
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
Options options) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
}
protected RocksDBValueState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
this.stateDesc = stateDesc;
this.valueSerializer = stateDesc.getSerializer();
}
......@@ -120,13 +124,16 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
}
@Override
protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
protected AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
}
private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> {
private static class Snapshot<K, N, V>
extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>>
{
private static final long serialVersionUID = 1L;
public Snapshot(File dbPath,
......@@ -146,14 +153,17 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
}
@Override
protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) throws Exception {
return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception {
return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, restorePath, options);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册