[FLINK-20866][yarn] Set high-availability.cluster-id to application id if not configured

In order to easily connect to an HA enabled Yarn cluster, this commit sets the
high-availability.cluster-id to the application id if not configured. This is
symmetric to how we deploy HA enabled clusters and makes it unnecessary to
explicitly set the high-availability.cluster-id if one tries to reconnect
to the cluster.

This closes #14577.
上级 03ca3993
......@@ -194,7 +194,11 @@ Once a HA service is configured, it will persist JobManager metadata and perform
YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.
Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
Flink sets it per default to the YARN application id.
**You should not overwrite this parameter when deploying an HA cluster on YARN**.
The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
#### Container Shutdown Behaviour
......
......@@ -194,7 +194,11 @@ Once a HA service is configured, it will persist JobManager metadata and perform
YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.zh.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.
Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
Flink sets it per default to the YARN application id.
**You should not overwrite this parameter when deploying an HA cluster on YARN**.
The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
#### Container Shutdown Behaviour
......
......@@ -87,6 +87,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
......@@ -209,6 +210,41 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
});
}
/**
* Tests that we can retrieve an HA enabled cluster by only specifying the application id if no
* other high-availability.cluster-id has been configured. See FLINK-20866.
*/
@Test
public void testClusterClientRetrieval() throws Exception {
runTest(
() -> {
final YarnClusterDescriptor yarnClusterDescriptor =
setupYarnClusterDescriptor();
final RestClusterClient<ApplicationId> restClusterClient =
deploySessionCluster(yarnClusterDescriptor);
ClusterClient<ApplicationId> newClusterClient = null;
try {
final ApplicationId clusterId = restClusterClient.getClusterId();
final YarnClusterDescriptor newClusterDescriptor =
setupYarnClusterDescriptor();
newClusterClient =
newClusterDescriptor.retrieve(clusterId).getClusterClient();
assertThat(newClusterClient.listJobs().join(), is(empty()));
newClusterClient.shutDownCluster();
} finally {
restClusterClient.close();
if (newClusterClient != null) {
newClusterClient.close();
}
}
});
}
private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId)
throws Exception {
final YarnClient yarnClient = getYarnClient();
......
......@@ -1767,11 +1767,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
checkNotNull(report);
final ApplicationId clusterId = report.getApplicationId();
final ApplicationId appId = report.getApplicationId();
final String host = report.getHost();
final int port = report.getRpcPort();
LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId);
LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, appId);
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
......@@ -1779,8 +1779,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
flinkConfiguration.set(
YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId));
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));
// set cluster-id to app id if not specified
if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
flinkConfiguration.set(
HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
}
}
public static void logDetachedClusterInformation(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册