diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
index 23367cdfc6f0f33d92f16b79198d9f9cea40718f..4f0eab2a348392885336b4fb55bed899e8b902aa 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
@@ -32,10 +32,10 @@ public interface TimerContext {
/**
* Timestamp of the element currently being processed.
*
- *
This might be {@code null}, for example if the time characteristic of your program
- * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ *
In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
+ * time when event entered the cep operator.
*/
- Long timestamp();
+ long timestamp();
/** Returns the current processing time. */
long currentProcessingTime();
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
index ab9c97dde6665ecf86e0d65fa5f2189c27b74fa4..13d04bdfbff2d4c1f4006a9b8979480e6c235ecd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
@@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter
Map> match,
Context ctx) throws Exception {
sideCollector.setCtx(ctx);
- long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
- flatTimeoutFunction.timeout(match, timestamp, sideCollector);
+ flatTimeoutFunction.timeout(match, ctx.timestamp(), sideCollector);
}
/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
index 29b0cf768fa30c84001219ae462eb0e56d405419..81999a8dd77cd41a6cb98c43d677fb1ce252a369 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
@@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter
final Map> match,
final Context ctx) throws Exception {
- final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
- final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
+ final T timedOutPatternResult = timeoutFunction.timeout(match, ctx.timestamp());
ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 6df1fa020eed0a95ffe7a429458d3abbeff4dc43..262d89dd7338a7eb8e653512f6ffa3fb1c2d6727 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -459,8 +459,9 @@ public class CepOperator
private void setTimestamp(long timestamp) {
if (!isProcessingTime) {
collector.setAbsoluteTimestamp(timestamp);
- context.setTimestamp(timestamp);
}
+
+ context.setTimestamp(timestamp);
}
/**
@@ -493,12 +494,8 @@ public class CepOperator
}
@Override
- public Long timestamp() {
- if (isProcessingTime) {
- return null;
- } else {
- return timestamp;
- }
+ public long timestamp() {
+ return timestamp;
}
@Override
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
index 09477487880da327f5171bec3b40f14964fa8532..baefe844c45f409dc710ace2045bf59d349450e5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
*/
public class CepProcessFunctionContextTest extends TestLogger {
- private static final boolean PROCESSING_TIME = false;
- private static final boolean EVENT_TIME = true;
+ private static final boolean PROCESSING_TIME = true;
+ private static final boolean EVENT_TIME = false;
@Test
public void testTimestampPassingInEventTime() throws Exception {
@@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(1),
new NFAForwardingFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
// events out of order to test if internal sorting does not mess up the timestamps
@@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(1),
new NFAForwardingFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
+ harness.setProcessingTime(1);
harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+ harness.setProcessingTime(2);
harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+ harness.setProcessingTime(3);
assertOutput(harness.getOutput())
- .nextElementEquals("(NO_TIMESTAMP):A")
- .nextElementEquals("(NO_TIMESTAMP):B")
+ .nextElementEquals("1:A")
+ .nextElementEquals("2:B")
.hasNoMoreElements();
}
}
@@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(1),
new NFAForwardingFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
harness.setProcessingTime(15);
@@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(1),
new NFAForwardingFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
harness.setProcessingTime(10);
@@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(2, timedOut),
new NFATimingOutFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
// events out of order to test if internal sorting does not mess up the timestamps
@@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(2, timedOut),
new NFATimingOutFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
harness.setProcessingTime(3);
@@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger {
harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
assertOutput(harness.getOutput())
- .nextElementEquals("(NO_TIMESTAMP):A:C")
+ .nextElementEquals("5:A:C")
.hasNoMoreElements();
assertOutput(harness.getSideOutput(timedOut))
- .nextElementEquals("(NO_TIMESTAMP):C")
+ .nextElementEquals("15:C")
.hasNoMoreElements();
}
}
@@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(2, sideOutputTag),
new NFATimingOutFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
// events out of order to test if internal sorting does not mess up the timestamps
@@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(2, sideOutputTag),
new NFATimingOutFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
harness.setProcessingTime(3);
@@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
*/
private static PatternProcessFunction extractTimestampAndNames(int stateNumber) {
return new AccessContextWithNames(stateNumber,
- context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+ context -> String.valueOf(context.timestamp()));
}
/**
@@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
OutputTag timedOutTag) {
return new AccessContextWithNamesWithTimedOut(stateNumber,
timedOutTag,
- context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+ context -> String.valueOf(context.timestamp()));
}
/**