From 3fc96cd1f9564a60ba5ec7f06a1fec4ab173b200 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Sun, 2 Jul 2017 13:11:05 +0800 Subject: [PATCH] [FLINK-7061] [cep] Fix quantifier range starting from 0 This closes #4242 --- .../org/apache/flink/cep/pattern/Pattern.java | 1 + .../apache/flink/cep/pattern/Quantifier.java | 3 +- .../flink/cep/nfa/TimesRangeITCase.java | 51 +++++++++++++++++++ .../apache/flink/cep/pattern/PatternTest.java | 10 ++++ 4 files changed, 63 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index f4d34047cef..2ffbc41a600 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -368,6 +368,7 @@ public class Pattern { this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); if (from == 0) { this.quantifier.optional(); + from = 1; } this.times = Times.of(from, to); return this; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index c1893b4a110..9192a133dfb 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -153,9 +153,8 @@ public class Quantifier { private final int to; private Times(int from, int to) { - Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0."); Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "."); - Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0."); this.from = from; this.to = to; } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java index 4305fa2f4c5..37a953407cf 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java @@ -91,6 +91,57 @@ public class TimesRangeITCase extends TestLogger { )); } + @Test + public void testTimesRangeFromZero() { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(0, 2).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, end1) + )); + } + @Test public void testTimesRangeNonStrict() { List> inputEvents = new ArrayList<>(); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 999e5f3c44a..6d93ff3a3b4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -195,6 +195,16 @@ public class PatternTest extends TestLogger { assertEquals(previous2.getName(), "start"); } + @Test(expected = IllegalArgumentException.class) + public void testPatternTimesNegativeTimes() throws Exception { + Pattern.begin("start").where(dummyCondition()).times(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testPatternTimesNegativeFrom() throws Exception { + Pattern.begin("start").where(dummyCondition()).times(-1, 2); + } + @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception { -- GitLab