FlinkArgsUtils.java 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.dolphinscheduler.server.utils;
19

Q
qiaozhanwei 已提交
20 21
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
22
import org.apache.dolphinscheduler.common.process.ResourceInfo;
Q
qiaozhanwei 已提交
23
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
24
import org.apache.dolphinscheduler.common.utils.StringUtils;
25 26 27 28 29

import java.util.ArrayList;
import java.util.List;

/**
30
 * flink args utils
31 32
 */
public class FlinkArgsUtils {
33

34
    private static final String LOCAL_DEPLOY_MODE = "local";
35

36 37
    private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";

38 39 40 41
    private FlinkArgsUtils() {
        throw new IllegalStateException("Utility class");
    }

42 43 44 45 46
    /**
     * build args
     * @param param flink parameters
     * @return argument list
     */
47 48
    public static List<String> buildArgs(FlinkParameters param) {
        List<String> args = new ArrayList<>();
49

50
        String deployMode = "cluster";
51 52 53
        String tmpDeployMode = param.getDeployMode();
        if (StringUtils.isNotEmpty(tmpDeployMode)) {
            deployMode = tmpDeployMode;
54
        }
55
        String others = param.getOthers();
56
        if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
57
            args.add(Constants.FLINK_RUN_MODE);  //-m
58

59
            args.add(Constants.FLINK_YARN_CLUSTER);   //yarn-cluster
60

61
            int slot = param.getSlot();
62
            if (slot > 0) {
63
                args.add(Constants.FLINK_YARN_SLOT);
64
                args.add(String.format("%d", slot));   //-ys
65
            }
66

67 68
            String appName = param.getAppName();
            if (StringUtils.isNotEmpty(appName)) { //-ynm
69
                args.add(Constants.FLINK_APP_NAME);
70
                args.add(ArgsUtils.escape(appName));
71
            }
72

73
            // judge flink version, the parameter -yn has removed from flink 1.10
74
            String flinkVersion = param.getFlinkVersion();
75
            if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
76
                int taskManager = param.getTaskManager();
77
                if (taskManager > 0) {                        //-yn
78 79 80
                    args.add(Constants.FLINK_TASK_MANAGE);
                    args.add(String.format("%d", taskManager));
                }
81
            }
82 83
            String jobManagerMemory = param.getJobManagerMemory();
            if (StringUtils.isNotEmpty(jobManagerMemory)) {
84
                args.add(Constants.FLINK_JOB_MANAGE_MEM);
85
                args.add(jobManagerMemory); //-yjm
86
            }
87

88 89
            String taskManagerMemory = param.getTaskManagerMemory();
            if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
90
                args.add(Constants.FLINK_TASK_MANAGE_MEM);
91
                args.add(taskManagerMemory);
92 93
            }

94 95 96 97 98 99 100
            if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
                String queue = param.getQueue();
                if (StringUtils.isNotEmpty(queue)) { // -yqu
                    args.add(Constants.FLINK_QUEUE);
                    args.add(queue);
                }
            }
101
        }
102

103 104 105 106
        int parallelism = param.getParallelism();
        if (parallelism > 0) {
            args.add(Constants.FLINK_PARALLELISM);
            args.add(String.format("%d", parallelism));   // -p
107
        }
108

109 110 111 112 113
        // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
        // The task status will be synchronized with the cluster job status
        args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae

        // -s -yqu -yat -yD -D
114 115 116 117
        if (StringUtils.isNotEmpty(others)) {
            args.add(others);
        }

118 119 120 121 122
        ProgramType programType = param.getProgramType();
        String mainClass = param.getMainClass();
        if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
            args.add(Constants.FLINK_MAIN_CLASS);    //-c
            args.add(param.getMainClass());          //main class
123 124
        }

125 126 127
        ResourceInfo mainJar = param.getMainJar();
        if (mainJar != null) {
            args.add(mainJar.getRes());
128 129
        }

130 131 132
        String mainArgs = param.getMainArgs();
        if (StringUtils.isNotEmpty(mainArgs)) {
            args.add(mainArgs);
133
        }
134 135 136 137 138

        return args;
    }

}