提交 abe7ae23 编写于 作者: D Dawid Wysakowicz 提交者: Dawid Wysakowicz

[hotfix][cep] Introduced nfa test harness

上级 5e464871
......@@ -22,10 +22,10 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.aftermatch.SkipPastLastStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkRuntimeException;
......@@ -40,9 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.junit.Assert.assertThat;
/**
......@@ -77,9 +75,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(3);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3),
......@@ -141,9 +139,11 @@ public class AfterMatchSkipITCase extends TestLogger{
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
.withAfterMatchSkipStrategy(skipStrategy)
.build();
return feedNFA(streamEvents, nfa, skipStrategy);
return nfaTestHarness.feedRecords(streamEvents);
}
}
......@@ -198,9 +198,11 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).oneOrMore();
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
.withAfterMatchSkipStrategy(skipStrategy)
.build();
return feedNFA(streamEvents, nfa, skipStrategy);
return nfaTestHarness.feedRecords(streamEvents);
}
}
......@@ -231,9 +233,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(3);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3),
......@@ -275,9 +277,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(2);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, ab2, ab3, ab4),
......@@ -318,9 +320,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
}).times(2);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, ab2, ab3, ab4),
......@@ -376,9 +378,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("d");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Collections.singletonList(
Lists.newArrayList(a1, b1, c1, d1)
......@@ -415,9 +417,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}
);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a2, b2)
......@@ -459,9 +461,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("c");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, c1),
......@@ -498,9 +500,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("c");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, c1),
......@@ -543,9 +545,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
}).oneOrMore().consecutive();
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, b1),
......@@ -573,9 +575,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}
);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
//skip to first element of a match should throw exception if they are enabled,
//this mode is used in MATCH RECOGNIZE which assumes that skipping to first element
......@@ -645,9 +647,11 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("c");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
.withAfterMatchSkipStrategy(skipStrategy)
.build();
return feedNFA(streamEvents, nfa, skipStrategy);
return nfaTestHarness.feedRecords(streamEvents);
}
}
......@@ -686,9 +690,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
}).oneOrMore().consecutive();
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, b1),
......@@ -728,9 +732,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Collections.singletonList(
Lists.newArrayList(a1, a2, a3, b1)
......@@ -768,9 +772,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
......@@ -809,9 +813,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
......@@ -851,9 +855,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
......@@ -913,9 +917,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("d");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a, b, c1, c2, c3, d),
......@@ -962,9 +966,9 @@ public class AfterMatchSkipITCase extends TestLogger{
ctx.getEventsForPattern("a").iterator().next().getPrice() == value.getPrice();
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, c1, b2),
......@@ -991,22 +995,10 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(2);
NFA<Event> nfa = compile(pattern, false);
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
NFAState nfaState = nfa.createInitialNFAState();
for (StreamRecord<Event> inputEvent : inputEvents) {
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
nfa.process(
sharedBufferAccessor,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
matchSkipStrategy);
}
}
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
nfaTestHarness.feedRecords(inputEvents);
assertThat(sharedBuffer.isEmpty(), Matchers.is(true));
}
......
......@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
......
......@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.GroupPattern;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
......@@ -32,8 +33,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
......@@ -1077,7 +1078,8 @@ public class GroupITCase extends TestLogger {
NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(c, a1, b1, d),
......
......@@ -33,8 +33,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
......
......@@ -21,11 +21,13 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -48,8 +50,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
......@@ -414,7 +416,11 @@ public class NFAITCase extends TestLogger {
Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
Collection<Map<String, List<Event>>> matchedPatterns =
nfa.process(sharedBufferAccessor, nfaState, event.getValue(), event.getTimestamp());
nfa.process(sharedBufferAccessor,
nfaState,
event.getValue(),
event.getTimestamp(),
AfterMatchSkipStrategy.noSkip());
resultingPatterns.addAll(matchedPatterns);
resultingTimeoutPatterns.addAll(timeoutPatterns);
......@@ -2342,11 +2348,13 @@ public class NFAITCase extends TestLogger {
NFAState nfaState = nfa.createInitialNFAState();
nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 2);
nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 3);
nfa.process(sharedBufferAccessor, nfaState, middleEvent3, 4);
nfa.process(sharedBufferAccessor, nfaState, end1, 6);
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent1, 2));
nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent2, 3));
nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent3, 4));
nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
......@@ -2387,10 +2395,11 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
nfa.process(sharedBufferAccessor, nfaState, middleEvent, 5);
nfa.process(sharedBufferAccessor, nfaState, end1, 6);
nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent, 5));
nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
......@@ -2432,11 +2441,12 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
nfa.process(sharedBufferAccessor, nfaState, end1, 6);
nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
......@@ -2478,11 +2488,12 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
nfa.process(sharedBufferAccessor, nfaState, end1, 6);
nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
......@@ -2734,25 +2745,12 @@ public class NFAITCase extends TestLogger {
}
}).times(3).consecutive();
NFA<Event> nfa = compile(pattern, false);
List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
NFAState nfaState = nfa.createInitialNFAState();
for (StreamRecord<Event> inputEvent : inputEvents) {
Collection<Map<String, List<Event>>> patterns = nfa.process(
sharedBufferAccessor,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp());
resultingPatterns.addAll(patterns);
}
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
Assert.assertEquals(1L, resultingPatterns.size());
Map<String, List<Event>> match = resultingPatterns.get(0);
Map<String, List<Event>> match = resultingPatterns.iterator().next();
Assert.assertArrayEquals(
match.get("start").toArray(),
Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray());
......@@ -2809,25 +2807,12 @@ public class NFAITCase extends TestLogger {
}
});
NFA<Event> nfa = compile(pattern, false);
List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
NFAState nfaState = nfa.createInitialNFAState();
for (StreamRecord<Event> inputEvent : inputEvents) {
Collection<Map<String, List<Event>>> patterns = nfa.process(
sharedBufferAccessor,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp());
resultingPatterns.addAll(patterns);
}
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
Assert.assertEquals(1L, resultingPatterns.size());
Map<String, List<Event>> match = resultingPatterns.get(0);
Map<String, List<Event>> match = resultingPatterns.iterator().next();
List<String> expectedOrder = Lists.newArrayList("a", "b", "aa", "bb", "ab");
List<String> resultOrder = new ArrayList<>();
......@@ -2847,8 +2832,8 @@ public class NFAITCase extends TestLogger {
try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
NFA<Event> nfa = compile(pattern, false);
nfa.process(accessor, nfa.createInitialNFAState(), a, 1);
nfa.process(accessor, nfa.createInitialNFAState(), b, 2);
nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
......
......@@ -20,10 +20,10 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -32,7 +32,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
/**
......@@ -99,22 +98,13 @@ public class NFAStateAccessTest {
}
});
NFA<Event> nfa = compile(pattern, false);
TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
for (StreamRecord<Event> inputEvent : inputEvents) {
try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
nfa.process(
accessor,
nfa.createInitialNFAState(),
inputEvent.getValue(),
inputEvent.getTimestamp());
}
}
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
nfaTestHarness.consumeRecords(inputEvents);
assertEquals(2, sharedBuffer.getStateReads());
assertEquals(3, sharedBuffer.getStateWrites());
assertEquals(5, sharedBuffer.getStateAccesses());
assertEquals(58, sharedBuffer.getStateReads());
assertEquals(33, sharedBuffer.getStateWrites());
assertEquals(91, sharedBuffer.getStateAccesses());
}
@Test
......@@ -182,21 +172,12 @@ public class NFAStateAccessTest {
}
});
NFA<Event> nfa = compile(pattern, false);
TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
for (StreamRecord<Event> inputEvent : inputEvents) {
try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
nfa.process(
accessor,
nfa.createInitialNFAState(),
inputEvent.getValue(),
inputEvent.getTimestamp());
}
}
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
nfaTestHarness.consumeRecords(inputEvents);
assertEquals(8, sharedBuffer.getStateReads());
assertEquals(12, sharedBuffer.getStateWrites());
assertEquals(20, sharedBuffer.getStateAccesses());
assertEquals(90, sharedBuffer.getStateReads());
assertEquals(31, sharedBuffer.getStateWrites());
assertEquals(121, sharedBuffer.getStateAccesses());
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
......@@ -47,6 +48,7 @@ public class NFAStatusChangeITCase {
private SharedBuffer<Event> sharedBuffer;
private SharedBufferAccessor<Event> sharedBufferAccessor;
private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
@Before
public void init() {
......@@ -95,32 +97,32 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L);
nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L);
nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
// the status of the queue of ComputationStatus changed,
// more than one ComputationStatus is generated by the event from some ComputationStatus
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L);
nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L);
nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L);
nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L);
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
......@@ -164,10 +166,10 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
nfaState.resetStateChanged();
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L);
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
assertTrue(nfaState.isStateChanged());
}
......@@ -193,7 +195,7 @@ public class NFAStatusChangeITCase {
nfaState.resetStateChanged();
nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
nfaState.resetStateChanged();
nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
......
......@@ -19,13 +19,11 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -39,10 +37,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
......@@ -91,9 +87,7 @@ public class NFATest extends TestLogger {
states.add(endState);
states.add(endingState);
NFA<Event> nfa = new NFA<>(states, 0, false);
Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
Map<String, List<Event>> firstPattern = new HashMap<>();
firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
......@@ -106,14 +100,16 @@ public class NFATest extends TestLogger {
expectedPatterns.add(firstPattern);
expectedPatterns.add(secondPattern);
Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
NFA<Event> nfa = new NFA<>(states, 0, false);
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@Test
public void testTimeoutWindowPruning() throws Exception {
NFA<Event> nfa = createStartEndNFA();
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
......@@ -121,7 +117,7 @@ public class NFATest extends TestLogger {
streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
Map<String, List<Event>> secondPattern = new HashMap<>();
secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
......@@ -129,7 +125,10 @@ public class NFATest extends TestLogger {
expectedPatterns.add(secondPattern);
Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
NFA<Event> nfa = createStartEndNFA();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
......@@ -140,15 +139,17 @@ public class NFATest extends TestLogger {
*/
@Test
public void testWindowBorders() throws Exception {
NFA<Event> nfa = createStartEndNFA();
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L));
Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
List<Map<String, List<Event>>> expectedPatterns = Collections.emptyList();
NFA<Event> nfa = createStartEndNFA();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
......@@ -159,7 +160,6 @@ public class NFATest extends TestLogger {
*/
@Test
public void testTimeoutWindowPruningWindowBorders() throws Exception {
NFA<Event> nfa = createStartEndNFA();
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
......@@ -167,7 +167,7 @@ public class NFATest extends TestLogger {
streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L));
streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L));
Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
Map<String, List<Event>> secondPattern = new HashMap<>();
secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0)));
......@@ -175,30 +175,12 @@ public class NFATest extends TestLogger {
expectedPatterns.add(secondPattern);
Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
NFA<Event> nfa = createStartEndNFA();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
assertEquals(expectedPatterns, actualPatterns);
}
Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
public Collection<Map<String, List<Event>>> runNFA(
NFA<Event> nfa, NFAState nfaState, List<StreamRecord<Event>> inputs) throws Exception {
Set<Map<String, List<Event>>> actualPatterns = new HashSet<>();
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
for (StreamRecord<Event> streamEvent : inputs) {
nfa.advanceTime(sharedBufferAccessor, nfaState, streamEvent.getTimestamp());
Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
sharedBufferAccessor,
nfaState,
streamEvent.getValue(),
streamEvent.getTimestamp());
actualPatterns.addAll(matchedPatterns);
}
}
return actualPatterns;
assertEquals(expectedPatterns, actualPatterns);
}
@Test
......@@ -289,51 +271,49 @@ public class NFATest extends TestLogger {
patterns.add(pattern2);
patterns.add(pattern3);
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
for (Pattern<Event, ?> p : patterns) {
NFA<Event> nfa = compile(p, false);
Event a = new Event(40, "a", 1.0);
Event b = new Event(41, "b", 2.0);
Event c = new Event(42, "c", 3.0);
Event b1 = new Event(41, "b", 3.0);
Event b2 = new Event(41, "b", 4.0);
Event b3 = new Event(41, "b", 5.0);
Event d = new Event(43, "d", 4.0);
NFAState nfaState = nfa.createInitialNFAState();
nfa.process(sharedBufferAccessor, nfaState, a, 1);
nfa.process(sharedBufferAccessor, nfaState, b, 2);
nfa.process(sharedBufferAccessor, nfaState, c, 3);
nfa.process(sharedBufferAccessor, nfaState, b1, 4);
nfa.process(sharedBufferAccessor, nfaState, b2, 5);
nfa.process(sharedBufferAccessor, nfaState, b3, 6);
nfa.process(sharedBufferAccessor, nfaState, d, 7);
nfa.process(sharedBufferAccessor, nfaState, a, 8);
NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
//serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
baos.close();
// copy
ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
in.close();
out.close();
// deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
bais.close();
assertEquals(nfaState, copy);
}
for (Pattern<Event, ?> p : patterns) {
NFA<Event> nfa = compile(p, false);
Event a = new Event(40, "a", 1.0);
Event b = new Event(41, "b", 2.0);
Event c = new Event(42, "c", 3.0);
Event b1 = new Event(41, "b", 3.0);
Event b2 = new Event(41, "b", 4.0);
Event b3 = new Event(41, "b", 5.0);
Event d = new Event(43, "d", 4.0);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
nfaTestHarness.consumeRecord(new StreamRecord<>(a, 1));
nfaTestHarness.consumeRecord(new StreamRecord<>(b, 2));
nfaTestHarness.consumeRecord(new StreamRecord<>(c, 3));
nfaTestHarness.consumeRecord(new StreamRecord<>(b1, 4));
nfaTestHarness.consumeRecord(new StreamRecord<>(b2, 5));
nfaTestHarness.consumeRecord(new StreamRecord<>(b3, 6));
nfaTestHarness.consumeRecord(new StreamRecord<>(d, 7));
nfaTestHarness.consumeRecord(new StreamRecord<>(a, 8));
NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
//serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
baos.close();
// copy
ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
in.close();
out.close();
// deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
bais.close();
assertEquals(nfaState, copy);
}
}
......
......@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
......
......@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
......@@ -33,8 +34,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
......@@ -141,8 +142,9 @@ public void testClearingBuffer() throws Exception {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, b1, c1, d)
));
......@@ -186,8 +188,9 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, d1, d2, d3),
Lists.newArrayList(a1, d1, d2),
......
......@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
......
......@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
......
......@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
......@@ -31,8 +32,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
......@@ -92,8 +93,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
......@@ -142,8 +144,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
......@@ -193,8 +196,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
......@@ -244,8 +248,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, breaking)
......@@ -292,8 +297,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
......@@ -342,8 +348,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
......@@ -394,8 +401,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
......@@ -525,8 +533,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
......@@ -579,8 +588,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
......@@ -631,10 +641,10 @@ public class UntilConditionITCase {
});
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.utils;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* Test harness for setting up {@link NFA}.
*/
public final class NFATestHarness {
private final SharedBuffer<Event> sharedBuffer;
private final NFA<Event> nfa;
private final NFAState nfaState;
private final AfterMatchSkipStrategy afterMatchSkipStrategy;
private NFATestHarness(
SharedBuffer<Event> sharedBuffer,
NFA<Event> nfa,
NFAState nfaState,
AfterMatchSkipStrategy afterMatchSkipStrategy) {
this.sharedBuffer = sharedBuffer;
this.nfa = nfa;
this.nfaState = nfaState;
this.afterMatchSkipStrategy = afterMatchSkipStrategy;
}
/**
* Constructs a test harness starting from a given {@link Pattern}.
*/
public static NFATestHarnessBuilderPattern forPattern(Pattern<Event, ?> pattern) {
return new NFATestHarnessBuilderPattern(pattern);
}
/**
* Constructs a test harness starting from a given {@link NFA}.
*/
public static NFATestHarnessBuilderNFA forNFA(NFA<Event> nfa) {
return new NFATestHarnessBuilderNFA(nfa);
}
public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
List<List<Event>> resultingPatterns = new ArrayList<>();
for (StreamRecord<Event> inputEvent : inputEvents) {
resultingPatterns.addAll(feedRecord(inputEvent));
}
return resultingPatterns;
}
public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
List<List<Event>> resultingPatterns = new ArrayList<>();
Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
for (Map<String, List<Event>> p : matches) {
List<Event> res = new ArrayList<>();
for (List<Event> le : p.values()) {
res.addAll(le);
}
resultingPatterns.add(res);
}
return resultingPatterns;
}
public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
for (StreamRecord<Event> inputEvent : inputEvents) {
resultingPatterns.addAll(consumeRecord(inputEvent));
}
return resultingPatterns;
}
public Collection<Map<String, List<Event>>> consumeRecord(StreamRecord<Event> inputEvent) throws Exception {
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
return nfa.process(
sharedBufferAccessor,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
afterMatchSkipStrategy);
}
}
/**
* Builder for {@link NFATestHarness} that encapsulates {@link Pattern}.
*/
public static class NFATestHarnessBuilderPattern extends NFATestHarnessBuilderBase {
private final Pattern<Event, ?> pattern;
private boolean timeoutHandling = false;
NFATestHarnessBuilderPattern(Pattern<Event, ?> pattern) {
super(pattern.getAfterMatchSkipStrategy());
this.pattern = pattern;
}
public NFATestHarnessBuilderBase withTimeoutHandling() {
this.timeoutHandling = true;
return this;
}
@Override
public NFATestHarness build() {
NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
return new NFATestHarness(
sharedBuffer,
nfa,
nfa.createInitialNFAState(),
afterMatchSkipStrategy);
}
}
/**
* Builder for {@link NFATestHarness} that encapsulates {@link NFA}.
*/
public static class NFATestHarnessBuilderNFA extends NFATestHarnessBuilderBase {
private final NFA<Event> nfa;
private NFAState nfaState;
NFATestHarnessBuilderNFA(NFA<Event> nfa) {
super(AfterMatchSkipStrategy.noSkip());
this.nfa = nfa;
this.nfaState = nfa.createInitialNFAState();
}
public NFATestHarnessBuilderBase withNFAState(NFAState nfaState) {
this.nfaState = nfaState;
return this;
}
@Override
public NFATestHarness build() {
return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
}
}
/**
* Common builder, which can be used independent if we start with {@link Pattern} or {@link NFA}.
* Enables to provide custom services like {@link SharedBuffer} etc.
*/
public abstract static class NFATestHarnessBuilderBase {
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
AfterMatchSkipStrategy afterMatchSkipStrategy;
NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
this.afterMatchSkipStrategy = skipStrategy;
}
public NFATestHarnessBuilderBase withSharedBuffer(SharedBuffer<Event> sharedBuffer) {
this.sharedBuffer = sharedBuffer;
return this;
}
public NFATestHarnessBuilderBase withAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) {
this.afterMatchSkipStrategy = afterMatchSkipStrategy;
return this;
}
public abstract NFATestHarness build();
}
}
......@@ -16,77 +16,29 @@
* limitations under the License.
*/
package org.apache.flink.cep.nfa;
package org.apache.flink.cep.utils;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* Base method for IT tests of {@link NFA}. It provides utility methods.
*/
public class NFATestUtilities {
public static List<List<Event>> feedNFA(
List<StreamRecord<Event>> inputEvents,
NFA<Event> nfa) throws Exception {
return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), AfterMatchSkipStrategy.noSkip());
}
public static List<List<Event>> feedNFA(
List<StreamRecord<Event>> inputEvents,
NFA<Event> nfa,
NFAState nfaState) throws Exception {
return feedNFA(inputEvents, nfa, nfaState, AfterMatchSkipStrategy.noSkip());
}
public static List<List<Event>> feedNFA(
List<StreamRecord<Event>> inputEvents,
NFA<Event> nfa,
AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), afterMatchSkipStrategy);
}
@Deprecated
public static List<List<Event>> feedNFA(
List<StreamRecord<Event>> inputEvents,
NFA<Event> nfa,
NFAState nfaState,
AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
List<List<Event>> resultingPatterns = new ArrayList<>();
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
for (StreamRecord<Event> inputEvent : inputEvents) {
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
Collection<Map<String, List<Event>>> patterns = nfa.process(
sharedBufferAccessor,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
afterMatchSkipStrategy);
for (Map<String, List<Event>> p: patterns) {
List<Event> res = new ArrayList<>();
for (List<Event> le: p.values()) {
res.addAll(le);
}
resultingPatterns.add(res);
}
}
}
return resultingPatterns;
NFA<Event> nfa) throws Exception {
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
return nfaTestHarness.feedRecords(inputEvents);
}
public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册