diff --git a/src/main/java/net/wicp/tams/app/duckula/controller/bean/models/RtdwFlinktask.java b/src/main/java/net/wicp/tams/app/duckula/controller/bean/models/RtdwFlinktask.java index 6d007f2381e988821be3d3ca0ae7549c86f00fee..ea355243dfe5e32f47d3a474354ccce3ea1e4a24 100644 --- a/src/main/java/net/wicp/tams/app/duckula/controller/bean/models/RtdwFlinktask.java +++ b/src/main/java/net/wicp/tams/app/duckula/controller/bean/models/RtdwFlinktask.java @@ -46,13 +46,13 @@ public class RtdwFlinktask { * Database Column Remarks: 用户执行程序保存的路径 This field was generated by MyBatis Generator. This field corresponds to the database column rtdw_flinktask.user_jar_path * @mbg.generated */ - @TamsCol(value = "执行程序", save = 1, query = -1, showWidth = 100, orderby = 9, pageElement = PageElement.Upload, pageElementOpt = "{}") + @TamsCol(value = "执行程序", save = 1, query = -1, showWidth = 300, orderby = 9, pageElement = PageElement.Upload, pageElementOpt = "{}") private String userJarPath; /** * Database Column Remarks: 执行主类 This field was generated by MyBatis Generator. This field corresponds to the database column rtdw_flinktask.entry_class * @mbg.generated */ - @TamsCol(value = "执行主类", save = 1, query = -1, showWidth = 100, orderby = 8) + @TamsCol(value = "执行主类", save = 1, query = -1, showWidth = 200, orderby = 8) private String entryClass; /** * Database Column Remarks: 主参数 This field was generated by MyBatis Generator. This field corresponds to the database column rtdw_flinktask.main_args diff --git a/src/main/java/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.java b/src/main/java/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.java index 569de8a9bc94c2ce8a41bee99bdbd17a2ddf36e8..4e1a7c4d778544448ba08c3a2c3eefec22df8895 100644 --- a/src/main/java/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.java +++ b/src/main/java/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.java @@ -3,23 +3,34 @@ package net.wicp.tams.duckula.ops.pages.rtdw; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.tapestry5.ioc.annotations.Inject; import org.apache.tapestry5.util.TextStreamResponse; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.google.common.base.Predicate; import common.kubernetes.apiserver.KubeClient; import common.kubernetes.constant.CrdVesion; import lombok.extern.slf4j.Slf4j; +import net.wicp.tams.app.duckula.controller.bean.models.RtdwFlinkbaseimage; import net.wicp.tams.app.duckula.controller.bean.models.RtdwFlinktask; +import net.wicp.tams.app.duckula.controller.dao.RtdwFlinkbaseimageMapper; import net.wicp.tams.app.duckula.controller.dao.RtdwFlinktaskMapper; import net.wicp.tams.app.duckula.controller.service.K8sService; import net.wicp.tams.common.Result; +import net.wicp.tams.common.apiext.CollectionUtil; import net.wicp.tams.common.apiext.FreemarkUtil; import net.wicp.tams.common.apiext.IOUtil; +import net.wicp.tams.common.apiext.StringUtil; import net.wicp.tams.common.exception.ProjectExceptionRuntime; import net.wicp.tams.component.tools.TapestryAssist; import net.wicp.tams.duckula.ops.pages.ParentPageBean; @@ -36,10 +47,13 @@ public class FlinkTask extends ParentPageBean { @Inject private RtdwFlinktaskMapper rtdwFlinktaskMapper; + @Inject + private RtdwFlinkbaseimageMapper rtdwFlinkbaseimageMapper; + @Override public void doSave(RtdwFlinktask t, boolean isInsert) { if (isInsert) { - //t.setImageRep("registry.cn-hangzhou.aliyuncs.com/linkedcare/bigdata"); + // t.setImageRep("registry.cn-hangzhou.aliyuncs.com/linkedcare/bigdata"); this.rtdwFlinktaskMapper.insert(t); } else { this.rtdwFlinktaskMapper.updateByPrimaryKeySelective(t); @@ -65,25 +79,58 @@ public class FlinkTask extends ParentPageBean { public TextStreamResponse onSubmitTask() { final RtdwFlinktask t = TapestryAssist.getBeanFromPage(getTClass(), requestGlobals); try { - String context = IOUtil.slurp(IOUtil.fileToInputStream("/deploy/k8s/cr.yaml", K8sService.class)); - String result = FreemarkUtil.getInst().doProcessByTemp(context, t); - Map deployFlinkApplication = deployFlinkApplication(t.getNamespace(), t); + Map deployFlinkApplication = deployFlinkApplication(t); System.out.println("deployFlinkApplication=" + deployFlinkApplication); - return TapestryAssist.getTextStreamResponse(Result.getError(result)); + return TapestryAssist.getTextStreamResponse(Result.getError("")); } catch (IOException e) { log.error("布署失败", e); return TapestryAssist.getTextStreamResponse(Result.getError("cr deploy error:" + e.getMessage())); - }catch (ProjectExceptionRuntime e) { + } catch (ProjectExceptionRuntime e) { return TapestryAssist.getTextStreamResponse(new Result(e.getExcept())); } } - private Map deployFlinkApplication(String namespace, RtdwFlinktask params) throws IOException { + Pair baseHost = Pair.of(4.0, 8096);// 一个标准的task是4核8G + Pair singleSlot = Pair.of(1.0, 2048);// 一个slot占用1C2U,要不要留一些给操作系统?后面再说 + + private Map deployFlinkApplication(RtdwFlinktask rtdwFlinktask) throws IOException { + RtdwFlinkbaseimage rtdwFlinkbaseimage = rtdwFlinkbaseimageMapper.selectByPrimaryKey(rtdwFlinktask.getBaseImg()); + Map params = new HashMap(); + params.put("namespace", rtdwFlinktask.getNamespace()); + params.put("name", rtdwFlinktask.getName()); + params.put("imageTag", rtdwFlinkbaseimage.getImageTag()); + params.put("userJarPath", rtdwFlinktask.getUserJarPath()); + Integer slots = rtdwFlinktask.getSlot(); + int taskNum = slots / 4 + 1; + int hasSlots = slots % 4; + if (taskNum > 1) { + params.put("cpu", baseHost.getLeft()); + params.put("mem", baseHost.getRight()); + } else { + params.put("cpu", singleSlot.getLeft() * hasSlots); + params.put("mem", singleSlot.getRight() * hasSlots); + } + + List configs = Arrays.asList(rtdwFlinktask.getFlinkConfig().split("\n")); + List allConfig = new ArrayList(); + allConfig.addAll(configs);// 直接使用configs做为allConfig会报错,configs不能添加元素 + CollectionUtils.filter(allConfig, new org.apache.commons.collections.Predicate() { + @Override + public boolean evaluate(Object arg0) { + String ele = StringUtil.trimSpace((String) arg0); + return StringUtil.isNotNull(ele) && !ele.startsWith("taskmanager.numberOfTaskSlots:") + && !ele.startsWith("kubernetes.jobmanager.service-account:"); + } + }); + allConfig.add(String.format("taskmanager.numberOfTaskSlots: %s", taskNum > 1 ? 4 : hasSlots)); + allConfig.add("kubernetes.jobmanager.service-account: flink-native-k8s-operator"); + String configstr = CollectionUtil.listJoin(allConfig, "\n ");// 后面带4个空格 + params.put("flinkConfig", " "+configstr);// 前面带4个空格 String context = IOUtil.slurp(IOUtil.fileToInputStream("/deploy/k8s/cr.yaml", K8sService.class)); String result = FreemarkUtil.getInst().doProcessByTemp(context, params); InputStream is = new ByteArrayInputStream(result.getBytes()); - Map map = KubeClient.createCusObject(CrdVesion.flink, namespace, is); + Map map = KubeClient.createCusObject(CrdVesion.flink, rtdwFlinktask.getNamespace(), is); return map; } diff --git a/src/main/resources/deploy/k8s/cr.yaml b/src/main/resources/deploy/k8s/cr.yaml index 68531758c70786f1cc9f74210a0f500270aed1a9..2421410eb787b712fd2179bca09e1351c9f18415 100644 --- a/src/main/resources/deploy/k8s/cr.yaml +++ b/src/main/resources/deploy/k8s/cr.yaml @@ -8,14 +8,14 @@ spec: jarURI: local://${userJarPath} parallelism: 5 jobManagerResource: - mem: 4096m - cpu: 1.5 + mem: 512m + cpu: taskManagerResource: mem: ${mem}m cpu: ${cpu} savepointsDir: file:///tmp/savepoints savepointGeneration: 0 - <#if !flinkConfig??> + <#if flinkConfig??> flinkConfig: ${flinkConfig} \ No newline at end of file diff --git a/src/main/resources/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.tml b/src/main/resources/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.tml index ceacff1fdd34bd468c10115b04e8801e2e87cf9e..20edbe46fa10f155f2ff7519639f3937e0c3a3c8 100644 --- a/src/main/resources/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.tml +++ b/src/main/resources/net/wicp/tams/duckula/ops/pages/rtdw/FlinkTask.tml @@ -1,4 +1,4 @@ -