提交 9f9ee230 编写于 作者: 偏锋书生's avatar 偏锋书生

最终准备调试代码

上级 215be8dd
......@@ -46,13 +46,13 @@ public class RtdwFlinktask {
* Database Column Remarks: 用户执行程序保存的路径 This field was generated by MyBatis Generator. This field corresponds to the database column rtdw_flinktask.user_jar_path
* @mbg.generated
*/
@TamsCol(value = "执行程序", save = 1, query = -1, showWidth = 100, orderby = 9, pageElement = PageElement.Upload, pageElementOpt = "{}")
@TamsCol(value = "执行程序", save = 1, query = -1, showWidth = 300, orderby = 9, pageElement = PageElement.Upload, pageElementOpt = "{}")
private String userJarPath;
/**
* Database Column Remarks: 执行主类 This field was generated by MyBatis Generator. This field corresponds to the database column rtdw_flinktask.entry_class
* @mbg.generated
*/
@TamsCol(value = "执行主类", save = 1, query = -1, showWidth = 100, orderby = 8)
@TamsCol(value = "执行主类", save = 1, query = -1, showWidth = 200, orderby = 8)
private String entryClass;
/**
* Database Column Remarks: 主参数 This field was generated by MyBatis Generator. This field corresponds to the database column rtdw_flinktask.main_args
......
......@@ -3,23 +3,34 @@ package net.wicp.tams.duckula.ops.pages.rtdw;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tapestry5.ioc.annotations.Inject;
import org.apache.tapestry5.util.TextStreamResponse;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.google.common.base.Predicate;
import common.kubernetes.apiserver.KubeClient;
import common.kubernetes.constant.CrdVesion;
import lombok.extern.slf4j.Slf4j;
import net.wicp.tams.app.duckula.controller.bean.models.RtdwFlinkbaseimage;
import net.wicp.tams.app.duckula.controller.bean.models.RtdwFlinktask;
import net.wicp.tams.app.duckula.controller.dao.RtdwFlinkbaseimageMapper;
import net.wicp.tams.app.duckula.controller.dao.RtdwFlinktaskMapper;
import net.wicp.tams.app.duckula.controller.service.K8sService;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.apiext.CollectionUtil;
import net.wicp.tams.common.apiext.FreemarkUtil;
import net.wicp.tams.common.apiext.IOUtil;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.exception.ProjectExceptionRuntime;
import net.wicp.tams.component.tools.TapestryAssist;
import net.wicp.tams.duckula.ops.pages.ParentPageBean;
......@@ -36,10 +47,13 @@ public class FlinkTask extends ParentPageBean<RtdwFlinktask> {
@Inject
private RtdwFlinktaskMapper rtdwFlinktaskMapper;
@Inject
private RtdwFlinkbaseimageMapper rtdwFlinkbaseimageMapper;
@Override
public void doSave(RtdwFlinktask t, boolean isInsert) {
if (isInsert) {
//t.setImageRep("registry.cn-hangzhou.aliyuncs.com/linkedcare/bigdata");
// t.setImageRep("registry.cn-hangzhou.aliyuncs.com/linkedcare/bigdata");
this.rtdwFlinktaskMapper.insert(t);
} else {
this.rtdwFlinktaskMapper.updateByPrimaryKeySelective(t);
......@@ -65,25 +79,58 @@ public class FlinkTask extends ParentPageBean<RtdwFlinktask> {
public TextStreamResponse onSubmitTask() {
final RtdwFlinktask t = TapestryAssist.getBeanFromPage(getTClass(), requestGlobals);
try {
String context = IOUtil.slurp(IOUtil.fileToInputStream("/deploy/k8s/cr.yaml", K8sService.class));
String result = FreemarkUtil.getInst().doProcessByTemp(context, t);
Map<String, Object> deployFlinkApplication = deployFlinkApplication(t.getNamespace(), t);
Map<String, Object> deployFlinkApplication = deployFlinkApplication(t);
System.out.println("deployFlinkApplication=" + deployFlinkApplication);
return TapestryAssist.getTextStreamResponse(Result.getError(result));
return TapestryAssist.getTextStreamResponse(Result.getError(""));
} catch (IOException e) {
log.error("布署失败", e);
return TapestryAssist.getTextStreamResponse(Result.getError("cr deploy error:" + e.getMessage()));
}catch (ProjectExceptionRuntime e) {
} catch (ProjectExceptionRuntime e) {
return TapestryAssist.getTextStreamResponse(new Result(e.getExcept()));
}
}
private Map<String, Object> deployFlinkApplication(String namespace, RtdwFlinktask params) throws IOException {
Pair<Double, Integer> baseHost = Pair.of(4.0, 8096);// 一个标准的task是4核8G
Pair<Double, Integer> singleSlot = Pair.of(1.0, 2048);// 一个slot占用1C2U,要不要留一些给操作系统?后面再说
private Map<String, Object> deployFlinkApplication(RtdwFlinktask rtdwFlinktask) throws IOException {
RtdwFlinkbaseimage rtdwFlinkbaseimage = rtdwFlinkbaseimageMapper.selectByPrimaryKey(rtdwFlinktask.getBaseImg());
Map<String, Object> params = new HashMap<String, Object>();
params.put("namespace", rtdwFlinktask.getNamespace());
params.put("name", rtdwFlinktask.getName());
params.put("imageTag", rtdwFlinkbaseimage.getImageTag());
params.put("userJarPath", rtdwFlinktask.getUserJarPath());
Integer slots = rtdwFlinktask.getSlot();
int taskNum = slots / 4 + 1;
int hasSlots = slots % 4;
if (taskNum > 1) {
params.put("cpu", baseHost.getLeft());
params.put("mem", baseHost.getRight());
} else {
params.put("cpu", singleSlot.getLeft() * hasSlots);
params.put("mem", singleSlot.getRight() * hasSlots);
}
List<String> configs = Arrays.asList(rtdwFlinktask.getFlinkConfig().split("\n"));
List<String> allConfig = new ArrayList<String>();
allConfig.addAll(configs);// 直接使用configs做为allConfig会报错,configs不能添加元素
CollectionUtils.filter(allConfig, new org.apache.commons.collections.Predicate() {
@Override
public boolean evaluate(Object arg0) {
String ele = StringUtil.trimSpace((String) arg0);
return StringUtil.isNotNull(ele) && !ele.startsWith("taskmanager.numberOfTaskSlots:")
&& !ele.startsWith("kubernetes.jobmanager.service-account:");
}
});
allConfig.add(String.format("taskmanager.numberOfTaskSlots: %s", taskNum > 1 ? 4 : hasSlots));
allConfig.add("kubernetes.jobmanager.service-account: flink-native-k8s-operator");
String configstr = CollectionUtil.listJoin(allConfig, "\n ");// 后面带4个空格
params.put("flinkConfig", " "+configstr);// 前面带4个空格
String context = IOUtil.slurp(IOUtil.fileToInputStream("/deploy/k8s/cr.yaml", K8sService.class));
String result = FreemarkUtil.getInst().doProcessByTemp(context, params);
InputStream is = new ByteArrayInputStream(result.getBytes());
Map<String, Object> map = KubeClient.createCusObject(CrdVesion.flink, namespace, is);
Map<String, Object> map = KubeClient.createCusObject(CrdVesion.flink, rtdwFlinktask.getNamespace(), is);
return map;
}
......
......@@ -8,14 +8,14 @@ spec:
jarURI: local://${userJarPath}
parallelism: 5
jobManagerResource:
mem: 4096m
cpu: 1.5
mem: 512m
cpu:
taskManagerResource:
mem: ${mem}m
cpu: ${cpu}
savepointsDir: file:///tmp/savepoints
savepointGeneration: 0
<#if !flinkConfig??>
<#if flinkConfig??>
flinkConfig:
${flinkConfig}
</#if>
\ No newline at end of file
<html t:type="rjzjh/layoutQuery" selectHandle="selectGrid" initSaveHandle="initUpdate" queryButs="[{'id':'checkBut','iconCls':'icon-attach','text':'提交任务','url':'submitTask','confirm':'它是异步任务,需要一些时间查看你的任务状态,你确认要提交此任务吗?'}]"
<html t:type="rjzjh/layoutQuery" selectHandle="selectGrid" initSaveHandle="initUpdate" saveCheckHandle="saveCheck" queryButs="[{'id':'checkBut','iconCls':'icon-attach','text':'提交任务','url':'submitTask','confirm':'它是异步任务,需要一些时间查看你的任务状态,你确认要提交此任务吗?'}]"
xmlns:t="http://tapestry.apache.org/schema/tapestry_5_4.xsd"
xmlns:r="tapestry-library:rjzjh"
xmlns:s="tapestry-library:tams"
......@@ -34,12 +34,22 @@
}
}
//主是要判断文件组件
function saveCheck(){
alert("新增或修改前的检查,跟据业务需求写前端较验:");
//var deployComValue= $.rjzjh.getcombogrid('deployId_save','deploy');
//var middlewareIdSel= $('#middlewareId_save').combobox('getValue');
//var rules= $('#ruleEdit').val();
return false;
}
var retValue=true;
jQuery("input[filetag='rjzjhuploadfield']").each(function(i,n){
if(n.value==''){
retValue=false;
$.rjzjh.alert('上传文件不能为空');
return;
}
if(!n.value.endWith('.jar')){
retValue=false;
$.rjzjh.alert('需要选择jar包上传');
return retValue;
}
});
return retValue;
};
</script>
</html>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册