未验证 提交 fb4286f3 编写于 作者: B blublinsky 提交者: Xintong Song

[FLINK-20664][k8s] Support setting service account for TaskManager pod.

上级 3b8998d5
......@@ -112,9 +112,9 @@
</tr>
<tr>
<td><h5>kubernetes.jobmanager.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server.</td>
<td>Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server. If not explicitly configured, config option 'kubernetes.service-account' will be used.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.tolerations</h5></td>
......@@ -146,6 +146,12 @@
<td>Map</td>
<td>The user-specified secrets that will be mounted into Flink container. The value should be in the form of <span markdown="span">`foo:/opt/secrets-foo,bar:/opt/secrets-bar`</span>.</td>
</tr>
<tr>
<td><h5>kubernetes.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
<td>String</td>
<td>Service account that is used by jobmanager and taskmanager within kubernetes cluster. Notice that this can be overwritten by config options 'kubernetes.jobmanager.service-account' and 'kubernetes.taskmanager.service-account' for jobmanager and taskmanager respectively.</td>
</tr>
<tr>
<td><h5>kubernetes.taskmanager.annotations</h5></td>
<td style="word-wrap: break-word;">(none)</td>
......@@ -170,6 +176,12 @@
<td>Map</td>
<td>The node selector to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.</td>
</tr>
<tr>
<td><h5>kubernetes.taskmanager.service-account</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Service account that is used by taskmanager within kubernetes cluster. The task manager uses this service account when watching config maps on the API server to retrieve leader address of jobmanager and resourcemanager. If not explicitly configured, config option 'kubernetes.service-account' will be used.</td>
</tr>
<tr>
<td><h5>kubernetes.taskmanager.tolerations</h5></td>
<td style="word-wrap: break-word;">(none)</td>
......
......@@ -38,6 +38,8 @@ import static org.apache.flink.configuration.description.TextElement.code;
@PublicEvolving
public class KubernetesConfigOptions {
private static final String KUBERNETES_SERVICE_ACCOUNT_KEY = "kubernetes.service-account";
public static final ConfigOption<String> CONTEXT =
key("kubernetes.context")
.stringType()
......@@ -56,9 +58,27 @@ public class KubernetesConfigOptions {
public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.jobmanager.service-account")
.stringType()
.defaultValue("default")
.noDefaultValue()
.withDescription("Service account that is used by jobmanager within kubernetes cluster. " +
"The job manager uses this service account when requesting taskmanager pods from the API server.");
"The job manager uses this service account when requesting taskmanager pods from the API server. " +
"If not explicitly configured, config option '" + KUBERNETES_SERVICE_ACCOUNT_KEY + "' will be used.");
public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.taskmanager.service-account")
.stringType()
.noDefaultValue()
.withDescription("Service account that is used by taskmanager within kubernetes cluster. " +
"The task manager uses this service account when watching config maps on the API server to retrieve " +
"leader address of jobmanager and resourcemanager. If not explicitly configured, config option '" +
KUBERNETES_SERVICE_ACCOUNT_KEY + "' will be used.");
public static final ConfigOption<String> KUBERNETES_SERVICE_ACCOUNT =
key(KUBERNETES_SERVICE_ACCOUNT_KEY)
.stringType()
.defaultValue("default")
.withDescription("Service account that is used by jobmanager and taskmanager within kubernetes cluster. " +
"Notice that this can be overwritten by config options '" + JOB_MANAGER_SERVICE_ACCOUNT.key() +
"' and '" + TASK_MANAGER_SERVICE_ACCOUNT.key() + "' for jobmanager and taskmanager respectively.");
public static final ConfigOption<Double> JOB_MANAGER_CPU =
key("kubernetes.jobmanager.cpu")
......
......@@ -58,6 +58,7 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
.withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
.endMetadata()
.editOrNewSpec()
.withServiceAccountName(kubernetesTaskManagerParameters.getServiceAccount())
.withRestartPolicy(Constants.RESTART_POLICY_OF_NEVER)
.withImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets())
.withNodeSelector(kubernetesTaskManagerParameters.getNodeSelector())
......
......@@ -120,7 +120,8 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters
}
public String getServiceAccount() {
return flinkConfig.getString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT);
return flinkConfig.getOptional(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT)
.orElse(flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
}
public String getEntrypointClass() {
......
......@@ -106,6 +106,12 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter
return containeredTaskManagerParameters.getTaskExecutorProcessSpec().getCpuCores().getValue().doubleValue();
}
public String getServiceAccount() {
return flinkConfig.getOptional(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT)
.orElse(flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
}
public Map<String, Long> getTaskManagerExternalResources() {
return taskManagerExternalResources;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
import io.fabric8.kubernetes.api.model.Pod;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link InitJobManagerDecorator} decorating service account.
*/
public class InitJobManagerDecoratorAccountTest extends KubernetesJobManagerTestBase {
private static final String SERVICE_ACCOUNT_NAME = "service-test";
private static final String JOB_MANGER_SERVICE_ACCOUNT_NAME = "jm-service-test";
private Pod resultPod;
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, JOB_MANGER_SERVICE_ACCOUNT_NAME);
}
@Override
protected void onSetup() throws Exception {
super.onSetup();
final InitJobManagerDecorator initJobManagerDecorator =
new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
final FlinkPod resultFlinkPod = initJobManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
this.resultPod = resultFlinkPod.getPod();
}
@Test
public void testPodServiceAccountName() {
assertThat(this.resultPod.getSpec().getServiceAccountName(), is(JOB_MANGER_SERVICE_ACCOUNT_NAME));
}
}
......@@ -74,7 +74,7 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase;
import io.fabric8.kubernetes.api.model.Pod;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link InitTaskManagerDecorator} decorating service account.
*/
public class InitTaskManagerDecoratorAccountTest extends KubernetesTaskManagerTestBase {
private static final String SERVICE_ACCOUNT_NAME = "service-test";
private static final String TASK_MANAGER_SERVICE_ACCOUNT_NAME = "tm-service-test";
private Pod resultPod;
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, TASK_MANAGER_SERVICE_ACCOUNT_NAME);
}
@Override
protected void onSetup() throws Exception {
super.onSetup();
final InitTaskManagerDecorator initTaskManagerDecorator =
new InitTaskManagerDecorator(kubernetesTaskManagerParameters);
final FlinkPod resultFlinkPod = initTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
this.resultPod = resultFlinkPod.getPod();
}
@Test
public void testPodServiceAccountName() {
assertThat(this.resultPod.getSpec().getServiceAccountName(), is(TASK_MANAGER_SERVICE_ACCOUNT_NAME));
}
}
......@@ -53,6 +53,7 @@ import static org.junit.Assert.assertThat;
*/
public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase {
private static final String SERVICE_ACCOUNT_NAME = "service-test";
private static final List<String> IMAGE_PULL_SECRETS = Arrays.asList("s1", "s2", "s3");
private static final Map<String, String> ANNOTATIONS = new HashMap<String, String>() {
{
......@@ -77,6 +78,7 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
......@@ -187,6 +189,11 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
assertThat(resultAnnotations, is(equalTo(ANNOTATIONS)));
}
@Test
public void testPodServiceAccountName() {
assertThat(this.resultPod.getSpec().getServiceAccountName(), is(SERVICE_ACCOUNT_NAME));
}
@Test
public void testRestartPolicy() {
final String resultRestartPolicy = this.resultPod.getSpec().getRestartPolicy();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册