提交 1198664c 编写于 作者: S Stephan Ewen

[FLINK-3201] Adjust CEP code for changes state interfaces

上级 180cd3f6
...@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; ...@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
...@@ -93,12 +93,12 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> { ...@@ -93,12 +93,12 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> {
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
final StateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream( final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
checkpointId, checkpointId,
timestamp); timestamp);
final ObjectOutputStream oos = new ObjectOutputStream(os); final ObjectOutputStream oos = new ObjectOutputStream(os);
final StateBackend.CheckpointStateOutputView ov = new StateBackend.CheckpointStateOutputView(os); final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
oos.writeObject(nfa); oos.writeObject(nfa);
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
package org.apache.flink.cep.operator; package org.apache.flink.cep.operator;
import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
...@@ -26,7 +27,7 @@ import org.apache.flink.cep.nfa.NFA; ...@@ -26,7 +27,7 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
...@@ -70,8 +71,8 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator ...@@ -70,8 +71,8 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator
// TODO: fix once the state refactoring is completed // TODO: fix once the state refactoring is completed
private transient Set<KEY> keys; private transient Set<KEY> keys;
private transient OperatorState<NFA<IN>> nfaOperatorState; private transient ValueState<NFA<IN>> nfaOperatorState;
private transient OperatorState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState; private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
public KeyedCEPPatternOperator( public KeyedCEPPatternOperator(
TypeSerializer<IN> inputSerializer, TypeSerializer<IN> inputSerializer,
...@@ -95,19 +96,21 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator ...@@ -95,19 +96,21 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator
} }
if (nfaOperatorState == null) { if (nfaOperatorState == null) {
nfaOperatorState = this.createKeyValueState( nfaOperatorState = getPartitionedState(
NFA_OPERATOR_STATE_NAME, new ValueStateDescriptor<NFA<IN>>(
new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()), NFA_OPERATOR_STATE_NAME,
null); new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()),
null));
} }
if (priorityQueueOperatorState == null) { if (priorityQueueOperatorState == null) {
priorityQueueOperatorState = this.createKeyValueState( priorityQueueOperatorState = getPartitionedState(
PRIORIRY_QUEUE_STATE_NAME, new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
new PriorityQueueSerializer<StreamRecord<IN>>( PRIORIRY_QUEUE_STATE_NAME,
new StreamRecordSerializer<IN>(getInputSerializer()), new PriorityQueueSerializer<StreamRecord<IN>>(
new PriorityQueueStreamRecordFactory<IN>()), new StreamRecordSerializer<IN>(getInputSerializer()),
null); new PriorityQueueStreamRecordFactory<IN>()),
null));
} }
} }
...@@ -166,7 +169,7 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator ...@@ -166,7 +169,7 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
StateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); AbstractStateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
ov.writeInt(keys.size()); ov.writeInt(keys.size());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册