提交 873d6cd1 编写于 作者: S Stephan Ewen

[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds

Cassandra needs Java 8 to run reliably.
上级 0ea2596e
......@@ -51,7 +51,7 @@ public class CommonTestUtils {
try {
Thread.sleep(remaining);
}
catch (InterruptedException e) {}
catch (InterruptedException ignored) {}
now = System.currentTimeMillis();
}
......@@ -137,8 +137,7 @@ public class CommonTestUtils {
}
public static void printLog4jDebugConfig(File file) throws IOException {
FileWriter fw = new FileWriter(file);
try {
try (FileWriter fw = new FileWriter(file)) {
PrintWriter writer = new PrintWriter(fw);
writer.println("log4j.rootLogger=DEBUG, console");
......@@ -152,9 +151,6 @@ public class CommonTestUtils {
writer.flush();
writer.close();
}
finally {
fw.close();
}
}
public static File createTempDirectory() throws IOException {
......@@ -165,7 +161,6 @@ public class CommonTestUtils {
if (!dir.exists() && dir.mkdirs()) {
return dir;
}
System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
}
throw new IOException("Could not create temporary file directory");
......
......@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.core.Cluster;
......@@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
......@@ -42,6 +46,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
......@@ -49,10 +54,12 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -64,16 +71,21 @@ import java.util.ArrayList;
import java.util.Scanner;
import java.util.UUID;
import static org.junit.Assert.*;
@SuppressWarnings("serial")
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static File tmpDir;
private static final boolean EMBEDDED = true;
private static EmbeddedCassandraService cassandra;
private transient static ClusterBuilder builder = new ClusterBuilder() {
private static ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
......@@ -83,6 +95,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
.withoutMetrics().build();
}
};
private static Cluster cluster;
private static Session session;
......@@ -97,7 +110,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
static {
for (int i = 0; i < 20; i++) {
collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 0));
collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
}
}
......@@ -115,15 +128,36 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
}
}
//=====Setup========================================================================================================
// ------------------------------------------------------------------------
// Cassandra Cluster Setup
// ------------------------------------------------------------------------
@BeforeClass
public static void startCassandra() throws IOException {
//generate temporary files
// check if we should run this test, current Cassandra version requires Java >= 1.8
try {
String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
float javaVersion = Float.parseFloat(javaVersionString);
Assume.assumeTrue(javaVersion >= 1.8f);
}
catch (AssumptionViolatedException e) {
System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
throw e;
}
catch (Exception e) {
LOG.error("Cannot determine Java version", e);
e.printStackTrace();
fail("Cannot determine Java version");
}
// generate temporary files
tmpDir = CommonTestUtils.createTempDirectory();
ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
File file = new File(classLoader.getResource("cassandra.yaml").getFile());
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
tmp.createNewFile();
assertTrue(tmp.createNewFile());
BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
//copy cassandra.yaml; inject absolute paths into cassandra.yaml
......@@ -139,7 +173,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
// Tell cassandra where the configuration files are.
// Use the test configuration file.
System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
if (EMBEDDED) {
cassandra = new EmbeddedCassandraService();
......@@ -160,13 +194,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@Before
public void checkIfIgnore() {
String runtime = System.getProperty("java.runtime.name");
String version = System.getProperty("java.runtime.version");
LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
// The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
// Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
// OpenJDK is "OpenJDK Runtime Environment"
Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
}
@After
......@@ -176,13 +204,23 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@AfterClass
public static void closeCassandra() {
session.executeAsync(DROP_KEYSPACE_QUERY);
session.close();
cluster.close();
if (EMBEDDED) {
if (session != null) {
session.executeAsync(DROP_KEYSPACE_QUERY);
session.close();
}
if (cluster != null) {
cluster.close();
}
if (cassandra != null) {
cassandra.stop();
}
tmpDir.delete();
if (tmpDir != null) {
//noinspection ResultOfMethodCallIgnored
tmpDir.delete();
}
}
//=====Exactly-Once=================================================================================================
......@@ -202,7 +240,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@Override
protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
}
@Override
......@@ -379,7 +417,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
new TupleTypeInfo(Tuple3.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
long count = inputDS.count();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册