diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index c823f5bccf979b3b8434ea882a3bed9eef675173..49b1efacbe5767c8737c9c4b056316db7e04b2c8 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -81,12 +81,16 @@ public abstract class CassandraSinkBase extends RichSinkFunction { @Override public void close() { try { - session.close(); + if (session != null) { + session.close(); + } } catch (Exception e) { LOG.error("Error while closing session.", e); } try { - cluster.close(); + if (cluster != null) { + cluster.close(); + } } catch (Exception e) { LOG.error("Error while closing cluster.", e); } diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java index f78464754c449d08d6e05c8816c94064791c0668..8bce9d6253f63ed25bff956de264be58a225fa2e 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java @@ -94,12 +94,16 @@ public class CassandraTupleWriteAheadSink extends GenericWrite public void close() throws Exception { super.close(); try { - session.close(); + if (session != null) { + session.close(); + } } catch (Exception e) { LOG.error("Error while closing session.", e); } try { - cluster.close(); + if (cluster != null) { + cluster.close(); + } } catch (Exception e) { LOG.error("Error while closing cluster.", e); }