提交 2afe2fa1 编写于 作者: I insist777 提交者: 薛 之 谦

[improvement] Add two parameters in workergroup, and support the application...

[improvement] Add two parameters in workergroup, and support the application of description display and other parameters
上级 00bb86f7
......@@ -17,10 +17,11 @@
package org.apache.dolphinscheduler.api.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_ADDRESS_LIST_FAIL;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL;
import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
......@@ -28,6 +29,11 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
......@@ -39,14 +45,11 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_ADDRESS_LIST_FAIL;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL;
import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
/**
* worker group controller
......@@ -72,7 +75,9 @@ public class WorkerGroupController extends BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
@ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String")
@ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String"),
@ApiImplicitParam(name = "description", value = "WORKER_DESC", required = false, dataType = "String"),
@ApiImplicitParam(name = "otherParamsJson", value = "WORKER_PARMS_JSON", required = false, dataType = "String"),
})
@PostMapping()
@ResponseStatus(HttpStatus.OK)
......@@ -81,9 +86,11 @@ public class WorkerGroupController extends BaseController {
public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "addrList") String addrList
@RequestParam(value = "addrList") String addrList,
@RequestParam(value = "description",required = false, defaultValue = "") String description,
@RequestParam(value = "otherParamsJson",required = false, defaultValue = "") String otherParamsJson
) {
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList);
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson);
return returnDataList(result);
}
......@@ -98,9 +105,9 @@ public class WorkerGroupController extends BaseController {
*/
@ApiOperation(value = "queryAllWorkerGroupsPaging", notes = "QUERY_WORKER_GROUP_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataTypeClass = int.class, example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataTypeClass = int.class, example = "20"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataTypeClass = String.class)
})
@GetMapping()
@ResponseStatus(HttpStatus.OK)
......@@ -109,8 +116,7 @@ public class WorkerGroupController extends BaseController {
public Result queryAllWorkerGroupsPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize,
@RequestParam(value = "searchVal", required = false) String searchVal
) {
@RequestParam(value = "searchVal", required = false) String searchVal) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
......@@ -146,15 +152,14 @@ public class WorkerGroupController extends BaseController {
*/
@ApiOperation(value = "deleteWorkerGroupById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataTypeClass = int.class, example = "10"),
})
@DeleteMapping(value = "/{id}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_WORKER_GROUP_FAIL)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteWorkerGroupById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("id") Integer id
) {
@PathVariable("id") Integer id) {
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(loginUser, id);
return returnDataList(result);
}
......
......@@ -34,9 +34,11 @@ public interface WorkerGroupService {
* @param id worker group id
* @param name worker group name
* @param addrList addr list
* @param description description
* @param otherParamsJson otherParamsJson
* @return create or update result code
*/
Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList);
Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson);
/**
* query worker group paging
......
......@@ -82,7 +82,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
*/
@Override
@Transactional
public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList) {
public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser,null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
......@@ -108,6 +108,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
if (checkWorkerGroupNameExists(workerGroup)) {
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
......@@ -118,14 +119,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}
handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
putMsg(result, Status.SUCCESS);
return result;
}
protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser, String otherParamsJson) {
if (workerGroup.getId() != 0) {
workerGroupMapper.updateById(workerGroup);
} else {
workerGroupMapper.insert(workerGroup);
permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
......@@ -272,6 +277,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
} else {
workerGroups = workerGroupMapper.queryAllWorkerGroup();
}
// worker groups from zookeeper
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
Collection<String> workerGroupList = null;
......@@ -289,7 +295,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
return workerGroups;
}
Map<String, WorkerGroup> workerGroupsMap = null;
if (workerGroups.size() != 0) {
workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName, workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup));
}
for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
Collection<String> childrenNodes = null;
......@@ -302,19 +311,27 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
continue;
}
WorkerGroup wg = new WorkerGroup();
handleAddrList(wg, workerGroup, childrenNodes);
wg.setName(workerGroup);
if (isPaging) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
String[] rv = registeredValue.split(Constants.COMMA);
wg.setCreateTime(new Date(Long.parseLong(rv[6])));
wg.setUpdateTime(new Date(Long.parseLong(rv[7])));
wg.setSystemDefault(true);
if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
workerGroups.remove(workerGroupsMap.get(workerGroup));
}
}
workerGroups.add(wg);
}
return workerGroups;
}
protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection<String> childrenNodes) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
}
/**
* delete worker group by id
......
......@@ -71,6 +71,8 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");
paramsMap.add("addrList","192.168.0.1,192.168.0.2");
paramsMap.add("description","");
paramsMap.add("otherParamsJson","");
MvcResult mvcResult = mockMvc.perform(post("/worker-groups")
.header("sessionId", sessionId)
.params(paramsMap))
......
......@@ -21,6 +21,7 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
......@@ -28,6 +29,7 @@ import java.util.Date;
* worker group
*/
@TableName("t_ds_worker_group")
@Data
public class WorkerGroup {
@TableId(value = "id", type = IdType.AUTO)
......@@ -41,67 +43,11 @@ public class WorkerGroup {
private Date updateTime;
private String description;
@TableField(exist = false)
private boolean systemDefault;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddrList() {
return addrList;
}
public void setAddrList(String addrList) {
this.addrList = addrList;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public boolean getSystemDefault() {
return systemDefault;
}
public void setSystemDefault(boolean systemDefault) {
this.systemDefault = systemDefault;
}
@Override
public String toString() {
return "WorkerGroup{"
+ "id= " + id
+ ", name= " + name
+ ", addrList= " + addrList
+ ", createTime= " + createTime
+ ", updateTime= " + updateTime
+ ", systemDefault= " + systemDefault
+ "}";
}
private String otherParamsJson;
}
......@@ -954,6 +954,8 @@ CREATE TABLE t_ds_worker_group
addr_list text NULL DEFAULT NULL,
create_time datetime NULL DEFAULT NULL,
update_time datetime NULL DEFAULT NULL,
description text NULL DEFAULT NULL,
other_params_json text NULL DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY name_unique (name)
);
......
......@@ -945,6 +945,8 @@ CREATE TABLE `t_ds_worker_group` (
`addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]',
`create_time` datetime NULL DEFAULT NULL COMMENT 'create time',
`update_time` datetime NULL DEFAULT NULL COMMENT 'update time',
`description` text NULL DEFAULT NULL COMMENT 'description',
`other_params_json` text NULL DEFAULT NULL COMMENT 'other params json',
PRIMARY KEY (`id`),
UNIQUE KEY `name_unique` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
......
......@@ -852,6 +852,8 @@ CREATE TABLE t_ds_worker_group (
addr_list text DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
description text DEFAULT NULL,
other_params_json text DEFAULT NULL,
PRIMARY KEY (id) ,
CONSTRAINT name_unique UNIQUE (name)
) ;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE `t_ds_worker_group` ADD COLUMN `other_params_json` text DEFAULT NULL COMMENT 'other params json';
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
)
RETURNS character varying
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_schema varchar;
BEGIN
---get schema name
v_schema =current_schema();
--- add column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json int DEFAULT NULL ';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_update_metadata();
d//
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_insert_dq_initial_data();
d//
......@@ -197,23 +197,16 @@ public class ServerNodeManager implements InitializingBean {
public void run() {
try {
// sync worker node info
Map<String, String> newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(newWorkerNodeInfo);
Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) {
String workerGroup = wg.getName();
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
if (newWorkerNodeInfo.containsKey(addr)) {
nodes.add(addr);
}
}
if (!nodes.isEmpty()) {
syncWorkerGroupNodes(workerGroup, nodes);
String workerGroupName = wg.getName();
Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
if (!workerAddress.isEmpty()) {
syncWorkerGroupNodes(workerGroupName, workerAddress);
}
}
}
......@@ -223,6 +216,18 @@ public class ServerNodeManager implements InitializingBean {
}
}
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
if (newWorkerNodeInfo.containsKey(addr)) {
nodes.add(addr);
}
}
return nodes;
}
/**
* worker group node listener
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册