提交 7ecea109 编写于 作者: G gongzijian

Merge remote-tracking branch 'upstream/dev-20190415' into dev-20190415

......@@ -21,6 +21,7 @@ Lodash 高性能的 JavaScript 实用工具库
- #### Node安装
Node包下载 (注意版本 8.9.4) `https://nodejs.org/download/release/v8.9.4/`
- #### 前端项目构建
用命令行模式 `cd` 进入 `escheduler-ui`项目目录并执行 `npm install` 拉取项目依赖包
......@@ -31,6 +32,16 @@ Node包下载 (注意版本 8.9.4) `https://nodejs.org/download/release/v8.9.4/`
> 运行 `cnpm install`
- 新建一个`.env`文件,用于跟后端交互的接口
`escheduler-ui`目录下新建一个`.env`文件,在文件里添加后端服务的ip地址和端口,用于跟后端交互,`.env`文件内容如下:
```
# 代理的接口地址(自行修改)
API_BASE = http://192.168.xx.xx:12345
# 如果您需要用ip访问项目可以把 "#" 号去掉(例)
#DEV_HOST = 192.168.xx.xx
```
> ##### !!!这里特别注意 项目如果在拉取依赖包的过程中报 " node-sass error " 错误,请在执行完后再次执行以下命令
```
......@@ -49,6 +60,7 @@ npm install node-sass --unsafe-perm //单独安装node-sass依赖
再拷贝到服务器对应的目录下(前端服务静态页面存放目录)
访问地址 `http://localhost:8888/#/`
......
......@@ -3,23 +3,14 @@
前端有3种部署方式,分别为自动化部署,手动部署和编译源码部署
## 1、准备工作
#### 准备一:下载安装包
#### 下载安装包
目前最新安装包版本是1.0.1,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/)
下载escheduler-ui-1.0.1.tar.gz后,解压后会产生dist目录,进入dist目录
> cd dist
#### 准备二:新建一个`.env`文件
在dist目录下新建一个`.env`文件,在文件里添加后端服务的ip地址和端口,用于跟后端交互,`.env`文件内容如下:
```
# 代理的接口地址(自行修改)
API_BASE = http://192.168.xx.xx:12345
# 如果您需要用ip访问项目可以把 "#" 号去掉(例)
#DEV_HOST = 192.168.xx.xx
```
## 2、部署
以下两种方式任选其一部署即可,推荐自动化部署
......
......@@ -57,7 +57,7 @@ escheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL
flush privileges;
```
* 创建表和导入基础数据
* 1.0.0和1.0.1版本创建表和导入基础数据
说明:在escheduler-backend/sql/escheduler.sql和quartz.sql
```sql
......@@ -66,6 +66,12 @@ escheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL
mysql -h {host} -u {user} -p{password} -D {db} < quartz.sql
```
* 1.0.2版本创建表和导入基础数据
```
sh ./script/create_escheduler.sh
```
#### 准备五: 修改部署目录权限及运行参数
我们先来大体了解下解压后escheduler-backend目录下的文件(夹)的作用
......@@ -89,7 +95,7 @@ install.sh : 一键部署脚本
- 修改 **install.sh**中的各参数,替换成自身业务所需的值
- 如果使用hdfs相关功能,需要拷贝**hdfs-site.xml****core-site.xml**到conf目录下
- 如果使用hdfs相关功能,需要拷贝**hdfs-site.xml****core-site.xml**到conf目录下
## 2、部署
......@@ -148,11 +154,11 @@ install.sh : 一键部署脚本
* 一键停止集群所有服务
` sh ./script/stop_all.sh`
` sh ./bin/stop_all.sh`
* 一键开启集群所有服务
` sh ./script/start_all.sh`
` sh ./bin/start_all.sh`
* 启停Master
......@@ -185,4 +191,10 @@ sh ./bin/escheduler-daemon.sh stop logger-server
```
sh ./bin/escheduler-daemon.sh start alert-server
sh ./bin/escheduler-daemon.sh stop alert-server
```
## 3、数据库升级
数据库升级是在1.0.2版本增加的功能,执行以下命令即可自动升级数据库
```
sh ./script/upgrade_escheduler.sh
```
\ No newline at end of file
......@@ -125,6 +125,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setDesc(desc);
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
......@@ -288,6 +289,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setDesc(desc);
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
......
......@@ -346,7 +346,8 @@ public class ProcessInstanceService extends BaseDAGService {
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, "update");
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
Date schedule = null;
......@@ -355,8 +356,12 @@ public class ProcessInstanceService extends BaseDAGService {
} else {
schedule = processInstance.getScheduleTime();
}
processInstance.setScheduleTime(schedule);
processInstance.setLocations(locations);
processInstance.setConnects(connects);
String globalParams = null;
String originDefParams = null;
int timeout = processInstance.getTimeout();
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
......@@ -370,9 +375,14 @@ public class ProcessInstanceService extends BaseDAGService {
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
}
int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
globalParams, schedule, flag, locations, connects);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
// globalParams, schedule, flag, locations, connects);
int update = processDao.updateProcessInstance(processInstance);
int updateDefine = 1;
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
......@@ -380,6 +390,7 @@ public class ProcessInstanceService extends BaseDAGService {
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(timeout);
updateDefine = processDefineMapper.update(processDefinition);
}
if (update > 0 && updateDefine > 0) {
......
......@@ -331,6 +331,11 @@ public final class Constants {
*/
public static final int MAX_TASK_TIMEOUT = 24 * 3600;
/**
* max task timeout
*/
public static final int MAX_PROCESS_TIMEOUT = Integer.MAX_VALUE;
/**
* heartbeat threads number
......
......@@ -23,6 +23,8 @@ import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.mapper.AlertMapper;
import cn.escheduler.dao.mapper.UserAlertGroupMapper;
import cn.escheduler.dao.model.Alert;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -83,8 +85,9 @@ public class AlertDao extends AbstractBaseDao {
*/
public void sendServerStopedAlert(int alertgroupId,String host,String serverType){
Alert alert = new Alert();
String content = String.format("[{'type':'%s','host':'%s','event':'服务挂掉','警告级别':'严重'}]",serverType,host);
alert.setTitle("容错告警");
String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]",
serverType, host);
alert.setTitle("Fault tolerance warning");
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
......@@ -94,6 +97,34 @@ public class AlertDao extends AbstractBaseDao {
alertMapper.insert(alert);
}
/**
* process time out alert
* @param processInstance
* @param processDefinition
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){
int alertgroupId = processInstance.getWarningGroupId();
String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc();
Alert alert = new Alert();
String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
processInstance.getId(), processInstance.getName());
alert.setTitle("Process Timeout Warn");
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(alertgroupId);
if (StringUtils.isNotEmpty(receivers)) {
alert.setReceivers(receivers);
}
if (StringUtils.isNotEmpty(receiversCc)) {
alert.setReceiversCc(receiversCc);
}
alert.setCreateTime(new Date());
alert.setUpdateTime(new Date());
alertMapper.insert(alert);
}
/**
* task timeout warn
*/
......
......@@ -482,6 +482,7 @@ public class ProcessDao extends AbstractBaseDao {
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
processInstance.setWorkerGroupId(command.getWorkerGroupId());
processInstance.setTimeout(processDefinition.getTimeout());
return processInstance;
}
......
......@@ -94,6 +94,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "receivers", column = "receivers", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "receiversCc", column = "receivers_cc", javaType = String.class, jdbcType = JdbcType.VARCHAR)
......@@ -121,6 +122,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryByDefineName")
......@@ -157,6 +159,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryAllDefinitionList")
......@@ -183,6 +186,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "scheduleReleaseState", column = "schedule_release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefineListPaging")
......@@ -211,6 +215,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefinitionListByIdList")
......
......@@ -55,6 +55,7 @@ public class ProcessDefinitionMapperProvider {
VALUES("`connects`", "#{processDefinition.connects}");
VALUES("`create_time`", "#{processDefinition.createTime}");
VALUES("`update_time`", "#{processDefinition.updateTime}");
VALUES("`timeout`", "#{processDefinition.timeout}");
VALUES("`flag`", EnumFieldUtil.genFieldStr("processDefinition.flag", ReleaseState.class));
VALUES("`user_id`", "#{processDefinition.userId}");
......@@ -100,6 +101,7 @@ public class ProcessDefinitionMapperProvider {
SET("`global_params`=#{processDefinition.globalParams}");
SET("`create_time`=#{processDefinition.createTime}");
SET("`update_time`=#{processDefinition.updateTime}");
SET("`timeout`=#{processDefinition.timeout}");
SET("`flag`="+EnumFieldUtil.genFieldStr("processDefinition.flag", Flag.class));
SET("`user_id`=#{processDefinition.userId}");
......@@ -173,7 +175,7 @@ public class ProcessDefinitionMapperProvider {
*/
public String queryDefineListPaging(Map<String, Object> parameter) {
return new SQL() {{
SELECT("td.id,td.name,td.version,td.release_state,td.project_id,td.user_id,td.`desc`,td.create_time,td.update_time,td.flag,td.global_params,td.receivers,td.receivers_cc,sc.schedule_release_state");
SELECT("td.*,sc.schedule_release_state");
FROM(TABLE_NAME + " td");
LEFT_OUTER_JOIN(" (select process_definition_id,release_state as schedule_release_state from `t_escheduler_schedules` " +
"group by `process_definition_id`,`release_state`) sc on sc.process_definition_id = td.id");
......
......@@ -95,6 +95,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById")
......@@ -133,6 +134,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById")
......@@ -171,6 +173,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -209,6 +212,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -256,6 +260,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -352,6 +357,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -444,6 +450,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -488,6 +495,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -532,6 +540,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
......@@ -574,6 +583,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess")
......@@ -616,6 +626,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess")
......
......@@ -68,6 +68,7 @@ public class ProcessInstanceMapperProvider {
VALUES("`is_sub_process`", EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
VALUES("`executor_id`", "#{processInstance.executorId}");
VALUES("`worker_group_id`", "#{processInstance.workerGroupId}");
VALUES("`timeout`", "#{processInstance.timeout}");
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class));
}
}.toString();
......@@ -141,6 +142,7 @@ public class ProcessInstanceMapperProvider {
SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
SET("`executor_id`=#{processInstance.executorId}");
SET("`worker_group_id`=#{processInstance.workerGroupId}");
SET("`timeout`=#{processInstance.timeout}");
WHERE("`id`=#{processInstance.id}");
......
......@@ -37,6 +37,9 @@ public class ProcessData {
private List<Property> globalParams;
private int timeout;
public ProcessData() {
}
......@@ -82,4 +85,11 @@ public class ProcessData {
this.globalParams = globalParams;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
}
......@@ -136,6 +136,11 @@ public class ProcessDefinition {
*/
private ReleaseState scheduleReleaseState;
/**
* process warning time out. unit: minute
*/
private int timeout;
public String getName() {
return name;
......@@ -316,6 +321,14 @@ public class ProcessDefinition {
this.scheduleReleaseState = scheduleReleaseState;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public String toString() {
return "ProcessDefinition{" +
......@@ -340,6 +353,8 @@ public class ProcessDefinition {
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
", scheduleReleaseState=" + scheduleReleaseState +
", timeout=" + timeout +
'}';
}
}
......@@ -183,6 +183,11 @@ public class ProcessInstance {
*/
private int workerGroupId;
/**
* process timeout for warning
*/
private int timeout;
public ProcessInstance(){
}
......@@ -495,6 +500,14 @@ public class ProcessInstance {
this.workerGroupId = workerGroupId;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public String toString() {
return "ProcessInstance{" +
......@@ -528,7 +541,9 @@ public class ProcessInstance {
", historyCmd='" + historyCmd + '\'' +
", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' +
", duration=" + duration +
", timeout=" + timeout +
", processInstancePriority=" + processInstancePriority +
'}';
}
}
......@@ -30,12 +30,17 @@ public class EschedulerManager {
UpgradeDao upgradeDao = UpgradeDao.getInstance();
public void initEscheduler() {
// Determines whether the escheduler table structure has been init
if(upgradeDao.isExistsTable("t_escheduler_version") || upgradeDao.isExistsTable("t_escheduler_queue")) {
logger.info("The database has been initialized. Skip the initialization step");
return;
}
this.initEschedulerSchema();
}
public void initEschedulerSchema() {
logger.info("Start initializing the ark manager mysql table structure");
logger.info("Start initializing the escheduler manager mysql table structure");
upgradeDao.initEschedulerSchema();
}
......@@ -52,15 +57,21 @@ public class EschedulerManager {
}else {
String version = "";
// Gets the version of the current system
if (upgradeDao.isExistsTable("t_escheduler_version")) {
version = upgradeDao.getCurrentVersion();
}else if(upgradeDao.isExistsColumn("t_escheduler_queue","create_time")){
version = "1.0.1";
}else if(upgradeDao.isExistsTable("t_escheduler_queue")){
version = "1.0.0";
}else{
logger.error("Unable to determine current software version, so cannot upgrade");
throw new RuntimeException("Unable to determine current software version, so cannot upgrade");
}
// The target version of the upgrade
String schemaVersion = "";
for(String schemaDir : schemaList) {
// Gets the version of the current system
if (upgradeDao.isExistsTable("t_escheduler_version")) {
version = upgradeDao.getCurrentVersion();
}else {
version = "1.0.0";
}
schemaVersion = schemaDir.split("_")[0];
if(SchemaUtils.isAGreatVersion(schemaVersion , version)) {
......@@ -70,7 +81,11 @@ public class EschedulerManager {
logger.info("Begin upgrading escheduler's mysql table structure");
upgradeDao.upgradeEscheduler(schemaDir);
if(SchemaUtils.isAGreatVersion(version,"1.0.1")){
version = upgradeDao.getCurrentVersion();
}else {
version = schemaVersion;
}
}
}
......
......@@ -20,6 +20,7 @@ import cn.escheduler.common.utils.MysqlUtil;
import cn.escheduler.common.utils.ScriptRunner;
import cn.escheduler.dao.AbstractBaseDao;
import cn.escheduler.dao.datasource.ConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,6 +34,7 @@ public class UpgradeDao extends AbstractBaseDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
private static final String T_VERSION_NAME = "t_escheduler_version";
private static final String rootDir = System.getProperty("user.dir");
@Override
protected void init() {
......@@ -64,6 +66,10 @@ public class UpgradeDao extends AbstractBaseDao {
private void runInitEschedulerDML() {
Connection conn = null;
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String mysqlSQLFilePath = rootDir + "/sql/create/release-1.0.0_schema/mysql/escheduler_dml.sql";
try {
conn = ConnectionFactory.getDataSource().getConnection();
conn.setAutoCommit(false);
......@@ -71,7 +77,7 @@ public class UpgradeDao extends AbstractBaseDao {
// Execute the ark_manager_dml.sql script to import the data related to escheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, false, true);
Reader initSqlReader = new FileReader(new File("sql/create/release-1.0.0_schema/mysql/escheduler_dml.sql"));
Reader initSqlReader = new FileReader(new File(mysqlSQLFilePath));
initScriptRunner.runScript(initSqlReader);
conn.commit();
......@@ -100,11 +106,15 @@ public class UpgradeDao extends AbstractBaseDao {
private void runInitEschedulerDDL() {
Connection conn = null;
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String mysqlSQLFilePath = rootDir + "/sql/create/release-1.0.0_schema/mysql/escheduler_ddl.sql";
try {
conn = ConnectionFactory.getDataSource().getConnection();
// Execute the escheduler_ddl.sql script to create the table structure of escheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
Reader initSqlReader = new FileReader(new File("sql/create/release-1.0.0_schema/mysql/escheduler_ddl.sql"));
Reader initSqlReader = new FileReader(new File(mysqlSQLFilePath));
initScriptRunner.runScript(initSqlReader);
} catch (IOException e) {
......@@ -122,7 +132,11 @@ public class UpgradeDao extends AbstractBaseDao {
}
/**
* Determines whether a table exists
* @param tableName
* @return
*/
public boolean isExistsTable(String tableName) {
Connection conn = null;
try {
......@@ -144,6 +158,33 @@ public class UpgradeDao extends AbstractBaseDao {
}
/**
* Determines whether a field exists in the specified table
* @param tableName
* @param columnName
* @return
*/
public boolean isExistsColumn(String tableName,String columnName) {
Connection conn = null;
try {
conn = ConnectionFactory.getDataSource().getConnection();
ResultSet rs = conn.getMetaData().getColumns(null,null,tableName,columnName);
if (rs.next()) {
return true;
} else {
return false;
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
MysqlUtil.realeaseResource(null, null, conn);
}
}
public String getCurrentVersion() {
String sql = String.format("select version from %s",T_VERSION_NAME);
......@@ -182,7 +223,10 @@ public class UpgradeDao extends AbstractBaseDao {
private void upgradeEschedulerDML(String schemaDir) {
String schemaVersion = schemaDir.split("_")[0];
String mysqlSQLFilePath = "sql/upgrade/" + schemaDir + "/mysql/escheduler_dml.sql";
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String mysqlSQLFilePath = rootDir + "/sql/upgrade/" + schemaDir + "/mysql/escheduler_dml.sql";
Connection conn = null;
PreparedStatement pstmt = null;
try {
......@@ -239,7 +283,10 @@ public class UpgradeDao extends AbstractBaseDao {
}
private void upgradeEschedulerDDL(String schemaDir) {
String mysqlSQLFilePath = "sql/upgrade/" + schemaDir + "/mysql/escheduler_ddl.sql";
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String mysqlSQLFilePath = rootDir + "/sql/upgrade/" + schemaDir + "/mysql/escheduler_ddl.sql";
Connection conn = null;
PreparedStatement pstmt = null;
try {
......
......@@ -29,7 +29,6 @@ public class CreateEscheduler {
private static final Logger logger = LoggerFactory.getLogger(CreateEscheduler.class);
public static void main(String[] args) {
Thread.currentThread().setName("manager-CreateEscheduler");
EschedulerManager eschedulerManager = new EschedulerManager();
eschedulerManager.initEscheduler();
logger.info("init escheduler finished");
......
......@@ -27,8 +27,6 @@ public class UpgradeEscheduler {
private static final Logger logger = LoggerFactory.getLogger(UpgradeEscheduler.class);
public static void main(String[] args) {
Thread.currentThread().setName("manager-UpgradeEscheduler");
EschedulerManager eschedulerManager = new EschedulerManager();
try {
eschedulerManager.upgradeEscheduler();
......
# base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root@123
spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8
spring.datasource.username=xx
spring.datasource.password=xx
# connection configuration
spring.datasource.initialSize=5
......
......@@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable {
processDao.createRecoveryWaitingThreadCommand(null, processInstance);
}
List<TaskInstance> taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId());
alertManager.sendWarnningOfProcessInstance(processInstance, taskInstances);
alertManager.sendAlertProcessInstance(processInstance, taskInstances);
}
......@@ -775,8 +775,15 @@ public class MasterExecThread implements Runnable {
private void runProcess(){
// submit start node
submitPostNode(null);
// submitStandByTask();
boolean sendTimeWarning = false;
while(!processInstance.IsProcessInstanceStop()){
// send warning email if process time out.
if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
alertManager.sendProcessTimeoutAlert(processInstance,
processDao.findProcessDefineById(processInstance.getProcessDefinitionId()));
sendTimeWarning = true;
}
Set<MasterBaseTaskExecThread> keys = activeTaskNode.keySet();
for (MasterBaseTaskExecThread taskExecThread : keys) {
Future<Boolean> future = activeTaskNode.get(taskExecThread);
......@@ -821,7 +828,7 @@ public class MasterExecThread implements Runnable {
}
// send alert
if(this.recoverToleranceFaultTaskList.size() > 0){
alertManager.sendWarnningWorkerleranceFault(processInstance, recoverToleranceFaultTaskList);
alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
this.recoverToleranceFaultTaskList.clear();
}
// updateProcessInstance completed task status
......@@ -851,6 +858,25 @@ public class MasterExecThread implements Runnable {
logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}
/**
* check process time out
* @param processInstance
* @return
*/
private boolean checkProcessTimeOut(ProcessInstance processInstance) {
if(processInstance.getTimeout() == 0 ){
return false;
}
Date now = new Date();
long runningTime = DateUtils.differMs(now, processInstance.getStartTime());
if(runningTime > processInstance.getTimeout()){
return true;
}
return false;
}
private boolean canSubmitTaskToQueue() {
return OSUtils.checkResource(conf, true);
}
......
......@@ -26,6 +26,7 @@ import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.model.Alert;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import org.slf4j.Logger;
......@@ -54,27 +55,27 @@ public class AlertManager {
private String getCommandCnName(CommandType commandType) {
switch (commandType) {
case RECOVER_TOLERANCE_FAULT_PROCESS:
return "恢复容错";
return "recover tolerance fault process";
case RECOVER_SUSPENDED_PROCESS:
return "恢复暂停流程";
return "recover suspended process";
case START_CURRENT_TASK_PROCESS:
return "从当前节点开始执行";
return "start current task process";
case START_FAILURE_TASK_PROCESS:
return "从失败节点开始执行";
return "start failure task process";
case START_PROCESS:
return "启动工作流";
return "start process";
case REPEAT_RUNNING:
return "重跑";
return "repeat running";
case SCHEDULER:
return "定时执行";
return "scheduler";
case COMPLEMENT_DATA:
return "补数";
return "complement data";
case PAUSE:
return "暂停工作流";
return "pause";
case STOP:
return "停止工作流";
return "stop";
default:
return "未知的命令类型";
return "unknown type";
}
}
......@@ -124,14 +125,14 @@ public class AlertManager {
continue;
}
LinkedHashMap<String, String> failedTaskMap = new LinkedHashMap();
failedTaskMap.put("任务id", String.valueOf(task.getId()));
failedTaskMap.put("任务名称", task.getName());
failedTaskMap.put("任务类型", task.getTaskType());
failedTaskMap.put("任务状态", task.getState().toString());
failedTaskMap.put("任务开始时间", DateUtils.dateToString(task.getStartTime()));
failedTaskMap.put("任务结束时间", DateUtils.dateToString(task.getEndTime()));
failedTaskMap.put("task id", String.valueOf(task.getId()));
failedTaskMap.put("task name", task.getName());
failedTaskMap.put("task type", task.getTaskType());
failedTaskMap.put("task state", task.getState().toString());
failedTaskMap.put("task start time", DateUtils.dateToString(task.getStartTime()));
failedTaskMap.put("task end time", DateUtils.dateToString(task.getEndTime()));
failedTaskMap.put("host", task.getHost());
failedTaskMap.put("日志路径", task.getLogPath());
failedTaskMap.put("log path", task.getLogPath());
failedTaskList.add(failedTaskMap);
}
res = JSONUtils.toJson(failedTaskList);
......@@ -152,10 +153,10 @@ public class AlertManager {
for(TaskInstance taskInstance: toleranceTaskList){
LinkedHashMap<String, String> toleranceWorkerContentMap = new LinkedHashMap();
toleranceWorkerContentMap.put("工作流程名称", processInstance.getName());
toleranceWorkerContentMap.put("容错任务名称", taskInstance.getName());
toleranceWorkerContentMap.put("容错机器IP", taskInstance.getHost());
toleranceWorkerContentMap.put("任务失败次数", String.valueOf(taskInstance.getRetryTimes()));
toleranceWorkerContentMap.put("process name", processInstance.getName());
toleranceWorkerContentMap.put("task name", taskInstance.getName());
toleranceWorkerContentMap.put("host", taskInstance.getHost());
toleranceWorkerContentMap.put("task retry times", String.valueOf(taskInstance.getRetryTimes()));
toleranceTaskInstanceList.add(toleranceWorkerContentMap);
}
return JSONUtils.toJson(toleranceTaskInstanceList);
......@@ -166,9 +167,9 @@ public class AlertManager {
* @param processInstance
* @param toleranceTaskList
*/
public void sendWarnningWorkerleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
Alert alert = new Alert();
alert.setTitle("worker容错报警");
alert.setTitle("worker fault tolerance");
alert.setShowType(ShowType.TABLE);
String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
alert.setContent(content);
......@@ -187,8 +188,8 @@ public class AlertManager {
* send process instance alert
* @param processInstance
*/
public void sendWarnningOfProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances){
public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances){
boolean sendWarnning = false;
WarningType warningType = processInstance.getWarningType();
......@@ -217,7 +218,7 @@ public class AlertManager {
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "成功" :"失败";
String success = processInstance.getState().typeIsSuccess() ? "success" :"failed";
alert.setTitle(cmdName + success);
ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE;
alert.setShowType(showType);
......@@ -233,4 +234,7 @@ public class AlertManager {
logger.info("add alert to db , alert: {}", alert.toString());
}
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
alertDao.sendProcessTimeoutAlert(processInstance, processDefinition);
}
}
......@@ -9,7 +9,7 @@ worker.fetch.task.num = 3
# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
worker.max.cpuload.avg=10
#worker.max.cpuload.avg=10
# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
worker.reserved.memory=1
\ No newline at end of file
......@@ -76,7 +76,7 @@ public class AlertManagerTest {
toleranceTaskList.add(toleranceTask1);
toleranceTaskList.add(toleranceTask2);
alertManager.sendWarnningWorkerleranceFault(processInstance, toleranceTaskList);
alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList);
}
......@@ -103,7 +103,7 @@ public class AlertManagerTest {
toleranceTaskList.add(toleranceTask1);
toleranceTaskList.add(toleranceTask2);
alertManager.sendWarnningOfProcessInstance(processInstance, toleranceTaskList);
alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList);
}
}
......@@ -187,7 +187,7 @@ masterTaskCommitRetryTimes="5"
masterTaskCommitInterval="100"
# master最大cpu平均负载,用来判断master是否还有执行能力
masterMaxCupLoadAvg="10"
masterMaxCpuLoadAvg="10"
# master预留内存,用来判断master是否还有执行能力
masterReservedMemory="1"
......@@ -201,10 +201,10 @@ workerExecThreads="100"
workerHeartbeatInterval="10"
# worker一次抓取任务数
workerFetchTaskNum="10"
workerFetchTaskNum="3"
# worker最大cpu平均负载,用来判断master是否还有执行能力
workerMaxCupLoadAvg="10"
# worker最大cpu平均负载,用来判断worker是否还有执行能力,保持系统默认,默认为cpu核数的2倍,当负载达到2倍时,
#workerMaxCupLoadAvg="10"
# worker预留内存,用来判断master是否还有执行能力
workerReservedMemory="1"
......@@ -272,14 +272,14 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCupLoadAvg}#g" conf/master.properties
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties
sed -i ${txt} "s#worker.exec.threads.*#worker.exec.threads=${workerExecThreads}#g" conf/worker.properties
sed -i ${txt} "s#worker.heartbeat.interval.*#worker.heartbeat.interval=${workerHeartbeatInterval}#g" conf/worker.properties
sed -i ${txt} "s#worker.fetch.task.num.*#worker.fetch.task.num=${workerFetchTaskNum}#g" conf/worker.properties
sed -i ${txt} "s#worker.max.cpuload.avg.*#worker.max.cpuload.avg=${workerMaxCupLoadAvg}#g" conf/worker.properties
#sed -i ${txt} "s#worker.max.cpuload.avg.*#worker.max.cpuload.avg=${workerMaxCupLoadAvg}#g" conf/worker.properties
sed -i ${txt} "s#worker.reserved.memory.*#worker.reserved.memory=${workerReservedMemory}#g" conf/worker.properties
......
......@@ -59,6 +59,16 @@
<outputDirectory>./conf</outputDirectory>
</fileSet>
<fileSet>
<directory>script</directory>
<includes>
<include>start_all.sh</include>
<include>stop_all.sh</include>
<include>escheduler-daemon.sh</include>
</includes>
<outputDirectory>./bin</outputDirectory>
</fileSet>
<fileSet>
<directory>./</directory>
<includes>
......
#!/bin/bash
workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
echo "$workDir/lib"
java -Xmx1G -cp "$workDir/../lib/*" cn.escheduler.dao.upgrade.shell.CreateEscheduler
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
ESCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export ESCHEDULER_CONF_DIR=$ESCHEDULER_HOME/conf
export ESCHEDULER_LIB_JARS=$ESCHEDULER_HOME/lib/*
export ESCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=cn.escheduler.dao.upgrade.shell.CreateEscheduler
exec_command="$ESCHEDULER_OPTS -classpath $ESCHEDULER_CONF_DIR:$ESCHEDULER_LIB_JARS $CLASS"
cd $ESCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
#!/bin/bash
workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
echo "$workDir/lib"
java -Xmx1G -cp "$workDir/../lib/*" cn.escheduler.dao.upgrade.shell.UpgradeEscheduler
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
ESCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export ESCHEDULER_CONF_DIR=$ESCHEDULER_HOME/conf
export ESCHEDULER_LIB_JARS=$ESCHEDULER_HOME/lib/*
export ESCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=cn.escheduler.dao.upgrade.shell.UpgradeEscheduler
exec_command="$ESCHEDULER_OPTS -classpath $ESCHEDULER_CONF_DIR:$ESCHEDULER_LIB_JARS $CLASS"
cd $ESCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
......@@ -180,4 +180,66 @@ d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_schedules_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_schedules_C_worker_group_id;
\ No newline at end of file
DROP PROCEDURE ac_escheduler_T_t_escheduler_schedules_C_worker_group_id;
-- ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='worker_group_id')
THEN
ALTER TABLE t_escheduler_process_instance ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `process_instance_priority`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id;
-- ac_escheduler_T_t_escheduler_process_instance_C_timeout
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_instance_C_timeout;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_timeout()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='timeout')
THEN
ALTER TABLE `t_escheduler_process_instance` ADD COLUMN `timeout` int(11) NULL DEFAULT 0 COMMENT '超时时间' AFTER `worker_group_id`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_instance_C_timeout;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_timeout;
-- ac_escheduler_T_t_escheduler_process_definition_C_timeout
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_definition_C_timeout;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_timeout()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='timeout')
THEN
ALTER TABLE `t_escheduler_process_definition` ADD COLUMN `timeout` int(11) NULL DEFAULT 0 COMMENT '超时时间' AFTER `create_time`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_definition_C_timeout;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_timeout;
\ No newline at end of file
INSERT INTO `t_escheduler_version` (`version`) VALUES ('1.0.0');
\ No newline at end of file
INSERT INTO `t_escheduler_version` (`version`) VALUES ('1.0.2');
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册