提交 180cd3f6 编写于 作者: S Stephan Ewen

[FLINK-3312] Add simple constructors for State Descriptors

上级 456d0aba
......@@ -62,7 +62,7 @@ import static java.util.Objects.requireNonNull;
* @param <SD> The type of {@link StateDescriptor}.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend>
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
implements KvState<K, N, S, SD, Backend>, State {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
......@@ -258,7 +258,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
}
public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
......
......@@ -195,6 +195,11 @@ public class DbStateBackend extends AbstractStateBackend {
@Override
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {
if (!stateDesc.isSerializerInitialized()) {
throw new IllegalArgumentException("state descriptor serializer not initialized");
}
String stateName = operatorIdentifier + "_"+ stateDesc.getName();
return new LazyDbValueState<>(
......@@ -210,7 +215,14 @@ public class DbStateBackend extends AbstractStateBackend {
@Override
protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, new ArrayListSerializer<>(stateDesc.getSerializer()));
if (!stateDesc.isSerializerInitialized()) {
throw new IllegalArgumentException("state descriptor serializer not initialized");
}
ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(),
new ArrayListSerializer<>(stateDesc.getSerializer()), null);
ValueState<ArrayList<T>> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
return new GenericListState<>(valueState);
}
......@@ -220,7 +232,13 @@ public class DbStateBackend extends AbstractStateBackend {
protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, stateDesc.getSerializer());
if (!stateDesc.isSerializerInitialized()) {
throw new IllegalArgumentException("state descriptor serializer not initialized");
}
ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(
stateDesc.getName(), stateDesc.getSerializer(), null);
ValueState<T> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
return new GenericReducingState<>(valueState, stateDesc.getReduceFunction());
}
......
......@@ -31,14 +31,13 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
......@@ -210,7 +209,7 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase {
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
sum = getRuntimeContext().getState(
new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE));
new ValueStateDescriptor<>("my_state", Long.class, 0L));
}
@Override
......@@ -238,11 +237,9 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase {
@Override
public void open(Configuration parameters) throws IOException {
aCounts = getRuntimeContext().getState(
new ValueStateDescriptor<>("a", NonSerializableLong.of(0L),
new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L)));
bCounts = getRuntimeContext().getState(
new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE));
bCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("b", Long.class, 0L));
}
@Override
......
......@@ -63,7 +63,6 @@ import org.junit.Test;
import com.google.common.base.Optional;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
public class DbStateBackendTest {
......@@ -201,7 +200,7 @@ public class DbStateBackendTest {
backend.initializeForJob(env, "dummy_test_kv", IntSerializer.INSTANCE);
ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE,
new ValueStateDescriptor<>("state1", null, StringSerializer.INSTANCE));
new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, null));
LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state;
......@@ -455,9 +454,14 @@ public class DbStateBackendTest {
backend2.initializeForJob(new DummyEnvironment("test", 3, 1), "dummy_2", StringSerializer.INSTANCE);
backend3.initializeForJob(new DummyEnvironment("test", 3, 2), "dummy_3", StringSerializer.INSTANCE);
ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a1", null, StringSerializer.INSTANCE));
ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a2", null, StringSerializer.INSTANCE));
ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a3", null, StringSerializer.INSTANCE));
ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE,
new ValueStateDescriptor<>("a1", StringSerializer.INSTANCE, null));
ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE,
new ValueStateDescriptor<>("a2", StringSerializer.INSTANCE, null));
ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE,
new ValueStateDescriptor<>("a3", StringSerializer.INSTANCE, null));
LazyDbValueState<?, ?, ?> s1 = (LazyDbValueState<?, ?, ?>) s1State;
LazyDbValueState<?, ?, ?> s2 = (LazyDbValueState<?, ?, ?>) s2State;
......@@ -520,7 +524,7 @@ public class DbStateBackendTest {
backend.initializeForJob(env, "dummy_test_caching", IntSerializer.INSTANCE);
ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE,
new ValueStateDescriptor<>("state1", "a", StringSerializer.INSTANCE));
new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, "a"));
LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state;
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,82 +6,68 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import static java.util.Objects.requireNonNull;
/**
* {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
* A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
* list state using
* {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
* {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
*
* @param <T> The type of the values that can be added to the list state.
*/
public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>> {
public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
private static final long serialVersionUID = 1L;
private final TypeSerializer<T> serializer;
/**
* Creates a new {@code ListStateDescriptor} with the given name.
* Creates a new {@code ListStateDescriptor} with the given name and list element type.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
*
* @param name The (unique) name for the state.
* @param serializer {@link TypeSerializer} for the state values.
* @param typeClass The type of the values in the state.
*/
public ListStateDescriptor(String name, TypeSerializer<T> serializer) {
super(requireNonNull(name));
this.serializer = requireNonNull(serializer);
}
@Override
public ListState<T> bind(StateBackend stateBackend) throws Exception {
return stateBackend.createListState(this);
public ListStateDescriptor(String name, Class<T> typeClass) {
super(name, typeClass, null);
}
/**
* Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
* Creates a new {@code ListStateDescriptor} with the given name and list element type.
*
* @param name The (unique) name for the state.
* @param typeInfo The type of the values in the state.
*/
public TypeSerializer<T> getSerializer() {
return serializer;
public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
super(name, typeInfo, null);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
return serializer.equals(that.serializer) && name.equals(that.name);
}
@Override
public int hashCode() {
int result = serializer.hashCode();
result = 31 * result + name.hashCode();
return result;
/**
* Creates a new {@code ListStateDescriptor} with the given name and list element type.
*
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer for the list values.
*/
public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, typeSerializer, null);
}
// ------------------------------------------------------------------------
@Override
public String toString() {
return "ListStateDescriptor{" +
"serializer=" + serializer +
'}';
public ListState<T> bind(StateBackend stateBackend) throws Exception {
return stateBackend.createListState(this);
}
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import static java.util.Objects.requireNonNull;
......@@ -27,80 +28,70 @@ import static java.util.Objects.requireNonNull;
/**
* {@link StateDescriptor} for {@link ReducingState}. This can be used to create partitioned
* reducing state using
* {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
* {@link org.apache.flink.api.common.functions.RuntimeContext#getReducingState(ReducingStateDescriptor)}.
*
* @param <T> The type of the values that can be added to the list state.
*/
public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>> {
public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {
private static final long serialVersionUID = 1L;
private final TypeSerializer<T> serializer;
private final ReduceFunction<T> reduceFunction;
/**
* Creates a new {@code ReducingStateDescriptor} with the given name and reduce function.
* Creates a new {@code ReducingStateDescriptor} with the given name, type, and default value.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #ReducingStateDescriptor(String, ReduceFunction, TypeInformation)} constructor.
*
* @param name The (unique) name for the state.
* @param serializer {@link TypeSerializer} for the state values.
* @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
* @param typeClass The type of the values in the state.
*/
public ReducingStateDescriptor(String name,
ReduceFunction<T> reduceFunction,
TypeSerializer<T> serializer) {
super(requireNonNull(name));
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
super(name, typeClass, null);
this.reduceFunction = requireNonNull(reduceFunction);
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction.");
}
this.serializer = requireNonNull(serializer);
this.reduceFunction = reduceFunction;
}
@Override
public ReducingState<T> bind(StateBackend stateBackend) throws Exception {
return stateBackend.createReducingState(this);
}
/**
* Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
* Creates a new {@code ReducingStateDescriptor} with the given name and default value.
*
* @param name The (unique) name for the state.
* @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
* @param typeInfo The type of the values in the state.
*/
public TypeSerializer<T> getSerializer() {
return serializer;
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {
super(name, typeInfo, null);
this.reduceFunction = requireNonNull(reduceFunction);
}
/**
* Returns the reduce function to be used for the reducing state.
* Creates a new {@code ValueStateDescriptor} with the given name and default value.
*
* @param name The (unique) name for the state.
* @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
* @param typeSerializer The type serializer of the values in the state.
*/
public ReduceFunction<T> getReduceFunction() {
return reduceFunction;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o;
return serializer.equals(that.serializer) && name.equals(that.name);
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeSerializer<T> typeSerializer) {
super(name, typeSerializer, null);
this.reduceFunction = requireNonNull(reduceFunction);
}
// ------------------------------------------------------------------------
@Override
public int hashCode() {
int result = serializer.hashCode();
result = 31 * result + name.hashCode();
return result;
public ReducingState<T> bind(StateBackend stateBackend) throws Exception {
return stateBackend.createReducingState(this);
}
@Override
public String toString() {
return "ReducingStateDescriptor{" +
"serializer=" + serializer +
", reduceFunction=" + reduceFunction +
'}';
/**
* Returns the reduce function to be used for the reducing state.
*/
public ReduceFunction<T> getReduceFunction() {
return reduceFunction;
}
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
/**
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,17 +6,30 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import static java.util.Objects.requireNonNull;
......@@ -30,20 +43,79 @@ import static java.util.Objects.requireNonNull;
*
* @param <S> The type of the State objects created from this {@code StateDescriptor}.
*/
public abstract class StateDescriptor<S extends State> implements Serializable {
public abstract class StateDescriptor<S extends State, T> implements Serializable {
private static final long serialVersionUID = 1L;
/** Name that uniquely identifies state created from this StateDescriptor. */
protected final String name;
private final String name;
/** The serializer for the type. May be eagerly initialized in the constructor,
* or lazily once the type is serialized or an ExecutionConfig is provided. */
private TypeSerializer<T> serializer;
/** The default value returned by the state when no other value is bound to a key */
private transient T defaultValue;
/** The type information describing the value type. Only used to lazily create the serializer
* and dropped during serialization */
private transient TypeInformation<T> typeInfo;
// ------------------------------------------------------------------------
/**
* Create a new {@code StateDescriptor} with the given name.
* Create a new {@code StateDescriptor} with the given name and the given type serializer.
*
* @param name The name of the {@code StateDescriptor}.
* @param serializer The type serializer for the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
public StateDescriptor(String name) {
this.name = requireNonNull(name);
protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
this.name = requireNonNull(name, "name must not be null");
this.serializer = requireNonNull(serializer, "serializer must not be null");
this.defaultValue = defaultValue;
}
/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
* @param name The name of the {@code StateDescriptor}.
* @param typeInfo The type information for the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
this.name = requireNonNull(name, "name must not be null");
this.typeInfo = requireNonNull(typeInfo, "type information must not be null");
this.defaultValue = defaultValue;
}
/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
*
* @param name The name of the {@code StateDescriptor}.
* @param type The class of the type of values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, Class<T> type, T defaultValue) {
this.name = requireNonNull(name, "name must not be null");
requireNonNull(type, "type class must not be null");
try {
this.typeInfo = TypeExtractor.createTypeInfo(type);
} catch (Exception e) {
throw new RuntimeException("Cannot create full type information based on the given class. If the type has generics, please", e);
}
this.defaultValue = defaultValue;
}
// ------------------------------------------------------------------------
/**
* Returns the name of this {@code StateDescriptor}.
*/
......@@ -51,16 +123,180 @@ public abstract class StateDescriptor<S extends State> implements Serializable {
return name;
}
/**
* Returns the default value.
*/
public T getDefaultValue() {
if (defaultValue != null) {
if (serializer != null) {
return serializer.copy(defaultValue);
} else {
throw new IllegalStateException("Serializer not yet initialized.");
}
} else {
return null;
}
}
/**
* Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
* Note that the serializer may initialized lazily and is only guaranteed to exist after
* calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
*/
public TypeSerializer<T> getSerializer() {
if (serializer != null) {
return serializer;
} else {
throw new IllegalStateException("Serializer not yet initialized.");
}
}
/**
* Creates a new {@link State} on the given {@link StateBackend}.
*
* @param stateBackend The {@code StateBackend} on which to create the {@link State}.
*/
public abstract S bind(StateBackend stateBackend) throws Exception ;
public abstract S bind(StateBackend stateBackend) throws Exception;
// ------------------------------------------------------------------------
// Force subclasses to implement
public abstract boolean equals(Object o);
/**
* Checks whether the serializer has been initialized. Serializer initialization is lazy,
* to allow parametrization of serializers with an {@link ExecutionConfig} via
* {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
*
* @return True if the serializers have been initialized, false otherwise.
*/
public boolean isSerializerInitialized() {
return serializer != null;
}
/**
* Initializes the serializer, unless it has been initialized before.
*
* @param executionConfig The execution config to use when creating the serializer.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
if (serializer == null) {
if (typeInfo != null) {
serializer = typeInfo.createSerializer(executionConfig);
} else {
throw new IllegalStateException(
"Cannot initialize serializer after TypeInformation was dropped during serialization");
}
}
}
/**
* This method should be called by subclasses prior to serialization. Because the TypeInformation is
* not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
* needs to make sure that the serializer is created before the TypeInformation is dropped.
*/
private void ensureSerializerCreated() {
if (serializer == null) {
if (typeInfo != null) {
serializer = typeInfo.createSerializer(new ExecutionConfig());
} else {
throw new IllegalStateException(
"Cannot initialize serializer after TypeInformation was dropped during serialization");
}
}
}
// ------------------------------------------------------------------------
// Standard Utils
// ------------------------------------------------------------------------
@Override
public int hashCode() {
return name.hashCode() + 41;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
else if (o == null || getClass() != o.getClass()) {
return false;
}
else {
StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
return this.name.equals(that.name);
}
}
// Force subclasses to implement
public abstract int hashCode();
@Override
public String toString() {
return getClass().getSimpleName() +
"{ name=" + name +
", defaultValue=" + defaultValue +
", serializer=" + serializer +
'}';
}
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
private void writeObject(final ObjectOutputStream out) throws IOException {
// make sure we have a serializer before the type information gets lost
ensureSerializerCreated();
// write all the non-transient fields
out.defaultWriteObject();
// write the non-serializable default value field
if (defaultValue == null) {
// we don't have a default value
out.writeBoolean(false);
} else {
// we have a default value
out.writeBoolean(true);
byte[] serializedDefaultValue;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
{
serializer.serialize(defaultValue, outView);
outView.flush();
serializedDefaultValue = baos.toByteArray();
}
catch (Exception e) {
throw new IOException("Unable to serialize default value of type " +
defaultValue.getClass().getSimpleName() + ".", e);
}
out.writeInt(serializedDefaultValue.length);
out.write(serializedDefaultValue);
}
}
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the non-transient fields
in.defaultReadObject();
// read the default value field
boolean hasDefaultValue = in.readBoolean();
if (hasDefaultValue) {
int size = in.readInt();
byte[] buffer = new byte[size];
int bytesRead = in.read(buffer);
if (bytesRead != size) {
throw new RuntimeException("Read size does not match expected size.");
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
{
defaultValue = serializer.deserialize(inView);
}
catch (Exception e) {
throw new IOException("Unable to deserialize default value.", e);
}
} else {
defaultValue = null;
}
}
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -6,161 +6,75 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import static java.util.Objects.requireNonNull;
/**
* {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned
* value state using
* {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
* {@link org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}.
*
* @param <T> The type of the values that the value state can hold.
*/
public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>> {
public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
private static final long serialVersionUID = 1L;
private transient T defaultValue;
private final TypeSerializer<T> serializer;
/**
* Creates a new {@code ValueStateDescriptor} with the given name and default value.
*
* Creates a new {@code ValueStateDescriptor} with the given name, type, and default value.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor.
*
* @param name The (unique) name for the state.
* @param typeClass The type of the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
* @param serializer {@link TypeSerializer} for the state values.
*/
public ValueStateDescriptor(String name, T defaultValue, TypeSerializer<T> serializer) {
super(requireNonNull(name));
this.defaultValue = defaultValue;
this.serializer = requireNonNull(serializer);
}
private void writeObject(final ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
if (defaultValue == null) {
// we don't have a default value
out.writeBoolean(false);
} else {
out.writeBoolean(true);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outView =
new DataOutputViewStreamWrapper(new DataOutputStream(baos));
try {
serializer.serialize(defaultValue, outView);
} catch (IOException ioe) {
throw new RuntimeException("Unable to serialize default value of type " +
defaultValue.getClass().getSimpleName() + ".", ioe);
}
outView.close();
out.writeInt(baos.size());
out.write(baos.toByteArray());
}
}
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
boolean hasDefaultValue = in.readBoolean();
if (hasDefaultValue) {
int size = in.readInt();
byte[] buffer = new byte[size];
int bytesRead = in.read(buffer);
if (bytesRead != size) {
throw new RuntimeException("Read size does not match expected size.");
}
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
DataInputViewStreamWrapper inView =
new DataInputViewStreamWrapper(new DataInputStream(bais));
defaultValue = serializer.deserialize(inView);
} else {
defaultValue = null;
}
}
@Override
public ValueState<T> bind(StateBackend stateBackend) throws Exception {
return stateBackend.createValueState(this);
public ValueStateDescriptor(String name, Class<T> typeClass, T defaultValue) {
super(name, typeClass, defaultValue);
}
/**
* Returns the default value.
* Creates a new {@code ValueStateDescriptor} with the given name and default value.
*
* @param name The (unique) name for the state.
* @param typeInfo The type of the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
public T getDefaultValue() {
if (defaultValue != null) {
return serializer.copy(defaultValue);
} else {
return null;
}
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
super(name, typeInfo, defaultValue);
}
/**
* Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
* Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific
* serializer.
*
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer of the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
public TypeSerializer<T> getSerializer() {
return serializer;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
return serializer.equals(that.serializer) && name.equals(that.name);
}
@Override
public int hashCode() {
int result = serializer.hashCode();
result = 31 * result + name.hashCode();
return result;
public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) {
super(name, typeSerializer, defaultValue);
}
// ------------------------------------------------------------------------
@Override
public String toString() {
return "ValueStateDescriptor{" +
"name=" + name +
", defaultValue=" + defaultValue +
", serializer=" + serializer +
'}';
public ValueState<T> bind(StateBackend stateBackend) throws Exception {
return stateBackend.createValueState(this);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ListStateDescriptorTest {
@Test
public void testValueStateDescriptorEagerSerializer() throws Exception {
TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
ListStateDescriptor<String> descr =
new ListStateDescriptor<String>("testName", serializer);
assertEquals("testName", descr.getName());
assertNotNull(descr.getSerializer());
assertEquals(serializer, descr.getSerializer());
ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertNotNull(copy.getSerializer());
assertEquals(serializer, copy.getSerializer());
}
@Test
public void testValueStateDescriptorLazySerializer() throws Exception {
// some different registered value
ExecutionConfig cfg = new ExecutionConfig();
cfg.registerKryoType(TaskInfo.class);
ListStateDescriptor<Path> descr =
new ListStateDescriptor<Path>("testName", Path.class);
try {
descr.getSerializer();
fail("should cause an exception");
} catch (IllegalStateException ignored) {}
descr.initializeSerializerUnlessSet(cfg);
assertNotNull(descr.getSerializer());
assertTrue(descr.getSerializer() instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
}
@Test
public void testValueStateDescriptorAutoSerializer() throws Exception {
ListStateDescriptor<String> descr =
new ListStateDescriptor<String>("testName", String.class);
ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertNotNull(copy.getSerializer());
assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class ReducingStateDescriptorTest {
@Test
public void testValueStateDescriptorEagerSerializer() throws Exception {
@SuppressWarnings("unchecked")
ReduceFunction<String> reducer = mock(ReduceFunction.class);
TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
ReducingStateDescriptor<String> descr =
new ReducingStateDescriptor<String>("testName", reducer, serializer);
assertEquals("testName", descr.getName());
assertNotNull(descr.getSerializer());
assertEquals(serializer, descr.getSerializer());
ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertNotNull(copy.getSerializer());
assertEquals(serializer, copy.getSerializer());
}
@Test
public void testValueStateDescriptorLazySerializer() throws Exception {
@SuppressWarnings("unchecked")
ReduceFunction<Path> reducer = mock(ReduceFunction.class);
// some different registered value
ExecutionConfig cfg = new ExecutionConfig();
cfg.registerKryoType(TaskInfo.class);
ReducingStateDescriptor<Path> descr =
new ReducingStateDescriptor<Path>("testName", reducer, Path.class);
try {
descr.getSerializer();
fail("should cause an exception");
} catch (IllegalStateException ignored) {}
descr.initializeSerializerUnlessSet(cfg);
assertNotNull(descr.getSerializer());
assertTrue(descr.getSerializer() instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
}
@Test
public void testValueStateDescriptorAutoSerializer() throws Exception {
@SuppressWarnings("unchecked")
ReduceFunction<String> reducer = mock(ReduceFunction.class);
ReducingStateDescriptor<String> descr =
new ReducingStateDescriptor<String>("testName", reducer, String.class);
ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertNotNull(copy.getSerializer());
assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
import java.io.File;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ValueStateDescriptorTest {
@Test
public void testValueStateDescriptorEagerSerializer() throws Exception {
TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
String defaultValue = "le-value-default";
ValueStateDescriptor<String> descr =
new ValueStateDescriptor<String>("testName", serializer, defaultValue);
assertEquals("testName", descr.getName());
assertEquals(defaultValue, descr.getDefaultValue());
assertNotNull(descr.getSerializer());
assertEquals(serializer, descr.getSerializer());
ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertEquals(defaultValue, copy.getDefaultValue());
assertNotNull(copy.getSerializer());
assertEquals(serializer, copy.getSerializer());
}
@Test
public void testValueStateDescriptorLazySerializer() throws Exception {
// some default value that goes to the generic serializer
Path defaultValue = new Path(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI());
// some different registered value
ExecutionConfig cfg = new ExecutionConfig();
cfg.registerKryoType(TaskInfo.class);
ValueStateDescriptor<Path> descr =
new ValueStateDescriptor<Path>("testName", Path.class, defaultValue);
try {
descr.getSerializer();
fail("should cause an exception");
} catch (IllegalStateException ignored) {}
descr.initializeSerializerUnlessSet(cfg);
assertNotNull(descr.getSerializer());
assertTrue(descr.getSerializer() instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
}
@Test
public void testValueStateDescriptorAutoSerializer() throws Exception {
String defaultValue = "le-value-default";
ValueStateDescriptor<String> descr =
new ValueStateDescriptor<String>("testName", String.class, defaultValue);
ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertEquals(defaultValue, copy.getDefaultValue());
assertNotNull(copy.getSerializer());
assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
}
}
......@@ -17,10 +17,8 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -103,8 +101,8 @@ public class SessionWindowing {
private final Long sessionTimeout;
private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", -1L,
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
private final ValueStateDescriptor<Long> stateDesc =
new ValueStateDescriptor<>("last-seen", Long.class, -1L);
public SessionTrigger(Long sessionTimeout) {
......
......@@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull;
* @param <SD> The type of StateDescriptor for the State S
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend>
public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
implements KvState<K, N, S, SD, Backend>, State {
/** Map containing the actual key/value pairs */
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
......@@ -178,12 +179,16 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
* @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S> stateDescriptor) throws Exception {
public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
if (keySerializer == null) {
throw new Exception("State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
}
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
}
if (keyValueStatesByName == null) {
keyValueStatesByName = new HashMap<>();
......
......@@ -35,7 +35,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
* @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
* @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
*/
public interface KvState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> {
public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {
/**
* Sets the current key, which will be used when using the state access methods.
......
......@@ -39,7 +39,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
* @param <SD> The type of the {@link StateDescriptor}
* @param <Backend> The type of the backend that can restore the state from this snapshot.
*/
public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> extends java.io.Serializable {
public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> extends java.io.Serializable {
/**
* Loads the key/value state back from this snapshot.
......
......@@ -41,7 +41,7 @@ import java.util.Map;
* @param <S> The type of State
* @param <SD> The type of StateDescriptor for the State S
*/
public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S>>
public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
/** The file system state backend backing snapshots of this state */
......
......@@ -39,7 +39,7 @@ import java.util.Map;
* @param <N> The type of the namespace in the snapshot state.
* @param <SV> The type of the state value.
*/
public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
private static final long serialVersionUID = 1L;
......
......@@ -39,7 +39,7 @@ import java.util.Map;
* @param <S> The type of State
* @param <SD> The type of StateDescriptor for the State S
*/
public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S>>
public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> {
public AbstractMemState(TypeSerializer<K> keySerializer,
......
......@@ -36,7 +36,7 @@ import java.util.Map;
* @param <N> The type of the namespace in the snapshot state.
* @param <SV> The type of the value in the snapshot state.
*/
public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> {
public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> {
private static final long serialVersionUID = 1L;
......
......@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state;
import com.google.common.base.Joiner;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
......@@ -29,11 +31,10 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -67,7 +68,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", null, StringSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
......@@ -149,7 +152,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
......@@ -246,7 +249,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
return value1 + "," + value2;
}
},
StringSerializer.INSTANCE);
String.class);
ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
......@@ -336,12 +339,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
"test_op",
IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id",
null,
StringSerializer.INSTANCE);
ValueState<String> state = backend.getPartitionedState(null,
VoidSerializer.INSTANCE,
kvId);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
KvState<Integer, Void, ValueState<String>, ValueStateDescriptor<String>, B> kv =
......@@ -379,7 +380,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
......@@ -427,7 +428,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
return value1 + "," + value2;
}
},
StringSerializer.INSTANCE);
String.class);
ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
......@@ -468,7 +469,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", new IntValue(-1), IntValueSerializer.INSTANCE);
ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<IntValue> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
......
......@@ -254,7 +254,7 @@ public abstract class AbstractStreamOperator<OUT>
* @throws IllegalStateException Thrown, if the key/value state was already initialized.
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
protected <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) throws Exception {
protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
return getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, stateDescriptor);
}
......@@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT>
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
@SuppressWarnings("unchecked")
protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S> stateDescriptor) throws Exception {
protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
return getStateBackend().getPartitionedState(namespace, (TypeSerializer<Object>) namespaceSerializer,
stateDescriptor);
}
......
......@@ -64,10 +64,13 @@ public class StreamGroupedFold<IN, OUT, KEY>
"operator. Probably the setOutputType method was not called.");
}
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = outTypeSerializer.deserialize(in);
ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, null, outTypeSerializer);
try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais))
{
initialValue = outTypeSerializer.deserialize(in);
}
ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
values = getPartitionedState(stateId);
}
......
......@@ -44,7 +44,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
@Override
public void open() throws Exception {
super.open();
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, null, serializer);
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);
values = getPartitionedState(stateId);
}
......
......@@ -112,6 +112,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
requireNonNull(stateProperties, "The state properties must not be null");
try {
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
return operator.getPartitionedState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
......@@ -122,6 +123,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
requireNonNull(stateProperties, "The state properties must not be null");
try {
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
return operator.getPartitionedState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
......@@ -132,6 +134,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
requireNonNull(stateProperties, "The state properties must not be null");
try {
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
return operator.getPartitionedState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
......@@ -163,7 +166,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
requireNonNull(stateType, "The state type information must not be null");
ValueStateDescriptor<S> stateProps =
new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig()));
new ValueStateDescriptor<>(name, stateType, defaultState);
return getState(stateProps);
}
......
......@@ -18,10 +18,10 @@
package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
......@@ -38,8 +38,8 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj
private final long interval;
private final ValueStateDescriptor<Boolean> stateDesc = new ValueStateDescriptor<>("first", true,
BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(new ExecutionConfig()));
private final ValueStateDescriptor<Boolean> stateDesc =
new ValueStateDescriptor<>("first", BooleanSerializer.INSTANCE, true);
private ContinuousEventTimeTrigger(long interval) {
this.interval = interval;
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
......@@ -36,8 +37,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
private final long interval;
private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("fire-timestamp", 0L,
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
private final ValueStateDescriptor<Long> stateDesc =
new ValueStateDescriptor<>("fire-timestamp", LongSerializer.INSTANCE, 0L);
private ContinuousProcessingTimeTrigger(long interval) {
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;
import java.io.IOException;
......@@ -35,8 +35,8 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
private final long maxCount;
private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("count", 0L,
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
private final ValueStateDescriptor<Long> stateDesc =
new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
private CountTrigger(long maxCount) {
......
......@@ -43,7 +43,7 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
this.deltaFunction = deltaFunction;
this.threshold = threshold;
stateDesc = new ValueStateDescriptor<>("last-element", null, stateSerializer);
stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer, null);
}
......
......@@ -179,7 +179,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
<S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
/**
* Retrieves a {@link ValueState} object that can be used to interact with
......
......@@ -55,13 +55,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
private final Evictor<? super IN, ? super W> evictor;
private final StateDescriptor<? extends ListState<StreamRecord<IN>>> windowStateDescriptor;
private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
TypeSerializer<W> windowSerializer,
KeySelector<IN, K> keySelector,
TypeSerializer<K> keySerializer,
StateDescriptor<? extends ListState<StreamRecord<IN>>> windowStateDescriptor,
StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
WindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
......@@ -161,7 +161,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
@VisibleForTesting
@SuppressWarnings("unchecked, rawtypes")
public StateDescriptor<? extends MergingState<IN, Iterable<IN>>> getStateDescriptor() {
return (StateDescriptor<? extends MergingState<IN, Iterable<IN>>>) windowStateDescriptor;
public StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
return (StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?>) windowStateDescriptor;
}
}
......@@ -448,13 +448,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
requireNonNull(name, "The name of the state must not be null");
requireNonNull(stateType, "The state type information must not be null");
ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig()));
ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(
name, stateType.createSerializer(getExecutionConfig()), defaultState);
return getPartitionedState(stateDesc);
}
@Override
@SuppressWarnings("rawtypes, unchecked")
public <S extends State> S getPartitionedState(final StateDescriptor<S> stateDescriptor) {
public <S extends State> S getPartitionedState(final StateDescriptor<S, ?> stateDescriptor) {
if (!(stateDescriptor instanceof ValueStateDescriptor)) {
throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only " +
"support ValueState.");
......
......@@ -102,7 +102,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected final Trigger<? super IN, ? super W> trigger;
protected final StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor;
protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor;
/**
* If this is true. The current processing time is set as the timestamp of incoming elements.
......@@ -167,7 +167,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TypeSerializer<W> windowSerializer,
KeySelector<IN, K> keySelector,
TypeSerializer<K> keySerializer,
StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor,
StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor,
WindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {
......@@ -374,15 +374,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
requireNonNull(name, "The name of the state must not be null");
requireNonNull(stateType, "The state type information must not be null");
ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig()));
ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), defaultState);
return getPartitionedState(stateDesc);
}
@SuppressWarnings("unchecked")
public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) {
public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
try {
return WindowOperator.this.getPartitionedState(window, windowSerializer,
stateDescriptor);
return WindowOperator.this.getPartitionedState(window, windowSerializer, stateDescriptor);
} catch (Exception e) {
throw new RuntimeException("Could not retrieve state", e);
}
......@@ -608,7 +607,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@VisibleForTesting
public StateDescriptor<? extends MergingState<IN, ACC>> getStateDescriptor() {
public StateDescriptor<? extends MergingState<IN, ACC>, ?> getStateDescriptor() {
return windowStateDescriptor;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
public class StreamingRuntimeContextTest {
@Test
public void testValueStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
config.registerKryoType(Path.class);
final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
StreamingRuntimeContext context = new StreamingRuntimeContext(
createMockOp(descriptorCapture, config),
createMockEnvironment(),
Collections.<String, Accumulator<?, ?>>emptyMap());
ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class, null);
context.getState(descr);
StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();
// check that the Path class is really registered, i.e., the execution config was applied
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
}
@Test
public void testReduceingStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
config.registerKryoType(Path.class);
final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
StreamingRuntimeContext context = new StreamingRuntimeContext(
createMockOp(descriptorCapture, config),
createMockEnvironment(),
Collections.<String, Accumulator<?, ?>>emptyMap());
@SuppressWarnings("unchecked")
ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) mock(ReduceFunction.class);
ReducingStateDescriptor<TaskInfo> descr =
new ReducingStateDescriptor<>("name", reducer, TaskInfo.class);
context.getReducingState(descr);
StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();
// check that the Path class is really registered, i.e., the execution config was applied
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
}
@Test
public void testListStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
config.registerKryoType(Path.class);
final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
StreamingRuntimeContext context = new StreamingRuntimeContext(
createMockOp(descriptorCapture, config),
createMockEnvironment(),
Collections.<String, Accumulator<?, ?>>emptyMap());
ListStateDescriptor<TaskInfo> descr = new ListStateDescriptor<>("name", TaskInfo.class);
context.getListState(descr);
StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();
// check that the Path class is really registered, i.e., the execution config was applied
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
}
// ------------------------------------------------------------------------
//
// ------------------------------------------------------------------------
@SuppressWarnings("unchecked")
private static AbstractStreamOperator<?> createMockOp(
final AtomicReference<Object> ref, final ExecutionConfig config) throws Exception {
AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class);
when(operatorMock.getExecutionConfig()).thenReturn(config);
when(operatorMock.getPartitionedState(any(StateDescriptor.class))).thenAnswer(
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
ref.set(invocationOnMock.getArguments()[0]);
return null;
}
});
return operatorMock;
}
private static Environment createMockEnvironment() {
Environment env = mock(Environment.class);
when(env.getUserClassLoader()).thenReturn(StreamingRuntimeContextTest.class.getClassLoader());
when(env.getDistributedCacheEntries()).thenReturn(Collections.<String, Future<Path>>emptyMap());
when(env.getTaskInfo()).thenReturn(new TaskInfo("test task", 0, 1, 1));
return env;
}
}
......@@ -763,7 +763,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
public void open(Configuration parameters) {
assertNotNull(getRuntimeContext());
state = getRuntimeContext().getState(
new ValueStateDescriptor<>("totalCount", 0, IntSerializer.INSTANCE));
new ValueStateDescriptor<>("totalCount", Integer.class, 0));
}
@Override
......
......@@ -943,8 +943,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// start with one, so the final count is correct and we test that we do not
// initialize with 0 always by default
state = getRuntimeContext().getState(
new ValueStateDescriptor<>("totalCount", 1, IntSerializer.INSTANCE));
state = getRuntimeContext().getState(new ValueStateDescriptor<>("totalCount", Integer.class, 1));
}
@Override
......
......@@ -45,7 +45,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
}
override def open(c: Configuration) = {
val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], stateSerializer)
val info = new ValueStateDescriptor[S]("state", stateSerializer, null.asInstanceOf[S])
state = getRuntimeContext().getState(info)
}
}
......@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
......@@ -45,6 +44,7 @@ import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
......@@ -224,7 +224,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
count = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", 0, IntSerializer.INSTANCE));
new ValueStateDescriptor<>("count", Integer.class, 0));
}
@Override
......
......@@ -27,14 +27,11 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -173,7 +170,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
sum = getRuntimeContext().getState(
new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE));
new ValueStateDescriptor<>("my_state", Long.class, 0L));
}
@Override
......@@ -202,11 +199,10 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
public void open(Configuration parameters) throws IOException {
aCounts = getRuntimeContext().getState(
new ValueStateDescriptor<>("a", NonSerializableLong.of(0L),
new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L)));
bCounts = getRuntimeContext().getState(
new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE));
new ValueStateDescriptor<>("b", Long.class, 0L));
}
@Override
......
......@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -255,8 +254,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
pCount = getRuntimeContext().getState(
new ValueStateDescriptor<>("pCount", 0L, LongSerializer.INSTANCE));
pCount = getRuntimeContext().getState(new ValueStateDescriptor<>("pCount", Long.class, 0L));
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册