未验证 提交 38c965dd 编写于 作者: L lgcareer 提交者: GitHub

new feature for #404 add resource tree function (#2323)

* add create resource directory

* add create resource directory

* update the resource test

* add upgrade sql in version 1.2.2

* Adding request parameter id to update queryResourceListPaging

* set isDirectory value is false default

* add full name to update updateResource

* remove request parameter isDirectory to update createResource method

* update queryResourceListPaging with change get to post

* update updateResource method with remove fullName

* File management list modification (#1976)

* add resource component

* add resource tree visitor

* return json string

* update queryResourceList

* upload file need fullName

* add method rootNode

* Shell task resources and authorization resources (#1989)

* File management list modification

* Shell task resources and authorization resources

* download resource when execute task

* download resource when execute task

* update authorization type

* download resource when execute task

* Spark task resource changes (#1990)

* File management list modification

* Shell task resources and authorization resources

* Spark task resource changes

* download resource when execute task

* update udf function service

* add resource type in ResourceComponent

* UDF resource tree and change DAG style (#2019)

* File management list modification

* Shell task resources and authorization resources

* Spark task resource changes

* UDF resource tree and change DAG style

* add deleteIds method in ResourceMapper and ResourceMapperTest

* Add comments on class and method

* add queryResourceByName method in controller

* update verify-name with change name to full name

* update queryResource with add parameter pid

* update queryResource with add parameter pid

* add resource ids in process definition and delete resource need judge whether it is used by any process definition

* Breadcrumb development (#2033)

* File management list modification

* Shell task resources and authorization resources

* Spark task resource changes

* UDF resource tree and change DAG style

* Breadcrumb development

* Breadcrumb development

* Resource tree bug fix (#2040)

* File management list modification

* Shell task resources and authorization resources

* Spark task resource changes

* UDF resource tree and change DAG style

* Breadcrumb development

* Breadcrumb development

* Resource tree bug fix

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* Fix github action rerun failed (#2067)

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* Change crumb position

* Change crumb position (#2068)

* build resource process definition map

* UDF changed to multiple choice

* UDF changed to multiple choice (#2077)

* Change crumb position

* UDF changed to multiple choice

* build resource process definition map (#2076)

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* build resource process definition map

* update resource name also need update all the children full name

* need add queryResource

* update resource name also need update all the children full name (#2096)

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* build resource process definition map

* update resource name also need update all the children full name

* need add queryResource

* Limit customization file content to no more than 3000 lines

* Limit customization file content to no more than 3000 lines(#2128)

* Limit customization file content to no more than 3000 lines(#2128) (#2140)

* Change crumb position

* UDF changed to multiple choice

* Limit customization file content to no more than 3000 lines

* Limit customization file content to no more than 3000 lines(#2128)

* add queryResourceJarList

* add queryResourceJarList

* add queryResourceJarList

* add queryResourceJarList (#2192)

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* build resource process definition map

* update resource name also need update all the children full name

* need add queryResource

* add queryResourceJarList

* add queryResourceJarList

* add queryResourceJarList

* Modify the main jar package

* Modify the main jar package (#2200)

* Change crumb position

* UDF changed to multiple choice

* Limit customization file content to no more than 3000 lines

* Limit customization file content to no more than 3000 lines(#2128)

* Modify the main jar package

* add resource filter in order to get filtered resource

* add comments of resource filter

* update list children by resource

* choose main jar with resource tree (#2220)

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* build resource process definition map

* update resource name also need update all the children full name

* need add queryResource

* add queryResourceJarList

* add queryResourceJarList

* add queryResourceJarList

* add resource filter in order to get filtered resource

* add comments of resource filter

* update list children by resource

* Return null if query resource list is empty

* update queryResource method change parameter pid to id

* getResouDelete checksum and modify parameter namerceId

* revert .env

* remove parameter pid

* Delete request interface

* go back to the last page

* jar interface call

* Fix issue #2234 and #2228

* change resource name with full name

* Fix issue #2234 and #2228 (#2246)

* update resource service test

* Fix github action rerun failed

* add status of PARENT_RESOURCE_NOT_EXIST

* build resource process definition map

* update resource name also need update all the children full name

* need add queryResource

* add queryResourceJarList

* add queryResourceJarList

* add queryResourceJarList

* add resource filter in order to get filtered resource

* add comments of resource filter

* update list children by resource

* Return null if query resource list is empty

* update queryResource method change parameter pid to id

* revert .env

* remove parameter pid

* Fix issue #2234 and #2228

* change resource name with full name

* Fix list query value error

* remove unauth-file with authorize-resource-tree

* Repair data cannot be echoed

* Repair data cannot be echoed

* execute mr and spark task need query resource name before

* Authorized resource interface replacement

* Authorized resource interface replacement

* Filter UDF resources

* Change parameters

* need query all authorized directory children when create task

* Change normalize.scss import method and animation.scss license modification

* Delete file list update processing

* It's fixed that resource not deleted in hdfs when delete it.

* add tooltips

* add tooltips (#2310)

* Echo workflow name and modify udf management name

* [new feature]add resource tree function

* revert front code in order to be same as dev branch

* revert front code in order to be same as dev branch

* revert common.properties and application.properties

* add super method

* update flink parameter test

* update flink parameter and unit test

* update resource service test

* If resource list is empty,need init it

* update flink parameter test
Co-authored-by: Nbreak60 <790061044@qq.com>
Co-authored-by: Nxingchun-chen <55787491+xingchun-chen@users.noreply.github.com>
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>
上级 48d7612c
......@@ -60,6 +60,50 @@ public class ResourcesController extends BaseController{
@Autowired
private UdfFuncService udfFuncService;
/**
* create resource
*
* @param loginUser login user
* @param alias alias
* @param description description
* @param type type
* @return create result code
*/
/**
*
* @param loginUser login user
* @param type type
* @param alias alias
* @param description description
* @param pid parent id
* @param currentDir current directory
* @return
*/
@ApiOperation(value = "createDirctory", notes= "CREATE_RESOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String"),
@ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType ="String"),
@ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile")
})
@PostMapping(value = "/directory/create")
public Result createDirectory(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "type") ResourceType type,
@RequestParam(value ="name") String alias,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value ="pid") int pid,
@RequestParam(value ="currentDir") String currentDir) {
try {
logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}",
loginUser.getUserName(),type, alias, description,pid,currentDir);
return resourceService.createDirectory(loginUser,alias, description,type ,pid,currentDir);
} catch (Exception e) {
logger.error(CREATE_RESOURCE_ERROR.getMsg(),e);
return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg());
}
}
/**
* create resource
*
......@@ -80,13 +124,15 @@ public class ResourcesController extends BaseController{
@PostMapping(value = "/create")
public Result createResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "type") ResourceType type,
@RequestParam(value ="name")String alias,
@RequestParam(value ="name") String alias,
@RequestParam(value = "description", required = false) String description,
@RequestParam("file") MultipartFile file) {
@RequestParam("file") MultipartFile file,
@RequestParam(value ="pid") int pid,
@RequestParam(value ="currentDir") String currentDir) {
try {
logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}",
loginUser.getUserName(),type, alias, description, file.getName(), file.getOriginalFilename());
return resourceService.createResource(loginUser,alias, description,type ,file);
return resourceService.createResource(loginUser,alias, description,type ,file,pid,currentDir);
} catch (Exception e) {
logger.error(CREATE_RESOURCE_ERROR.getMsg(),e);
return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg());
......@@ -120,7 +166,7 @@ public class ResourcesController extends BaseController{
try {
logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}",
loginUser.getUserName(),type, alias, description);
return resourceService.updateResource(loginUser,resourceId,alias, description,type);
return resourceService.updateResource(loginUser,resourceId,alias,description,type);
} catch (Exception e) {
logger.error(UPDATE_RESOURCE_ERROR.getMsg(),e);
return error(Status.UPDATE_RESOURCE_ERROR.getCode(), Status.UPDATE_RESOURCE_ERROR.getMsg());
......@@ -166,6 +212,7 @@ public class ResourcesController extends BaseController{
@ApiOperation(value = "queryResourceListPaging", notes= "QUERY_RESOURCE_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType ="int"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType ="String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType ="Int",example = "20")
......@@ -174,6 +221,7 @@ public class ResourcesController extends BaseController{
@ResponseStatus(HttpStatus.OK)
public Result queryResourceListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="type") ResourceType type,
@RequestParam(value ="id") int id,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize
......@@ -187,7 +235,7 @@ public class ResourcesController extends BaseController{
}
searchVal = ParameterUtils.handleEscapes(searchVal);
result = resourceService.queryResourceListPaging(loginUser,type,searchVal,pageNo, pageSize);
result = resourceService.queryResourceListPaging(loginUser,id,type,searchVal,pageNo, pageSize);
return returnDataListPaging(result);
}catch (Exception e){
logger.error(QUERY_RESOURCES_LIST_PAGING.getMsg(),e);
......@@ -227,32 +275,89 @@ public class ResourcesController extends BaseController{
* verify resource by alias and type
*
* @param loginUser login user
* @param alias resource name
* @param type resource type
* @param fullName resource full name
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
@ApiOperation(value = "verifyResourceName", notes= "VERIFY_RESOURCE_NAME_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String")
@ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String")
})
@GetMapping(value = "/verify-name")
@ResponseStatus(HttpStatus.OK)
public Result verifyResourceName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="name") String alias,
@RequestParam(value ="fullName") String fullName,
@RequestParam(value ="type") ResourceType type
) {
try {
logger.info("login user {}, verfiy resource alias: {},resource type: {}",
loginUser.getUserName(), alias,type);
loginUser.getUserName(), fullName,type);
return resourceService.verifyResourceName(alias,type,loginUser);
return resourceService.verifyResourceName(fullName,type,loginUser);
} catch (Exception e) {
logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e);
return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg());
}
}
/**
* query resources jar list
*
* @param loginUser login user
* @param type resource type
* @return resource list
*/
@ApiOperation(value = "queryResourceJarList", notes= "QUERY_RESOURCE_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType")
})
@GetMapping(value="/list/jar")
@ResponseStatus(HttpStatus.OK)
public Result queryResourceJarList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="type") ResourceType type
){
try{
logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString());
Map<String, Object> result = resourceService.queryResourceJarList(loginUser, type);
return returnDataList(result);
}catch (Exception e){
logger.error(QUERY_RESOURCES_LIST_ERROR.getMsg(),e);
return error(Status.QUERY_RESOURCES_LIST_ERROR.getCode(), Status.QUERY_RESOURCES_LIST_ERROR.getMsg());
}
}
/**
* query resource by full name and type
*
* @param loginUser login user
* @param fullName resource full name
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
@ApiOperation(value = "queryResource", notes= "QUERY_BY_RESOURCE_NAME")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String")
})
@GetMapping(value = "/queryResource")
@ResponseStatus(HttpStatus.OK)
public Result queryResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="fullName",required = false) String fullName,
@RequestParam(value ="id",required = false) Integer id,
@RequestParam(value ="type") ResourceType type
) {
try {
logger.info("login user {}, query resource by full name: {} or id: {},resource type: {}",
loginUser.getUserName(), fullName,id,type);
return resourceService.queryResource(fullName,id,type);
} catch (Exception e) {
logger.error(RESOURCE_NOT_EXIST.getMsg(), e);
return error(Status.RESOURCE_NOT_EXIST.getCode(), Status.RESOURCE_NOT_EXIST.getMsg());
}
}
/**
* view resource file online
*
......@@ -310,16 +415,18 @@ public class ResourcesController extends BaseController{
@RequestParam(value ="fileName")String fileName,
@RequestParam(value ="suffix")String fileSuffix,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "content") String content
@RequestParam(value = "content") String content,
@RequestParam(value ="pid") int pid,
@RequestParam(value ="currentDir") String currentDir
) {
try{
logger.info("login user {}, online create resource! fileName : {}, type : {}, suffix : {},desc : {},content : {}",
loginUser.getUserName(),fileName,type,fileSuffix,description,content);
loginUser.getUserName(),fileName,type,fileSuffix,description,content,pid,currentDir);
if(StringUtils.isEmpty(content)){
logger.error("resource file contents are not allowed to be empty");
return error(Status.RESOURCE_FILE_IS_EMPTY.getCode(), RESOURCE_FILE_IS_EMPTY.getMsg());
}
return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content);
return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content,pid,currentDir);
}catch (Exception e){
logger.error(CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg(),e);
return error(Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getCode(), Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg());
......@@ -384,6 +491,9 @@ public class ResourcesController extends BaseController{
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getFilename() + "\"")
.body(file);
}catch (RuntimeException e){
logger.error(e.getMessage(),e);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
}catch (Exception e){
logger.error(DOWNLOAD_RESOURCE_FILE_ERROR.getMsg(),e);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Status.DOWNLOAD_RESOURCE_FILE_ERROR.getMsg());
......@@ -658,21 +768,21 @@ public class ResourcesController extends BaseController{
* @param userId user id
* @return unauthorized result code
*/
@ApiOperation(value = "unauthorizedFile", notes= "UNAUTHORIZED_FILE_NOTES")
@ApiOperation(value = "authorizeResourceTree", notes= "AUTHORIZE_RESOURCE_TREE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType ="Int", example = "100")
})
@GetMapping(value = "/unauth-file")
@GetMapping(value = "/authorize-resource-tree")
@ResponseStatus(HttpStatus.CREATED)
public Result unauthorizedFile(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result authorizeResourceTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("userId") Integer userId) {
try{
logger.info("resource unauthorized file, user:{}, unauthorized user id:{}", loginUser.getUserName(), userId);
Map<String, Object> result = resourceService.unauthorizedFile(loginUser, userId);
logger.info("all resource file, user:{}, user id:{}", loginUser.getUserName(), userId);
Map<String, Object> result = resourceService.authorizeResourceTree(loginUser, userId);
return returnDataList(result);
}catch (Exception e){
logger.error(UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg(),e);
return error(Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getCode(), Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg());
logger.error(AUTHORIZE_RESOURCE_TREE.getMsg(),e);
return error(Status.AUTHORIZE_RESOURCE_TREE.getCode(), Status.AUTHORIZE_RESOURCE_TREE.getMsg());
}
}
......
package org.apache.dolphinscheduler.api.dto.resources;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* directory
*/
public class Directory extends ResourceComponent{
@Override
public boolean isDirctory() {
return true;
}
}
package org.apache.dolphinscheduler.api.dto.resources;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* file leaf
*/
public class FileLeaf extends ResourceComponent{
}
package org.apache.dolphinscheduler.api.dto.resources;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.annotation.JSONType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import java.util.ArrayList;
import java.util.List;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* resource component
*/
@JSONType(orders={"id","pid","name","fullName","description","isDirctory","children","type"})
public abstract class ResourceComponent {
public ResourceComponent() {
}
public ResourceComponent(int id, int pid, String name, String fullName, String description, boolean isDirctory) {
this.id = id;
this.pid = pid;
this.name = name;
this.fullName = fullName;
this.description = description;
this.isDirctory = isDirctory;
int directoryFlag = isDirctory ? 1:0;
this.idValue = String.format("%s_%s",id,directoryFlag);
}
/**
* id
*/
@JSONField(ordinal = 1)
protected int id;
/**
* parent id
*/
@JSONField(ordinal = 2)
protected int pid;
/**
* name
*/
@JSONField(ordinal = 3)
protected String name;
/**
* current directory
*/
protected String currentDir;
/**
* full name
*/
@JSONField(ordinal = 4)
protected String fullName;
/**
* description
*/
@JSONField(ordinal = 5)
protected String description;
/**
* is directory
*/
@JSONField(ordinal = 6)
protected boolean isDirctory;
/**
* id value
*/
@JSONField(ordinal = 7)
protected String idValue;
/**
* resoruce type
*/
@JSONField(ordinal = 8)
protected ResourceType type;
/**
* children
*/
@JSONField(ordinal = 8)
protected List<ResourceComponent> children = new ArrayList<>();
/**
* add resource component
* @param resourceComponent resource component
*/
public void add(ResourceComponent resourceComponent){
children.add(resourceComponent);
}
public String getName(){
return this.name;
}
public String getDescription(){
return this.description;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public void setName(String name) {
this.name = name;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public void setDescription(String description) {
this.description = description;
}
public boolean isDirctory() {
return isDirctory;
}
public void setDirctory(boolean dirctory) {
isDirctory = dirctory;
}
public String getIdValue() {
return idValue;
}
public void setIdValue(int id,boolean isDirctory) {
int directoryFlag = isDirctory ? 1:0;
this.idValue = String.format("%s_%s",id,directoryFlag);
}
public ResourceType getType() {
return type;
}
public void setType(ResourceType type) {
this.type = type;
}
public List<ResourceComponent> getChildren() {
return children;
}
public void setChildren(List<ResourceComponent> children) {
this.children = children;
}
@Override
public String toString() {
return "ResourceComponent{" +
"id=" + id +
", pid=" + pid +
", name='" + name + '\'' +
", currentDir='" + currentDir + '\'' +
", fullName='" + fullName + '\'' +
", description='" + description + '\'' +
", isDirctory=" + isDirctory +
", idValue='" + idValue + '\'' +
", type=" + type +
", children=" + children +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import java.util.List;
/**
* interface filter
*/
public interface IFilter {
List<Resource> filter();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* resource filter
*/
public class ResourceFilter implements IFilter {
/**
* resource suffix
*/
private String suffix;
/**
* resource list
*/
private List<Resource> resourceList;
/**
* parent list
*/
//Set<Resource> parentList = new HashSet<>();
/**
* constructor
* @param suffix resource suffix
* @param resourceList resource list
*/
public ResourceFilter(String suffix, List<Resource> resourceList) {
this.suffix = suffix;
this.resourceList = resourceList;
}
/**
* file filter
* @return file filtered by suffix
*/
public Set<Resource> fileFilter(){
Set<Resource> resources = resourceList.stream().filter(t -> {
String alias = t.getAlias();
return alias.endsWith(suffix);
}).collect(Collectors.toSet());
return resources;
}
/**
* list all parent dir
* @return parent resource dir set
*/
Set<Resource> listAllParent(){
Set<Resource> parentList = new HashSet<>();
Set<Resource> filterFileList = fileFilter();
for(Resource file:filterFileList){
parentList.add(file);
setAllParent(file,parentList);
}
return parentList;
}
/**
* list all parent dir
* @param resource resource
* @return parent resource dir set
*/
private void setAllParent(Resource resource,Set<Resource> parentList){
for (Resource resourceTemp : resourceList) {
if (resourceTemp.getId() == resource.getPid()) {
parentList.add(resourceTemp);
setAllParent(resourceTemp,parentList);
}
}
}
@Override
public List<Resource> filter() {
return new ArrayList<>(listAllParent());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.Directory;
import org.apache.dolphinscheduler.api.dto.resources.FileLeaf;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.dao.entity.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* resource tree visitor
*/
public class ResourceTreeVisitor implements Visitor{
/**
* resource list
*/
private List<Resource> resourceList;
public ResourceTreeVisitor() {
}
/**
* constructor
* @param resourceList resource list
*/
public ResourceTreeVisitor(List<Resource> resourceList) {
this.resourceList = resourceList;
}
/**
* visit
* @return resoruce component
*/
public ResourceComponent visit() {
ResourceComponent rootDirectory = new Directory();
for (Resource resource : resourceList) {
// judge whether is root node
if (rootNode(resource)){
ResourceComponent tempResourceComponent = getResourceComponent(resource);
rootDirectory.add(tempResourceComponent);
tempResourceComponent.setChildren(setChildren(tempResourceComponent.getId(),resourceList));
}
}
return rootDirectory;
}
/**
* set children
* @param id id
* @param list resource list
* @return resource component list
*/
public static List<ResourceComponent> setChildren(int id, List<Resource> list ){
List<ResourceComponent> childList = new ArrayList<>();
for (Resource resource : list) {
if (id == resource.getPid()){
ResourceComponent tempResourceComponent = getResourceComponent(resource);
childList.add(tempResourceComponent);
}
}
for (ResourceComponent resourceComponent : childList) {
resourceComponent.setChildren(setChildren(resourceComponent.getId(),list));
}
if (childList.size()==0){
return new ArrayList<>();
}
return childList;
}
/**
* Determine whether it is the root node
* @param resource resource
* @return true if it is the root node
*/
public boolean rootNode(Resource resource) {
boolean isRootNode = true;
if(resource.getPid() != -1 ){
for (Resource parent : resourceList) {
if (resource.getPid() == parent.getId()) {
isRootNode = false;
break;
}
}
}
return isRootNode;
}
/**
* get resource component by resource
* @param resource resource
* @return resource component
*/
private static ResourceComponent getResourceComponent(Resource resource) {
ResourceComponent tempResourceComponent;
if(resource.isDirectory()){
tempResourceComponent = new Directory();
}else{
tempResourceComponent = new FileLeaf();
}
tempResourceComponent.setName(resource.getAlias());
tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/",""));
tempResourceComponent.setId(resource.getId());
tempResourceComponent.setPid(resource.getPid());
tempResourceComponent.setIdValue(resource.getId(),resource.isDirectory());
tempResourceComponent.setDescription(resource.getDescription());
tempResourceComponent.setType(resource.getType());
return tempResourceComponent;
}
}
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Visitor
*/
public interface Visitor {
/**
* visit
* @return resource component
*/
ResourceComponent visit();
}
......@@ -97,7 +97,7 @@ public enum Status {
VERIFY_UDF_FUNCTION_NAME_ERROR( 10070,"verify udf function name error", "UDF函数名称验证错误"),
DELETE_UDF_FUNCTION_ERROR( 10071,"delete udf function error", "删除UDF函数错误"),
AUTHORIZED_FILE_RESOURCE_ERROR( 10072,"authorized file resource error", "授权资源文件错误"),
UNAUTHORIZED_FILE_RESOURCE_ERROR( 10073,"unauthorized file resource error", "查询未授权资源错误"),
AUTHORIZE_RESOURCE_TREE( 10073,"authorize resource tree display error","授权资源目录树错误"),
UNAUTHORIZED_UDF_FUNCTION_ERROR( 10074,"unauthorized udf function error", "查询未授权UDF函数错误"),
AUTHORIZED_UDF_FUNCTION_ERROR(10075,"authorized udf function error", "授权UDF函数错误"),
CREATE_SCHEDULE_ERROR(10076,"create schedule error", "创建调度配置错误"),
......@@ -184,10 +184,12 @@ public enum Status {
RESOURCE_SIZE_EXCEED_LIMIT(20007, "upload resource file size exceeds limit", "上传资源文件大小超过限制"),
RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified", "资源文件后缀不支持修改"),
UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar", "UDF资源文件后缀名只支持[jar]"),
HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"),
HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"),
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}","udf函数绑定了资源文件[{0}]"),
RESOURCE_IS_USED(20014, "resource file is used by process definition","资源文件被上线的流程定义使用了"),
PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist","父资源文件不存在"),
USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
......
......@@ -38,11 +38,9 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
......@@ -162,6 +160,31 @@ public class ProcessDefinitionService extends BaseDAGService {
return result;
}
/**
* get resource ids
* @param processData process data
* @return resource ids
*/
private String getResourceIds(ProcessData processData) {
List<TaskNode> tasks = processData.getTasks();
Set<Integer> resourceIds = new HashSet<>();
for(TaskNode taskNode : tasks){
String taskParameter = taskNode.getParams();
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter);
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet());
resourceIds.addAll(tempSet);
}
StringBuilder sb = new StringBuilder();
for(int i : resourceIds) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(i);
}
return sb.toString();
}
/**
* query proccess definition list
......@@ -946,7 +969,9 @@ public class ProcessDefinitionService extends BaseDAGService {
return result;
}
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check
......@@ -1163,6 +1188,7 @@ public class ProcessDefinitionService extends BaseDAGService {
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//check process data
......
......@@ -118,7 +118,7 @@ public class UdfFuncService extends BaseService{
}
udf.setDescription(desc);
udf.setResourceId(resourceId);
udf.setResourceName(resource.getAlias());
udf.setResourceName(resource.getFullName());
udf.setType(type);
udf.setCreateTime(now);
......@@ -226,7 +226,7 @@ public class UdfFuncService extends BaseService{
}
udf.setDescription(desc);
udf.setResourceId(resourceId);
udf.setResourceName(resource.getAlias());
udf.setResourceName(resource.getFullName());
udf.setType(type);
udf.setUpdateTime(now);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* resource filter test
*/
public class ResourceFilterTest {
private static Logger logger = LoggerFactory.getLogger(ResourceFilterTest.class);
@Test
public void filterTest(){
List<Resource> allList = new ArrayList<>();
Resource resource1 = new Resource(3,-1,"b","/b",true);
Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
Resource resource5 = new Resource(7,-1,"b2","/b2",true);
Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
allList.add(resource1);
allList.add(resource2);
allList.add(resource3);
allList.add(resource4);
allList.add(resource5);
allList.add(resource6);
allList.add(resource7);
ResourceFilter resourceFilter = new ResourceFilter(".jar",allList);
List<Resource> resourceList = resourceFilter.filter();
Assert.assertNotNull(resourceList);
resourceList.stream().forEach(t-> logger.info(t.toString()));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* resource tree visitor test
*/
public class ResourceTreeVisitorTest {
@Test
public void visit() throws Exception {
List<Resource> resourceList = new ArrayList<>();
Resource resource1 = new Resource(3,-1,"b","/b",true);
Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
Resource resource5 = new Resource(7,-1,"b2","/b2",true);
Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
resourceList.add(resource1);
resourceList.add(resource2);
resourceList.add(resource3);
resourceList.add(resource4);
resourceList.add(resource5);
resourceList.add(resource6);
resourceList.add(resource7);
ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
ResourceComponent resourceComponent = resourceTreeVisitor.visit();
Assert.assertNotNull(resourceComponent.getChildren());
}
@Test
public void rootNode() throws Exception {
List<Resource> resourceList = new ArrayList<>();
Resource resource1 = new Resource(3,-1,"b","/b",true);
Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
Resource resource5 = new Resource(7,-1,"b2","/b2",true);
Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
resourceList.add(resource1);
resourceList.add(resource2);
resourceList.add(resource3);
resourceList.add(resource4);
resourceList.add(resource5);
resourceList.add(resource6);
resourceList.add(resource7);
ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
Assert.assertTrue(resourceTreeVisitor.rootNode(resource1));
Assert.assertTrue(resourceTreeVisitor.rootNode(resource2));
Assert.assertFalse(resourceTreeVisitor.rootNode(resource3));
}
}
\ No newline at end of file
......@@ -24,10 +24,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
......@@ -40,6 +37,7 @@ import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.omg.CORBA.Any;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
......@@ -73,6 +71,8 @@ public class ResourcesServiceTest {
private UserMapper userMapper;
@Mock
private UdfFuncMapper udfFunctionMapper;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Before
public void setUp() {
......@@ -96,14 +96,14 @@ public class ResourcesServiceTest {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_FILE_IS_EMPTY
MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf",new String().getBytes());
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile);
result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_FILE_IS_EMPTY.getMsg(),result.getMsg());
......@@ -111,31 +111,42 @@ public class ResourcesServiceTest {
mockMultipartFile = new MockMultipartFile("test.pdf","test.pdf","pdf",new String("test").getBytes());
PowerMockito.when(FileUtils.suffix("test.pdf")).thenReturn("pdf");
PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.jar")).thenReturn("jar");
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile);
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_SUFFIX_FORBID_CHANGE.getMsg(),result.getMsg());
//UDF_RESOURCE_SUFFIX_NOT_JAR
mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.pdf","ResourcesServiceTest.pdf","pdf",new String("test").getBytes());
PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.pdf")).thenReturn("pdf");
result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg(),result.getMsg());
//UDF_RESOURCE_SUFFIX_NOT_JAR
Mockito.when(tenantMapper.queryById(0)).thenReturn(getTenant());
Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(getResourceList());
mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.jar","ResourcesServiceTest.jar","pdf",new String("test").getBytes());
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
}
@Test
public void testCreateDirecotry(){
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
Result result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//SUCCESS
Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(new ArrayList<>());
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
//PARENT_RESOURCE_NOT_EXIST
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(resourcesMapper.selectById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
Assert.assertEquals(Status.PARENT_RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//RESOURCE_EXIST
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(resourcesMapper.queryResourceList("/directoryTest", 0, 0)).thenReturn(getResourceList());
result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
}
......@@ -163,41 +174,46 @@ public class ResourcesServiceTest {
//SUCCESS
user.setId(1);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest.jar",ResourceType.FILE);
Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
//RESOURCE_EXIST
Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.FILE);
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
//USER_NOT_EXIST
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
logger.info(result.toString());
Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode());
//TENANT_NOT_EXIST
Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg());
//RESOURCE_NOT_EXIST
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test1");
PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test1");
try {
Mockito.when(hadoopUtils.exists("test")).thenReturn(true);
} catch (IOException e) {
e.printStackTrace();
}
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//SUCCESS
PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test");
PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test");
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
......@@ -212,8 +228,8 @@ public class ResourcesServiceTest {
resourcePage.setTotal(1);
resourcePage.setRecords(getResourceList());
Mockito.when(resourcesMapper.queryResourcePaging(Mockito.any(Page.class),
Mockito.eq(0), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage);
Map<String, Object> result = resourcesService.queryResourceListPaging(loginUser,ResourceType.FILE,"test",1,10);
Mockito.eq(0),Mockito.eq(-1), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage);
Map<String, Object> result = resourcesService.queryResourceListPaging(loginUser,-1,ResourceType.FILE,"test",1,10);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
......@@ -263,6 +279,7 @@ public class ResourcesServiceTest {
//TENANT_NOT_EXIST
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setTenantId(2);
Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(loginUser);
result = resourcesService.delete(loginUser,1);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg());
......@@ -285,14 +302,20 @@ public class ResourcesServiceTest {
User user = new User();
user.setId(1);
Mockito.when(resourcesMapper.queryResourceList("test", 0, 0)).thenReturn(getResourceList());
Result result = resourcesService.verifyResourceName("test",ResourceType.FILE,user);
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest.jar", 0, 0)).thenReturn(getResourceList());
Result result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(), result.getMsg());
//TENANT_NOT_EXIST
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user);
String unExistFullName = "/test.jar";
try {
Mockito.when(hadoopUtils.exists(unExistFullName)).thenReturn(false);
} catch (IOException e) {
logger.error("hadoop error",e);
}
result = resourcesService.verifyResourceName("/test.jar",ResourceType.FILE,user);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg());
......@@ -304,10 +327,10 @@ public class ResourcesServiceTest {
} catch (IOException e) {
logger.error("hadoop error",e);
}
PowerMockito.when(HadoopUtils.getHdfsFilename("123", "test1")).thenReturn("test");
result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user);
PowerMockito.when(HadoopUtils.getHdfsResourceFileName("123", "test1")).thenReturn("test");
result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user);
logger.info(result.toString());
Assert.assertTrue(Status.RESOURCE_FILE_EXIST.getCode()==result.getCode());
Assert.assertTrue(Status.RESOURCE_EXIST.getCode()==result.getCode());
//SUCCESS
result = resourcesService.verifyResourceName("test2",ResourceType.FILE,user);
......@@ -389,14 +412,14 @@ public class ResourcesServiceTest {
PowerMockito.when(HadoopUtils.getHdfsUdfDir("udfDir")).thenReturn("udfDir");
User user = getUser();
//HDFS_NOT_STARTUP
Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("class");
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(),result.getMsg());
......@@ -404,7 +427,7 @@ public class ResourcesServiceTest {
try {
PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("jar");
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content");
result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content",-1,"/");
}catch (RuntimeException ex){
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), ex.getMessage());
......@@ -413,7 +436,7 @@ public class ResourcesServiceTest {
//SUCCESS
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
......@@ -584,13 +607,26 @@ public class ResourcesServiceTest {
private Resource getResource(){
Resource resource = new Resource();
resource.setPid(-1);
resource.setUserId(1);
resource.setDescription("ResourcesServiceTest.jar");
resource.setAlias("ResourcesServiceTest.jar");
resource.setFullName("/ResourcesServiceTest.jar");
resource.setType(ResourceType.FILE);
return resource;
}
private Resource getUdfResource(){
Resource resource = new Resource();
resource.setUserId(1);
resource.setDescription("udfTest");
resource.setAlias("udfTest.jar");
resource.setFullName("/udfTest.jar");
resource.setType(ResourceType.UDF);
return resource;
}
private UdfFunc getUdfFunc(){
UdfFunc udfFunc = new UdfFunc();
......
......@@ -43,6 +43,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import static org.junit.Assert.*;
......@@ -173,7 +174,11 @@ public class CheckUtilsTest {
// MapreduceParameters
MapreduceParameters mapreduceParameters = new MapreduceParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
mapreduceParameters.setMainJar(new ResourceInfo());
ResourceInfo resourceInfoMapreduce = new ResourceInfo();
resourceInfoMapreduce.setId(1);
resourceInfoMapreduce.setRes("");
mapreduceParameters.setMainJar(resourceInfoMapreduce);
mapreduceParameters.setProgramType(ProgramType.JAVA);
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
......
......@@ -23,13 +23,17 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
*/
public enum AuthorizationType {
/**
* 0 RESOURCE_FILE;
* 0 RESOURCE_FILE_ID;
* 0 RESOURCE_FILE_NAME;
* 1 UDF_FILE;
* 1 DATASOURCE;
* 2 UDF;
*/
RESOURCE_FILE(0, "resource file"),
DATASOURCE(1, "data source"),
UDF(2, "udf function");
RESOURCE_FILE_ID(0, "resource file id"),
RESOURCE_FILE_NAME(1, "resource file name"),
UDF_FILE(2, "udf file"),
DATASOURCE(3, "data source"),
UDF(4, "udf function");
AuthorizationType(int code, String descp){
this.code = code;
......
......@@ -23,6 +23,16 @@ public class ResourceInfo {
/**
* res the name of the resource that was uploaded
*/
private int id;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
private String res;
public String getRes() {
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import java.util.LinkedHashMap;
import java.util.List;
......@@ -31,7 +32,7 @@ public abstract class AbstractParameters implements IParameters {
public abstract boolean checkParameters();
@Override
public abstract List<String> getResourceFilesList();
public abstract List<ResourceInfo> getResourceFilesList();
/**
* local parameters
......
......@@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import java.util.List;
/**
......@@ -34,5 +36,5 @@ public interface IParameters {
*
* @return resource files list
*/
List<String> getResourceFilesList();
List<ResourceInfo> getResourceFilesList();
}
......@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.conditions;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List;
......@@ -41,7 +42,7 @@ public class ConditionsParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return null;
}
......
......@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
/**
......@@ -198,7 +199,7 @@ public class DataxParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
......
......@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.dependent;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.ArrayList;
......@@ -36,7 +37,7 @@ public class DependentParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
......
......@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* spark parameters
......@@ -50,35 +50,35 @@ public class FlinkParameters extends AbstractParameters {
private String mainArgs;
/**
* slot个数
* slot count
*/
private int slot;
/**
*Yarn application的名字
*Yarn application name
*/
private String appName;
/**
* taskManager 数量
* taskManager count
*/
private int taskManager;
/**
* jobManagerMemory 内存大小
* job manager memory
*/
private String jobManagerMemory ;
/**
* taskManagerMemory内存大小
* task manager memory
*/
private String taskManagerMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
......@@ -207,16 +207,11 @@ public class FlinkParameters extends AbstractParameters {
@Override
public List<String> getResourceFilesList() {
if(resourceList != null ) {
List<String> resourceFiles = resourceList.stream()
.map(ResourceInfo::getRes).collect(Collectors.toList());
if(mainJar != null) {
resourceFiles.add(mainJar.getRes());
}
return resourceFiles;
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return Collections.emptyList();
return resourceList;
}
......
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.task.http;
import org.apache.dolphinscheduler.common.enums.HttpCheckCondition;
import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.process.HttpProperty;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
......@@ -62,7 +63,7 @@ public class HttpParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
......
......@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.mr;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class MapreduceParameters extends AbstractParameters {
......@@ -54,7 +54,7 @@ public class MapreduceParameters extends AbstractParameters {
/**
* resource list
*/
private List<ResourceInfo> resourceList;
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* program type
......@@ -125,16 +125,12 @@ public class MapreduceParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
if(resourceList != null ) {
List<String> resourceFiles = resourceList.stream()
.map(ResourceInfo::getRes).collect(Collectors.toList());
if(mainJar != null) {
resourceFiles.add(mainJar.getRes());
}
return resourceFiles;
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return Collections.emptyList();
return resourceList;
}
@Override
......
......@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.task.procedure;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
......@@ -74,7 +75,7 @@ public class ProcedureParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
......
......@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List;
import java.util.stream.Collectors;
public class PythonParameters extends AbstractParameters {
/**
......@@ -56,12 +55,7 @@ public class PythonParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
if (resourceList != null) {
return resourceList.stream()
.map(p -> p.getRes()).collect(Collectors.toList());
}
return null;
public List<ResourceInfo> getResourceFilesList() {
return this.resourceList;
}
}
......@@ -59,12 +59,7 @@ public class ShellParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
if (resourceList != null) {
return resourceList.stream()
.map(p -> p.getRes()).collect(Collectors.toList());
}
return null;
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}
......@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.spark;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* spark parameters
......@@ -78,7 +78,7 @@ public class SparkParameters extends AbstractParameters {
/**
* resource list
*/
private List<ResourceInfo> resourceList;
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
......@@ -219,18 +219,12 @@ public class SparkParameters extends AbstractParameters {
return mainJar != null && programType != null && sparkVersion != null;
}
@Override
public List<String> getResourceFilesList() {
if(resourceList !=null ) {
List<String> resourceFilesList = resourceList.stream()
.map(ResourceInfo::getRes).collect(Collectors.toList());
if(mainJar != null){
resourceFilesList.add(mainJar.getRes());
}
return resourceFilesList;
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return Collections.emptyList();
return resourceList;
}
......
......@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.task.sql;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
......@@ -189,7 +190,7 @@ public class SqlParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
......
......@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.task.sqoop;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
......@@ -111,7 +112,7 @@ public class SqoopParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
}
......@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.subprocess;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.ArrayList;
......@@ -42,7 +43,7 @@ public class SubProcessParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
}
\ No newline at end of file
......@@ -26,6 +26,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
......@@ -415,6 +416,22 @@ public class HadoopUtils implements Closeable {
}
}
/**
* hdfs resource dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
String hdfsDir = "";
if (resourceType.equals(ResourceType.FILE)) {
hdfsDir = getHdfsResDir(tenantCode);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsDir = getHdfsUdfDir(tenantCode);
}
return hdfsDir;
}
/**
* hdfs resource dir
*
......@@ -450,22 +467,42 @@ public class HadoopUtils implements Closeable {
* get absolute path and name for file on hdfs
*
* @param tenantCode tenant code
* @param filename file name
* @param fileName file name
* @return get absolute path and name for file on hdfs
*/
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param fileName file name
* @return hdfs file name
*/
public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
}
/**
* get absolute path and name for resource file on hdfs
*
* @param tenantCode tenant code
* @param fileName file name
* @return get absolute path and name for file on hdfs
*/
public static String getHdfsFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsResDir(tenantCode), filename);
public static String getHdfsResourceFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
}
/**
* get absolute path and name for udf file on hdfs
*
* @param tenantCode tenant code
* @param filename file name
* @param fileName file name
* @return get absolute path and name for udf file on hdfs
*/
public static String getHdfsUdfFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename);
public static String getHdfsUdfFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
}
/**
......
......@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
......@@ -28,8 +29,7 @@ public class FlinkParametersTest {
@Test
public void getResourceFilesList() {
FlinkParameters flinkParameters = new FlinkParameters();
Assert.assertNotNull(flinkParameters.getResourceFilesList());
Assert.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
Assert.assertTrue(CollectionUtils.isEmpty(flinkParameters.getResourceFilesList()));
ResourceInfo mainResource = new ResourceInfo();
mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
......@@ -41,15 +41,17 @@ public class FlinkParametersTest {
resourceInfos.add(resourceInfo1);
flinkParameters.setResourceList(resourceInfos);
Assert.assertNotNull(flinkParameters.getResourceFilesList());
Assert.assertEquals(2, flinkParameters.getResourceFilesList().size());
List<ResourceInfo> resourceFilesList = flinkParameters.getResourceFilesList();
Assert.assertNotNull(resourceFilesList);
Assert.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
resourceInfo2.setRes("testFlinkParameters2.jar");
resourceInfos.add(resourceInfo2);
flinkParameters.setResourceList(resourceInfos);
Assert.assertNotNull(flinkParameters.getResourceFilesList());
Assert.assertEquals(3, flinkParameters.getResourceFilesList().size());
resourceFilesList = flinkParameters.getResourceFilesList();
Assert.assertNotNull(resourceFilesList);
Assert.assertEquals(3, resourceFilesList.size());
}
}
......@@ -163,6 +163,11 @@ public class ProcessDefinition {
*/
private String modifyBy;
/**
* resource ids
*/
private String resourceIds;
public String getName() {
return name;
......@@ -334,6 +339,14 @@ public class ProcessDefinition {
this.scheduleReleaseState = scheduleReleaseState;
}
public String getResourceIds() {
return resourceIds;
}
public void setResourceIds(String resourceIds) {
this.resourceIds = resourceIds;
}
public int getTimeout() {
return timeout;
}
......@@ -393,6 +406,8 @@ public class ProcessDefinition {
", timeout=" + timeout +
", tenantId=" + tenantId +
", modifyBy='" + modifyBy + '\'' +
", resourceIds='" + resourceIds + '\'' +
'}';
}
}
......@@ -32,11 +32,26 @@ public class Resource {
@TableId(value="id", type=IdType.AUTO)
private int id;
/**
* parent id
*/
private int pid;
/**
* resource alias
*/
private String alias;
/**
* full name
*/
private String fullName;
/**
* is directory
*/
private boolean isDirectory=false;
/**
* description
*/
......@@ -89,7 +104,15 @@ public class Resource {
this.updateTime = updateTime;
}
public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
public Resource(int id, int pid, String alias, String fullName, boolean isDirectory) {
this.id = id;
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
}
/*public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.alias = alias;
this.fileName = fileName;
this.description = description;
......@@ -98,6 +121,20 @@ public class Resource {
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}*/
public Resource(int pid, String alias, String fullName, boolean isDirectory, String description, String fileName, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
this.description = description;
this.fileName = fileName;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}
public int getId() {
......@@ -116,6 +153,30 @@ public class Resource {
this.alias = alias;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public boolean isDirectory() {
return isDirectory;
}
public void setDirectory(boolean directory) {
isDirectory = directory;
}
public String getFileName() {
return fileName;
}
......@@ -177,9 +238,12 @@ public class Resource {
public String toString() {
return "Resource{" +
"id=" + id +
", pid=" + pid +
", alias='" + alias + '\'' +
", fileName='" + fileName + '\'' +
", fullName='" + fullName + '\'' +
", isDirectory=" + isDirectory +
", description='" + description + '\'' +
", fileName='" + fileName + '\'' +
", userId=" + userId +
", type=" + type +
", size=" + size +
......
......@@ -20,9 +20,11 @@ import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
/**
* process definition mapper interface
......@@ -83,7 +85,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
List<ProcessDefinition> queryDefinitionListByTenant(@Param("tenantId") int tenantId);
/**
* count process definition group by user
* count process definition group by user
* @param userId userId
* @param projectIds projectIds
* @param isAdmin isAdmin
......@@ -93,4 +95,11 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
@Param("userId") Integer userId,
@Param("projectIds") Integer[] projectIds,
@Param("isAdmin") boolean isAdmin);
/**
* list all resource ids
* @return resource ids list
*/
@MapKey("id")
List<Map<String, Object>> listResources();
}
......@@ -30,12 +30,12 @@ public interface ResourceMapper extends BaseMapper<Resource> {
/**
* query resource list
* @param alias alias
* @param fullName full name
* @param userId userId
* @param type type
* @return resource list
*/
List<Resource> queryResourceList(@Param("alias") String alias,
List<Resource> queryResourceList(@Param("fullName") String fullName,
@Param("userId") int userId,
@Param("type") int type);
......@@ -59,6 +59,7 @@ public interface ResourceMapper extends BaseMapper<Resource> {
*/
IPage<Resource> queryResourcePaging(IPage<Resource> page,
@Param("userId") int userId,
@Param("id") int id,
@Param("type") int type,
@Param("searchVal") String searchVal);
......@@ -76,13 +77,13 @@ public interface ResourceMapper extends BaseMapper<Resource> {
*/
List<Resource> queryResourceExceptUserId(@Param("userId") int userId);
/**
* query tenant code by name
* @param resName resource name
* @param resType resource type
* @return tenant code
*/
String queryTenantCodeByResourceName(@Param("resName") String resName);
String queryTenantCodeByResourceName(@Param("resName") String resName,@Param("resType") int resType);
/**
* list authorized resource
......@@ -91,4 +92,48 @@ public interface ResourceMapper extends BaseMapper<Resource> {
* @return resource list
*/
<T> List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames);
/**
* list authorized resource
* @param userId userId
* @param resIds resource ids
* @return resource list
*/
<T> List<Resource> listAuthorizedResourceById(@Param("userId") int userId,@Param("resIds")T[] resIds);
/**
* delete resource by id array
* @param resIds resource id array
* @return delete num
*/
int deleteIds(@Param("resIds")Integer[] resIds);
/**
* list children
* @param direcotyId directory id
* @return resource id array
*/
List<Integer> listChildren(@Param("direcotyId") int direcotyId);
/**
* query resource by full name or pid
* @param fullName full name
* @param type resource type
* @return resource
*/
List<Resource> queryResource(@Param("fullName") String fullName,@Param("type") int type);
/**
* list resource by id array
* @param resIds resource id array
* @return resource list
*/
List<Resource> listResourceByIds(@Param("resIds")Integer[] resIds);
/**
* update resource
* @param resourceList resource list
* @return update num
*/
int batchUpdateResource(@Param("resourceList") List<Resource> resourceList);
}
......@@ -86,4 +86,19 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
<T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
/**
* list UDF by resource id
* @param resourceIds resource id array
* @return UDF function list
*/
List<UdfFunc> listUdfByResourceId(@Param("resourceIds") int[] resourceIds);
/**
* list authorized UDF by resource id
* @param resourceIds resource id array
* @return UDF function list
*/
List<UdfFunc> listAuthorizedUdfByResourceId(@Param("userId") int userId,@Param("resourceIds") int[] resourceIds);
}
......@@ -87,4 +87,11 @@
pd.user_id = u.id AND pd.project_id = p.id
AND pd.id = #{processDefineId}
</select>
<select id="listResources" resultType="java.util.HashMap">
SELECT id,resource_ids
FROM t_ds_process_definition
WHERE release_state = 1 and resource_ids is not null and resource_ids != ''
</select>
</mapper>
\ No newline at end of file
......@@ -22,8 +22,8 @@
select *
from t_ds_resources
where 1= 1
<if test="alias != null and alias != ''">
and alias = #{alias}
<if test="fullName != null and fullName != ''">
and full_name = #{fullName}
</if>
<if test="type != -1">
and type = #{type}
......@@ -47,8 +47,8 @@
<select id="queryResourcePaging" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where type=#{type}
<if test="userId != 0">
where type=#{type} and pid=#{id}
<if test="userId != 0 and id == -1">
and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId}
union select id as resources_id from t_ds_resources where user_id=#{userId})
</if>
......@@ -70,7 +70,74 @@
<select id="queryTenantCodeByResourceName" resultType="java.lang.String">
select tenant_code
from t_ds_tenant t, t_ds_user u, t_ds_resources res
where t.id = u.tenant_id and u.id = res.user_id and res.type=0
and res.alias= #{resName}
where t.id = u.tenant_id and u.id = res.user_id and res.type=#{resType}
and res.full_name= #{resName}
</select>
<select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where type=0
and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId}
union select id as resources_id from t_ds_resources where user_id=#{userId})
<if test="resNames != null and resNames != ''">
and full_name in
<foreach collection="resNames" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<select id="listAuthorizedResourceById" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId}
union select id as resources_id from t_ds_resources where user_id=#{userId})
<if test="resIds != null and resIds != ''">
and id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<delete id="deleteIds" parameterType="java.lang.Integer">
delete from t_ds_resources where id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</delete>
<select id="listChildren" resultType="java.lang.Integer">
select id
from t_ds_resources
where pid = #{direcotyId}
</select>
<select id="queryResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where type = #{type}
and full_name = #{fullName}
</select>
<update id="batchUpdateResource" parameterType="java.util.List">
<foreach collection="resourceList" item="resource" index="index" open="" close="" separator =";">
update t_ds_resources
<set>
full_name=#{resource.fullName},
update_time=#{resource.updateTime}
</set>
<where>
id=#{resource.id}
</where>
</foreach>
</update>
<select id="listResourceByIds" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</select>
</mapper>
......@@ -87,4 +87,28 @@
</foreach>
</if>
</select>
<select id="listUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where 1=1
<if test="resourceIds != null and resourceIds != ''">
and resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<select id="listAuthorizedUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where
id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="resourceIds != null and resourceIds != ''">
and resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>
\ No newline at end of file
......@@ -34,6 +34,7 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
......@@ -68,7 +69,10 @@ public class ResourceMapperTest {
private Resource insertOne(){
//insertOne
Resource resource = new Resource();
resource.setAlias("ut resource");
resource.setAlias("ut-resource");
resource.setFullName("/ut-resource");
resource.setPid(-1);
resource.setDirectory(false);
resource.setType(ResourceType.FILE);
resource.setUserId(111);
resourceMapper.insert(resource);
......@@ -80,16 +84,32 @@ public class ResourceMapperTest {
* @param user user
* @return Resource
*/
private Resource createResource(User user){
private Resource createResource(User user,boolean isDirectory,ResourceType resourceType,int pid,String alias,String fullName){
//insertOne
Resource resource = new Resource();
resource.setAlias(String.format("ut resource %s",user.getUserName()));
resource.setType(ResourceType.FILE);
resource.setDirectory(isDirectory);
resource.setType(resourceType);
resource.setAlias(alias);
resource.setFullName(fullName);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user){
//insertOne
String alias = String.format("ut-resource-%s",user.getUserName());
String fullName = String.format("/%s",alias);
Resource resource = createResource(user, false, ResourceType.FILE, -1, alias, fullName);
return resource;
}
/**
* create user
* @return User
......@@ -200,13 +220,15 @@ public class ResourceMapperTest {
IPage<Resource> resourceIPage = resourceMapper.queryResourcePaging(
page,
resource.getUserId(),
0,
-1,
resource.getType().ordinal(),
""
);
IPage<Resource> resourceIPage1 = resourceMapper.queryResourcePaging(
page,
1110,
-1,
resource.getType().ordinal(),
""
);
......@@ -289,7 +311,7 @@ public class ResourceMapperTest {
resourceMapper.updateById(resource);
String resource1 = resourceMapper.queryTenantCodeByResourceName(
resource.getAlias()
resource.getFullName(),ResourceType.FILE.ordinal()
);
......@@ -305,22 +327,37 @@ public class ResourceMapperTest {
User generalUser2 = createGeneralUser("user2");
// create one resource
Resource resource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser1);
// need download resources
String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()};
String[] resNames = new String[]{resource.getFullName(), unauthorizedResource.getFullName()};
List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertEquals(generalUser2.getId(),resource.getUserId());
Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
Assert.assertFalse(resources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames)));
// authorize object unauthorizedResource to generalUser
createResourcesUser(unauthorizedResource,generalUser2);
List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
Assert.assertTrue(authorizedResources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames)));
}
@Test
public void deleteIdsTest(){
// create a general user
User generalUser1 = createGeneralUser("user1");
Resource resource = createResource(generalUser1);
Resource resource1 = createResource(generalUser1);
List<Integer> resourceList = new ArrayList<>();
resourceList.add(resource.getId());
resourceList.add(resource1.getId());
int result = resourceMapper.deleteIds(resourceList.toArray(new Integer[resourceList.size()]));
Assert.assertEquals(result,2);
}
}
\ No newline at end of file
......@@ -23,15 +23,18 @@ import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
......@@ -96,7 +99,7 @@ public class TaskScheduleThread implements Runnable {
TaskNode taskNode = JSON.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
List<ResourceInfo> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
downloadResource(
taskInstance.getExecutePath(),
......@@ -165,6 +168,7 @@ public class TaskScheduleThread implements Runnable {
new Date(),
taskInstance.getId());
}
/**
* get global paras map
* @return
......@@ -289,14 +293,16 @@ public class TaskScheduleThread implements Runnable {
/**
* create project resource files
*/
private List<String> createProjectResFiles(TaskNode taskNode) throws Exception{
private List<ResourceInfo> createProjectResFiles(TaskNode taskNode) throws Exception{
Set<String> projectFiles = new HashSet<>();
Set<ResourceInfo> projectFiles = new HashSet<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
List<String> projectResourceFiles = baseParam.getResourceFilesList();
projectFiles.addAll(projectResourceFiles);
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (projectResourceFiles != null) {
projectFiles.addAll(projectResourceFiles);
}
}
return new ArrayList<>(projectFiles);
......@@ -309,18 +315,25 @@ public class TaskScheduleThread implements Runnable {
* @param projectRes
* @param logger
*/
private void downloadResource(String execLocalPath, List<String> projectRes, Logger logger) throws Exception {
private void downloadResource(String execLocalPath, List<ResourceInfo> projectRes, Logger logger) throws Exception {
checkDownloadPermission(projectRes);
for (String res : projectRes) {
File resFile = new File(execLocalPath, res);
String resourceName;
for (ResourceInfo res : projectRes) {
if (res.getId() != 0) {
Resource resource = processService.getResourceById(res.getId());
resourceName = resource.getFullName();
}else{
resourceName = res.getRes();
}
File resFile = new File(execLocalPath, resourceName);
if (!resFile.exists()) {
try {
// query the tenant code of the resource according to the name of the resource
String tentnCode = processService.queryTenantCodeByResName(res);
String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res);
String tentnCode = processService.queryTenantCodeByResName(resourceName, ResourceType.FILE);
String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tentnCode, resourceName);
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + res, false, true);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resourceName, false, true);
}catch (Exception e){
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
......@@ -336,10 +349,17 @@ public class TaskScheduleThread implements Runnable {
* @param projectRes resource name list
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
private void checkDownloadPermission(List<ResourceInfo> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
if (projectRes.stream().allMatch(t->t.getId() == 0)) {
String[] resNames = projectRes.stream().map(t -> t.getRes()).collect(Collectors.toList()).toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_NAME,processService,resNames,userId,logger);
permissionCheck.checkPermission();
}else{
Integer[] resIds = projectRes.stream().map(t -> t.getId()).collect(Collectors.toList()).toArray(new Integer[projectRes.size()]);
PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resIds,userId,logger);
permissionCheck.checkPermission();
}
}
}
\ No newline at end of file
......@@ -94,4 +94,9 @@ public abstract class AbstractYarnTask extends AbstractTask {
* @throws Exception exception
*/
protected abstract String buildCommand() throws Exception;
/**
* set main jar name
*/
protected abstract void setMainJarName();
}
......@@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.server.worker.task.flink;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
......@@ -63,6 +65,7 @@ public class FlinkTask extends AbstractYarnTask {
if (!flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
setMainJarName();
flinkParameters.setQueue(taskProps.getQueue());
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
......@@ -111,6 +114,28 @@ public class FlinkTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;
......
......@@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -64,7 +66,7 @@ public class MapReduceTask extends AbstractYarnTask {
if (!mapreduceParameters.checkParameters()) {
throw new RuntimeException("mapreduce task params is not valid");
}
setMainJarName();
mapreduceParameters.setQueue(taskProps.getQueue());
// replace placeholder
......@@ -99,6 +101,28 @@ public class MapReduceTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = mapreduceParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(mapreduceParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
mapreduceParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return mapreduceParameters;
......
......@@ -18,11 +18,13 @@ package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
......@@ -67,8 +69,8 @@ public class SparkTask extends AbstractYarnTask {
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
setMainJarName();
sparkParameters.setQueue(taskProps.getQueue());
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
......@@ -115,6 +117,28 @@ public class SparkTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;
......
......@@ -71,6 +71,10 @@ public class SqoopTask extends AbstractYarnTask {
return null;
}
@Override
protected void setMainJarName() {
}
@Override
public AbstractParameters getParameters() {
return sqoopParameters;
......
......@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -45,6 +46,11 @@ public class PermissionCheck<T> {
*/
private T[] needChecks;
/**
* resoruce info
*/
private List<ResourceInfo> resourceList;
/**
* user id
*/
......@@ -90,6 +96,22 @@ public class PermissionCheck<T> {
this.logger = logger;
}
/**
* permission check
* @param logger
* @param authorizationType
* @param processService
* @param resourceList
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
this.authorizationType = authorizationType;
this.processService = processService;
this.resourceList = resourceList;
this.userId = userId;
this.logger = logger;
}
public AuthorizationType getAuthorizationType() {
return authorizationType;
}
......@@ -122,6 +144,14 @@ public class PermissionCheck<T> {
this.userId = userId;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
/**
* has permission
* @return true if has permission
......@@ -141,6 +171,7 @@ public class PermissionCheck<T> {
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
// get user type in order to judge whether the user is admin
User user = processService.getUserById(userId);
if (user.getUserType() != UserType.ADMIN_USER){
......
......@@ -1556,10 +1556,11 @@ public class ProcessService {
/**
* find tenant code by resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
public String queryTenantCodeByResName(String resName){
return resourceMapper.queryTenantCodeByResourceName(resName);
public String queryTenantCodeByResName(String resName,ResourceType resourceType){
return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal());
}
/**
......@@ -1791,10 +1792,18 @@ public class ProcessService {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
switch (authorizationType){
case RESOURCE_FILE:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet());
case RESOURCE_FILE_ID:
Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedResourceFiles);
break;
case RESOURCE_FILE_NAME:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case UDF_FILE:
Set<Integer> authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfFiles);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedDatasources);
......@@ -1820,5 +1829,14 @@ public class ProcessService {
return userMapper.queryDetailsById(userId);
}
/**
* get resource by resoruce id
* @param resoruceId resource id
* @return Resource
*/
public Resource getResourceById(int resoruceId){
return resourceMapper.selectById(resoruceId);
}
}
......@@ -724,7 +724,7 @@ JSP.prototype.handleEvent = function () {
} else {
$(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))+1)
}
// Storage node dependency information
saveTargetarr(sourceId, targetId)
......
......@@ -116,4 +116,4 @@
},
components: { mPopup, mListBoxF }
}
</script>
\ No newline at end of file
</script>
1.2.0
\ No newline at end of file
1.2.2
\ No newline at end of file
......@@ -74,4 +74,84 @@ d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_C_app_link;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link;
\ No newline at end of file
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link;
-- ac_dolphin_T_t_ds_resources_A_pid
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_pid;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_pid()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_resources'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='pid')
THEN
ALTER TABLE t_ds_resources ADD `pid` int(11) DEFAULT -1 COMMENT 'parent id';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_resources_A_pid;
DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_pid;
-- ac_dolphin_T_t_ds_resources_A_full_name
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_full_name;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_resources'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='full_name')
THEN
ALTER TABLE t_ds_resources ADD `full_name` varchar(255) DEFAULT NULL COMMENT 'full name';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_resources_A_full_name;
DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name;
-- ac_dolphin_T_t_ds_resources_A_pid
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_is_directory;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_resources_is_directory()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_resources'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='is_directory')
THEN
ALTER TABLE t_ds_resources ADD `is_directory` tinyint(1) DEFAULT 0 COMMENT 'is directory';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_resources_is_directory;
DROP PROCEDURE ac_dolphin_T_t_ds_resources_is_directory;
-- ac_dolphin_T_t_ds_process_definition_A_resource_ids
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='resource_ids')
THEN
ALTER TABLE t_ds_process_definition ADD `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_process_definition_A_resource_ids;
DROP PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids;
\ No newline at end of file
......@@ -66,4 +66,87 @@ d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_C_app_link();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link();
\ No newline at end of file
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link();
-- ac_dolphin_T_t_ds_resources_A_pid
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_pid() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_resources'
AND COLUMN_NAME ='pid')
THEN
ALTER TABLE t_ds_resources ADD COLUMN pid int DEFAULT -1;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_resources_A_pid();
DROP FUNCTION ac_dolphin_T_t_ds_resources_A_pid();
-- ac_dolphin_T_t_ds_resources_A_full_name
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_full_name();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_full_name() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_resources'
AND COLUMN_NAME ='full_name')
THEN
ALTER TABLE t_ds_resources ADD COLUMN full_name varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_resources_A_full_name();
DROP FUNCTION ac_dolphin_T_t_ds_resources_A_full_name();
-- ac_dolphin_T_t_ds_resources_A_is_directory
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_is_directory();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_resources'
AND COLUMN_NAME ='is_directory')
THEN
ALTER TABLE t_ds_resources ADD COLUMN is_directory boolean DEFAULT false;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_resources_A_is_directory();
DROP FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory();
-- ac_dolphin_T_t_ds_process_definition_A_resource_ids
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_definition'
AND COLUMN_NAME ='resource_ids')
THEN
ALTER TABLE t_ds_process_definition ADD COLUMN resource_ids varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_process_definition_A_resource_ids();
DROP FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids();
......@@ -13,4 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
\ No newline at end of file
*/
UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册