From 284c50f66cb7ce5466bb987d7e6966683962c3a6 Mon Sep 17 00:00:00 2001 From: lijufeng2016 <920347627@qq.com> Date: Fri, 12 Jun 2020 18:23:28 +0800 Subject: [PATCH] Compatible with flink1.10 or newer (#2952) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 兼容flink1.10以上版本 * fix null point bug Co-authored-by: 李巨丰 Co-authored-by: dailidong --- .../common/task/flink/FlinkParameters.java | 13 ++++ .../server/utils/FlinkArgsUtils.java | 18 +++-- .../server/utils/FlinkArgsUtilsTest.java | 4 +- .../dag/_source/formModel/tasks/flink.vue | 67 +++++++++++++------ .../src/js/module/i18n/locale/en_US.js | 1 + .../src/js/module/i18n/locale/zh_CN.js | 1 + 6 files changed, 75 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java index 05cbb1d79..b89a92062 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java @@ -90,6 +90,11 @@ public class FlinkParameters extends AbstractParameters { */ private String others; + /** + * flink version + */ + private String flinkVersion; + /** * program type * 0 JAVA,1 SCALA,2 PYTHON @@ -200,6 +205,14 @@ public class FlinkParameters extends AbstractParameters { this.programType = programType; } + public String getFlinkVersion() { + return flinkVersion; + } + + public void setFlinkVersion(String flinkVersion) { + this.flinkVersion = flinkVersion; + } + @Override public boolean checkParameters() { return mainJar != null && programType != null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 12c7eb2d5..eaaafc956 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -28,10 +28,12 @@ import java.util.List; /** - * spark args utils + * flink args utils */ public class FlinkArgsUtils { private static final String LOCAL_DEPLOY_MODE = "local"; + private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; + /** * build args * @param param flink parameters @@ -44,7 +46,6 @@ public class FlinkArgsUtils { String tmpDeployMode = param.getDeployMode(); if (StringUtils.isNotEmpty(tmpDeployMode)) { deployMode = tmpDeployMode; - } if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { args.add(Constants.FLINK_RUN_MODE); //-m @@ -63,12 +64,15 @@ public class FlinkArgsUtils { args.add(appName); } - int taskManager = param.getTaskManager(); - if (taskManager != 0) { //-yn - args.add(Constants.FLINK_TASK_MANAGE); - args.add(String.format("%d", taskManager)); + // judgy flink version,from flink1.10,the parameter -yn removed + String flinkVersion = param.getFlinkVersion(); + if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { + int taskManager = param.getTaskManager(); + if (taskManager != 0) { //-yn + args.add(Constants.FLINK_TASK_MANAGE); + args.add(String.format("%d", taskManager)); + } } - String jobManagerMemory = param.getJobManagerMemory(); if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java index 2e4861e2a..a31523279 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -49,6 +49,7 @@ public class FlinkArgsUtilsTest { public String mainArgs = "testArgs"; public String queue = "queue1"; public String others = "--input file:///home"; + public String flinkVersion = "<1.10"; @Before @@ -79,6 +80,7 @@ public class FlinkArgsUtilsTest { param.setMainArgs(mainArgs); param.setQueue(queue); param.setOthers(others); + param.setFlinkVersion(flinkVersion); //Invoke buildArgs List result = FlinkArgsUtils.buildArgs(param); @@ -128,4 +130,4 @@ public class FlinkArgsUtilsTest { assertEquals(5, result.size()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index 3f85f3699..2e8772134 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -62,56 +62,73 @@ + +
{{$t('Flink Version')}}
+
+ + + + +
+
- {{$t('slot')}} + {{$t('jobManagerMemory')}} + :disabled="isDetails" + type="input" + v-model="jobManagerMemory" + :placeholder="$t('Please enter the number of Executor')" + style="width: 200px;" + autocomplete="off"> - {{$t('taskManager')}} + {{$t('taskManagerMemory')}} + :disabled="isDetails" + type="input" + v-model="taskManagerMemory" + :placeholder="$t('Please enter the Executor memory')" + style="width: 186px;" + autocomplete="off">
- {{$t('jobManagerMemory')}} + {{$t('slot')}} - {{$t('taskManagerMemory')}} +
+ {{$t('taskManager')}} +
-
{{$t('Command-line parameters')}}
@@ -207,6 +224,11 @@ programType: 'SCALA', // Program type(List) programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }], + + flinkVersion:'<1.10', + // Flink Versions(List) + flinkVersionList: [{ code: '<1.10' }, { code: '>=1.10' }], + normalizer(node) { return { label: node.name @@ -324,6 +346,7 @@ return {id: v} }), localParams: this.localParams, + flinkVersion: this.flinkVersion, slot: this.slot, taskManager: this.taskManager, jobManagerMemory: this.jobManagerMemory, @@ -485,11 +508,13 @@ this.mainJar = o.params.mainJar.id || '' } this.deployMode = o.params.deployMode || '' + this.flinkVersion = o.params.flinkVersion || '<1.10' this.slot = o.params.slot || 1 this.taskManager = o.params.taskManager || '2' this.jobManagerMemory = o.params.jobManagerMemory || '1G' this.taskManagerMemory = o.params.taskManagerMemory || '2G' + this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'SCALA' diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index fde153491..e47b680ea 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -98,6 +98,7 @@ export default { Script: 'Script', 'Please enter script(required)': 'Please enter script(required)', 'Deploy Mode': 'Deploy Mode', + 'Flink Version':'Flink Version', 'Driver core number': 'Driver core number', 'Please enter driver core number': 'Please enter driver core number', 'Driver memory use': 'Driver memory use', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 9ad392371..c9bb0e203 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -99,6 +99,7 @@ export default { Script: '脚本', 'Please enter script(required)': '请输入脚本(必填)', 'Deploy Mode': '部署方式', + 'Flink Version': 'Flink版本', 'Driver core number': 'Driver内核数', 'Please enter driver core number': '请输入Driver内核数', 'Driver memory use': 'Driver内存数', -- GitLab