[FLINK-17669] Use new WatermarkGenerator for per-partition watermarking in KafkaConsumer

上级 12197b3b
......@@ -18,11 +18,10 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
......@@ -245,8 +244,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
......@@ -264,8 +262,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
return new Kafka010Fetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
......
......@@ -19,10 +19,9 @@
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
......@@ -81,8 +80,7 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
public Kafka010Fetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
......@@ -97,8 +95,7 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
......@@ -141,7 +138,7 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
......@@ -204,12 +201,11 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
@SuppressWarnings("unchecked")
List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates();
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());
for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions) {
Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
if (lastProcessedOffset != null) {
checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");
......
......@@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* to the KafkaConsumer calls that change signature.
*/
@Internal
public class KafkaConsumerThread extends Thread {
public class KafkaConsumerThread<T> extends Thread {
/** Logger for this consumer. */
private final Logger log;
......@@ -82,7 +82,7 @@ public class KafkaConsumerThread extends Thread {
private final Properties kafkaProperties;
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
private final ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> unassignedPartitionsQueue;
/** The maximum number of milliseconds to wait for a fetch batch. */
private final long pollTimeout;
......@@ -128,7 +128,7 @@ public class KafkaConsumerThread extends Thread {
Logger log,
Handover handover,
Properties kafkaProperties,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> unassignedPartitionsQueue,
String threadName,
long pollTimeout,
boolean useMetrics,
......@@ -214,7 +214,7 @@ public class KafkaConsumerThread extends Thread {
// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions;
// main fetch loop
while (running) {
......@@ -391,7 +391,7 @@ public class KafkaConsumerThread extends Thread {
* <p>This method is exposed for testing purposes.
*/
@VisibleForTesting
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
void reassignPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions) throws Exception {
if (newPartitions.size() == 0) {
return;
}
......@@ -433,7 +433,7 @@ public class KafkaConsumerThread extends Thread {
// been replaced with actual offset values yet, or
// (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value represent.
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
for (KafkaTopicPartitionState<T, TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
......@@ -472,7 +472,7 @@ public class KafkaConsumerThread extends Thread {
hasBufferedWakeup = false;
// re-add all new partitions back to the unassigned partitions queue to be picked up again
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<T, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -545,9 +545,9 @@ public class KafkaConsumerThread extends Thread {
// Utilities
// ------------------------------------------------------------------------
private static List<TopicPartition> convertKafkaPartitions(List<KafkaTopicPartitionState<TopicPartition>> partitions) {
private static <T> List<TopicPartition> convertKafkaPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> partitions) {
ArrayList<TopicPartition> result = new ArrayList<>(partitions.size());
for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
for (KafkaTopicPartitionState<T, TopicPartition> p : partitions) {
result.add(p.getKafkaPartitionHandle());
}
return result;
......
......@@ -120,8 +120,7 @@ public class Kafka010FetcherTest {
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
null, /* watermark strategy */
new TestProcessingTimeService(),
10,
getClass().getClassLoader(),
......@@ -257,8 +256,7 @@ public class Kafka010FetcherTest {
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
null, /* watermark strategy */
new TestProcessingTimeService(),
10,
getClass().getClassLoader(),
......@@ -372,8 +370,7 @@ public class Kafka010FetcherTest {
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic watermark extractor */
null, /* punctuated watermark extractor */
null, /* watermark strategy */
new TestProcessingTimeService(),
10, /* watermark interval */
this.getClass().getClassLoader(),
......
......@@ -96,10 +96,10 @@ public class KafkaConsumerThreadTest {
// setup latch so the test waits until testThread is blocked on getBatchBlocking method
final MultiShotLatch getBatchBlockingInvoked = new MultiShotLatch();
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>() {
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>>() {
@Override
public List<KafkaTopicPartitionState<TopicPartition>> getBatchBlocking() throws InterruptedException {
public List<KafkaTopicPartitionState<Object, TopicPartition>> getBatchBlocking() throws InterruptedException {
getBatchBlockingInvoked.trigger();
return super.getBatchBlocking();
}
......@@ -129,15 +129,15 @@ public class KafkaConsumerThreadTest {
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(31L);
final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
final List<KafkaTopicPartitionState<Object, TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
......@@ -155,10 +155,10 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -175,7 +175,7 @@ public class KafkaConsumerThreadTest {
assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
......@@ -202,15 +202,15 @@ public class KafkaConsumerThreadTest {
// -------- new partitions with undefined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
final List<KafkaTopicPartitionState<Object, TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
......@@ -233,10 +233,10 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -257,7 +257,7 @@ public class KafkaConsumerThreadTest {
assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
......@@ -284,25 +284,25 @@ public class KafkaConsumerThreadTest {
// -------- old partitions --------
KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
oldPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
oldPartition2.setOffset(32L);
List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
List<KafkaTopicPartitionState<Object, TopicPartition>> oldPartitions = new ArrayList<>(2);
oldPartitions.add(oldPartition1);
oldPartitions.add(oldPartition2);
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
newPartition.setOffset(29L);
List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
List<KafkaTopicPartitionState<Object, TopicPartition>> totalPartitions = new ArrayList<>(3);
totalPartitions.add(oldPartition1);
totalPartitions.add(oldPartition2);
totalPartitions.add(newPartition);
......@@ -311,7 +311,7 @@ public class KafkaConsumerThreadTest {
// has initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> oldPartition : oldPartitions) {
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
}
......@@ -324,7 +324,7 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
unassignedPartitionsQueue.add(newPartition);
......@@ -343,7 +343,7 @@ public class KafkaConsumerThreadTest {
assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
// old partitions should be re-seeked to their previous positions
for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> partition : totalPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
......@@ -370,25 +370,25 @@ public class KafkaConsumerThreadTest {
// -------- old partitions --------
KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
oldPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
oldPartition2.setOffset(32L);
List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
List<KafkaTopicPartitionState<Object, TopicPartition>> oldPartitions = new ArrayList<>(2);
oldPartitions.add(oldPartition1);
oldPartitions.add(oldPartition2);
// -------- new partitions with undefined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
List<KafkaTopicPartitionState<Object, TopicPartition>> totalPartitions = new ArrayList<>(3);
totalPartitions.add(oldPartition1);
totalPartitions.add(oldPartition2);
totalPartitions.add(newPartition);
......@@ -397,7 +397,7 @@ public class KafkaConsumerThreadTest {
// has initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> oldPartition : oldPartitions) {
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
}
......@@ -414,7 +414,7 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
unassignedPartitionsQueue.add(newPartition);
......@@ -436,7 +436,7 @@ public class KafkaConsumerThreadTest {
assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
// old partitions should be re-seeked to their previous positions
for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> partition : totalPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
......@@ -466,21 +466,21 @@ public class KafkaConsumerThreadTest {
// -------- old partitions --------
KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
oldPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
oldPartition2.setOffset(32L);
List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
List<KafkaTopicPartitionState<Object, TopicPartition>> oldPartitions = new ArrayList<>(2);
oldPartitions.add(oldPartition1);
oldPartitions.add(oldPartition2);
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
newPartition.setOffset(29L);
......@@ -488,7 +488,7 @@ public class KafkaConsumerThreadTest {
// initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsToPositions = new LinkedHashMap<>();
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> oldPartition : oldPartitions) {
mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
}
......@@ -501,7 +501,7 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
unassignedPartitionsQueue.add(newPartition);
......@@ -525,7 +525,7 @@ public class KafkaConsumerThreadTest {
assertEquals(oldPartitions.size(), mockConsumerAssignmentsToPositions.size());
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> oldPartition : oldPartitions) {
assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle()));
assertEquals(
oldPartition.getOffset() + 1,
......@@ -554,15 +554,15 @@ public class KafkaConsumerThreadTest {
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
List<KafkaTopicPartitionState<Object, TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
......@@ -585,10 +585,10 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -634,15 +634,15 @@ public class KafkaConsumerThreadTest {
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
List<KafkaTopicPartitionState<Object, TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
......@@ -669,10 +669,10 @@ public class KafkaConsumerThreadTest {
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -700,7 +700,7 @@ public class KafkaConsumerThreadTest {
assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
......@@ -742,17 +742,17 @@ public class KafkaConsumerThreadTest {
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
KafkaTopicPartitionState<Object, TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(1);
List<KafkaTopicPartitionState<Object, TopicPartition>> newPartitions = new ArrayList<>(1);
newPartitions.add(newPartition1);
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
final ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<Object, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -808,7 +808,7 @@ public class KafkaConsumerThreadTest {
* partition reassignment, so that tests are eligible to setup various conditions before the reassignment happens
* and inspect reassignment results after it is completed.
*/
private static class TestKafkaConsumerThread extends KafkaConsumerThread {
private static class TestKafkaConsumerThread extends KafkaConsumerThread<Object> {
private final Consumer<byte[], byte[]> mockConsumer;
private final MultiShotLatch preReassignmentLatch = new MultiShotLatch();
......@@ -818,7 +818,7 @@ public class KafkaConsumerThreadTest {
public TestKafkaConsumerThread(
Consumer<byte[], byte[]> mockConsumer,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue,
Handover handover) {
super(
......@@ -858,7 +858,7 @@ public class KafkaConsumerThreadTest {
}
@Override
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
void reassignPartitions(List<KafkaTopicPartitionState<Object, TopicPartition>> newPartitions) throws Exception {
// triggers blocking calls on waitPartitionReassignmentInvoked()
preReassignmentLatch.trigger();
......@@ -1103,7 +1103,7 @@ public class KafkaConsumerThreadTest {
public TestKafkaConsumerThreadRateLimit(Logger log,
Handover handover, Properties kafkaProperties,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
ClosableBlockingQueue<KafkaTopicPartitionState<Object, TopicPartition>> unassignedPartitionsQueue,
String threadName, long pollTimeout,
boolean useMetrics, MetricGroup consumerMetricGroup,
MetricGroup subtaskMetricGroup,
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
......@@ -53,6 +54,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
......@@ -127,15 +130,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
* The assigner is kept in serialized form, to deserialize it into multiple copies. */
private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
* The assigner is kept in serialized form, to deserialize it into multiple copies. */
private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
/**
* Optional watermark strategy that will be run per Kafka partition, to exploit per-partition
* timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
* it into multiple copies.
*/
private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
/**
* User-set flag determining whether or not to commit on checkpoints.
......@@ -273,6 +273,39 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// Configuration
// ------------------------------------------------------------------------
/**
* Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign
* timestamps to records and generates watermarks to signal event time progress.
*
* <p>Running timestamp extractors / watermark generators directly inside the Kafka source
* (which you can do by using this method), per Kafka partition, allows users to let them
* exploit the per-partition characteristics.
*
* <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
* the streams from the partitions are unioned in a "first come first serve" fashion.
* Per-partition characteristics are usually lost that way. For example, if the timestamps are
* strictly ascending per Kafka partition, they will not be strictly ascending in the resulting
* Flink DataStream, if the parallel source subtask reads more than one partition.
*
* <p>Common watermark generation patterns can be found in the
* {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} class.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
checkNotNull(watermarkStrategy);
try {
ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
} catch (Exception e) {
throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
}
return this;
}
/**
* Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
* The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
......@@ -290,19 +323,29 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* <p>This method uses the deprecated watermark generator interfaces. Please switch to
* {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the
* new interfaces instead. The new interfaces support watermark idleness and no longer need
* to differentiate between "periodic" and "punctuated" watermarks.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
*
* @param assigner The timestamp assigner / watermark generator to use.
* @return The consumer object, to allow function chaining.
*/
@Deprecated
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.periodicWatermarkAssigner != null) {
throw new IllegalStateException("A periodic watermark emitter has already been set.");
if (this.watermarkStrategy != null) {
throw new IllegalStateException("Some watermark strategy has already been set.");
}
try {
ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
return this;
final WatermarkStrategy<T> wms = new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner);
return assignTimestampsAndWatermarks(wms);
} catch (Exception e) {
throw new IllegalArgumentException("The given assigner is not serializable", e);
}
......@@ -325,19 +368,29 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* <p>This method uses the deprecated watermark generator interfaces. Please switch to
* {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the
* new interfaces instead. The new interfaces support watermark idleness and no longer need
* to differentiate between "periodic" and "punctuated" watermarks.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
*
* @param assigner The timestamp assigner / watermark generator to use.
* @return The consumer object, to allow function chaining.
*/
@Deprecated
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.punctuatedWatermarkAssigner != null) {
throw new IllegalStateException("A punctuated watermark emitter has already been set.");
if (this.watermarkStrategy != null) {
throw new IllegalStateException("Some watermark strategy has already been set.");
}
try {
ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
return this;
final WatermarkStrategy<T> wms = new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner);
return assignTimestampsAndWatermarks(wms);
} catch (Exception e) {
throw new IllegalArgumentException("The given assigner is not serializable", e);
}
......@@ -696,8 +749,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
watermarkStrategy,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
......@@ -993,8 +1045,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
*
* @param sourceContext The source context to emit data to.
* @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets.
* @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
* @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
* @param watermarkStrategy Optional, a serialized WatermarkStrategy.
* @param runtimeContext The task's runtime context.
*
* @return The instantiated fetcher
......@@ -1004,8 +1055,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
protected abstract AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup kafkaMetricGroup,
......
......@@ -19,12 +19,12 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
......@@ -64,20 +64,30 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public abstract class AbstractFetcher<T, KPH> {
private static final int NO_TIMESTAMPS_WATERMARKS = 0;
private static final int PERIODIC_WATERMARKS = 1;
private static final int PUNCTUATED_WATERMARKS = 2;
private static final int WITH_WATERMARK_GENERATOR = 1;
// ------------------------------------------------------------------------
/** The source context to emit records and watermarks to. */
protected final SourceContext<T> sourceContext;
/**
* Wrapper around our SourceContext for allowing the {@link org.apache.flink.api.common.eventtime.WatermarkGenerator}
* to emit watermarks and mark idleness.
*/
protected final WatermarkOutput watermarkOutput;
/**
* {@link WatermarkOutputMultiplexer} for supporting per-partition watermark generation.
*/
private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
/** The lock that guarantees that record emission and state updates are atomic,
* from the view of taking a checkpoint. */
private final Object checkpointLock;
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
private final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates;
/**
* Queue of partitions that are not yet assigned to any Kafka clients for consuming.
......@@ -88,31 +98,21 @@ public abstract class AbstractFetcher<T, KPH> {
* <p>All partitions added to this queue are guaranteed to have been added
* to {@link #subscribedPartitionStates} already.
*/
protected final ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> unassignedPartitionsQueue;
protected final ClosableBlockingQueue<KafkaTopicPartitionState<T, KPH>> unassignedPartitionsQueue;
/** The mode describing whether the fetcher also generates timestamps and watermarks. */
private final int timestampWatermarkMode;
/**
* Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
* The assigner is kept in serialized form, to deserialize it into multiple copies.
*/
private final SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic;
/**
* Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
* The assigner is kept in serialized form, to deserialize it into multiple copies.
* Optional watermark strategy that will be run per Kafka partition, to exploit per-partition
* timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
* it into multiple copies.
*/
private final SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated;
private final SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
/** User class loader used to deserialize watermark assigners. */
private final ClassLoader userCodeClassLoader;
/** Only relevant for punctuated watermarks: The current cross partition watermark. */
private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
......@@ -141,14 +141,15 @@ public abstract class AbstractFetcher<T, KPH> {
protected AbstractFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
this.sourceContext = checkNotNull(sourceContext);
this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext);
this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
this.checkpointLock = sourceContext.getCheckpointLock();
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
......@@ -157,23 +158,12 @@ public abstract class AbstractFetcher<T, KPH> {
this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);
// figure out what we watermark mode we will be using
this.watermarksPeriodic = watermarksPeriodic;
this.watermarksPunctuated = watermarksPunctuated;
this.watermarkStrategy = watermarkStrategy;
if (watermarksPeriodic == null) {
if (watermarksPunctuated == null) {
// simple case, no watermarks involved
timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
} else {
timestampWatermarkMode = PUNCTUATED_WATERMARKS;
}
if (watermarkStrategy == null) {
timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
} else {
if (watermarksPunctuated == null) {
timestampWatermarkMode = PERIODIC_WATERMARKS;
} else {
throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
}
timestampWatermarkMode = WITH_WATERMARK_GENERATOR;
}
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
......@@ -182,19 +172,18 @@ public abstract class AbstractFetcher<T, KPH> {
this.subscribedPartitionStates = createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
userCodeClassLoader);
// check that all seed partition states have a defined offset
for (KafkaTopicPartitionState<?> partitionState : subscribedPartitionStates) {
for (KafkaTopicPartitionState<?, ?> partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
}
// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
for (KafkaTopicPartitionState<T, KPH> partition : subscribedPartitionStates) {
unassignedPartitionsQueue.add(partition);
}
......@@ -204,11 +193,10 @@ public abstract class AbstractFetcher<T, KPH> {
}
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@SuppressWarnings("unchecked")
PeriodicWatermarkEmitter<KPH> periodicEmitter = new PeriodicWatermarkEmitter(
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T, KPH> periodicEmitter = new PeriodicWatermarkEmitter<>(
subscribedPartitionStates,
sourceContext,
watermarkOutputMultiplexer,
processingTimeProvider,
autoWatermarkInterval);
......@@ -230,19 +218,18 @@ public abstract class AbstractFetcher<T, KPH> {
* @param newPartitions discovered partitions to add
*/
public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException {
List<KafkaTopicPartitionState<KPH>> newPartitionStates = createPartitionStateHolders(
List<KafkaTopicPartitionState<T, KPH>> newPartitionStates = createPartitionStateHolders(
newPartitions,
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
userCodeClassLoader);
if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, newPartitionStates);
}
for (KafkaTopicPartitionState<KPH> newPartitionState : newPartitionStates) {
for (KafkaTopicPartitionState<T, KPH> newPartitionState : newPartitionStates) {
// the ordering is crucial here; first register the state holder, then
// push it to the partitions queue to be read
subscribedPartitionStates.add(newPartitionState);
......@@ -259,7 +246,7 @@ public abstract class AbstractFetcher<T, KPH> {
*
* @return All subscribed partitions.
*/
protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates() {
protected final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates() {
return subscribedPartitionStates;
}
......@@ -333,7 +320,7 @@ public abstract class AbstractFetcher<T, KPH> {
assert Thread.holdsLock(checkpointLock);
HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.size());
for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
for (KafkaTopicPartitionState<T, KPH> partition : subscribedPartitionStates) {
state.put(partition.getKafkaTopicPartition(), partition.getOffset());
}
return state;
......@@ -352,7 +339,7 @@ public abstract class AbstractFetcher<T, KPH> {
*/
protected void emitRecordsWithTimestamps(
Queue<T> records,
KafkaTopicPartitionState<KPH> partitionState,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset,
long kafkaEventTimestamp) {
// emit the records, using the checkpoint lock to guarantee
......@@ -360,84 +347,29 @@ public abstract class AbstractFetcher<T, KPH> {
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
// timestamps will be of the same size as records.
long timestamp = getTimestampForRecord(record, partitionState, kafkaEventTimestamp);
sourceContext.collectWithTimestamp(record, timestamp);
if (timestampWatermarkMode == PUNCTUATED_WATERMARKS) {
emitPunctuatedWatermark(record, timestamp, partitionState);
}
}
partitionState.setOffset(offset);
}
}
private void emitPunctuatedWatermark(
T record,
long timestamp,
KafkaTopicPartitionState<KPH> partitionState) {
final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
// if we also have a new per-partition watermark, check if that is also a
// new cross-partition watermark
if (newWatermark != null) {
updateMinPunctuatedWatermark(newWatermark);
}
}
protected long getTimestampForRecord(
T record,
KafkaTopicPartitionState<KPH> partitionState,
long kafkaEventTimestamp) {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
return kafkaEventTimestamp;
} else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
// extract timestamp - this accesses/modifies the per-partition state inside the
// watermark generator instance, so we need to lock the access on the
// partition state. concurrent access can happen from the periodic emitter
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (withWatermarksState) {
return withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
}
} else {
final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
// only one thread ever works on accessing timestamps and watermarks
// from the punctuated extractor
return withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
}
}
long timestamp;
/**
*Checks whether a new per-partition watermark is also a new cross-partition watermark.
*/
private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
long newMin = Long.MAX_VALUE;
for (KafkaTopicPartitionState<?> state : subscribedPartitionStates) {
@SuppressWarnings("unchecked")
final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
}
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (partitionState) {
// You would expect that we don't have to do this under lock. You would be wrong:
// A WatermarkStrategy can wrap an old-style combined
// timestamp extractor/watermark assigner, in which case the TimestampAssigner and
// WatermarkGenerator wrap one and the same object, where extracting the timestamp
// updates the internal state of the assigner.
timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
}
sourceContext.collectWithTimestamp(record, timestamp);
// double-check locking pattern
if (newMin > maxWatermarkSoFar) {
synchronized (checkpointLock) {
if (newMin > maxWatermarkSoFar) {
maxWatermarkSoFar = newMin;
sourceContext.emitWatermark(new Watermark(newMin));
}
// TODO: not sure it's a good idea to split it into two Synchronized blocks, but
// we have to move the onEvent after the collect call, otherwise the WM would
// be emitted before the record
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (partitionState) {
partitionState.onEvent(record, timestamp);
}
}
partitionState.setOffset(offset);
}
}
......@@ -449,16 +381,15 @@ public abstract class AbstractFetcher<T, KPH> {
* Utility method that takes the topic partitions and creates the topic partition state
* holders, depending on the timestamp / watermark mode.
*/
private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
private List<KafkaTopicPartitionState<T, KPH>> createPartitionStateHolders(
Map<KafkaTopicPartition, Long> partitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
// CopyOnWrite as adding discovered partitions could happen in parallel
// while different threads iterate the partitions list
List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
List<KafkaTopicPartitionState<T, KPH>> partitionStates = new CopyOnWriteArrayList<>();
switch (timestampWatermarkMode) {
case NO_TIMESTAMPS_WATERMARKS: {
......@@ -466,7 +397,7 @@ public abstract class AbstractFetcher<T, KPH> {
// create the kafka version specific partition handle
KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
KafkaTopicPartitionState<KPH> partitionState =
KafkaTopicPartitionState<T, KPH> partitionState =
new KafkaTopicPartitionState<>(partitionEntry.getKey(), kafkaHandle);
partitionState.setOffset(partitionEntry.getValue());
......@@ -476,18 +407,26 @@ public abstract class AbstractFetcher<T, KPH> {
return partitionStates;
}
case PERIODIC_WATERMARKS: {
case WITH_WATERMARK_GENERATOR: {
for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
userCodeClassLoader);
AssignerWithPeriodicWatermarks<T> assignerInstance =
watermarksPeriodic.deserializeValue(userCodeClassLoader);
int outputId = watermarkOutputMultiplexer.registerNewOutput();
WatermarkOutput immediateOutput =
watermarkOutputMultiplexer.getImmediateOutput(outputId);
WatermarkOutput deferredOutput =
watermarkOutputMultiplexer.getDeferredOutput(outputId);
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> partitionState =
new KafkaTopicPartitionStateWithWatermarkGenerator<>(
partitionEntry.getKey(),
kafkaHandle,
assignerInstance);
deserializedWatermarkStrategy.createTimestampAssigner(() -> consumerMetricGroup),
deserializedWatermarkStrategy.createWatermarkGenerator(() -> consumerMetricGroup),
immediateOutput,
deferredOutput);
partitionState.setOffset(partitionEntry.getValue());
......@@ -497,26 +436,6 @@ public abstract class AbstractFetcher<T, KPH> {
return partitionStates;
}
case PUNCTUATED_WATERMARKS: {
for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
AssignerWithPunctuatedWatermarks<T> assignerInstance =
watermarksPunctuated.deserializeValue(userCodeClassLoader);
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partitionState =
new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
partitionEntry.getKey(),
kafkaHandle,
assignerInstance);
partitionState.setOffset(partitionEntry.getValue());
partitionStates.add(partitionState);
}
return partitionStates;
}
default:
// cannot happen, add this as a guard for the future
throw new RuntimeException();
......@@ -524,15 +443,14 @@ public abstract class AbstractFetcher<T, KPH> {
}
/**
* Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, SerializedValue, ClassLoader)}
* Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, ClassLoader)}
* that uses the same offset for all partitions when creating their state holders.
*/
private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
private List<KafkaTopicPartitionState<T, KPH>> createPartitionStateHolders(
List<KafkaTopicPartition> partitions,
long initialOffset,
int timestampWatermarkMode,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
Map<KafkaTopicPartition, Long> partitionsToInitialOffset = new HashMap<>(partitions.size());
......@@ -543,8 +461,7 @@ public abstract class AbstractFetcher<T, KPH> {
return createPartitionStateHolders(
partitionsToInitialOffset,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
userCodeClassLoader);
}
......@@ -562,9 +479,9 @@ public abstract class AbstractFetcher<T, KPH> {
*/
private void registerOffsetMetrics(
MetricGroup consumerMetricGroup,
List<KafkaTopicPartitionState<KPH>> partitionOffsetStates) {
List<KafkaTopicPartitionState<T, KPH>> partitionOffsetStates) {
for (KafkaTopicPartitionState<KPH> ktp : partitionOffsetStates) {
for (KafkaTopicPartitionState<T, KPH> ktp : partitionOffsetStates) {
MetricGroup topicPartitionGroup = consumerMetricGroup
.addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, ktp.getTopic())
.addGroup(OFFSETS_BY_PARTITION_METRICS_GROUP, Integer.toString(ktp.getPartition()));
......@@ -577,7 +494,7 @@ public abstract class AbstractFetcher<T, KPH> {
}
}
private static String getLegacyOffsetsMetricsGaugeName(KafkaTopicPartitionState<?> ktp) {
private static String getLegacyOffsetsMetricsGaugeName(KafkaTopicPartitionState<?, ?> ktp) {
return ktp.getTopic() + "-" + ktp.getPartition();
}
......@@ -594,10 +511,10 @@ public abstract class AbstractFetcher<T, KPH> {
*/
private static class OffsetGauge implements Gauge<Long> {
private final KafkaTopicPartitionState<?> ktp;
private final KafkaTopicPartitionState<?, ?> ktp;
private final OffsetGaugeType gaugeType;
OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
OffsetGauge(KafkaTopicPartitionState<?, ?> ktp, OffsetGaugeType gaugeType) {
this.ktp = ktp;
this.gaugeType = gaugeType;
}
......@@ -620,30 +537,27 @@ public abstract class AbstractFetcher<T, KPH> {
* The periodic watermark emitter. In its given interval, it checks all partitions for
* the current event time watermark, and possibly emits the next watermark.
*/
private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback {
private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeCallback {
private final List<KafkaTopicPartitionState<KPH>> allPartitions;
private final List<KafkaTopicPartitionState<T, KPH>> allPartitions;
private final SourceContext<?> emitter;
private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
private final ProcessingTimeService timerService;
private final long interval;
private long lastWatermarkTimestamp;
//-------------------------------------------------
PeriodicWatermarkEmitter(
List<KafkaTopicPartitionState<KPH>> allPartitions,
SourceContext<?> emitter,
List<KafkaTopicPartitionState<T, KPH>> allPartitions,
WatermarkOutputMultiplexer watermarkOutputMultiplexer,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
this.allPartitions = checkNotNull(allPartitions);
this.emitter = checkNotNull(emitter);
this.watermarkOutputMultiplexer = watermarkOutputMultiplexer;
this.timerService = checkNotNull(timerService);
this.interval = autoWatermarkInterval;
this.lastWatermarkTimestamp = Long.MIN_VALUE;
}
//-------------------------------------------------
......@@ -653,29 +567,20 @@ public abstract class AbstractFetcher<T, KPH> {
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
public void onProcessingTime(long timestamp) {
long minAcrossAll = Long.MAX_VALUE;
boolean isEffectiveMinAggregation = false;
for (KafkaTopicPartitionState<?> state : allPartitions) {
for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
// we access the current watermark for the periodic assigners under the state
// lock, to prevent concurrent modification to any internal variables
final long curr;
// we do this under the state lock, to prevent concurrent modification to any
// internal variables
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (state) {
curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();
state.onPeriodicEmit();
}
minAcrossAll = Math.min(minAcrossAll, curr);
isEffectiveMinAggregation = true;
}
// emit next watermark, if there is one
if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {
lastWatermarkTimestamp = minAcrossAll;
emitter.emitWatermark(new Watermark(minAcrossAll));
}
watermarkOutputMultiplexer.onPeriodicEmit();
// schedule the next watermark
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
......
......@@ -30,7 +30,7 @@ import org.apache.flink.annotation.Internal;
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
*/
@Internal
public class KafkaTopicPartitionState<KPH> {
public class KafkaTopicPartitionState<T, KPH> {
// ------------------------------------------------------------------------
......@@ -106,6 +106,19 @@ public class KafkaTopicPartitionState<KPH> {
return committedOffset;
}
public long extractTimestamp(T record, long kafkaEventTimestamp) {
return kafkaEventTimestamp;
}
public void onEvent(T event, long timestamp) {
// do nothing
}
public void onPeriodicEmit() {
// do nothing
}
// ------------------------------------------------------------------------
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
/**
* A special version of the per-kafka-partition-state that additionally holds
* a punctuated watermark generator (and timestamp extractor) per partition.
*
* <p>This class is not thread safe, but it gives volatile access to the current
* partition watermark ({@link #getCurrentPartitionWatermark()}).
*
* @param <T> The type of records handled by the watermark generator
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
*/
@Internal
public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
/** The timestamp assigner and watermark generator for the partition. */
private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
/** The last watermark timestamp generated by this partition. */
private volatile long partitionWatermark;
// ------------------------------------------------------------------------
public KafkaTopicPartitionStateWithPunctuatedWatermarks(
KafkaTopicPartition partition, KPH kafkaPartitionHandle,
AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks) {
super(partition, kafkaPartitionHandle);
this.timestampsAndWatermarks = timestampsAndWatermarks;
this.partitionWatermark = Long.MIN_VALUE;
}
// ------------------------------------------------------------------------
public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
}
@Nullable
public Watermark checkAndGetNewWatermark(T record, long timestamp) {
Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
if (mark != null && mark.getTimestamp() > partitionWatermark) {
partitionWatermark = mark.getTimestamp();
return mark;
}
else {
return null;
}
}
public long getCurrentPartitionWatermark() {
return partitionWatermark;
}
// ------------------------------------------------------------------------
@Override
public String toString() {
return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
}
}
......@@ -18,48 +18,72 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
/**
* A special version of the per-kafka-partition-state that additionally holds
* a periodic watermark generator (and timestamp extractor) per partition.
* A special version of the per-kafka-partition-state that additionally holds a {@link
* TimestampAssigner}, {@link WatermarkGenerator}, an immediate {@link WatermarkOutput}, and a
* deferred {@link WatermarkOutput} for this partition.
*
* <p>See {@link org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer} for an
* explanation
* of immediate and deferred {@link WatermarkOutput WatermarkOutputs.}.
*
* @param <T> The type of records handled by the watermark generator
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
*/
@Internal
public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
public final class KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> extends KafkaTopicPartitionState<T, KPH> {
private final TimestampAssigner<T> timestampAssigner;
/** The timestamp assigner and watermark generator for the partition. */
private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
private final WatermarkGenerator<T> watermarkGenerator;
/** The last watermark timestamp generated by this partition. */
private long partitionWatermark;
/**
* Refer to {@link org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer} for
* a description of immediate/deferred output.
*/
private final WatermarkOutput immediateOutput;
/**
* Refer to {@link org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer} for
* a description of immediate/deferred output.
*/
private final WatermarkOutput deferredOutput;
// ------------------------------------------------------------------------
public KafkaTopicPartitionStateWithPeriodicWatermarks(
public KafkaTopicPartitionStateWithWatermarkGenerator(
KafkaTopicPartition partition, KPH kafkaPartitionHandle,
AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks) {
TimestampAssigner<T> timestampAssigner,
WatermarkGenerator<T> watermarkGenerator,
WatermarkOutput immediateOutput,
WatermarkOutput deferredOutput) {
super(partition, kafkaPartitionHandle);
this.timestampsAndWatermarks = timestampsAndWatermarks;
this.partitionWatermark = Long.MIN_VALUE;
this.timestampAssigner = timestampAssigner;
this.watermarkGenerator = watermarkGenerator;
this.immediateOutput = immediateOutput;
this.deferredOutput = deferredOutput;
}
// ------------------------------------------------------------------------
public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
@Override
public long extractTimestamp(T record, long kafkaEventTimestamp) {
return timestampAssigner.extractTimestamp(record, kafkaEventTimestamp);
}
@Override
public void onEvent(T event, long timestamp) {
watermarkGenerator.onEvent(event, timestamp, immediateOutput);
}
public long getCurrentWatermarkTimestamp() {
Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
if (wm != null) {
partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
}
return partitionWatermark;
@Override
public void onPeriodicEmit() {
watermarkGenerator.onPeriodicEmit(deferredOutput);
}
// ------------------------------------------------------------------------
......@@ -67,6 +91,6 @@ public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extend
@Override
public String toString() {
return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+ ", offset=" + getOffset();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
/**
* A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link
* org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
*/
public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput {
private final SourceContext<T> sourceContext;
public SourceContextWatermarkOutputAdapter(SourceContext<T> sourceContext) {
this.sourceContext = sourceContext;
}
@Override
public void emitWatermark(Watermark watermark) {
sourceContext.emitWatermark(
new org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp()));
}
@Override
public void markIdle() {
sourceContext.markAsTemporarilyIdle();
}
}
......@@ -18,12 +18,11 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
......@@ -395,8 +394,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
......
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
......@@ -990,8 +991,24 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
private volatile boolean isRunning = true;
protected TestingFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
super(sourceContext, seedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics);
protected TestingFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
super(
sourceContext,
seedPartitionsWithInitialOffsets,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
consumerMetricGroup,
useMetrics);
}
@Override
......@@ -1067,8 +1084,15 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
}
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(SupplierWithException<AbstractFetcher<T, ?>, Exception> abstractFetcherSupplier, AbstractPartitionDiscoverer abstractPartitionDiscoverer, long discoveryIntervalMillis) {
this(abstractFetcherSupplier, abstractPartitionDiscoverer, false, discoveryIntervalMillis);
DummyFlinkKafkaConsumer(
SupplierWithException<AbstractFetcher<T, ?>, Exception> abstractFetcherSupplier,
AbstractPartitionDiscoverer abstractPartitionDiscoverer,
long discoveryIntervalMillis) {
this(
abstractFetcherSupplier,
abstractPartitionDiscoverer,
false,
discoveryIntervalMillis);
}
@SuppressWarnings("unchecked")
......@@ -1134,12 +1158,10 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
}
@Override
@SuppressWarnings("unchecked")
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
......@@ -1184,9 +1206,23 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
}
@Override
protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
return new TestingFetcher<T, String>(sourceContext, thisSubtaskPartitionsWithStartOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), 0L, getClass().getClassLoader(), consumerMetricGroup, useMetrics);
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
return new TestingFetcher<T, String>(
sourceContext,
thisSubtaskPartitionsWithStartOffsets,
watermarkStrategy,
runtimeContext.getProcessingTimeService(),
0L,
getClass().getClassLoader(),
consumerMetricGroup,
useMetrics);
}
@Override
......@@ -1282,8 +1318,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
super(
new TestSourceContext<>(),
new HashMap<>(),
null,
null,
null /* watermark strategy */,
new TestProcessingTimeService(),
0,
MockFetcher.class.getClassLoader(),
......
......@@ -18,13 +18,11 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
......@@ -33,19 +31,15 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
......@@ -65,12 +59,11 @@ public class AbstractFetcherTest {
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
null,
null,
new TestProcessingTimeService(),
0);
sourceContext,
originalPartitions,
null, /* watermark strategy */
new TestProcessingTimeService(),
0);
synchronized (sourceContext.getCheckpointLock()) {
HashMap<KafkaTopicPartition, Long> currentState = fetcher.snapshotCurrentState();
......@@ -103,14 +96,13 @@ public class AbstractFetcherTest {
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
null, /* periodic watermark assigner */
null, /* punctuated watermark assigner */
new TestProcessingTimeService(),
0);
sourceContext,
originalPartitions,
null, /* watermark strategy */
new TestProcessingTimeService(),
0);
final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0);
final KafkaTopicPartitionState<Long, Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0);
emitRecord(fetcher, 1L, partitionStateHolder, 1L);
emitRecord(fetcher, 2L, partitionStateHolder, 2L);
......@@ -123,278 +115,6 @@ public class AbstractFetcherTest {
assertEquals(3L, partitionStateHolder.getOffset()); // the offset in state still should have advanced
}
@Test
public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
null, /* periodic watermark assigner */
new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), /* punctuated watermark assigner */
processingTimeProvider,
0);
final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0);
// elements generate a watermark if the timestamp is a multiple of three
emitRecord(fetcher, 1L, partitionStateHolder, 1L);
emitRecord(fetcher, 2L, partitionStateHolder, 2L);
emitRecord(fetcher, 3L, partitionStateHolder, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
assertEquals(3L, partitionStateHolder.getOffset());
// emit no records
fetcher.emitRecordsWithTimestamps(emptyQueue(), partitionStateHolder, 4L, -1L);
// no elements or watermarks should have been collected
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// the offset in state still should have advanced
assertEquals(4L, partitionStateHolder.getOffset());
}
@Test
public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), /* periodic watermark assigner */
null, /* punctuated watermark assigner */
processingTimeProvider,
10);
final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0);
// elements generate a watermark if the timestamp is a multiple of three
emitRecord(fetcher, 1L, partitionStateHolder, 1L);
emitRecord(fetcher, 2L, partitionStateHolder, 2L);
emitRecord(fetcher, 3L, partitionStateHolder, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertEquals(3L, partitionStateHolder.getOffset());
// advance timer for watermark emitting
processingTimeProvider.setCurrentTime(10L);
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// emit no records
fetcher.emitRecordsWithTimestamps(emptyQueue(), partitionStateHolder, 4L, Long.MIN_VALUE);
// no elements should have been collected
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
// the offset in state still should have advanced
assertEquals(4L, partitionStateHolder.getOffset());
// no watermarks should be collected
processingTimeProvider.setCurrentTime(20L);
assertFalse(sourceContext.hasWatermark());
}
// ------------------------------------------------------------------------
// Timestamps & watermarks tests
// ------------------------------------------------------------------------
@Test
public void testPunctuatedWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
null, /* periodic watermark assigner */
new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
processingTimeProvider,
0);
final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates().get(0);
final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates().get(1);
final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates().get(2);
// elements generate a watermark if the timestamp is a multiple of three
// elements for partition 1
emitRecords(fetcher, Arrays.asList(1L, 2L), part1, 1L);
emitRecord(fetcher, 2L, part1, 2L);
emitRecords(fetcher, Arrays.asList(2L, 3L), part1, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// elements for partition 2
emitRecord(fetcher, 12L, part2, 1L);
assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// elements for partition 3
emitRecord(fetcher, 101L, part3, 1L);
emitRecord(fetcher, 102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
// now, we should have a watermark
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 3
emitRecord(fetcher, 1003L, part3, 3L);
emitRecord(fetcher, 1004L, part3, 4L);
emitRecord(fetcher, 1005L, part3, 5L);
assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
// advance partition 1 beyond partition 2 - this bumps the watermark
emitRecord(fetcher, 30L, part1, 4L);
assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
assertTrue(sourceContext.hasWatermark());
assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 2 again - this bumps the watermark
emitRecord(fetcher, 13L, part2, 2L);
assertFalse(sourceContext.hasWatermark());
emitRecord(fetcher, 14L, part2, 3L);
assertFalse(sourceContext.hasWatermark());
emitRecord(fetcher, 15L, part2, 3L);
assertTrue(sourceContext.hasWatermark());
assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
}
@Test
public void testPeriodicWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
null, /* punctuated watermarks assigner*/
processingTimeService,
10);
final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates().get(0);
final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates().get(1);
final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates().get(2);
// elements generate a watermark if the timestamp is a multiple of three
// elements for partition 1
emitRecord(fetcher, 1L, part1, 1L);
emitRecord(fetcher, 2L, part1, 2L);
emitRecord(fetcher, 3L, part1, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
// elements for partition 2
emitRecord(fetcher, 12L, part2, 1L);
assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
// elements for partition 3
emitRecord(fetcher, 101L, part3, 1L);
emitRecord(fetcher, 102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
processingTimeService.setCurrentTime(10);
// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 3
emitRecord(fetcher, 1003L, part3, 3L);
emitRecord(fetcher, 1004L, part3, 4L);
emitRecord(fetcher, 1005L, part3, 5L);
assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
// advance partition 1 beyond partition 2 - this bumps the watermark
emitRecord(fetcher, 30L, part1, 4L);
assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
processingTimeService.setCurrentTime(20);
// this blocks until the periodic thread emitted the watermark
assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 2 again - this bumps the watermark
emitRecord(fetcher, 13L, part2, 2L);
emitRecord(fetcher, 14L, part2, 3L);
emitRecord(fetcher, 15L, part2, 3L);
processingTimeService.setCurrentTime(30);
// this blocks until the periodic thread emitted the watermark
long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
}
@Test
public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
null, /* punctuated watermarks assigner*/
processingTimeProvider,
10);
processingTimeProvider.setCurrentTime(10);
// no partitions; when the periodic watermark emitter fires, no watermark should be emitted
assertFalse(sourceContext.hasWatermark());
// counter-test that when the fetcher does actually have partitions,
// when the periodic watermark emitter fires again, a watermark really is emitted
fetcher.addDiscoveredPartitions(Collections.singletonList(new KafkaTopicPartition(testTopic, 0)));
emitRecord(fetcher, 100L, fetcher.subscribedPartitionStates().get(0), 3L);
processingTimeProvider.setCurrentTime(20);
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
@Test
public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
// test data
......@@ -402,7 +122,6 @@ public class AbstractFetcherTest {
// ----- create the test fetcher -----
@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = new TestSourceContext<>();
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
......@@ -413,8 +132,7 @@ public class AbstractFetcherTest {
final TestFetcher<String> fetcher = new TestFetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
null, /* watermark strategy */
new TestProcessingTimeService(),
10,
fetchLoopWaitLatch,
......@@ -451,16 +169,14 @@ public class AbstractFetcherTest {
TestFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval) throws Exception {
this(
sourceContext,
assignedPartitionsWithStartOffsets,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
null,
......@@ -470,8 +186,7 @@ public class AbstractFetcherTest {
TestFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
OneShotLatch fetchLoopWaitLatch,
......@@ -480,8 +195,7 @@ public class AbstractFetcherTest {
super(
sourceContext,
assignedPartitionsWithStartOffsets,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
TestFetcher.class.getClassLoader(),
......@@ -499,7 +213,7 @@ public class AbstractFetcherTest {
@Override
public void runFetchLoop() throws Exception {
if (fetchLoopWaitLatch != null) {
for (KafkaTopicPartitionState<?> ignored : subscribedPartitionStates()) {
for (KafkaTopicPartitionState<?, ?> ignored : subscribedPartitionStates()) {
fetchLoopWaitLatch.trigger();
stateIterationBlockLatch.await();
}
......@@ -536,7 +250,7 @@ public class AbstractFetcherTest {
private static <T, KPH> void emitRecord(
AbstractFetcher<T, KPH> fetcher,
T record,
KafkaTopicPartitionState<KPH> partitionState,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset) {
ArrayDeque<T> recordQueue = new ArrayDeque<>();
recordQueue.add(record);
......@@ -548,55 +262,7 @@ public class AbstractFetcherTest {
Long.MIN_VALUE);
}
private static <T, KPH> void emitRecords(
AbstractFetcher<T, KPH> fetcher,
List<T> records,
KafkaTopicPartitionState<KPH> partitionState,
long offset) {
ArrayDeque<T> recordQueue = new ArrayDeque<>(records);
fetcher.emitRecordsWithTimestamps(
recordQueue,
partitionState,
offset,
Long.MIN_VALUE);
}
private static <T> Queue<T> emptyQueue() {
return new ArrayDeque<>();
}
@SuppressWarnings("deprecation")
private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
private volatile long maxTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element);
return element;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestamp);
}
}
@SuppressWarnings("deprecation")
private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
return element;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for the watermarking behaviour of {@link AbstractFetcher}.
*/
@SuppressWarnings("serial")
@RunWith(Enclosed.class)
public class AbstractFetcherWatermarksTest {
/**
* Tests with watermark generators that have a periodic nature.
*/
@RunWith(Parameterized.class)
public static class PeriodicWatermarksSuite {
@Parameterized.Parameters
public static Collection<WatermarkStrategy<Long>> getParams() {
return Arrays.asList(
new AssignerWithPeriodicWatermarksAdapter.Strategy<>(new PeriodicTestExtractor()),
WatermarkStrategies
.forGenerator((ctx) -> new PeriodicTestWatermarkGenerator())
.withTimestampAssigner((event, previousTimestamp) -> event)
.build()
);
}
@Parameterized.Parameter
public WatermarkStrategy<Long> testWmStrategy;
@Test
public void testPeriodicWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(
new KafkaTopicPartition(testTopic, 7),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(
new KafkaTopicPartition(testTopic, 13),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(
new KafkaTopicPartition(testTopic, 21),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<>(testWmStrategy),
processingTimeService,
10);
final KafkaTopicPartitionState<Long, Object> part1 =
fetcher.subscribedPartitionStates().get(0);
final KafkaTopicPartitionState<Long, Object> part2 =
fetcher.subscribedPartitionStates().get(1);
final KafkaTopicPartitionState<Long, Object> part3 =
fetcher.subscribedPartitionStates().get(2);
// elements generate a watermark if the timestamp is a multiple of three
// elements for partition 1
emitRecord(fetcher, 1L, part1, 1L);
emitRecord(fetcher, 1L, part1, 1L);
emitRecord(fetcher, 2L, part1, 2L);
emitRecord(fetcher, 3L, part1, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
// elements for partition 2
emitRecord(fetcher, 12L, part2, 1L);
assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
// elements for partition 3
emitRecord(fetcher, 101L, part3, 1L);
emitRecord(fetcher, 102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
processingTimeService.setCurrentTime(10);
// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 3
emitRecord(fetcher, 1003L, part3, 3L);
emitRecord(fetcher, 1004L, part3, 4L);
emitRecord(fetcher, 1005L, part3, 5L);
assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
// advance partition 1 beyond partition 2 - this bumps the watermark
emitRecord(fetcher, 30L, part1, 4L);
assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
processingTimeService.setCurrentTime(20);
// this blocks until the periodic thread emitted the watermark
assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 2 again - this bumps the watermark
emitRecord(fetcher, 13L, part2, 2L);
emitRecord(fetcher, 14L, part2, 3L);
emitRecord(fetcher, 15L, part2, 3L);
processingTimeService.setCurrentTime(30);
// this blocks until the periodic thread emitted the watermark
long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
}
@Test
public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(
new KafkaTopicPartition(testTopic, 1),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<>(testWmStrategy),
processingTimeProvider,
10);
final KafkaTopicPartitionState<Long, Object> partitionStateHolder =
fetcher.subscribedPartitionStates().get(0);
// elements generate a watermark if the timestamp is a multiple of three
emitRecord(fetcher, 1L, partitionStateHolder, 1L);
emitRecord(fetcher, 2L, partitionStateHolder, 2L);
emitRecord(fetcher, 3L, partitionStateHolder, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertEquals(3L, partitionStateHolder.getOffset());
// advance timer for watermark emitting
processingTimeProvider.setCurrentTime(10L);
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// emit no records
fetcher.emitRecordsWithTimestamps(
emptyQueue(),
partitionStateHolder,
4L,
Long.MIN_VALUE);
// no elements should have been collected
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
// the offset in state still should have advanced
assertEquals(4L, partitionStateHolder.getOffset());
// no watermarks should be collected
processingTimeProvider.setCurrentTime(20L);
assertFalse(sourceContext.hasWatermark());
}
@Test
public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<>(testWmStrategy),
processingTimeProvider,
10);
processingTimeProvider.setCurrentTime(10);
// no partitions; when the periodic watermark emitter fires, no watermark should be emitted
assertFalse(sourceContext.hasWatermark());
// counter-test that when the fetcher does actually have partitions,
// when the periodic watermark emitter fires again, a watermark really is emitted
fetcher.addDiscoveredPartitions(Collections.singletonList(
new KafkaTopicPartition(testTopic, 0)));
emitRecord(fetcher, 100L, fetcher.subscribedPartitionStates().get(0), 3L);
processingTimeProvider.setCurrentTime(20);
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
}
/**
* Tests with watermark generators that have a punctuated nature.
*/
public static class PunctuatedWatermarksSuite {
@Test
public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(
new KafkaTopicPartition(testTopic, 1),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
AssignerWithPunctuatedWatermarksAdapter.Strategy<Long> testWmStrategy =
new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(new PunctuatedTestExtractor());
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<>(testWmStrategy),
processingTimeProvider,
0);
final KafkaTopicPartitionState<Long, Object> partitionStateHolder =
fetcher.subscribedPartitionStates().get(0);
// elements generate a watermark if the timestamp is a multiple of three
emitRecord(fetcher, 1L, partitionStateHolder, 1L);
emitRecord(fetcher, 2L, partitionStateHolder, 2L);
emitRecord(fetcher, 3L, partitionStateHolder, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
assertEquals(3L, partitionStateHolder.getOffset());
// emit no records
fetcher.emitRecordsWithTimestamps(emptyQueue(), partitionStateHolder, 4L, -1L);
// no elements or watermarks should have been collected
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// the offset in state still should have advanced
assertEquals(4L, partitionStateHolder.getOffset());
}
@Test
public void testPunctuatedWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(
new KafkaTopicPartition(testTopic, 7),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(
new KafkaTopicPartition(testTopic, 13),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(
new KafkaTopicPartition(testTopic, 21),
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
AssignerWithPunctuatedWatermarksAdapter.Strategy<Long> testWmStrategy =
new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(new PunctuatedTestExtractor());
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<>(testWmStrategy),
processingTimeProvider,
0);
final KafkaTopicPartitionState<Long, Object> part1 =
fetcher.subscribedPartitionStates().get(0);
final KafkaTopicPartitionState<Long, Object> part2 =
fetcher.subscribedPartitionStates().get(1);
final KafkaTopicPartitionState<Long, Object> part3 =
fetcher.subscribedPartitionStates().get(2);
// elements generate a watermark if the timestamp is a multiple of three
// elements for partition 1
emitRecords(fetcher, Arrays.asList(1L, 2L), part1, 1L);
emitRecord(fetcher, 2L, part1, 2L);
emitRecords(fetcher, Arrays.asList(2L, 3L), part1, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// elements for partition 2
emitRecord(fetcher, 12L, part2, 1L);
assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// elements for partition 3
emitRecord(fetcher, 101L, part3, 1L);
emitRecord(fetcher, 102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
// now, we should have a watermark
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 3
emitRecord(fetcher, 1003L, part3, 3L);
emitRecord(fetcher, 1004L, part3, 4L);
emitRecord(fetcher, 1005L, part3, 5L);
assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
// advance partition 1 beyond partition 2 - this bumps the watermark
emitRecord(fetcher, 30L, part1, 4L);
assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
assertTrue(sourceContext.hasWatermark());
assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 2 again - this bumps the watermark
emitRecord(fetcher, 13L, part2, 2L);
assertFalse(sourceContext.hasWatermark());
emitRecord(fetcher, 14L, part2, 3L);
assertFalse(sourceContext.hasWatermark());
emitRecord(fetcher, 15L, part2, 3L);
assertTrue(sourceContext.hasWatermark());
assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
}
}
private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
TestFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval) throws Exception {
super(
sourceContext,
assignedPartitionsWithStartOffsets,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
TestFetcher.class.getClassLoader(),
new UnregisteredMetricsGroup(),
false);
}
public void runFetchLoop() {
throw new UnsupportedOperationException();
}
@Override
public void cancel() {
throw new UnsupportedOperationException();
}
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) {
throw new UnsupportedOperationException();
}
@Override
protected Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
return new Object();
}
}
private static <T, KPH> void emitRecord(
AbstractFetcher<T, KPH> fetcher,
T record,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset) {
ArrayDeque<T> recordQueue = new ArrayDeque<>();
recordQueue.add(record);
fetcher.emitRecordsWithTimestamps(
recordQueue,
partitionState,
offset,
Long.MIN_VALUE);
}
private static <T, KPH> void emitRecords(
AbstractFetcher<T, KPH> fetcher,
List<T> records,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset) {
ArrayDeque<T> recordQueue = new ArrayDeque<>(records);
fetcher.emitRecordsWithTimestamps(
recordQueue,
partitionState,
offset,
Long.MIN_VALUE);
}
private static <T> Queue<T> emptyQueue() {
return new ArrayDeque<>();
}
@SuppressWarnings("deprecation")
private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
private volatile long maxTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element);
return element;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestamp);
}
}
@SuppressWarnings("deprecation")
private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
return element;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
}
}
private static class PeriodicTestWatermarkGenerator implements WatermarkGenerator<Long> {
private volatile long maxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(
Long event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(maxTimestamp));
}
}
}
......@@ -18,10 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher;
......@@ -211,8 +210,7 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
......@@ -225,8 +223,7 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
......
......@@ -64,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* to the KafkaConsumer calls that change signature.
*/
@Internal
public class KafkaConsumerThread extends Thread {
public class KafkaConsumerThread<T> extends Thread {
/** Logger for this consumer. */
private final Logger log;
......@@ -79,7 +79,7 @@ public class KafkaConsumerThread extends Thread {
private final Properties kafkaProperties;
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
private final ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> unassignedPartitionsQueue;
/** The maximum number of milliseconds to wait for a fetch batch. */
private final long pollTimeout;
......@@ -122,7 +122,7 @@ public class KafkaConsumerThread extends Thread {
Logger log,
Handover handover,
Properties kafkaProperties,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> unassignedPartitionsQueue,
String threadName,
long pollTimeout,
boolean useMetrics,
......@@ -203,7 +203,7 @@ public class KafkaConsumerThread extends Thread {
// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions;
// main fetch loop
while (running) {
......@@ -370,7 +370,7 @@ public class KafkaConsumerThread extends Thread {
* <p>This method is exposed for testing purposes.
*/
@VisibleForTesting
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
void reassignPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions) throws Exception {
if (newPartitions.size() == 0) {
return;
}
......@@ -412,7 +412,7 @@ public class KafkaConsumerThread extends Thread {
// been replaced with actual offset values yet, or
// (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value represent.
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
for (KafkaTopicPartitionState<T, TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
......@@ -451,7 +451,7 @@ public class KafkaConsumerThread extends Thread {
hasBufferedWakeup = false;
// re-add all new partitions back to the unassigned partitions queue to be picked up again
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
for (KafkaTopicPartitionState<T, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
......@@ -481,9 +481,9 @@ public class KafkaConsumerThread extends Thread {
// Utilities
// ------------------------------------------------------------------------
private static List<TopicPartition> convertKafkaPartitions(List<KafkaTopicPartitionState<TopicPartition>> partitions) {
private static <T> List<TopicPartition> convertKafkaPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> partitions) {
ArrayList<TopicPartition> result = new ArrayList<>(partitions.size());
for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
for (KafkaTopicPartitionState<T, TopicPartition> p : partitions) {
result.add(p.getKafkaPartitionHandle());
}
return result;
......
......@@ -18,9 +18,8 @@
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
......@@ -78,8 +77,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
public KafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
......@@ -93,8 +91,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
......@@ -136,7 +133,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
......@@ -207,11 +204,11 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
@SuppressWarnings("unchecked")
List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates();
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());
for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions) {
Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
if (lastProcessedOffset != null) {
checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册