提交 9487fcbf 编写于 作者: A Andrea Sella 提交者: zentol

[FLINK-4119] Refactor null checks in Cassandra IOF

This closes #2163
上级 6744b852
......@@ -31,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,12 +53,9 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
private transient ResultSet resultSet;
public CassandraInputFormat(String query, ClusterBuilder builder) {
if (Strings.isNullOrEmpty(query)) {
throw new IllegalArgumentException("Query cannot be null or empty");
}
if (builder == null) {
throw new IllegalArgumentException("Builder cannot be null.");
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty");
Preconditions.checkArgument(builder != null, "Builder cannot be null");
this.query = query;
this.builder = builder;
}
......@@ -115,15 +113,19 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
@Override
public void close() throws IOException {
try {
session.close();
if (session != null) {
session.close();
}
} catch (Exception e) {
LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
LOG.error("Error while closing session.", e);
}
try {
cluster.close();
if (cluster != null ) {
cluster.close();
}
} catch (Exception e) {
LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
LOG.error("Error while closing cluster.", e);
}
}
}
......@@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,12 +52,9 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
private transient Throwable exception = null;
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
if (Strings.isNullOrEmpty(insertQuery)) {
throw new IllegalArgumentException("insertQuery cannot be null or empty");
}
if (builder == null) {
throw new IllegalArgumentException("Builder cannot be null.");
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty");
Preconditions.checkArgument(builder != null, "Builder cannot be null");
this.insertQuery = insertQuery;
this.builder = builder;
}
......@@ -109,15 +107,19 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
@Override
public void close() throws IOException {
try {
session.close();
if (session != null) {
session.close();
}
} catch (Exception e) {
LOG.warn("Inputformat couldn't be closed.", e);
LOG.error("Error while closing session.", e);
}
try {
cluster.close();
if (cluster != null ) {
cluster.close();
}
} catch (Exception e) {
LOG.warn("Inputformat couldn't be closed." , e);
LOG.error("Error while closing cluster.", e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册