未验证 提交 3fc96cd1 编写于 作者: D Dian Fu 提交者: Dawid Wysakowicz

[FLINK-7061] [cep] Fix quantifier range starting from 0

This closes #4242
上级 3096bd03
......@@ -368,6 +368,7 @@ public class Pattern<T, F extends T> {
this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
if (from == 0) {
this.quantifier.optional();
from = 1;
}
this.times = Times.of(from, to);
return this;
......
......@@ -153,9 +153,8 @@ public class Quantifier {
private final int to;
private Times(int from, int to) {
Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0.");
Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + ".");
Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0.");
this.from = from;
this.to = to;
}
......
......@@ -91,6 +91,57 @@ public class TimesRangeITCase extends TestLogger {
));
}
@Test
public void testTimesRangeFromZero() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event middleEvent3 = new Event(43, "a", 4.0);
Event end1 = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 2));
inputEvents.add(new StreamRecord<>(middleEvent2, 3));
inputEvents.add(new StreamRecord<>(middleEvent3, 4));
inputEvents.add(new StreamRecord<>(end1, 6));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).next("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).times(0, 2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, end1),
Lists.newArrayList(startEvent, end1)
));
}
@Test
public void testTimesRangeNonStrict() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
......
......@@ -195,6 +195,16 @@ public class PatternTest extends TestLogger {
assertEquals(previous2.getName(), "start");
}
@Test(expected = IllegalArgumentException.class)
public void testPatternTimesNegativeTimes() throws Exception {
Pattern.begin("start").where(dummyCondition()).times(-1);
}
@Test(expected = IllegalArgumentException.class)
public void testPatternTimesNegativeFrom() throws Exception {
Pattern.begin("start").where(dummyCondition()).times(-1, 2);
}
@Test(expected = MalformedPatternException.class)
public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册