From 2afe2fa1e2444edb7b17a599a454a3e702728e7e Mon Sep 17 00:00:00 2001 From: insist777 <84278047+insist777@users.noreply.github.com> Date: Tue, 23 Aug 2022 10:54:17 +0800 Subject: [PATCH] [improvement] Add two parameters in workergroup, and support the application of description display and other parameters --- .../api/controller/WorkerGroupController.java | 49 +++++++------- .../api/service/WorkerGroupService.java | 4 +- .../service/impl/WorkerGroupServiceImpl.java | 27 ++++++-- .../controller/WorkerGroupControllerTest.java | 2 + .../dao/entity/WorkerGroup.java | 64 ++----------------- .../resources/sql/dolphinscheduler_h2.sql | 2 + .../resources/sql/dolphinscheduler_mysql.sql | 2 + .../sql/dolphinscheduler_postgresql.sql | 2 + .../mysql/dolphinscheduler_ddl.sql | 20 ++++++ .../mysql/dolphinscheduler_dml.sql | 16 +++++ .../postgresql/dolphinscheduler_ddl.sql | 54 ++++++++++++++++ .../postgresql/dolphinscheduler_dml.sql | 30 +++++++++ .../master/registry/ServerNodeManager.java | 31 +++++---- 13 files changed, 203 insertions(+), 100 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index 3966236f6..69e1a57d0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -17,10 +17,11 @@ package org.apache.dolphinscheduler.api.controller; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_ADDRESS_LIST_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR; + import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.WorkerGroupService; @@ -28,6 +29,11 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.User; + +import springfox.documentation.annotations.ApiIgnore; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.DeleteMapping; @@ -39,14 +45,11 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; -import springfox.documentation.annotations.ApiIgnore; - -import java.util.Map; -import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_ADDRESS_LIST_FAIL; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL; -import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; /** * worker group controller @@ -72,7 +75,9 @@ public class WorkerGroupController extends BaseController { @ApiImplicitParams({ @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"), @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"), - @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String") + @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String"), + @ApiImplicitParam(name = "description", value = "WORKER_DESC", required = false, dataType = "String"), + @ApiImplicitParam(name = "otherParamsJson", value = "WORKER_PARMS_JSON", required = false, dataType = "String"), }) @PostMapping() @ResponseStatus(HttpStatus.OK) @@ -81,9 +86,11 @@ public class WorkerGroupController extends BaseController { public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "id", required = false, defaultValue = "0") int id, @RequestParam(value = "name") String name, - @RequestParam(value = "addrList") String addrList + @RequestParam(value = "addrList") String addrList, + @RequestParam(value = "description",required = false, defaultValue = "") String description, + @RequestParam(value = "otherParamsJson",required = false, defaultValue = "") String otherParamsJson ) { - Map result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList); + Map result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson); return returnDataList(result); } @@ -98,9 +105,9 @@ public class WorkerGroupController extends BaseController { */ @ApiOperation(value = "queryAllWorkerGroupsPaging", notes = "QUERY_WORKER_GROUP_PAGING_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), - @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"), - @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String") + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataTypeClass = int.class, example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataTypeClass = int.class, example = "20"), + @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataTypeClass = String.class) }) @GetMapping() @ResponseStatus(HttpStatus.OK) @@ -109,8 +116,7 @@ public class WorkerGroupController extends BaseController { public Result queryAllWorkerGroupsPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize, - @RequestParam(value = "searchVal", required = false) String searchVal - ) { + @RequestParam(value = "searchVal", required = false) String searchVal) { Result result = checkPageParams(pageNo, pageSize); if (!result.checkResult()) { return result; @@ -146,15 +152,14 @@ public class WorkerGroupController extends BaseController { */ @ApiOperation(value = "deleteWorkerGroupById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"), + @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataTypeClass = int.class, example = "10"), }) @DeleteMapping(value = "/{id}") @ResponseStatus(HttpStatus.OK) @ApiException(DELETE_WORKER_GROUP_FAIL) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result deleteWorkerGroupById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable("id") Integer id - ) { + @PathVariable("id") Integer id) { Map result = workerGroupService.deleteWorkerGroupById(loginUser, id); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 2de4134db..4c474b75c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -34,9 +34,11 @@ public interface WorkerGroupService { * @param id worker group id * @param name worker group name * @param addrList addr list + * @param description description + * @param otherParamsJson otherParamsJson * @return create or update result code */ - Map saveWorkerGroup(User loginUser, int id, String name, String addrList); + Map saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson); /** * query worker group paging diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 693fe9d9b..d566d7c62 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -82,7 +82,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro */ @Override @Transactional - public Map saveWorkerGroup(User loginUser, int id, String name, String addrList) { + public Map saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson) { Map result = new HashMap<>(); if (!canOperatorPermissions(loginUser,null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) { putMsg(result, Status.USER_NO_OPERATION_PERM); @@ -108,6 +108,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro workerGroup.setName(name); workerGroup.setAddrList(addrList); workerGroup.setUpdateTime(now); + workerGroup.setDescription(description); if (checkWorkerGroupNameExists(workerGroup)) { putMsg(result, Status.NAME_EXIST, workerGroup.getName()); @@ -118,14 +119,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr); return result; } + handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson); + putMsg(result, Status.SUCCESS); + return result; + } + + protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser, String otherParamsJson) { if (workerGroup.getId() != 0) { workerGroupMapper.updateById(workerGroup); } else { workerGroupMapper.insert(workerGroup); permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger); } - putMsg(result, Status.SUCCESS); - return result; } /** @@ -272,6 +277,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } else { workerGroups = workerGroupMapper.queryAllWorkerGroup(); } + // worker groups from zookeeper String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; Collection workerGroupList = null; @@ -289,7 +295,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } return workerGroups; } - + Map workerGroupsMap = null; + if (workerGroups.size() != 0) { + workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName, workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup)); + } for (String workerGroup : workerGroupList) { String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup; Collection childrenNodes = null; @@ -302,19 +311,27 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro continue; } WorkerGroup wg = new WorkerGroup(); + handleAddrList(wg, workerGroup, childrenNodes); wg.setName(workerGroup); if (isPaging) { - wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next()); String[] rv = registeredValue.split(Constants.COMMA); wg.setCreateTime(new Date(Long.parseLong(rv[6]))); wg.setUpdateTime(new Date(Long.parseLong(rv[7]))); wg.setSystemDefault(true); + if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) { + wg.setDescription(workerGroupsMap.get(workerGroup).getDescription()); + workerGroups.remove(workerGroupsMap.get(workerGroup)); + } } workerGroups.add(wg); } return workerGroups; } + + protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection childrenNodes) { + wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); + } /** * delete worker group by id diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 9077bcc1e..f4ab73588 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -71,6 +71,8 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("name","cxc_work_group"); paramsMap.add("addrList","192.168.0.1,192.168.0.2"); + paramsMap.add("description",""); + paramsMap.add("otherParamsJson",""); MvcResult mvcResult = mockMvc.perform(post("/worker-groups") .header("sessionId", sessionId) .params(paramsMap)) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java index 204ee2eb4..b84e3ff9f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java @@ -21,6 +21,7 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; import java.util.Date; @@ -28,6 +29,7 @@ import java.util.Date; * worker group */ @TableName("t_ds_worker_group") +@Data public class WorkerGroup { @TableId(value = "id", type = IdType.AUTO) @@ -41,67 +43,11 @@ public class WorkerGroup { private Date updateTime; + private String description; + @TableField(exist = false) private boolean systemDefault; - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getAddrList() { - return addrList; - } - - public void setAddrList(String addrList) { - this.addrList = addrList; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getUpdateTime() { - return updateTime; - } - - public void setUpdateTime(Date updateTime) { - this.updateTime = updateTime; - } - - public boolean getSystemDefault() { - return systemDefault; - } - - public void setSystemDefault(boolean systemDefault) { - this.systemDefault = systemDefault; - } - - @Override - public String toString() { - return "WorkerGroup{" - + "id= " + id - + ", name= " + name - + ", addrList= " + addrList - + ", createTime= " + createTime - + ", updateTime= " + updateTime - + ", systemDefault= " + systemDefault - + "}"; - } + private String otherParamsJson; } diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 92271be52..b46bc632a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -954,6 +954,8 @@ CREATE TABLE t_ds_worker_group addr_list text NULL DEFAULT NULL, create_time datetime NULL DEFAULT NULL, update_time datetime NULL DEFAULT NULL, + description text NULL DEFAULT NULL, + other_params_json text NULL DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY name_unique (name) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 8a2b74bca..bda7b10b8 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -945,6 +945,8 @@ CREATE TABLE `t_ds_worker_group` ( `addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]', `create_time` datetime NULL DEFAULT NULL COMMENT 'create time', `update_time` datetime NULL DEFAULT NULL COMMENT 'update time', + `description` text NULL DEFAULT NULL COMMENT 'description', + `other_params_json` text NULL DEFAULT NULL COMMENT 'other params json', PRIMARY KEY (`id`), UNIQUE KEY `name_unique` (`name`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 935bf47a1..0d9cab408 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -852,6 +852,8 @@ CREATE TABLE t_ds_worker_group ( addr_list text DEFAULT NULL , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , + description text DEFAULT NULL, + other_params_json text DEFAULT NULL, PRIMARY KEY (id) , CONSTRAINT name_unique UNIQUE (name) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000..3d0ad0197 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + + +ALTER TABLE `t_ds_worker_group` ADD COLUMN `other_params_json` text DEFAULT NULL COMMENT 'other params json'; + diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 000000000..4a14f326b --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000..372b99be4 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +delimiter d// + + + +CREATE OR REPLACE FUNCTION public.dolphin_update_metadata( + ) + RETURNS character varying + LANGUAGE 'plpgsql' + COST 100 + VOLATILE PARALLEL UNSAFE +AS $BODY$ +DECLARE +v_schema varchar; +BEGIN + ---get schema name + v_schema =current_schema(); + + + +--- add column +EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json int DEFAULT NULL '; + + + +return 'Success!'; +exception when others then + ---Raise EXCEPTION '(%)',SQLERRM; + + return SQLERRM; +END; +$BODY$; + +select dolphin_update_metadata(); + + +d// + diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 000000000..4d7327f76 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +delimiter d// + +return 'Success!'; +exception when others then + ---Raise EXCEPTION '(%)',SQLERRM; + return SQLERRM; +END; +$BODY$; + +select dolphin_insert_dq_initial_data(); + +d// + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index a6599b603..8112bd82d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -197,23 +197,16 @@ public class ServerNodeManager implements InitializingBean { public void run() { try { // sync worker node info - Map newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true); - syncAllWorkerNodeInfo(newWorkerNodeInfo); - + Map registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true); + syncAllWorkerNodeInfo(registryWorkerNodeMap); // sync worker group nodes from database List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); if (CollectionUtils.isNotEmpty(workerGroupList)) { for (WorkerGroup wg : workerGroupList) { - String workerGroup = wg.getName(); - Set nodes = new HashSet<>(); - String[] addrs = wg.getAddrList().split(Constants.COMMA); - for (String addr : addrs) { - if (newWorkerNodeInfo.containsKey(addr)) { - nodes.add(addr); - } - } - if (!nodes.isEmpty()) { - syncWorkerGroupNodes(workerGroup, nodes); + String workerGroupName = wg.getName(); + Set workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg); + if (!workerAddress.isEmpty()) { + syncWorkerGroupNodes(workerGroupName, workerAddress); } } } @@ -223,6 +216,18 @@ public class ServerNodeManager implements InitializingBean { } } + + protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo, WorkerGroup wg) { + Set nodes = new HashSet<>(); + String[] addrs = wg.getAddrList().split(Constants.COMMA); + for (String addr : addrs) { + if (newWorkerNodeInfo.containsKey(addr)) { + nodes.add(addr); + } + } + return nodes; + } + /** * worker group node listener */ -- GitLab