提交 a0249d99 编写于 作者: Z zhangminglei 提交者: Tzu-Li (Gordon) Tai

[FLINK-6311] [kinesis] NPE in FlinkKinesisConsumer if source was closed before run

This closes #3738.
上级 42328bd9
...@@ -149,7 +149,7 @@ public class KinesisDataFetcher<T> { ...@@ -149,7 +149,7 @@ public class KinesisDataFetcher<T> {
private final KinesisProxyInterface kinesis; private final KinesisProxyInterface kinesis;
/** Thread that executed runFetcher() */ /** Thread that executed runFetcher() */
private Thread mainThread; private volatile Thread mainThread;
/** /**
* The current number of shards that are actively read by this fetcher. * The current number of shards that are actively read by this fetcher.
...@@ -408,7 +408,10 @@ public class KinesisDataFetcher<T> { ...@@ -408,7 +408,10 @@ public class KinesisDataFetcher<T> {
*/ */
public void shutdownFetcher() { public void shutdownFetcher() {
running = false; running = false;
mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
if (mainThread != null) {
mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
}
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册