提交 254379b7 编写于 作者: A Aljoscha Krettek

[FLINK-4162] Fix Event Queue Serialization in Abstract(Keyed)CEPPatternOperator

Before, these were using StreamRecordSerializer, which does not serialize
timestamps. Now it uses MultiplexingStreamRecordSerializer.

This also extends the tests in CEPOperatorTest to test that timestamps
are correctly checkpointed/restored.
上级 ac061467
......@@ -25,8 +25,9 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import java.io.IOException;
......@@ -46,7 +47,7 @@ import java.util.PriorityQueue;
abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
private static final long serialVersionUID = 7487334510746595640L;
private final StreamRecordSerializer<IN> streamRecordSerializer;
private final MultiplexingStreamRecordSerializer<IN> streamRecordSerializer;
// global nfa for all elements
private NFA<IN> nfa;
......@@ -60,7 +61,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
NFACompiler.NFAFactory<IN> nfaFactory) {
super(inputSerializer, isProcessingTime);
this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
this.nfa = nfaFactory.createNFA();
}
......@@ -134,7 +135,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
for (int i = 0; i <numberPriorityQueueEntries; i++) {
priorityQueue.offer(streamRecordSerializer.deserialize(div));
StreamElement streamElement = streamRecordSerializer.deserialize(div);
priorityQueue.offer(streamElement.<IN>asRecord());
}
div.close();
......
......@@ -29,8 +29,8 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import java.io.IOException;
......@@ -103,13 +103,17 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
null));
}
@SuppressWarnings("unchecked,rawtypes")
TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
(TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer());
if (priorityQueueOperatorState == null) {
priorityQueueOperatorState = getPartitionedState(
new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
new ValueStateDescriptor<>(
PRIORIRY_QUEUE_STATE_NAME,
new PriorityQueueSerializer<StreamRecord<IN>>(
new StreamRecordSerializer<IN>(getInputSerializer()),
new PriorityQueueStreamRecordFactory<IN>()),
new PriorityQueueSerializer<>(
streamRecordSerializer,
new PriorityQueueStreamRecordFactory<IN>()),
null));
}
}
......
......@@ -28,6 +28,7 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
......@@ -99,6 +100,94 @@ public class CEPOperatorTest extends TestLogger {
harness.close();
}
@Test
public void testCEPOperatorCheckpointing() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
private static final long serialVersionUID = -4873366487571254798L;
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
};
OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
new CEPPatternOperator<>(
Event.createTypeSerializer(),
false,
new NFAFactory()));
harness.open();
Event startEvent = new Event(42, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
Event endEvent= new Event(42, "end", 1.0);
harness.processElement(new StreamRecord<Event>(startEvent, 1));
harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
// simulate snapshot/restore with some elements in internal sorting queue
StreamTaskState snapshot = harness.snapshot(0, 0);
harness = new OneInputStreamOperatorTestHarness<>(
new CEPPatternOperator<>(
Event.createTypeSerializer(),
false,
new NFAFactory()));
harness.setup();
harness.restore(snapshot, 1);
harness.open();
harness.processWatermark(new Watermark(Long.MIN_VALUE));
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
// if element timestamps are not correctly checkpointed/restored this will lead to
// a pruning time underflow exception in NFA
harness.processWatermark(new Watermark(2));
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot2 = harness.snapshot(1, 1);
harness = new OneInputStreamOperatorTestHarness<>(
new CEPPatternOperator<>(
Event.createTypeSerializer(),
false,
new NFAFactory()));
harness.setup();
harness.restore(snapshot2, 2);
harness.open();
harness.processElement(new StreamRecord<Event>(middleEvent, 3));
harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<Event>(endEvent, 5));
harness.processWatermark(new Watermark(Long.MAX_VALUE));
ConcurrentLinkedQueue<Object> result = harness.getOutput();
// watermark and the result
assertEquals(2, result.size());
Object resultObject = result.poll();
assertTrue(resultObject instanceof StreamRecord);
StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
assertTrue(resultRecord.getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
assertEquals(startEvent, patternMap.get("start"));
assertEquals(middleEvent, patternMap.get("middle"));
assertEquals(endEvent, patternMap.get("end"));
harness.close();
}
@Test
public void testKeyedCEPOperatorCheckpointing() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
......@@ -128,12 +217,33 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<Event>(startEvent, 1));
harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
// simulate snapshot/restore with some elements in internal sorting queue
StreamTaskState snapshot = harness.snapshot(0, 0);
harness = new OneInputStreamOperatorTestHarness<>(
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
keySelector,
IntSerializer.INSTANCE,
new NFAFactory()));
harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
harness.setup();
harness.restore(snapshot, 1);
harness.open();
harness.processWatermark(new Watermark(Long.MIN_VALUE));
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
// if element timestamps are not correctly checkpointed/restored this will lead to
// a pruning time underflow exception in NFA
harness.processWatermark(new Watermark(2));
// simulate snapshot/restore
StreamTaskState snapshot = harness.snapshot(0, 0);
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot2 = harness.snapshot(1, 1);
harness = new OneInputStreamOperatorTestHarness<>(
new KeyedCEPPatternOperator<>(
......@@ -145,7 +255,7 @@ public class CEPOperatorTest extends TestLogger {
harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
harness.setup();
harness.restore(snapshot, 1);
harness.restore(snapshot2, 2);
harness.open();
harness.processElement(new StreamRecord<Event>(middleEvent, 3));
......@@ -156,6 +266,7 @@ public class CEPOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> result = harness.getOutput();
// watermark and the result
assertEquals(2, result.size());
Object resultObject = result.poll();
......@@ -203,7 +314,10 @@ public class CEPOperatorTest extends TestLogger {
public boolean filter(Event value) throws Exception {
return value.getName().equals("end");
}
});
})
// add a window timeout to test whether timestamps of elements in the
// priority queue in CEP operator are correctly checkpointed/restored
.within(Time.milliseconds(10));
return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册