提交 5e464871 编写于 作者: D Dawid Wysakowicz 提交者: Dawid Wysakowicz

[hotfix][cep] Change contract of cep TimerContext#timestamp to never

return null
上级 9742ef7a
...@@ -32,10 +32,10 @@ public interface TimerContext { ...@@ -32,10 +32,10 @@ public interface TimerContext {
/** /**
* Timestamp of the element currently being processed. * Timestamp of the element currently being processed.
* *
* <p>This might be {@code null}, for example if the time characteristic of your program * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. * time when event entered the cep operator.
*/ */
Long timestamp(); long timestamp();
/** Returns the current processing time. */ /** Returns the current processing time. */
long currentProcessingTime(); long currentProcessingTime();
......
...@@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter<IN, OUT, T> ...@@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter<IN, OUT, T>
Map<String, List<IN>> match, Map<String, List<IN>> match,
Context ctx) throws Exception { Context ctx) throws Exception {
sideCollector.setCtx(ctx); sideCollector.setCtx(ctx);
long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime(); flatTimeoutFunction.timeout(match, ctx.timestamp(), sideCollector);
flatTimeoutFunction.timeout(match, timestamp, sideCollector);
} }
/** /**
......
...@@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter<IN, OUT, T> ...@@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter<IN, OUT, T>
final Map<String, List<IN>> match, final Map<String, List<IN>> match,
final Context ctx) throws Exception { final Context ctx) throws Exception {
final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime(); final T timedOutPatternResult = timeoutFunction.timeout(match, ctx.timestamp());
final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
ctx.output(timedOutPartialMatchesTag, timedOutPatternResult); ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
} }
......
...@@ -459,8 +459,9 @@ public class CepOperator<IN, KEY, OUT> ...@@ -459,8 +459,9 @@ public class CepOperator<IN, KEY, OUT>
private void setTimestamp(long timestamp) { private void setTimestamp(long timestamp) {
if (!isProcessingTime) { if (!isProcessingTime) {
collector.setAbsoluteTimestamp(timestamp); collector.setAbsoluteTimestamp(timestamp);
context.setTimestamp(timestamp);
} }
context.setTimestamp(timestamp);
} }
/** /**
...@@ -493,12 +494,8 @@ public class CepOperator<IN, KEY, OUT> ...@@ -493,12 +494,8 @@ public class CepOperator<IN, KEY, OUT>
} }
@Override @Override
public Long timestamp() { public long timestamp() {
if (isProcessingTime) { return timestamp;
return null;
} else {
return timestamp;
}
} }
@Override @Override
......
...@@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput; ...@@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
*/ */
public class CepProcessFunctionContextTest extends TestLogger { public class CepProcessFunctionContextTest extends TestLogger {
private static final boolean PROCESSING_TIME = false; private static final boolean PROCESSING_TIME = true;
private static final boolean EVENT_TIME = true; private static final boolean EVENT_TIME = false;
@Test @Test
public void testTimestampPassingInEventTime() throws Exception { public void testTimestampPassingInEventTime() throws Exception {
...@@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractTimestampAndNames(1), extractTimestampAndNames(1),
new NFAForwardingFactory(), new NFAForwardingFactory(),
PROCESSING_TIME))) { EVENT_TIME))) {
harness.open(); harness.open();
// events out of order to test if internal sorting does not mess up the timestamps // events out of order to test if internal sorting does not mess up the timestamps
...@@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractTimestampAndNames(1), extractTimestampAndNames(1),
new NFAForwardingFactory(), new NFAForwardingFactory(),
EVENT_TIME))) { PROCESSING_TIME))) {
harness.open(); harness.open();
harness.setProcessingTime(1);
harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord()); harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
harness.setProcessingTime(2);
harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord()); harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
harness.setProcessingTime(3);
assertOutput(harness.getOutput()) assertOutput(harness.getOutput())
.nextElementEquals("(NO_TIMESTAMP):A") .nextElementEquals("1:A")
.nextElementEquals("(NO_TIMESTAMP):B") .nextElementEquals("2:B")
.hasNoMoreElements(); .hasNoMoreElements();
} }
} }
...@@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractCurrentProcessingTimeAndNames(1), extractCurrentProcessingTimeAndNames(1),
new NFAForwardingFactory(), new NFAForwardingFactory(),
EVENT_TIME))) { PROCESSING_TIME))) {
harness.open(); harness.open();
harness.setProcessingTime(15); harness.setProcessingTime(15);
...@@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractCurrentProcessingTimeAndNames(1), extractCurrentProcessingTimeAndNames(1),
new NFAForwardingFactory(), new NFAForwardingFactory(),
PROCESSING_TIME))) { EVENT_TIME))) {
harness.open(); harness.open();
harness.setProcessingTime(10); harness.setProcessingTime(10);
...@@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractTimestampAndNames(2, timedOut), extractTimestampAndNames(2, timedOut),
new NFATimingOutFactory(), new NFATimingOutFactory(),
PROCESSING_TIME))) { EVENT_TIME))) {
harness.open(); harness.open();
// events out of order to test if internal sorting does not mess up the timestamps // events out of order to test if internal sorting does not mess up the timestamps
...@@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractTimestampAndNames(2, timedOut), extractTimestampAndNames(2, timedOut),
new NFATimingOutFactory(), new NFATimingOutFactory(),
EVENT_TIME))) { PROCESSING_TIME))) {
harness.open(); harness.open();
harness.setProcessingTime(3); harness.setProcessingTime(3);
...@@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger {
harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord()); harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
assertOutput(harness.getOutput()) assertOutput(harness.getOutput())
.nextElementEquals("(NO_TIMESTAMP):A:C") .nextElementEquals("5:A:C")
.hasNoMoreElements(); .hasNoMoreElements();
assertOutput(harness.getSideOutput(timedOut)) assertOutput(harness.getSideOutput(timedOut))
.nextElementEquals("(NO_TIMESTAMP):C") .nextElementEquals("15:C")
.hasNoMoreElements(); .hasNoMoreElements();
} }
} }
...@@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractCurrentProcessingTimeAndNames(2, sideOutputTag), extractCurrentProcessingTimeAndNames(2, sideOutputTag),
new NFATimingOutFactory(), new NFATimingOutFactory(),
PROCESSING_TIME))) { EVENT_TIME))) {
harness.open(); harness.open();
// events out of order to test if internal sorting does not mess up the timestamps // events out of order to test if internal sorting does not mess up the timestamps
...@@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator( createCepOperator(
extractCurrentProcessingTimeAndNames(2, sideOutputTag), extractCurrentProcessingTimeAndNames(2, sideOutputTag),
new NFATimingOutFactory(), new NFATimingOutFactory(),
EVENT_TIME))) { PROCESSING_TIME))) {
harness.open(); harness.open();
harness.setProcessingTime(3); harness.setProcessingTime(3);
...@@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
*/ */
private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) { private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
return new AccessContextWithNames(stateNumber, return new AccessContextWithNames(stateNumber,
context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP); context -> String.valueOf(context.timestamp()));
} }
/** /**
...@@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger { ...@@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
OutputTag<String> timedOutTag) { OutputTag<String> timedOutTag) {
return new AccessContextWithNamesWithTimedOut(stateNumber, return new AccessContextWithNamesWithTimedOut(stateNumber,
timedOutTag, timedOutTag,
context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP); context -> String.valueOf(context.timestamp()));
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册