提交 a997dd61 编写于 作者: R Robert Metzger

[FLINK-3081] Properly stop periodic Kafka committer

This closes #1410
上级 e9a2bc9d
......@@ -414,14 +414,18 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
// same here.
long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
offsetCommitter.setDaemon(true);
offsetCommitter.start();
LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
}
fetcher.run(sourceContext, deserializer, lastOffsets);
if (offsetCommitter != null) {
offsetCommitter.close();
try {
fetcher.run(sourceContext, deserializer, lastOffsets);
} finally {
if (offsetCommitter != null) {
offsetCommitter.close();
offsetCommitter.join();
}
}
}
else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册