提交 c93d9eaf 编写于 作者: T Till Rohrmann

Add option to use single actor system for local execution. Use local...

Add option to use single actor system for local execution. Use local connection manager if a single task manager is used for local execution. Remove synchronized blcok in getReceiverList of ChannelManager which effectively serialized the connection lookup calls of a single task manager.

Fix Java6 problem that File has no method toPath
上级 a0f77857
......@@ -18,4 +18,3 @@ tmp
.DS_Store
_site
docs/api
atlassian-ide-plugin.xml
......@@ -44,7 +44,7 @@ public class AvroExternalJarProgramITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
testMiniCluster = new ForkableFlinkMiniCluster(config);
testMiniCluster = new ForkableFlinkMiniCluster(config, false);
String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
......
......@@ -57,7 +57,7 @@ public class ClusterUtil {
}
try {
exec = new LocalFlinkMiniCluster(configuration);
exec = new LocalFlinkMiniCluster(configuration, true);
Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
configuration, ClusterUtil.class.getClassLoader());
......
......@@ -85,7 +85,8 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
writeYarnProperties(address)
jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout))
jobManager = Some(AkkaUtils.getReference(JobManager.getRemoteAkkaURL(address))(system,
timeout))
jobManager.get ! RegisterMessageListener
pollingTimer foreach {
......
......@@ -32,7 +32,7 @@ object YarnUtils {
AkkaUtils.createActorSystem(akkaConfig)
}
def createActorSystem: ActorSystem = {
def createActorSystem(): ActorSystem = {
val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
getConfigString)
......@@ -40,12 +40,31 @@ object YarnUtils {
}
def getConfigString: String = {
s"""akka.loglevel = "INFO"
|akka.stdout-loglevel = "INFO"
|akka.log-dead-letters-during-shutdown = off
|akka.log-dead-letters = off
|akka.remote.log-remote-lifecycle-events=off
|""".stripMargin
"""
|akka{
| loglevel = "INFO"
| stdout-loglevel = "INFO"
| log-dead-letters-during-shutdown = off
| log-dead-letters = off
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
| remote{
| log-remote-lifecycle-events = off
|
| netty{
| tcp{
| port = 0
| transport-class = "akka.remote.transport.netty.NettyTransport"
| tcp-nodelay = on
| maximum-frame-size = 1MB
| execution-pool-size = 4
| }
| }
| }
|}""".stripMargin
}
def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = {
......
......@@ -100,7 +100,7 @@ public class LocalExecutor extends PlanExecutor {
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
// start it up
this.flink = new LocalFlinkMiniCluster(configuration);
this.flink = new LocalFlinkMiniCluster(configuration, true);
} else {
throw new IllegalStateException("The local executor was already started.");
}
......
......@@ -380,15 +380,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
while (true) {
ConnectionInfoLookupResponse lookupResponse;
synchronized (this.channelLookup) {
try{
lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
sourceChannelID), timeout).response();
}catch(IOException ioe) {
throw ioe;
}
}
lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
sourceChannelID), timeout).response();
if (lookupResponse.receiverReady()) {
receiverList = new EnvelopeReceiverList(lookupResponse);
......
......@@ -44,6 +44,10 @@ object AkkaUtils {
createActorSystem(getDefaultActorSystemConfig)
}
def createLocalActorSystem(): ActorSystem = {
createActorSystem(getDefaultLocalActorSystemConfig)
}
def createActorSystem(akkaConfig: Config): ActorSystem = {
ActorSystem.create("flink", akkaConfig)
}
......@@ -133,20 +137,47 @@ object AkkaUtils {
getDefaultActorSystemConfigString + configString
}
def getLocalConfigString(configuration: Configuration): String = {
val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
val configString =
s"""
|akka {
| loglevel = $logLevel
| stdout-loglevel = $logLevel
|
| log-dead-letters = $logLifecycleEvents
| log-dead-letters-during-shutdown = $logLifecycleEvents
|
| actor{
| default-dispatcher{
| executor = "default-executor"
|
| throughput = ${akkaThroughput}
|
| fork-join-executor {
| parallelism-factor = 2.0
| }
| }
| }
|
|}
""".stripMargin
getDefaultLocalActorSystemConfigString + configString
}
def getDefaultActorSystemConfigString: String = {
"""
val config = """
|akka {
| daemonic = on
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logger-startup-timeout = 30s
| loglevel = "WARNING"
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| stdout-loglevel = "WARNING"
| jvm-exit-on-fatal-error = off
| log-config-on-start = on
| serialize-messages = on
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
......@@ -164,7 +195,26 @@ object AkkaUtils {
| }
|}
""".stripMargin
}
getDefaultLocalActorSystemConfigString + config
}
def getDefaultLocalActorSystemConfigString: String = {
"""
|akka {
| daemonic = on
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logger-startup-timeout = 30s
| loglevel = "DEBUG"
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| stdout-loglevel = "DEBUG"
| jvm-exit-on-fatal-error = off
| log-config-on-start = off
| serialize-messages = on
|}
""".stripMargin
}
// scalastyle:off line.size.limit
......@@ -347,6 +397,10 @@ object AkkaUtils {
ConfigFactory.parseString(getDefaultActorSystemConfigString)
}
def getDefaultLocalActorSystemConfig = {
ConfigFactory.parseString(getDefaultLocalActorSystemConfigString)
}
def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout:
FiniteDuration): ActorRef = {
Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout)
......
......@@ -107,7 +107,7 @@ object JobClient{
"configuration.")
}
JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
}
}
......
......@@ -58,7 +58,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
Execution.timeout = timeout;
log.info("Starting job manager.")
log.info(s"Starting job manager at ${self.path}.")
val (archiveCount,
profiling,
......@@ -520,7 +520,7 @@ object JobManager {
actorSystem.actorOf(props, JOB_MANAGER_NAME)
}
def getAkkaURL(address: String): String = {
def getRemoteAkkaURL(address: String): String = {
s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
}
......@@ -541,6 +541,6 @@ object JobManager {
def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout:
FiniteDuration): ActorRef = {
AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort))
AkkaUtils.getReference(getRemoteAkkaURL(address.getHostName + ":" + address.getPort))
}
}
......@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.{ConfigFactory, Config}
import com.typesafe.config.{ConfigFactory}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
......@@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Await}
abstract class FlinkMiniCluster(userConfiguration: Configuration) {
abstract class FlinkMiniCluster(userConfiguration: Configuration,
val singleActorSystem: Boolean) {
import FlinkMiniCluster._
val HOSTNAME = "localhost"
......@@ -41,6 +42,10 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
val configuration = generateConfiguration(userConfiguration)
if(singleActorSystem){
configuration.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
}
val jobManagerActorSystem = startJobManagerActorSystem()
val jobManagerActor = startJobManager(jobManagerActorSystem)
......@@ -48,7 +53,13 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield {
val actorSystem = startTaskManagerActorSystem(i)
val actorSystem = if(singleActorSystem) {
jobManagerActorSystem
}
else{
startTaskManagerActorSystem(i)
}
(actorSystem, startTaskManager(i)(actorSystem))
}
......@@ -66,7 +77,12 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_JOB_MANAGER_IPC_PORT)
AkkaUtils.getConfigString(HOSTNAME, port, configuration)
if(singleActorSystem){
AkkaUtils.getLocalConfigString(configuration)
}else{
AkkaUtils.getConfigString(HOSTNAME, port, configuration)
}
}
def startJobManagerActorSystem(): ActorSystem = {
......@@ -111,13 +127,23 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
}
def shutdown(): Unit = {
taskManagerActorSystems foreach { _.shutdown() }
if(!singleActorSystem){
taskManagerActorSystems foreach {
_.shutdown()
}
}
jobManagerActorSystem.shutdown()
}
def awaitTermination(): Unit = {
jobManagerActorSystem.awaitTermination()
taskManagerActorSystems foreach { _.awaitTermination()}
if(!singleActorSystem) {
taskManagerActorSystems foreach {
_.awaitTermination()
}
}
}
def waitForTaskManagersToBeRegistered(): Unit = {
......
......@@ -28,11 +28,16 @@ import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
FlinkMiniCluster(userConfiguration){
class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
extends FlinkMiniCluster(userConfiguration, singleActorSystem){
import LocalFlinkMiniCluster._
val jobClientActorSystem = AkkaUtils.createActorSystem()
val jobClientActorSystem = if(singleActorSystem){
jobManagerActorSystem
}else{
AkkaUtils.createActorSystem()
}
var jobClient: Option[ActorRef] = None
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
......@@ -70,7 +75,7 @@ FlinkMiniCluster(userConfiguration){
}
val localExecution = if(numTaskManagers == 1){
false
true
}else{
false
}
......@@ -87,6 +92,10 @@ FlinkMiniCluster(userConfiguration){
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
if(singleActorSystem){
config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
}
val jc = JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
jobClient = Some(jc)
jc
......@@ -101,11 +110,16 @@ FlinkMiniCluster(userConfiguration){
override def shutdown(): Unit = {
super.shutdown()
jobClientActorSystem.shutdown()
if(!singleActorSystem) {
jobClientActorSystem.shutdown()
}
}
override def awaitTermination(): Unit = {
jobClientActorSystem.awaitTermination()
if(!singleActorSystem) {
jobClientActorSystem.awaitTermination()
}
super.awaitTermination()
}
......
......@@ -73,7 +73,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
import taskManagerConfig.{timeout => tmTimeout, _}
implicit val timeout = tmTimeout
log.info(s"Starting task manager at ${self.path}.")
val REGISTRATION_DELAY = 0 seconds
......@@ -558,7 +557,7 @@ object TaskManager {
"configuration.")
}
JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
}
val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
......@@ -667,10 +666,6 @@ object TaskManager {
system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, reportInterval), PROFILER_NAME)
}
def getAkkaURL(address: String): String = {
s"akka.tcp://flink@${address}/user/taskmanager"
}
def checkTempDirs(tmpDirs: Array[String]): Unit = {
tmpDirs.zipWithIndex.foreach {
case (dir: String, _) =>
......
......@@ -26,7 +26,9 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration) {
class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration,
true) {
override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
......
......@@ -100,14 +100,14 @@ object TestingUtils {
networkConnectionConfig) with TestingTaskManager))
}
def startTestingCluster(numSlots: Int, numTaskManagers: Int = 1): FlinkMiniCluster = {
def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT):
FlinkMiniCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs)
config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000)
config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT)
val cluster = new TestingCluster(config)
cluster
config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
new TestingCluster(config)
}
def setGlobalExecutionContext(): Unit = {
......
......@@ -58,7 +58,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
// Local Test Cluster Life Cycle
// --------------------------------------------------------------------------------------------
public void startCluster(){
public void startCluster() throws Exception{
this.executor = startCluster(numTaskManagers, taskManagerNumSlots);
}
......
......@@ -46,7 +46,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
@BeforeClass
public static void setup() throws Exception{
cluster = TestBaseUtils.startCluster(1, 4);
}
@AfterClass
......
......@@ -52,6 +52,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
public class TestBaseUtils {
protected static final int MINIMUM_HEAP_SIZE_MB = 192;
protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
......@@ -78,7 +79,7 @@ public class TestBaseUtils {
}
protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
taskManagerNumSlots) {
taskManagerNumSlots) throws Exception {
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
......
......@@ -24,8 +24,10 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
LocalFlinkMiniCluster(userConfiguration) {
class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
def this(userConfiguration: Configuration) = this(userConfiguration, true)
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")
......@@ -68,7 +70,7 @@ LocalFlinkMiniCluster(userConfiguration) {
}
val localExecution = if(numTaskManagers == 1){
false
true
}else{
false
}
......
......@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
cluster = new ForkableFlinkMiniCluster(config);
cluster = new ForkableFlinkMiniCluster(config, false);
RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
......
......@@ -45,7 +45,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
case 2 => Right(20)
})
val resultPath = tempFolder.newFile().toPath.toUri.toString
val resultPath = tempFolder.newFile().toURI.toString
val result = eithers.map{
_ match {
......@@ -68,7 +68,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
case 2 => Left(20)
})
val resultPath = tempFolder.newFile().toPath.toUri.toString
val resultPath = tempFolder.newFile().toURI.toString
val result = eithers.map(_ match {
case Left(i) => i
......@@ -89,7 +89,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
case 2 => Right(20)
})
val resultPath = tempFolder.newFile().toPath.toUri.toString
val resultPath = tempFolder.newFile().toURI.toString
val result = eithers.map(_ match {
case Right(i) => i
......@@ -110,7 +110,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
case 2 => None
})
val resultPath = tempFolder.newFile().toPath.toUri.toString
val resultPath = tempFolder.newFile().toURI.toString
val result = eithers.map(_ match {
......@@ -133,7 +133,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
case 2 => Some(20)
})
val resultPath = tempFolder.newFile().toPath.toUri.toString
val resultPath = tempFolder.newFile().toURI.toString
val result = eithers.map(_ match {
case Some(i) => i
......@@ -154,7 +154,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
case 2 => None
})
val resultPath = tempFolder.newFile().toPath.toUri.toString
val resultPath = tempFolder.newFile().toURI.toString
val result = eithers.map(_ match {
case None => 20
......
......@@ -662,7 +662,6 @@ under the License.
<!-- Build files -->
<exclude>**/*.iml</exclude>
<exclude>flink-quickstart/**/testArtifact/goal.txt</exclude>
<exclude>atlassian-ide-plugin.xml</exclude>
<!-- Generated content -->
<exclude>**/target/**</exclude>
<exclude>docs/_site/**</exclude>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册