FlinkArgsUtils.java 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.dolphinscheduler.server.utils;
19

20
import org.apache.commons.lang.StringUtils;
Q
qiaozhanwei 已提交
21 22
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
23
import org.apache.dolphinscheduler.common.process.ResourceInfo;
Q
qiaozhanwei 已提交
24
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
25 26 27 28 29

import java.util.ArrayList;
import java.util.List;

/**
30
 * flink args utils
31 32
 */
public class FlinkArgsUtils {
33
    private static final String LOCAL_DEPLOY_MODE = "local";
34 35
    private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";

36 37 38 39 40
    /**
     * build args
     * @param param flink parameters
     * @return argument list
     */
41 42
    public static List<String> buildArgs(FlinkParameters param) {
        List<String> args = new ArrayList<>();
43

44
        String deployMode = "cluster";
45 46 47
        String tmpDeployMode = param.getDeployMode();
        if (StringUtils.isNotEmpty(tmpDeployMode)) {
            deployMode = tmpDeployMode;
48
        }
49
        String others = param.getOthers();
50
        if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
51
            args.add(Constants.FLINK_RUN_MODE);  //-m
52

53
            args.add(Constants.FLINK_YARN_CLUSTER);   //yarn-cluster
54

55 56
            int slot = param.getSlot();
            if (slot != 0) {
57
                args.add(Constants.FLINK_YARN_SLOT);
58
                args.add(String.format("%d", slot));   //-ys
59
            }
60

61 62
            String appName = param.getAppName();
            if (StringUtils.isNotEmpty(appName)) { //-ynm
63
                args.add(Constants.FLINK_APP_NAME);
64
                args.add(ArgsUtils.escape(appName));
65
            }
66

67
            // judge flink version, the parameter -yn has removed from flink 1.10
68
            String flinkVersion = param.getFlinkVersion();
69
            if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
70 71 72 73 74
                int taskManager = param.getTaskManager();
                if (taskManager != 0) {                        //-yn
                    args.add(Constants.FLINK_TASK_MANAGE);
                    args.add(String.format("%d", taskManager));
                }
75
            }
76 77
            String jobManagerMemory = param.getJobManagerMemory();
            if (StringUtils.isNotEmpty(jobManagerMemory)) {
78
                args.add(Constants.FLINK_JOB_MANAGE_MEM);
79
                args.add(jobManagerMemory); //-yjm
80
            }
81

82 83
            String taskManagerMemory = param.getTaskManagerMemory();
            if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
84
                args.add(Constants.FLINK_TASK_MANAGE_MEM);
85
                args.add(taskManagerMemory);
86 87
            }

88 89 90 91 92 93 94 95
            if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
                String queue = param.getQueue();
                if (StringUtils.isNotEmpty(queue)) { // -yqu
                    args.add(Constants.FLINK_QUEUE);
                    args.add(queue);
                }
            }

96
            args.add(Constants.FLINK_DETACH); //-d
97 98

        }
99

100 101 102 103 104
        // -p -s -yqu -yat -sae -yD -D
        if (StringUtils.isNotEmpty(others)) {
            args.add(others);
        }

105 106 107 108 109
        ProgramType programType = param.getProgramType();
        String mainClass = param.getMainClass();
        if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
            args.add(Constants.FLINK_MAIN_CLASS);    //-c
            args.add(param.getMainClass());          //main class
110 111
        }

112 113 114
        ResourceInfo mainJar = param.getMainJar();
        if (mainJar != null) {
            args.add(mainJar.getRes());
115 116
        }

117 118 119
        String mainArgs = param.getMainArgs();
        if (StringUtils.isNotEmpty(mainArgs)) {
            args.add(mainArgs);
120
        }
121 122 123 124

        return args;
    }

125

126
}