提交 fb48dc2f 编写于 作者: Z zjureel 提交者: zentol

[FLINK-7099] Replace usages of deprecated JOB_MANAGER_IPC_PORT_KEY and JOB_MANAGER_IPC_ADDRESS_KEY

This closes #4278.
上级 d50076f5
......@@ -44,6 +44,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
......@@ -1146,8 +1147,8 @@ public class CliFrontend {
* @param config The config to write to
*/
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostString());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
config.setString(JobManagerOptions.ADDRESS, address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
}
// --------------------------------------------------------------------------------------------
......
......@@ -25,8 +25,8 @@ import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
......@@ -108,8 +108,8 @@ public class RemoteExecutor extends PlanExecutor {
this.jarFiles = jarFiles;
this.globalClasspaths = globalClasspaths;
clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
}
// ------------------------------------------------------------------------
......
......@@ -19,8 +19,8 @@
package org.apache.flink.client.deployment;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
/**
* A deployment descriptor for an existing cluster.
......@@ -35,8 +35,8 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
@Override
public String getClusterDescription() {
String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "");
int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
String host = config.getString(JobManagerOptions.ADDRESS, "");
int port = config.getInteger(JobManagerOptions.PORT, -1);
return "Standalone cluster at " + host + ":" + port;
}
......
......@@ -18,8 +18,8 @@
package org.apache.flink.client;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import java.io.File;
import java.io.FileNotFoundException;
......@@ -75,8 +75,8 @@ public class CliFrontendTestUtils {
}
public static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
assertEquals(expectedAddress, jobManagerAddress);
assertEquals(expectedPort, jobManagerPort);
......
......@@ -19,8 +19,8 @@
package org.apache.flink.client.program;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClientActorTest;
......@@ -98,8 +98,8 @@ public class ClientConnectionTest extends TestLogger {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
config.setString(JobManagerOptions.ADDRESS, unreachableEndpoint.getHostName());
config.setInteger(JobManagerOptions.PORT, unreachableEndpoint.getPort());
ClusterClient client = new StandaloneClusterClient(config);
......
......@@ -31,8 +31,8 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
......@@ -96,8 +96,8 @@ public class ClientTest extends TestLogger {
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, freePort);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
try {
......
......@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
......@@ -56,8 +56,8 @@ public class ExecutionPlanCreationTest {
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, mockJmAddress.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort());
config.setString(JobManagerOptions.ADDRESS, mockJmAddress.getHostName());
config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort());
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);
......
......@@ -18,8 +18,8 @@
package org.apache.flink.client.program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.util.StandaloneUtils;
import org.apache.flink.util.ConfigurationException;
......@@ -54,8 +54,8 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
......@@ -72,8 +72,8 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
......
......@@ -22,6 +22,7 @@ import org.apache.flink.api.avro.testjar.AvroExternalJarProgram;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
......@@ -66,8 +67,8 @@ public class AvroExternalJarProgramITCase extends TestLogger {
Collections.singleton(new Path(jarFile)),
Collections.<URL>emptyList());
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
program.invokeInteractiveModeForExecution();
}
......
......@@ -25,9 +25,9 @@ import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
......@@ -202,8 +202,8 @@ public class FlinkClient {
jobGraph.addJar(new Path(uploadedJarUri));
final Configuration configuration = jobGraph.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
configuration.setString(JobManagerOptions.ADDRESS, jobManagerHost);
configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
final ClusterClient client;
try {
......@@ -242,8 +242,8 @@ public class FlinkClient {
}
final Configuration configuration = GlobalConfiguration.loadConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
configuration.setString(JobManagerOptions.ADDRESS, this.jobManagerHost);
configuration.setInteger(JobManagerOptions.PORT, this.jobManagerPort);
final ClusterClient client;
try {
......
......@@ -19,9 +19,9 @@ package org.apache.flink.storm.api;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
......@@ -91,12 +91,11 @@ public class FlinkSubmitter {
final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
stormConf.put(Config.NIMBUS_HOST,
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
flinkConfig.getString(JobManagerOptions.ADDRESS, "localhost"));
}
if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
stormConf.put(Config.NIMBUS_THRIFT_PORT,
new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
6123)));
new Integer(flinkConfig.getInteger(JobManagerOptions.PORT)));
}
final String serConf = JSONValue.toJSONString(stormConf);
......
......@@ -246,10 +246,9 @@ public class MesosApplicationMasterRunner {
taskManagerParameters.cpus());
// JM endpoint, which should be explicitly configured based on acquired net resources
final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
final int listeningPort = config.getInteger(JobManagerOptions.PORT);
checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and 65536");
JobManagerOptions.PORT.key() + "\" is invalid, it must be between 0 and 65536");
// ----------------- (2) start the actor system -------------------
......
......@@ -185,8 +185,8 @@ public class BootstrapTools {
// this ensures correct values are present in the web frontend
final Address address = AkkaUtils.getAddress(actorSystem);
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host().get());
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port().get().toString());
config.setString(JobManagerOptions.ADDRESS, address.host().get());
config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString()));
if (config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
logger.info("Starting JobManager Web Frontend");
......@@ -228,8 +228,8 @@ public class BootstrapTools {
Configuration cfg = baseConfig.clone();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHostname);
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, registrationTimeout.toString());
if (numSlots != -1){
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.minicluster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
......@@ -133,7 +134,7 @@ public class MiniClusterConfiguration {
public String getJobManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.getString(JobManagerOptions.ADDRESS, "localhost");
}
public String getTaskManagerBindAddress() {
......@@ -145,7 +146,7 @@ public class MiniClusterConfiguration {
public String getResourceManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
config.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
}
public Time getRpcTimeout() {
......
......@@ -152,7 +152,7 @@ class JobManager(
protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
case Some(registry) =>
val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS)
Option(new JobManagerMetricGroup(
registry, NetUtils.unresolvedHostToNormalizedString(host)))
case None =>
......@@ -1956,7 +1956,7 @@ object JobManager {
// if it is not in there, the actor system will bind to the loopback interface's
// address and will not be reachable from anyone remote
if (externalHostName == null) {
val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
val message = "Config parameter '" + JobManagerOptions.ADDRESS.key() +
"' is missing (hostname/address to bind JobManager to)."
LOG.error(message)
System.exit(STARTUP_FAILURE_RETURN_CODE)
......@@ -1970,7 +1970,7 @@ object JobManager {
System.exit(STARTUP_FAILURE_RETURN_CODE)
}
else {
val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
val message = s"Config parameter '" + JobManagerOptions.ADDRESS.key() +
"' does not specify a valid port."
LOG.error(message)
System.exit(STARTUP_FAILURE_RETURN_CODE)
......@@ -2181,8 +2181,8 @@ object JobManager {
val address = AkkaUtils.getAddress(jobManagerSystem)
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
configuration.setString(JobManagerOptions.ADDRESS, address.host.get)
configuration.setInteger(JobManagerOptions.PORT, address.port.get)
jobManagerSystem
}
......@@ -2399,17 +2399,17 @@ object JobManager {
}
if (cliOptions.getHost() != null) {
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, cliOptions.getHost())
configuration.setString(JobManagerOptions.ADDRESS, cliOptions.getHost())
}
val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
val host = configuration.getString(JobManagerOptions.ADDRESS)
val portRange =
// high availability mode
if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
LOG.info("Starting JobManager with high-availability")
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
configuration.setInteger(JobManagerOptions.PORT, 0)
// The port range of allowed job manager ports or 0 for random
configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE)
......@@ -2418,12 +2418,10 @@ object JobManager {
LOG.info("Starting JobManager without high-availability")
// In standalone mode, we don't allow port ranges
val listeningPort = configuration.getInteger(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val listeningPort = configuration.getInteger(JobManagerOptions.PORT)
if (listeningPort <= 0 || listeningPort >= 65536) {
val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
val message = "Config parameter '" + JobManagerOptions.PORT.key() +
"' is invalid, it must be greater than 0 and less than 65536."
LOG.error(message)
System.exit(STARTUP_FAILURE_RETURN_CODE)
......
......@@ -72,7 +72,7 @@ abstract class FlinkMiniCluster(
// NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
val hostname = userConfiguration.getString(
ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
JobManagerOptions.ADDRESS,
"localhost")
protected val originalConfiguration = generateConfiguration(userConfiguration)
......@@ -129,14 +129,12 @@ abstract class FlinkMiniCluster(
}
def configuration: Configuration = {
if (originalConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
if (originalConfiguration.getInteger(JobManagerOptions.PORT) == 0) {
val leaderConfiguration = new Configuration(originalConfiguration)
val leaderPort = getLeaderRPCPort
leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
leaderConfiguration.setInteger(JobManagerOptions.PORT, leaderPort)
leaderConfiguration
} else {
......@@ -241,8 +239,7 @@ abstract class FlinkMiniCluster(
AkkaUtils.getAkkaConfig(originalConfiguration, None)
}
else {
val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val port = originalConfiguration.getInteger(JobManagerOptions.PORT)
val resolvedPort = if(port != 0) port + index else port
......
......@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions, TaskManagerOptions}
import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
......@@ -125,12 +125,10 @@ class LocalFlinkMiniCluster(
val jobManagerName = getJobManagerName(index)
val archiveName = getArchiveName(index)
val jobManagerPort = config.getInteger(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val jobManagerPort = config.getInteger(JobManagerOptions.PORT)
if(jobManagerPort > 0) {
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
config.setInteger(JobManagerOptions.PORT, jobManagerPort + index)
}
val (instanceManager,
......@@ -389,8 +387,8 @@ class LocalFlinkMiniCluster(
def getDefaultConfig: Configuration = {
val config: Configuration = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setString(JobManagerOptions.ADDRESS, hostname)
config.setInteger(JobManagerOptions.PORT, 0)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
......
......@@ -19,8 +19,8 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
......@@ -78,8 +78,8 @@ public class JobSubmitTest {
int port = NetUtils.getAvailablePort();
jmConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
jmConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
jmConfig.setInteger(JobManagerOptions.PORT, port);
scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, listeningAddress);
......
......@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.testutils.CommonTestUtils;
......@@ -54,8 +55,8 @@ public class TaskManagerConfigurationTest {
Configuration config = new Configuration();
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
......@@ -82,8 +83,8 @@ public class TaskManagerConfigurationTest {
// config with pre-configured hostname to speed up tests (no interface selection)
Configuration config = new Configuration();
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
......@@ -184,8 +185,8 @@ public class TaskManagerConfigurationTest {
// open a server port to allow the system to connect
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
config.setString(JobManagerOptions.ADDRESS, hostname);
config.setInteger(JobManagerOptions.PORT, server.getLocalPort());
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
......
......@@ -240,8 +240,8 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
int taskManagerPort = Integer.parseInt(args[1]);
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
......
......@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
......@@ -142,8 +143,8 @@ public class TaskManagerStartupTest {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, 21656);
try {
TaskManager.runTaskManager(
......@@ -184,8 +185,8 @@ public class TaskManagerStartupTest {
public void testMemoryConfigWrong() {
try {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, 21656);
cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
// something invalid
......
......@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, SecurityOptions}
import org.apache.flink.configuration._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
......@@ -49,7 +49,7 @@ class AkkaSslITCase(_system: ActorSystem)
"start with akka ssl enabled" in {
val config = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
......@@ -76,7 +76,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {
val config = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
......
......@@ -26,7 +26,7 @@ import akka.pattern.Patterns._
import akka.pattern.ask
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
......@@ -211,11 +211,11 @@ class TestingCluster(
// restart the leading job manager with the same port
val port = getLeaderRPCPort
val oldPort = originalConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
JobManagerOptions.PORT,
0)
// we have to set the old port in the configuration file because this is used for startup
originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
originalConfiguration.setInteger(JobManagerOptions.PORT, port)
clearLeader()
......@@ -234,7 +234,7 @@ class TestingCluster(
}
// reset the original configuration
originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
originalConfiguration.setInteger(JobManagerOptions.PORT, oldPort)
val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
......
......@@ -26,7 +26,7 @@ import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.client.CliFrontend
import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, JobManagerOptions}
import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster}
import scala.collection.mutable.ArrayBuffer
......@@ -145,7 +145,7 @@ object FlinkShell {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
val config = GlobalConfiguration.loadConfiguration()
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setInteger(JobManagerOptions.PORT, 0)
val miniCluster = new StandaloneMiniCluster(config)
......
......@@ -25,8 +25,8 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
......@@ -196,8 +196,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
Configuration configuration = new Configuration();
configuration.addAll(this.clientConfiguration);
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
configuration.setString(JobManagerOptions.ADDRESS, host);
configuration.setInteger(JobManagerOptions.PORT, port);
ClusterClient client;
try {
......
......@@ -405,8 +405,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
int jobManagerPort = Integer.parseInt(args[0]);
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
......
......@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
......@@ -70,7 +71,7 @@ public class IPv6HostnamesITCase extends TestLogger {
log.info("Test will use IPv6 address " + addressString + " for connection tests");
Configuration conf = new Configuration();
conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
conf.setString(JobManagerOptions.ADDRESS, addressString);
conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
......
......@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
......@@ -428,8 +429,8 @@ public class CliFrontendYarnAddressConfigurationTest {
}
private static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
assertEquals(expectedAddress, jobManagerAddress);
assertEquals(expectedPort, jobManagerPort);
......
......@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
......@@ -202,9 +203,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Extracted hostname:port: {} {}", hostname, port);
Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
parsedConfig.get(JobManagerOptions.ADDRESS.key()));
Assert.assertEquals("unable to find port in " + jsonConfig, port,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
parsedConfig.get(JobManagerOptions.PORT.key()));
// test logfile access
String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
......
......@@ -416,8 +416,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
appReport.getHost(), appReport.getRpcPort(), applicationID);
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
} catch (Exception e) {
......@@ -591,8 +591,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
int port = report.getRpcPort();
// Correctly initialize the Flink config
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册