From 5e4648715f97338bb8ee0c8cc790bdbecc5b4422 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Mon, 7 Jan 2019 15:23:40 +0100 Subject: [PATCH] [hotfix][cep] Change contract of cep TimerContext#timestamp to never return null --- .../flink/cep/context/TimerContext.java | 6 ++-- .../PatternTimeoutFlatSelectAdapter.java | 3 +- .../adaptors/PatternTimeoutSelectAdapter.java | 3 +- .../flink/cep/operator/CepOperator.java | 11 +++--- .../CepProcessFunctionContextTest.java | 35 ++++++++++--------- 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java index 23367cdfc6f..4f0eab2a348 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java @@ -32,10 +32,10 @@ public interface TimerContext { /** * Timestamp of the element currently being processed. * - *

This might be {@code null}, for example if the time characteristic of your program - * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + *

In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the + * time when event entered the cep operator. */ - Long timestamp(); + long timestamp(); /** Returns the current processing time. */ long currentProcessingTime(); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java index ab9c97dde66..13d04bdfbff 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java @@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter Map> match, Context ctx) throws Exception { sideCollector.setCtx(ctx); - long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime(); - flatTimeoutFunction.timeout(match, timestamp, sideCollector); + flatTimeoutFunction.timeout(match, ctx.timestamp(), sideCollector); } /** diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java index 29b0cf768fa..81999a8dd77 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java @@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter final Map> match, final Context ctx) throws Exception { - final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime(); - final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp); + final T timedOutPatternResult = timeoutFunction.timeout(match, ctx.timestamp()); ctx.output(timedOutPartialMatchesTag, timedOutPatternResult); } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java index 6df1fa020ee..262d89dd733 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java @@ -459,8 +459,9 @@ public class CepOperator private void setTimestamp(long timestamp) { if (!isProcessingTime) { collector.setAbsoluteTimestamp(timestamp); - context.setTimestamp(timestamp); } + + context.setTimestamp(timestamp); } /** @@ -493,12 +494,8 @@ public class CepOperator } @Override - public Long timestamp() { - if (isProcessingTime) { - return null; - } else { - return timestamp; - } + public long timestamp() { + return timestamp; } @Override diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java index 09477487880..baefe844c45 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java @@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput; */ public class CepProcessFunctionContextTest extends TestLogger { - private static final boolean PROCESSING_TIME = false; - private static final boolean EVENT_TIME = true; + private static final boolean PROCESSING_TIME = true; + private static final boolean EVENT_TIME = false; @Test public void testTimestampPassingInEventTime() throws Exception { @@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractTimestampAndNames(1), new NFAForwardingFactory(), - PROCESSING_TIME))) { + EVENT_TIME))) { harness.open(); // events out of order to test if internal sorting does not mess up the timestamps @@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractTimestampAndNames(1), new NFAForwardingFactory(), - EVENT_TIME))) { + PROCESSING_TIME))) { harness.open(); + harness.setProcessingTime(1); harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord()); + harness.setProcessingTime(2); harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord()); + harness.setProcessingTime(3); assertOutput(harness.getOutput()) - .nextElementEquals("(NO_TIMESTAMP):A") - .nextElementEquals("(NO_TIMESTAMP):B") + .nextElementEquals("1:A") + .nextElementEquals("2:B") .hasNoMoreElements(); } } @@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractCurrentProcessingTimeAndNames(1), new NFAForwardingFactory(), - EVENT_TIME))) { + PROCESSING_TIME))) { harness.open(); harness.setProcessingTime(15); @@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractCurrentProcessingTimeAndNames(1), new NFAForwardingFactory(), - PROCESSING_TIME))) { + EVENT_TIME))) { harness.open(); harness.setProcessingTime(10); @@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractTimestampAndNames(2, timedOut), new NFATimingOutFactory(), - PROCESSING_TIME))) { + EVENT_TIME))) { harness.open(); // events out of order to test if internal sorting does not mess up the timestamps @@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractTimestampAndNames(2, timedOut), new NFATimingOutFactory(), - EVENT_TIME))) { + PROCESSING_TIME))) { harness.open(); harness.setProcessingTime(3); @@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger { harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord()); assertOutput(harness.getOutput()) - .nextElementEquals("(NO_TIMESTAMP):A:C") + .nextElementEquals("5:A:C") .hasNoMoreElements(); assertOutput(harness.getSideOutput(timedOut)) - .nextElementEquals("(NO_TIMESTAMP):C") + .nextElementEquals("15:C") .hasNoMoreElements(); } } @@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractCurrentProcessingTimeAndNames(2, sideOutputTag), new NFATimingOutFactory(), - PROCESSING_TIME))) { + EVENT_TIME))) { harness.open(); // events out of order to test if internal sorting does not mess up the timestamps @@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger { createCepOperator( extractCurrentProcessingTimeAndNames(2, sideOutputTag), new NFATimingOutFactory(), - EVENT_TIME))) { + PROCESSING_TIME))) { harness.open(); harness.setProcessingTime(3); @@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger { */ private static PatternProcessFunction extractTimestampAndNames(int stateNumber) { return new AccessContextWithNames(stateNumber, - context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP); + context -> String.valueOf(context.timestamp())); } /** @@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger { OutputTag timedOutTag) { return new AccessContextWithNamesWithTimedOut(stateNumber, timedOutTag, - context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP); + context -> String.valueOf(context.timestamp())); } /** -- GitLab