From c9edd9a88ab20bc314470fb3eb7da4c643dd7d57 Mon Sep 17 00:00:00 2001 From: Chiwan Park Date: Mon, 7 Sep 2015 23:03:56 +0900 Subject: [PATCH] [FLINK-2619] [tests] Fix for some unexecuted Scala tests This closes #1103 --- .../ExecutionGraphRestartTest.scala | 5 +- .../TaskManagerLossFailsTasksTest.scala | 5 +- .../JobManagerRegistrationTest.scala | 4 +- .../operations/GraphOperationsITCase.scala | 1 + .../flink/api/scala/ScalaShellITSuite.scala | 85 ++++++++++--------- .../MassiveCaseClassSortingITCase.scala | 5 +- 6 files changed, 59 insertions(+), 46 deletions(-) rename flink-tests/src/test/scala/org/apache/flink/api/scala/{misc => manual}/MassiveCaseClassSortingITCase.scala (98%) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala index 6060bc313ba..434a8cb993e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -26,8 +26,11 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner import org.scalatest.{Matchers, WordSpecLike} +@RunWith(classOf[JUnitRunner]) class ExecutionGraphRestartTest extends WordSpecLike with Matchers { val NUM_TASKS = 31 @@ -118,7 +121,7 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { } eg.getState should equal(JobStatus.FINISHED) - }catch{ + } catch { case t: Throwable => t.printStackTrace() fail(t.getMessage) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 3d0d3a56f30..177dc851ec1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -26,8 +26,11 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner import org.scalatest.{Matchers, WordSpecLike} +@RunWith(classOf[JUnitRunner]) class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { "A task manager loss" must { @@ -64,7 +67,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { instance1.markDead() eg.getState should equal(JobStatus.FAILING) - }catch{ + } catch { case t:Throwable => t.printStackTrace() fail(t.getMessage) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 4368309371c..7487670c2dc 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -27,10 +27,11 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID} -import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} import org.junit.Assert.{assertNotEquals, assertNotNull} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scala.concurrent.duration._ @@ -41,6 +42,7 @@ import scala.language.postfixOps * It also tests the JobManager's response to heartbeats from TaskManagers it does * not know. */ +@RunWith(classOf[JUnitRunner]) class JobManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala index d49e56559c7..713eb8d3ea1 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.graph.scala.test.operations import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.api.scala._ diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala index 9717ae7bb6a..e932cd2e11a 100644 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala +++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala @@ -22,17 +22,20 @@ import java.io._ import java.util.concurrent.TimeUnit import org.apache.flink.runtime.StreamingMode -import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers} +import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import scala.concurrent.duration.FiniteDuration import scala.tools.nsc.Settings +@RunWith(classOf[JUnitRunner]) class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { test("Iteration test with iterative Pi example") { - val input : String = + val input: String = """ val initial = env.fromElements(0) @@ -46,9 +49,9 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { } val result = count map { c => c / 10000.0 * 4 } result.collect() - """.stripMargin + """.stripMargin - val output : String = processInShell(input) + val output: String = processInShell(input) output should not include "failed" output should not include "error" @@ -56,7 +59,8 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { } test("WordCount in Shell") { - val input = """ + val input = + """ val text = env.fromElements("To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", @@ -64,7 +68,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) val result = counts.print() - """.stripMargin + """.stripMargin val output = processInShell(input) @@ -72,7 +76,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { output should not include "error" output should not include "Exception" -// some of the words that should be included + // some of the words that should be included output should include("(a,1)") output should include("(whether,1)") output should include("(to,4)") @@ -80,14 +84,14 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { } test("Sum 1..10, should be 55") { - val input : String = + val input = """ val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10) val reduced = input.reduce(_+_) reduced.print """.stripMargin - val output : String = processInShell(input) + val output = processInShell(input) output should not include "failed" output should not include "error" @@ -97,7 +101,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { } test("WordCount in Shell with custom case class") { - val input : String = + val input = """ case class WC(word: String, count: Int) @@ -111,7 +115,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { reduced.print() """.stripMargin - val output : String = processInShell(input) + val output = processInShell(input) output should not include "failed" output should not include "error" @@ -120,11 +124,9 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { output should include("WC(hello,1)") output should include("WC(world,10)") } - - + test("Submit external library") { - - val input : String = + val input = """ import org.apache.flink.ml.math._ val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0)) @@ -132,12 +134,14 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { """.stripMargin // find jar file that contains the ml code - var externalJar : String = "" - var folder : File = new File("../flink-ml/target/"); - var listOfFiles : Array[File] = folder.listFiles(); - for(i <- 0 to listOfFiles.length - 1){ - var filename : String = listOfFiles(i).getName(); - if(!filename.contains("test") && !filename.contains("original") && filename.contains(".jar")){ + var externalJar = "" + val folder = new File("../flink-ml/target/") + val listOfFiles = folder.listFiles() + + for (i <- listOfFiles.indices) { + val filename: String = listOfFiles(i).getName + if (!filename.contains("test") && !filename.contains("original") && filename.contains( + ".jar")) { println("ive found file:" + listOfFiles(i).getAbsolutePath) externalJar = listOfFiles(i).getAbsolutePath } @@ -145,13 +149,13 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { assert(externalJar != "") - val output : String = processInShell(input,Option(externalJar)) + val output: String = processInShell(input, Option(externalJar)) output should not include "failed" output should not include "error" output should not include "Exception" - output should include( "\nDenseVector(1.0, 2.0, 3.0)") + output should include("\nDenseVector(1.0, 2.0, 3.0)") } /** @@ -159,8 +163,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { * @param input commands to be processed in the shell * @return output of shell */ - def processInShell(input : String, externalJars : Option[String] = None): String ={ - + def processInShell(input: String, externalJars: Option[String] = None): String = { val in = new BufferedReader(new StringReader(input + "\n")) val out = new StringWriter() val baos = new ByteArrayOutputStream() @@ -174,28 +177,26 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { case Some(c) => c.getLeaderRPCPort case _ => throw new RuntimeException("Test cluster not initialized.") } - - var repl : FlinkILoop= null - - externalJars match { - case Some(ej) => repl = new FlinkILoop( - host, port, - Option(Array(ej)), + + val repl = externalJars match { + case Some(ej) => new FlinkILoop( + host, port, + Option(Array(ej)), + in, new PrintWriter(out)) + + case None => new FlinkILoop( + host, port, in, new PrintWriter(out)) - - case None => repl = new FlinkILoop( - host,port, - in,new PrintWriter(out)) } - + repl.settings = new Settings() // enable this line to use scala in intellij repl.settings.usejavacp.value = true - + externalJars match { case Some(ej) => repl.settings.classpath.value = ej - case None => + case None => } repl.process(repl.settings) @@ -205,7 +206,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { System.setOut(oldOut) baos.flush() - + val stdout = baos.toString out.toString + stdout @@ -230,6 +231,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { } override def afterAll(): Unit = { - cluster.map(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS))) + cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS))) } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala similarity index 98% rename from flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala rename to flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala index dd27eb579c7..7385fa26443 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.scala.misc +package org.apache.flink.api.scala.manual import java.io.File import java.util.Random @@ -36,6 +36,9 @@ import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory import org.junit.Assert._ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable +/** + * This test is wrote as manual test. + */ class MassiveCaseClassSortingITCase { val SEED : Long = 347569784659278346L -- GitLab