diff --git a/.gitignore b/.gitignore index 0afd000c4b30abe521b1827eedcee7eb6832360d..b519d406e859e804bad0ef563f8262bafb04d351 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,3 @@ tmp .DS_Store _site docs/api -atlassian-ide-plugin.xml diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java index 78a6683083fc88072843deef5cfa81f0ab081777..0b394e3f381665ca30311e0766a2a0c44d0f133d 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java @@ -44,7 +44,7 @@ public class AvroExternalJarProgramITCase { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - testMiniCluster = new ForkableFlinkMiniCluster(config); + testMiniCluster = new ForkableFlinkMiniCluster(config, false); String jarFile = JAR_FILE; String testData = getClass().getResource(TEST_DATA_FILE).toString(); diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java index f75db68f2bd087c27fb18913947a1a6972245e7f..8fb6554e9df122d4c031bcdd69e61a4006b2b3bd 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -57,7 +57,7 @@ public class ClusterUtil { } try { - exec = new LocalFlinkMiniCluster(configuration); + exec = new LocalFlinkMiniCluster(configuration, true); Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()), configuration, ClusterUtil.class.getClassLoader()); diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index 58ce6cf55911a070b60ce3095341e2b20bdaebbb..4a6e8cbbb16b6f0bb11cba147df6a4febb8060d6 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -85,7 +85,8 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient, writeYarnProperties(address) - jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout)) + jobManager = Some(AkkaUtils.getReference(JobManager.getRemoteAkkaURL(address))(system, + timeout)) jobManager.get ! RegisterMessageListener pollingTimer foreach { diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala index d71273abf36946a821e08473ddc60a9f3152bec0..86b06e1335eb588e17c23404dbeba8342d3e9f15 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala @@ -32,7 +32,7 @@ object YarnUtils { AkkaUtils.createActorSystem(akkaConfig) } - def createActorSystem: ActorSystem = { + def createActorSystem(): ActorSystem = { val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString + getConfigString) @@ -40,12 +40,31 @@ object YarnUtils { } def getConfigString: String = { - s"""akka.loglevel = "INFO" - |akka.stdout-loglevel = "INFO" - |akka.log-dead-letters-during-shutdown = off - |akka.log-dead-letters = off - |akka.remote.log-remote-lifecycle-events=off - |""".stripMargin + """ + |akka{ + | loglevel = "INFO" + | stdout-loglevel = "INFO" + | log-dead-letters-during-shutdown = off + | log-dead-letters = off + | + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } + | + | remote{ + | log-remote-lifecycle-events = off + | + | netty{ + | tcp{ + | port = 0 + | transport-class = "akka.remote.transport.netty.NettyTransport" + | tcp-nodelay = on + | maximum-frame-size = 1MB + | execution-pool-size = 4 + | } + | } + | } + |}""".stripMargin } def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = { diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 55fda89433cc49b33dea3be14452fddc40433f05..6691cdfe2473c77bf1bee82a14377da1d913737c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -100,7 +100,7 @@ public class LocalExecutor extends PlanExecutor { configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots()); configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles()); // start it up - this.flink = new LocalFlinkMiniCluster(configuration); + this.flink = new LocalFlinkMiniCluster(configuration, true); } else { throw new IllegalStateException("The local executor was already started."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java index cd07cc3025bbd97bfa05f760f90b841017ba08cd..c8418729e9bfad827b5f8a9181710527c20415ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java @@ -380,15 +380,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker while (true) { ConnectionInfoLookupResponse lookupResponse; - synchronized (this.channelLookup) { - try{ - lookupResponse = AkkaUtils.ask(channelLookup, - new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID, - sourceChannelID), timeout).response(); - }catch(IOException ioe) { - throw ioe; - } - } + lookupResponse = AkkaUtils.ask(channelLookup, + new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID, + sourceChannelID), timeout).response(); if (lookupResponse.receiverReady()) { receiverList = new EnvelopeReceiverList(lookupResponse); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index d52dde29dd0d89c2c8b8584af99df8beb6350dcc..229c2295ed7a528cca1858afd94e99f7d86de3a4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -44,6 +44,10 @@ object AkkaUtils { createActorSystem(getDefaultActorSystemConfig) } + def createLocalActorSystem(): ActorSystem = { + createActorSystem(getDefaultLocalActorSystemConfig) + } + def createActorSystem(akkaConfig: Config): ActorSystem = { ActorSystem.create("flink", akkaConfig) } @@ -133,20 +137,47 @@ object AkkaUtils { getDefaultActorSystemConfigString + configString } + def getLocalConfigString(configuration: Configuration): String = { + val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT, + ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT) + val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, + ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + + val logLifecycleEvents = if (lifecycleEvents) "on" else "off" + + val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL, + ConfigConstants.DEFAULT_AKKA_LOG_LEVEL) + + val configString = + s""" + |akka { + | loglevel = $logLevel + | stdout-loglevel = $logLevel + | + | log-dead-letters = $logLifecycleEvents + | log-dead-letters-during-shutdown = $logLifecycleEvents + | + | actor{ + | default-dispatcher{ + | executor = "default-executor" + | + | throughput = ${akkaThroughput} + | + | fork-join-executor { + | parallelism-factor = 2.0 + | } + | } + | } + | + |} + """.stripMargin + + getDefaultLocalActorSystemConfigString + configString + } + def getDefaultActorSystemConfigString: String = { - """ + val config = """ |akka { - | daemonic = on - | - | loggers = ["akka.event.slf4j.Slf4jLogger"] - | logger-startup-timeout = 30s - | loglevel = "WARNING" - | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - | stdout-loglevel = "WARNING" - | jvm-exit-on-fatal-error = off - | log-config-on-start = on - | serialize-messages = on - | | actor { | provider = "akka.remote.RemoteActorRefProvider" | } @@ -164,7 +195,26 @@ object AkkaUtils { | } |} """.stripMargin - } + + getDefaultLocalActorSystemConfigString + config + } + + def getDefaultLocalActorSystemConfigString: String = { + """ + |akka { + | daemonic = on + | + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | logger-startup-timeout = 30s + | loglevel = "DEBUG" + | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + | stdout-loglevel = "DEBUG" + | jvm-exit-on-fatal-error = off + | log-config-on-start = off + | serialize-messages = on + |} + """.stripMargin + } // scalastyle:off line.size.limit @@ -347,6 +397,10 @@ object AkkaUtils { ConfigFactory.parseString(getDefaultActorSystemConfigString) } + def getDefaultLocalActorSystemConfig = { + ConfigFactory.parseString(getDefaultLocalActorSystemConfigString) + } + def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef = { Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 53f0daa79df89e453f982842a880c63deb531ad0..937706922957666008a3f77dfc722115efd9045d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -107,7 +107,7 @@ object JobClient{ "configuration.") } - JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) + JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 6dbcb67dbb923d8207b626e394fede04f9679a3d..8752befa7b447b5640b79dbccd2b989fe5f058df 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -58,7 +58,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { Execution.timeout = timeout; - log.info("Starting job manager.") + log.info(s"Starting job manager at ${self.path}.") val (archiveCount, profiling, @@ -520,7 +520,7 @@ object JobManager { actorSystem.actorOf(props, JOB_MANAGER_NAME) } - def getAkkaURL(address: String): String = { + def getRemoteAkkaURL(address: String): String = { s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}" } @@ -541,6 +541,6 @@ object JobManager { def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef = { - AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort)) + AkkaUtils.getReference(getRemoteAkkaURL(address.getHostName + ":" + address.getPort)) } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index bfb9ae25206a4426ed6ca8c08568d590ca29c1b0..1b7d45238dc097a4f40bb45926ab6f660a97a2ef 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} -import com.typesafe.config.{ConfigFactory, Config} +import com.typesafe.config.{ConfigFactory} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager @@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Future, Await} -abstract class FlinkMiniCluster(userConfiguration: Configuration) { +abstract class FlinkMiniCluster(userConfiguration: Configuration, + val singleActorSystem: Boolean) { import FlinkMiniCluster._ val HOSTNAME = "localhost" @@ -41,6 +42,10 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { val configuration = generateConfiguration(userConfiguration) + if(singleActorSystem){ + configuration.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager") + } + val jobManagerActorSystem = startJobManagerActorSystem() val jobManagerActor = startJobManager(jobManagerActorSystem) @@ -48,7 +53,13 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield { - val actorSystem = startTaskManagerActorSystem(i) + val actorSystem = if(singleActorSystem) { + jobManagerActorSystem + } + else{ + startTaskManagerActorSystem(i) + } + (actorSystem, startTaskManager(i)(actorSystem)) } @@ -66,7 +77,12 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants .DEFAULT_JOB_MANAGER_IPC_PORT) - AkkaUtils.getConfigString(HOSTNAME, port, configuration) + if(singleActorSystem){ + AkkaUtils.getLocalConfigString(configuration) + }else{ + AkkaUtils.getConfigString(HOSTNAME, port, configuration) + } + } def startJobManagerActorSystem(): ActorSystem = { @@ -111,13 +127,23 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { } def shutdown(): Unit = { - taskManagerActorSystems foreach { _.shutdown() } + if(!singleActorSystem){ + taskManagerActorSystems foreach { + _.shutdown() + } + } + jobManagerActorSystem.shutdown() } def awaitTermination(): Unit = { jobManagerActorSystem.awaitTermination() - taskManagerActorSystems foreach { _.awaitTermination()} + + if(!singleActorSystem) { + taskManagerActorSystems foreach { + _.awaitTermination() + } + } } def waitForTaskManagersToBeRegistered(): Unit = { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index c1169d3628aa0eb02dc1934c2e49bbd940759a58..67053b239474b83ef9469c1907b402fef3c33fed 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -28,11 +28,16 @@ import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation import org.slf4j.LoggerFactory -class LocalFlinkMiniCluster(userConfiguration: Configuration) extends -FlinkMiniCluster(userConfiguration){ +class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) + extends FlinkMiniCluster(userConfiguration, singleActorSystem){ import LocalFlinkMiniCluster._ - val jobClientActorSystem = AkkaUtils.createActorSystem() + val jobClientActorSystem = if(singleActorSystem){ + jobManagerActorSystem + }else{ + AkkaUtils.createActorSystem() + } + var jobClient: Option[ActorRef] = None override def generateConfiguration(userConfiguration: Configuration): Configuration = { @@ -70,7 +75,7 @@ FlinkMiniCluster(userConfiguration){ } val localExecution = if(numTaskManagers == 1){ - false + true }else{ false } @@ -87,6 +92,10 @@ FlinkMiniCluster(userConfiguration){ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME) config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort) + if(singleActorSystem){ + config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager") + } + val jc = JobClient.startActorWithConfiguration(config)(jobClientActorSystem) jobClient = Some(jc) jc @@ -101,11 +110,16 @@ FlinkMiniCluster(userConfiguration){ override def shutdown(): Unit = { super.shutdown() - jobClientActorSystem.shutdown() + + if(!singleActorSystem) { + jobClientActorSystem.shutdown() + } } override def awaitTermination(): Unit = { - jobClientActorSystem.awaitTermination() + if(!singleActorSystem) { + jobClientActorSystem.awaitTermination() + } super.awaitTermination() } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 253984c3050765758a2002be077e96e9abdc44fa..a4040fa9c37044254350ab5511ed1a23c1b2cc90 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -73,7 +73,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka import taskManagerConfig.{timeout => tmTimeout, _} implicit val timeout = tmTimeout - log.info(s"Starting task manager at ${self.path}.") val REGISTRATION_DELAY = 0 seconds @@ -558,7 +557,7 @@ object TaskManager { "configuration.") } - JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) + JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) } val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) @@ -667,10 +666,6 @@ object TaskManager { system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, reportInterval), PROFILER_NAME) } - def getAkkaURL(address: String): String = { - s"akka.tcp://flink@${address}/user/taskmanager" - } - def checkTempDirs(tmpDirs: Array[String]): Unit = { tmpDirs.zipWithIndex.foreach { case (dir: String, _) => diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 5a512650067f70e6c0b94291e0e8247dd9b7ec41..806d9b436f3737e310c472fffa0f34facf415229 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -26,7 +26,9 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.net.NetUtils import org.apache.flink.runtime.taskmanager.TaskManager -class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration) { +class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration, + true) { + override def generateConfiguration(userConfig: Configuration): Configuration = { val cfg = new Configuration() cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost") diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index ce54a74d9d719cf1f61cb7073f1c4ea89519ef13..ce068c950f023afb15a81800b6c2bde5ef5bd073 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -100,14 +100,14 @@ object TestingUtils { networkConnectionConfig) with TestingTaskManager)) } - def startTestingCluster(numSlots: Int, numTaskManagers: Int = 1): FlinkMiniCluster = { + def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT): + FlinkMiniCluster = { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) - config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers) + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs) config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000) - config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT) - val cluster = new TestingCluster(config) - cluster + config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) + new TestingCluster(config) } def setGlobalExecutionContext(): Unit = { diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 38b52cab31124aafec23006875a9d13ae94746de..b24b3ee7185e6e898eb450b9b3fa44cb8539b092 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -58,7 +58,7 @@ public abstract class AbstractTestBase extends TestBaseUtils { // Local Test Cluster Life Cycle // -------------------------------------------------------------------------------------------- - public void startCluster(){ + public void startCluster() throws Exception{ this.executor = startCluster(numTaskManagers, taskManagerNumSlots); } diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java index 225936c7d046d0cd0463e1ce98218b8b577612d4..c86ccaa302ce34d3b7b72ca6428019d8092189ec 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java @@ -46,7 +46,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils { @BeforeClass public static void setup() throws Exception{ cluster = TestBaseUtils.startCluster(1, 4); - } @AfterClass diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 7939a3cf6d7c9acd794602dab004d4cf194a2953..fe2d7fc0321ccefc4a74bbcc38dd465fa01fa0fe 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; public class TestBaseUtils { + protected static final int MINIMUM_HEAP_SIZE_MB = 192; protected static final long TASK_MANAGER_MEMORY_SIZE = 80; @@ -78,7 +79,7 @@ public class TestBaseUtils { } protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int - taskManagerNumSlots) { + taskManagerNumSlots) throws Exception { Configuration config = new Configuration(); config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true); config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true); diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 1ebe9ebf6cd2c78c4ce5145f27c9bcea8bc637f9..649e09417e729b77371d698635649326923c72a4 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -24,8 +24,10 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager} -class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends -LocalFlinkMiniCluster(userConfiguration) { +class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean) + extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) { + + def this(userConfiguration: Configuration) = this(userConfiguration, true) override def generateConfiguration(userConfiguration: Configuration): Configuration = { val forNumberString = System.getProperty("forkNumber") @@ -68,7 +70,7 @@ LocalFlinkMiniCluster(userConfiguration) { } val localExecution = if(numTaskManagers == 1){ - false + true }else{ false } diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java index 9046d2def6d9fcb359e0e0ee034e4c9ac9ce9ac8..6928963b5ebf871aed9a951c7e10ca99cae4f23e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java @@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - cluster = new ForkableFlinkMiniCluster(config); + cluster = new ForkableFlinkMiniCluster(config, false); RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort()); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala index 104a4409724206af0691de724bfbfe7c3a5aae43..8e0f9c8b48cb6cd70fcd990d99b49ec08a5409af 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala @@ -45,7 +45,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB case 2 => Right(20) }) - val resultPath = tempFolder.newFile().toPath.toUri.toString + val resultPath = tempFolder.newFile().toURI.toString val result = eithers.map{ _ match { @@ -68,7 +68,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB case 2 => Left(20) }) - val resultPath = tempFolder.newFile().toPath.toUri.toString + val resultPath = tempFolder.newFile().toURI.toString val result = eithers.map(_ match { case Left(i) => i @@ -89,7 +89,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB case 2 => Right(20) }) - val resultPath = tempFolder.newFile().toPath.toUri.toString + val resultPath = tempFolder.newFile().toURI.toString val result = eithers.map(_ match { case Right(i) => i @@ -110,7 +110,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB case 2 => None }) - val resultPath = tempFolder.newFile().toPath.toUri.toString + val resultPath = tempFolder.newFile().toURI.toString val result = eithers.map(_ match { @@ -133,7 +133,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB case 2 => Some(20) }) - val resultPath = tempFolder.newFile().toPath.toUri.toString + val resultPath = tempFolder.newFile().toURI.toString val result = eithers.map(_ match { case Some(i) => i @@ -154,7 +154,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB case 2 => None }) - val resultPath = tempFolder.newFile().toPath.toUri.toString + val resultPath = tempFolder.newFile().toURI.toString val result = eithers.map(_ match { case None => 20 diff --git a/pom.xml b/pom.xml index d5a0c0a948b6e5ed47a84357313933c98f9e3b51..3cbfaa73190bf9b1820063a1cb8e7f7152ac8813 100644 --- a/pom.xml +++ b/pom.xml @@ -662,7 +662,6 @@ under the License. **/*.iml flink-quickstart/**/testArtifact/goal.txt - atlassian-ide-plugin.xml **/target/** docs/_site/**