提交 b3f46ef1 编写于 作者: B baoliang

add worker group

上级 8591fde3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.api.controller;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.service.WorkerGroupService;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.dao.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* worker group controller
*/
@RestController
@RequestMapping("/worker-group")
public class WorkerGroupController extends BaseController{
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupController.class);
@Autowired
WorkerGroupService workerGroupService;
@PostMapping(value = "/save")
@ResponseStatus(HttpStatus.OK)
public Result saveWorkerGroup(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "ipList") String ipList
) {
logger.info("save worker group: login user {}, id:{}, name: {}, ipList: {} ",
loginUser.getUserName(), id, name, ipList);
try {
Map<String, Object> result = workerGroupService.saveWorkerGroup(id, name, ipList);
return returnDataList(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
@PostMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
public Result queryAllWorkerGroupsPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize
) {
logger.info("query all worker group: login user {}, pageNo:{}, pageSize:{}, searchVal:{}",
loginUser.getUserName() , pageNo, pageSize, searchVal);
try {
Map<String, Object> result = workerGroupService.queryAllGroupPaging(pageNo, pageSize, searchVal);
return returnDataListPaging(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
@PostMapping(value = "/delete-by-id")
@ResponseStatus(HttpStatus.OK)
public Result deleteById(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("id") Integer id
) {
logger.info("delete worker group: login user {}, id:{} ",
loginUser.getUserName() , id);
try {
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(id);
return returnDataList(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
}
......@@ -156,6 +156,10 @@ public enum Status {
UPDATE_QUEUE_ERROR(10131, "update queue error"),
NEED_NOT_UPDATE_QUEUE(10132, "no content changes, no updates are required"),
VERIFY_QUEUE_ERROR(10133,"verify queue error"),
NAME_NULL(10134,"name must be not null"),
NAME_EXIST(10135, "name {0} already exists"),
SAVE_ERROR(10136, "save error"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"),
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.api.service;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.PageInfo;
import cn.escheduler.dao.mapper.WorkerGroupMapper;
import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.WorkerGroup;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* work group service
*/
@Service
public class WorkerGroupService extends BaseService {
@Autowired
WorkerGroupMapper workerGroupMapper;
public Map<String, Object> saveWorkerGroup(int id, String name, String ipList){
Map<String, Object> result = new HashMap<>(5);
if(StringUtils.isEmpty(name)){
putMsg(result, Status.NAME_NULL);
return result;
}
Date now = new Date();
WorkerGroup workerGroup = new WorkerGroup();
if(id != 0){
workerGroup = workerGroupMapper.queryById(id);
}else{
workerGroup.setCreateTime(now);
}
workerGroup.setName(name);
workerGroup.setIpList(ipList);
workerGroup.setUpdateTime(now);
if(checkWorkerGroupNameExists(workerGroup)){
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
if(workerGroup.getId() != 0 ){
workerGroupMapper.update(workerGroup);
}else{
workerGroupMapper.insert(workerGroup);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check worker group name exists
* @param workerGroup
* @return
*/
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
if(workerGroupList.size() > 0 ){
// new group has same name..
if(workerGroup.getId() == 0){
return true;
}
// update group...
for(WorkerGroup group : workerGroupList){
if(group.getId() != workerGroup.getId()){
return true;
}
}
}
return false;
}
public Map<String,Object> queryAllGroupPaging(Integer pageNo, Integer pageSize, String searchVal) {
Map<String, Object> result = new HashMap<>(5);
int count = workerGroupMapper.countPaging(searchVal);
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
List<WorkerGroup> workerGroupList = workerGroupMapper.queryListPaging(pageInfo.getStart(), pageSize, searchVal);
pageInfo.setTotalCount(count);
pageInfo.setLists(workerGroupList);
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
public Map<String,Object> deleteWorkerGroupById(Integer id) {
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS);
return result;
}
}
......@@ -12,7 +12,7 @@ CREATE TABLE `t_escheduler_access_token` (
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
CREATE TABLE `escheduler`.`t_escheduler_error_command` (
CREATE TABLE `t_escheduler_error_command` (
`id` int(11) NOT NULL COMMENT '主键',
`command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程 4 从失败节点开始执行',
`executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者',
......@@ -30,3 +30,13 @@ CREATE TABLE `escheduler`.`t_escheduler_error_command` (
`message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
CREATE TABLE `t_escheduler_worker_group` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称',
`ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表',
`create_time` date NULL DEFAULT NULL COMMENT '创建时间',
`update_time` date NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.mapper;
import cn.escheduler.dao.model.WorkerGroup;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.JdbcType;
import java.util.Date;
import java.util.List;
public interface WorkerGroupMapper {
/**
* query all worker group list
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryAllWorkerGroup")
List<WorkerGroup> queryAllWorkerGroup();
/**
* query worker group by name
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryWorkerGroupByName")
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
/**
* query worker group paging by search value
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryListPaging")
List<WorkerGroup> queryListPaging(@Param("offset") int offset,
@Param("pageSize") int pageSize,
@Param("searchVal") String searchVal);
/**
* count worker group by search value
* @param searchVal
* @return
*/
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "countPaging")
int countPaging(@Param("searchVal") String searchVal);
/**
* insert worker server
*
* @param workerGroup
* @return
*/
@InsertProvider(type = WorkerGroupMapperProvider.class, method = "insert")
@Options(useGeneratedKeys = true,keyProperty = "workerGroup.id")
@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "workerGroup.id", before = false, resultType = int.class)
int insert(@Param("workerGroup") WorkerGroup workerGroup);
/**
* update worker
*
* @param workerGroup
* @return
*/
@UpdateProvider(type = WorkerGroupMapperProvider.class, method = "update")
int update(@Param("workerGroup") WorkerGroup workerGroup);
/**
* delete work group by id
* @param id
* @return
*/
@DeleteProvider(type = WorkerGroupMapperProvider.class, method = "deleteById")
int deleteById(@Param("id") int id);
/**
* query work group by id
* @param id
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@DeleteProvider(type = WorkerGroupMapperProvider.class, method = "queryById")
WorkerGroup queryById(@Param("id") int id);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.mapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
import java.util.Map;
public class WorkerGroupMapperProvider {
private static final String TABLE_NAME = "t_escheduler_worker_group";
/**
* query worker list
* @return
*/
public String queryAllWorkerGroup() {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
}}.toString();
}
/**
* insert worker server
* @param parameter
* @return
*/
public String insert(Map<String, Object> parameter) {
return new SQL() {{
INSERT_INTO(TABLE_NAME);
VALUES("id", "#{workerGroup.id}");
VALUES("name", "#{workerGroup.name}");
VALUES("ip_list", "#{workerGroup.ipList}");
VALUES("create_time", "#{workerGroup.createTime}");
VALUES("update_time", "#{workerGroup.updateTime}");
}}.toString();
}
/**
* update worker group
*
* @param parameter
* @return
*/
public String update(Map<String, Object> parameter) {
return new SQL() {{
UPDATE(TABLE_NAME);
SET("name = #{workerGroup.name}");
SET("ip_list = #{workerGroup.ipList}");
SET("create_time = #{workerGroup.createTime}");
SET("update_time = #{workerGroup.updateTime}");
WHERE("id = #{workerGroup.id}");
}}.toString();
}
/**
* delete worker group by id
* @param parameter
* @return
*/
public String deleteById(Map<String, Object> parameter) {
return new SQL() {{
DELETE_FROM(TABLE_NAME);
WHERE("id = #{id}");
}}.toString();
}
/**
* query worker group by name
* @param parameter
* @return
*/
public String queryWorkerGroupByName(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("name = #{name}");
}}.toString();
}
/**
* query worker group by id
* @param parameter
* @return
*/
public String queryById(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("id = #{id}");
}}.toString();
}
/**
* query worker group by id
* @param parameter
* @return
*/
public String queryListPaging(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
Object searchVal = parameter.get("searchVal");
if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){
WHERE( " name like concat('%', #{searchVal}, '%') ");
}
ORDER_BY(" update_time desc limit #{offset},#{pageSize} ");
}}.toString();
}
/**
* count worker group number by search value
* @param parameter
* @return
*/
public String countPaging(Map<String, Object> parameter) {
return new SQL() {{
SELECT("count(0)");
FROM(TABLE_NAME);
Object searchVal = parameter.get("searchVal");
if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){
WHERE( " name like concat('%', #{searchVal}, '%') ");
}
}}.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.model;
import java.util.Date;
/**
* worker group for task running
*/
public class WorkerGroup {
private int id;
private String name;
private String ipList;
private Date createTime;
private Date updateTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIpList() {
return ipList;
}
public void setIpList(String ipList) {
this.ipList = ipList;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "Worker group model{" +
"id= " + id +
",name= " + name +
",ipList= " + ipList +
",createTime= " + createTime +
",updateTime= " + updateTime +
"}";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
# base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8
spring.datasource.username=xx
spring.datasource.password=xx
spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root@123
# connection configuration
spring.datasource.initialSize=5
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.mapper;
import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.model.WorkerGroup;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Date;
import java.util.List;
/**
* worker group mapper test
*/
public class WorkerGroupMapperTest {
WorkerGroupMapper workerGroupMapper;
@Before
public void before() {
workerGroupMapper = ConnectionFactory.getSqlSession().getMapper(WorkerGroupMapper.class);
}
@Test
public void test() {
WorkerGroup workerGroup = new WorkerGroup();
String name = "workerGroup3";
workerGroup.setName(name);
workerGroup.setIpList("192.168.220.154,192.168.220.188");
workerGroup.setCreateTime(new Date());
workerGroup.setUpdateTime(new Date());
workerGroupMapper.insert(workerGroup);
Assert.assertNotEquals(workerGroup.getId(), 0);
List<WorkerGroup> workerGroups2 = workerGroupMapper.queryWorkerGroupByName(name);
Assert.assertEquals(workerGroups2.size(), 1);
workerGroup.setName("workerGroup11");
workerGroupMapper.update(workerGroup);
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
Assert.assertNotEquals(workerGroups.size(), 0);
workerGroupMapper.deleteById(workerGroup.getId());
workerGroups = workerGroupMapper.queryAllWorkerGroup();
Assert.assertEquals(workerGroups.size(), 0);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册