未验证 提交 489e7fe4 编写于 作者: H HanayoZz 提交者: GitHub

[Feature-10495][Resource Center] Resource Center Refactor (#12076)

* resource center refactor - S3 services connection
Co-authored-by: Ncaishunfeng <caishunfeng2021@gmail.com>
上级 cb70e89d
......@@ -35,7 +35,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
@JsonPropertyOrder({"id", "pid", "name", "fullName", "description", "isDirctory", "children", "type"})
public abstract class ResourceComponent {
public ResourceComponent(int id, int pid, String name, String fullName, String description, boolean isDirctory) {
public ResourceComponent(int id, String pid, String name, String fullName, String description, boolean isDirctory) {
this.id = id;
this.pid = pid;
this.name = name;
......@@ -53,7 +53,7 @@ public abstract class ResourceComponent {
/**
* parent id
*/
protected int pid;
protected String pid;
/**
* name
*/
......
......@@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.Directory;
import org.apache.dolphinscheduler.api.dto.resources.FileLeaf;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* resource tree visitor
......@@ -32,7 +33,7 @@ public class ResourceTreeVisitor implements Visitor {
/**
* resource list
*/
private List<Resource> resourceList;
private List<StorageEntity> resourceList;
public ResourceTreeVisitor() {
}
......@@ -41,7 +42,7 @@ public class ResourceTreeVisitor implements Visitor {
* constructor
* @param resourceList resource list
*/
public ResourceTreeVisitor(List<Resource> resourceList) {
public ResourceTreeVisitor(List<StorageEntity> resourceList) {
this.resourceList = resourceList;
}
......@@ -50,14 +51,15 @@ public class ResourceTreeVisitor implements Visitor {
* @return resoruce component
*/
@Override
public ResourceComponent visit() {
public ResourceComponent visit(String rootPath) {
ResourceComponent rootDirectory = new Directory();
for (Resource resource : resourceList) {
for (StorageEntity resource : resourceList) {
// judge whether is root node
if (rootNode(resource)) {
if (rootNode(resource, rootPath)) {
// if it is a root node.
ResourceComponent tempResourceComponent = getResourceComponent(resource);
rootDirectory.add(tempResourceComponent);
tempResourceComponent.setChildren(setChildren(tempResourceComponent.getId(), resourceList));
tempResourceComponent.setChildren(setChildren(tempResourceComponent.getFullName(), resourceList));
}
}
return rootDirectory;
......@@ -65,20 +67,21 @@ public class ResourceTreeVisitor implements Visitor {
/**
* set children
* @param id id
* @param fullName unique path
* @param list resource list
* @return resource component list
*/
public static List<ResourceComponent> setChildren(int id, List<Resource> list) {
public static List<ResourceComponent> setChildren(String fullName, List<StorageEntity> list) {
// id is the unique value,
List<ResourceComponent> childList = new ArrayList<>();
for (Resource resource : list) {
if (id == resource.getPid()) {
for (StorageEntity resource : list) {
if (Objects.equals(fullName, resource.getPfullName())) {
ResourceComponent tempResourceComponent = getResourceComponent(resource);
childList.add(tempResourceComponent);
}
}
for (ResourceComponent resourceComponent : childList) {
resourceComponent.setChildren(setChildren(resourceComponent.getId(), list));
resourceComponent.setChildren(setChildren(resourceComponent.getFullName(), list));
}
if (childList.size() == 0) {
return new ArrayList<>();
......@@ -91,17 +94,18 @@ public class ResourceTreeVisitor implements Visitor {
* @param resource resource
* @return true if it is the root node
*/
public boolean rootNode(Resource resource) {
public boolean rootNode(StorageEntity resource, String rootPath) {
boolean isRootNode = true;
if (resource.getPid() != -1) {
for (Resource parent : resourceList) {
if (resource.getPid() == parent.getId()) {
if (!Objects.equals(resource.getPfullName(), rootPath)) {
for (StorageEntity parent : resourceList) {
if (Objects.equals(resource.getPfullName(), parent.getFullName())) {
isRootNode = false;
break;
}
}
}
return isRootNode;
}
......@@ -110,7 +114,7 @@ public class ResourceTreeVisitor implements Visitor {
* @param resource resource
* @return resource component
*/
private static ResourceComponent getResourceComponent(Resource resource) {
private static ResourceComponent getResourceComponent(StorageEntity resource) {
ResourceComponent tempResourceComponent;
if (resource.isDirectory()) {
tempResourceComponent = new Directory();
......@@ -119,9 +123,10 @@ public class ResourceTreeVisitor implements Visitor {
}
tempResourceComponent.setName(resource.getAlias());
tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/", ""));
// tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/",""));
tempResourceComponent.setFullName(resource.getFullName());
tempResourceComponent.setId(resource.getId());
tempResourceComponent.setPid(resource.getPid());
tempResourceComponent.setPid(resource.getPfullName());
tempResourceComponent.setIdValue(resource.getId(), resource.isDirectory());
tempResourceComponent.setDescription(resource.getDescription());
tempResourceComponent.setType(resource.getType());
......
......@@ -28,5 +28,5 @@ public interface Visitor {
* visit
* @return resource component
*/
ResourceComponent visit();
ResourceComponent visit(String rootPath);
}
......@@ -38,7 +38,6 @@ public interface ResourcePermissionCheckService<T> {
* @param authorizationType
* @param userId
* @param logger
* @param <T>
* @return
*/
Set<T> userOwnedResourceIdsAcquisition(Object authorizationType, Integer userId, Logger logger);
......
......@@ -57,9 +57,8 @@ public interface ResourcesService {
* @param loginUser login user
* @param name alias
* @param desc description
* @param file file
* @param type type
* @param pid parent id
* @param file file
* @param currentDir current directory
* @return create result code
*/
......@@ -68,13 +67,11 @@ public interface ResourcesService {
String desc,
ResourceType type,
MultipartFile file,
int pid,
String currentDir);
/**
* update resource
* @param loginUser login user
* @param resourceId resource id
* @param name name
* @param desc description
* @param type resource type
......@@ -82,7 +79,8 @@ public interface ResourcesService {
* @return update result code
*/
Result<Object> updateResource(User loginUser,
int resourceId,
String fullName,
String tenantCode,
String name,
String desc,
ResourceType type,
......@@ -98,8 +96,8 @@ public interface ResourcesService {
* @param pageSize page size
* @return resource list page
*/
Result queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo,
Integer pageSize);
Result queryResourceListPaging(User loginUser, String fullName, String resTenantCode,
ResourceType type, String searchVal, Integer pageNo, Integer pageSize);
/**
* query resource list
......@@ -108,7 +106,7 @@ public interface ResourcesService {
* @param type resource type
* @return resource list
*/
Map<String, Object> queryResourceList(User loginUser, ResourceType type);
Map<String, Object> queryResourceList(User loginUser, ResourceType type, String fullName);
/**
* query resource list by program type
......@@ -123,11 +121,10 @@ public interface ResourcesService {
* delete resource
*
* @param loginUser login user
* @param resourceId resource id
* @return delete result code
* @throws IOException exception
*/
Result<Object> delete(User loginUser, int resourceId) throws IOException;
Result<Object> delete(User loginUser, String fullName, String tenantCode) throws IOException;
/**
* verify resource by name and type
......@@ -139,23 +136,22 @@ public interface ResourcesService {
Result<Object> verifyResourceName(String fullName, ResourceType type, User loginUser);
/**
* verify resource by full name or pid and type
* @param fullName resource full name
* @param id resource id
* verify resource by file name
* @param fileName resource file name
* @param type resource type
* @return true if the resource full name or pid not exists, otherwise return false
* @return true if the resource file name, otherwise return false
*/
Result<Object> queryResource(User loginUser, String fullName, Integer id, ResourceType type);
Result<Object> queryResourceByFileName(User loginUser, String fileName, ResourceType type, String resTenantCode);
/**
* view resource file online
*
* @param resourceId resource id
* @param skipLineNum skip line number
* @param limit limit
* @param fullName fullName
* @return resource content
*/
Result<Object> readResource(User loginUser, int resourceId, int skipLineNum, int limit);
Result<Object> readResource(User loginUser, String fullName, String tenantCode, int skipLineNum, int limit);
/**
* create resource file online
......@@ -169,7 +165,7 @@ public interface ResourcesService {
* @return create result code
*/
Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix,
String desc, String content, int pid, String currentDirectory);
String desc, String content, String currentDirectory);
/**
* create or update resource.
......@@ -203,16 +199,16 @@ public interface ResourcesService {
* @param content content
* @return update result cod
*/
Result<Object> updateResourceContent(User loginUser, int resourceId, String content);
Result<Object> updateResourceContent(User loginUser, String fullName, String tenantCode,
String content);
/**
* download file
*
* @param resourceId resource id
* @return resource content
* @throws IOException exception
*/
org.springframework.core.io.Resource downloadResource(User loginUser, int resourceId) throws IOException;
org.springframework.core.io.Resource downloadResource(User loginUser, String fullName) throws IOException;
/**
* list all file
......@@ -270,9 +266,11 @@ public interface ResourcesService {
/**
* get resource by id
* @param resourceId resource id
* @param fullName resource full name
* @param tenantCode owner's tenant code of resource
* @return resource
*/
Result<Object> queryResourceById(User loginUser, Integer resourceId);
Result<Object> queryResourceByFullName(User loginUser, String fullName, String tenantCode,
ResourceType type) throws IOException;
}
......@@ -35,18 +35,17 @@ public interface UdfFuncService {
* @param argTypes argument types
* @param database database
* @param desc description
* @param resourceId resource id
* @param className class name
* @return create result code
*/
Result<Object> createUdfFunction(User loginUser,
String funcName,
String className,
String fullName,
String argTypes,
String database,
String desc,
UdfType type,
int resourceId);
UdfType type);
/**
* query udf function
......@@ -66,6 +65,7 @@ public interface UdfFuncService {
* @param database data base
* @param desc description
* @param resourceId resource id
* @param fullName resource full name
* @param className class name
* @return update result code
*/
......@@ -77,7 +77,7 @@ public interface UdfFuncService {
String database,
String desc,
UdfType type,
int resourceId);
String fullName);
/**
* query udf function list paging
......
......@@ -25,15 +25,16 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
......@@ -66,6 +67,9 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
@Autowired
private UDFUserMapper udfUserMapper;
@Autowired(required = false)
private StorageOperate storageOperate;
/**
* create udf function
*
......@@ -75,7 +79,6 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
* @param argTypes argument types
* @param database database
* @param desc description
* @param resourceId resource id
* @param className class name
* @return create result code
*/
......@@ -84,11 +87,11 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
public Result<Object> createUdfFunction(User loginUser,
String funcName,
String className,
String fullName,
String argTypes,
String database,
String desc,
UdfType type,
int resourceId) {
UdfType type) {
Result<Object> result = new Result<>();
boolean canOperatorPermissions = canOperatorPermissions(loginUser, null, AuthorizationType.UDF,
......@@ -117,9 +120,15 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
return result;
}
Resource resource = resourceMapper.selectById(resourceId);
if (resource == null) {
logger.error("Resource does not exist, resourceId:{}.", resourceId);
Boolean existResource = false;
try {
existResource = storageOperate.exists(fullName);
} catch (IOException e) {
logger.error("Check resource error: {}", fullName, e);
}
if (!existResource) {
logger.error("resource full name {} is not exist", fullName);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
......@@ -137,8 +146,9 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
udf.setDatabase(database);
}
udf.setDescription(desc);
udf.setResourceId(resourceId);
udf.setResourceName(resource.getFullName());
// set resourceId to -1 because we do not store resource to db anymore, instead we use fullName
udf.setResourceId(-1);
udf.setResourceName(fullName);
udf.setType(type);
udf.setCreateTime(now);
......@@ -178,7 +188,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
}
UdfFunc udfFunc = udfFuncMapper.selectById(id);
if (udfFunc == null) {
logger.error("Resource does not exist, resourceId:{}.", id);
logger.error("Resource does not exist, udf func id:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
......@@ -196,7 +206,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
* @param argTypes argument types
* @param database data base
* @param desc description
* @param resourceId resource id
* @param fullName resource full name
* @param className class name
* @return update result code
*/
......@@ -209,7 +219,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
String database,
String desc,
UdfType type,
int resourceId) {
String fullName) {
Result<Object> result = new Result<>();
boolean canOperatorPermissions = canOperatorPermissions(loginUser, new Object[]{udfFuncId},
......@@ -251,13 +261,23 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
}
}
Resource resource = resourceMapper.selectById(resourceId);
if (resource == null) {
logger.error("Resource does not exist, resourceId:{}.", resourceId);
Boolean doesResExist = false;
try {
doesResExist = storageOperate.exists(fullName);
} catch (Exception e) {
logger.error("udf resource checking error", fullName);
result.setCode(Status.RESOURCE_NOT_EXIST.getCode());
result.setMsg(Status.RESOURCE_NOT_EXIST.getMsg());
return result;
}
if (!doesResExist) {
logger.error("resource full name {} is not exist", fullName);
result.setCode(Status.RESOURCE_NOT_EXIST.getCode());
result.setMsg(Status.RESOURCE_NOT_EXIST.getMsg());
return result;
}
Date now = new Date();
udf.setFuncName(funcName);
udf.setClassName(className);
......@@ -266,8 +286,9 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
udf.setDatabase(database);
}
udf.setDescription(desc);
udf.setResourceId(resourceId);
udf.setResourceName(resource.getFullName());
// set resourceId to -1 because we do not store resource to db anymore, instead we use fullName
udf.setResourceId(-1);
udf.setResourceName(fullName);
udf.setType(type);
udf.setUpdateTime(now);
......
......@@ -1173,7 +1173,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
if (CollectionUtils.isNotEmpty(components)) {
for (ResourceComponent component : components) {
// verify whether exist
if (!storageOperate.exists(oldTenantCode,
if (!storageOperate.exists(
String.format(Constants.FORMAT_S_S, srcBasePath, component.getFullName()))) {
logger.error("Resource file: {} does not exist, copy error.", component.getFullName());
throw new ServiceException(Status.RESOURCE_NOT_EXIST);
......@@ -1188,8 +1188,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
if (CollectionUtils.isEmpty(component.getChildren())) {
// if not exist,need create it
if (!storageOperate.exists(oldTenantCode,
String.format(Constants.FORMAT_S_S, dstBasePath, component.getFullName()))) {
if (!storageOperate
.exists(String.format(Constants.FORMAT_S_S, dstBasePath, component.getFullName()))) {
storageOperate.mkdir(newTenantCode,
String.format(Constants.FORMAT_S_S, dstBasePath, component.getFullName()));
}
......
......@@ -65,11 +65,15 @@ public class ResourcesControllerTest extends AbstractControllerTest {
public void testQuerytResourceList() throws Exception {
Map<String, Object> mockResult = new HashMap<>();
mockResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(resourcesService.queryResourceList(Mockito.any(), Mockito.any())).thenReturn(mockResult);
Mockito.when(resourcesService.queryResourceList(Mockito.any(), Mockito.any(), Mockito.anyString()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
paramsMap.add("type", ResourceType.FILE.name());
MvcResult mvcResult = mockMvc.perform(get("/resources/list")
.header(SESSION_ID, sessionId)
.param("type", ResourceType.FILE.name()))
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
......@@ -85,8 +89,8 @@ public class ResourcesControllerTest extends AbstractControllerTest {
Result mockResult = new Result<>();
mockResult.setCode(Status.SUCCESS.getCode());
Mockito.when(resourcesService.queryResourceListPaging(
Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyString(), Mockito.anyInt(),
Mockito.anyInt()))
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.any(),
Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
......@@ -95,6 +99,8 @@ public class ResourcesControllerTest extends AbstractControllerTest {
paramsMap.add("pageNo", "1");
paramsMap.add("searchVal", "test");
paramsMap.add("pageSize", "1");
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
paramsMap.add("tenantCode", "123");
MvcResult mvcResult = mockMvc.perform(get("/resources")
.header(SESSION_ID, sessionId)
......@@ -137,14 +143,17 @@ public class ResourcesControllerTest extends AbstractControllerTest {
public void testViewResource() throws Exception {
Result mockResult = new Result<>();
mockResult.setCode(Status.HDFS_NOT_STARTUP.getCode());
Mockito.when(resourcesService.readResource(Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt()))
Mockito.when(resourcesService.readResource(Mockito.any(),
Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("skipLineNum", "2");
paramsMap.add("limit", "100");
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
paramsMap.add("tenantCode", "123");
MvcResult mvcResult = mockMvc.perform(get("/resources/{id}/view", "5")
MvcResult mvcResult = mockMvc.perform(get("/resources/view")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
......@@ -163,7 +172,7 @@ public class ResourcesControllerTest extends AbstractControllerTest {
mockResult.setCode(Status.TENANT_NOT_EXIST.getCode());
Mockito.when(resourcesService
.onlineCreateResource(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
......@@ -192,14 +201,17 @@ public class ResourcesControllerTest extends AbstractControllerTest {
public void testUpdateResourceContent() throws Exception {
Result mockResult = new Result<>();
mockResult.setCode(Status.TENANT_NOT_EXIST.getCode());
Mockito.when(resourcesService.updateResourceContent(Mockito.any(), Mockito.anyInt(), Mockito.anyString()))
Mockito.when(resourcesService.updateResourceContent(Mockito.any(), Mockito.anyString(),
Mockito.anyString(), Mockito.anyString()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "1");
paramsMap.add("content", "echo test_1111");
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
paramsMap.add("tenantCode", "123");
MvcResult mvcResult = mockMvc.perform(put("/resources/1/update-content")
MvcResult mvcResult = mockMvc.perform(put("/resources/update-content")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
......@@ -215,9 +227,14 @@ public class ResourcesControllerTest extends AbstractControllerTest {
@Test
public void testDownloadResource() throws Exception {
Mockito.when(resourcesService.downloadResource(Mockito.any(), Mockito.anyInt())).thenReturn(null);
Mockito.when(resourcesService.downloadResource(Mockito.any(), Mockito.anyString()))
.thenReturn(null);
MvcResult mvcResult = mockMvc.perform(get("/resources/{id}/download", 5)
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
MvcResult mvcResult = mockMvc.perform(get("/resources/download")
.params(paramsMap)
.header(SESSION_ID, sessionId))
.andExpect(status().is(HttpStatus.BAD_REQUEST.value()))
.andReturn();
......@@ -231,7 +248,7 @@ public class ResourcesControllerTest extends AbstractControllerTest {
mockResult.setCode(Status.TENANT_NOT_EXIST.getCode());
Mockito.when(udfFuncService
.createUdfFunction(Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyInt()))
Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
......@@ -242,8 +259,9 @@ public class ResourcesControllerTest extends AbstractControllerTest {
paramsMap.add("database", "database");
paramsMap.add("description", "description");
paramsMap.add("resourceId", "1");
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
MvcResult mvcResult = mockMvc.perform(post("/resources/{resourceId}/udf-func", "123")
MvcResult mvcResult = mockMvc.perform(post("/resources/udf-func")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
......@@ -282,7 +300,8 @@ public class ResourcesControllerTest extends AbstractControllerTest {
mockResult.setCode(Status.TENANT_NOT_EXIST.getCode());
Mockito.when(udfFuncService
.updateUdfFunc(Mockito.any(), Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyInt()))
Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(),
Mockito.anyString()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
......@@ -294,8 +313,9 @@ public class ResourcesControllerTest extends AbstractControllerTest {
paramsMap.add("database", "database");
paramsMap.add("description", "description");
paramsMap.add("resourceId", "1");
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
MvcResult mvcResult = mockMvc.perform(put("/resources/{resourceId}/udf-func/{id}", "123", "456")
MvcResult mvcResult = mockMvc.perform(put("/resources/udf-func/{id}", "456")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
......@@ -465,10 +485,15 @@ public class ResourcesControllerTest extends AbstractControllerTest {
public void testDeleteResource() throws Exception {
Result mockResult = new Result<>();
mockResult.setCode(Status.SUCCESS.getCode());
Mockito.when(resourcesService.delete(Mockito.any(), Mockito.anyInt())).thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(delete("/resources/{id}", "123")
.header(SESSION_ID, sessionId))
Mockito.when(resourcesService.delete(Mockito.any(), Mockito.anyString(),
Mockito.anyString()))
.thenReturn(mockResult);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("fullName", "dolphinscheduler/resourcePath");
paramsMap.add("tenantCode", "123");
MvcResult mvcResult = mockMvc.perform(delete("/resources")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
......
......@@ -27,9 +27,12 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
......@@ -41,17 +44,21 @@ public class TenantControllerTest extends AbstractControllerTest {
private static final Logger logger = LoggerFactory.getLogger(TenantControllerTest.class);
private MockedStatic<PropertyUtils> mockedStaticPropertyUtils;
@Test
public void testCreateTenant() throws Exception {
mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("tenantCode", "hayden");
paramsMap.add("queueId", "1");
paramsMap.add("description", "tenant description");
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
MvcResult mvcResult = mockMvc.perform(post("/tenants/")
MvcResult mvcResult = mockMvc.perform(post("/tenants")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
......@@ -59,6 +66,7 @@ public class TenantControllerTest extends AbstractControllerTest {
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
mockedStaticPropertyUtils.close();
}
@Test
......
......@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import java.util.ArrayList;
import java.util.List;
......@@ -31,53 +31,43 @@ import org.junit.jupiter.api.Test;
public class ResourceTreeVisitorTest {
@Test
public void visit() throws Exception {
List<Resource> resourceList = new ArrayList<>();
public void visit() {
List<StorageEntity> resourceList = new ArrayList<>();
StorageEntity resource1 = new StorageEntity();
resource1.setFullName("/default/a");
resource1.setPfullName("/default");
StorageEntity resource2 = new StorageEntity();
resource1.setFullName("/default/a/a1.txt");
resource1.setPfullName("/default/a");
Resource resource1 = new Resource(3, -1, "b", "/b", true);
Resource resource2 = new Resource(4, 2, "a1.txt", "/a/a1.txt", false);
Resource resource3 = new Resource(5, 3, "b1.txt", "/b/b1.txt", false);
Resource resource4 = new Resource(6, 3, "b2.jar", "/b/b2.jar", false);
Resource resource5 = new Resource(7, -1, "b2", "/b2", true);
Resource resource6 = new Resource(8, -1, "b2", "/b/b2", true);
Resource resource7 = new Resource(9, 8, "c2.jar", "/b/b2/c2.jar", false);
resourceList.add(resource1);
resourceList.add(resource2);
resourceList.add(resource3);
resourceList.add(resource4);
resourceList.add(resource5);
resourceList.add(resource6);
resourceList.add(resource7);
ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
ResourceComponent resourceComponent = resourceTreeVisitor.visit();
ResourceComponent resourceComponent = resourceTreeVisitor.visit("/default");
Assertions.assertNotNull(resourceComponent.getChildren());
}
@Test
public void rootNode() throws Exception {
List<Resource> resourceList = new ArrayList<>();
public void rootNode() {
List<StorageEntity> resourceList = new ArrayList<>();
StorageEntity resource1 = new StorageEntity();
resource1.setFullName("/default/a");
resource1.setPfullName("/default");
StorageEntity resource2 = new StorageEntity();
resource1.setFullName("/default/a/a1.txt");
resource1.setPfullName("/default/a");
Resource resource1 = new Resource(3, -1, "b", "/b", true);
Resource resource2 = new Resource(4, 2, "a1.txt", "/a/a1.txt", false);
Resource resource3 = new Resource(5, 3, "b1.txt", "/b/b1.txt", false);
Resource resource4 = new Resource(6, 3, "b2.jar", "/b/b2.jar", false);
Resource resource5 = new Resource(7, -1, "b2", "/b2", true);
Resource resource6 = new Resource(8, -1, "b2", "/b/b2", true);
Resource resource7 = new Resource(9, 8, "c2.jar", "/b/b2/c2.jar", false);
resourceList.add(resource1);
resourceList.add(resource2);
resourceList.add(resource3);
resourceList.add(resource4);
resourceList.add(resource5);
resourceList.add(resource6);
resourceList.add(resource7);
ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
Assertions.assertTrue(resourceTreeVisitor.rootNode(resource1));
Assertions.assertTrue(resourceTreeVisitor.rootNode(resource2));
Assertions.assertFalse(resourceTreeVisitor.rootNode(resource3));
Assertions.assertTrue(resourceTreeVisitor.rootNode(resource1, "/default"));
Assertions.assertFalse(resourceTreeVisitor.rootNode(resource2, "/default"));
}
}
......@@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.commons.collections.CollectionUtils;
......@@ -96,6 +97,9 @@ public class TenantServiceTest {
@Mock
private ResourcePermissionCheckService resourcePermissionCheckService;
@Mock
private StorageOperate storageOperate;
private static final String tenantCode = "hayden";
private static final String tenantDesc = "This is the tenant desc";
private static final String queue = "queue";
......
......@@ -34,9 +34,11 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.commons.collections.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
......@@ -85,6 +87,9 @@ public class UdfFuncServiceTest {
@Mock
private UDFUserMapper udfUserMapper;
@Mock
private StorageOperate storageOperate;
@BeforeEach
public void setUp() {
mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class);
......@@ -108,21 +113,27 @@ public class UdfFuncServiceTest {
// hdfs not start
Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, Integer.MAX_VALUE);
"UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE);
logger.info(result.toString());
Assertions.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(), result.getMsg());
// resource not exist
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, Integer.MAX_VALUE);
"UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE);
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());
// success
Mockito.when(resourceMapper.selectById(1)).thenReturn(getResource());
try {
Mockito.when(storageOperate.exists("String")).thenReturn(true);
} catch (IOException e) {
logger.error("AmazonServiceException when checking resource: String");
}
result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, 1);
"UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE);
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
......@@ -154,7 +165,6 @@ public class UdfFuncServiceTest {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
Mockito.when(udfFuncMapper.selectUdfById(1)).thenReturn(getUdfFunc());
Mockito.when(resourceMapper.selectById(1)).thenReturn(getResource());
// UDF_FUNCTION_NOT_EXIST
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.UDF, null, 1,
......@@ -163,7 +173,7 @@ public class UdfFuncServiceTest {
serviceLogger)).thenReturn(true);
Result<Object> result = udfFuncService.updateUdfFunc(getLoginUser(), 12, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, 1);
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");
logger.info(result.toString());
Assertions.assertTrue(Status.UDF_FUNCTION_NOT_EXIST.getCode() == result.getCode());
......@@ -172,7 +182,7 @@ public class UdfFuncServiceTest {
serviceLogger)).thenReturn(true);
result = udfFuncService.updateUdfFunc(getLoginUser(), 1, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, 1);
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");
logger.info(result.toString());
Assertions.assertTrue(Status.HDFS_NOT_STARTUP.getCode() == result.getCode());
......@@ -185,16 +195,22 @@ public class UdfFuncServiceTest {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = udfFuncService.updateUdfFunc(getLoginUser(), 11, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, 12);
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");
logger.info(result.toString());
Assertions.assertTrue(Status.RESOURCE_NOT_EXIST.getCode() == result.getCode());
// success
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.UDF, null, 1,
ApiFuncIdentificationConstant.UDF_FUNCTION_UPDATE, serviceLogger)).thenReturn(true);
try {
Mockito.when(storageOperate.exists("")).thenReturn(true);
} catch (IOException e) {
logger.error("AmazonServiceException when checking resource: ");
}
result = udfFuncService.updateUdfFunc(getLoginUser(), 11, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, 1);
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");
logger.info(result.toString());
Assertions.assertTrue(Status.SUCCESS.getCode() == result.getCode());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import lombok.Data;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@TableName("t_ds_relation_resources_task")
public class ResourcesTask {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private String fullName;
private int taskId;
private ResourceType type;
public ResourcesTask(int id, String fullName, int taskId, ResourceType type) {
this.id = id;
this.fullName = fullName;
this.taskId = taskId;
this.type = type;
}
public ResourcesTask(int taskId, String fullName, ResourceType type) {
this.taskId = taskId;
this.fullName = fullName;
this.type = type;
}
}
......@@ -54,7 +54,6 @@ public interface ResourceMapper extends BaseMapper<Resource> {
/**
* resource page
* @param page page
* @param userId userId
* @param id id
* @param type type
* @param searchVal searchVal
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ResourcesTask;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* resource task relation mapper interface
*/
public interface ResourceTaskMapper extends BaseMapper<ResourcesTask> {
Integer existResourceByTaskIdNFullName(@Param("taskId") int task_id, @Param("fullName") String fullName);
int deleteIds(@Param("resIds") Integer[] resIds);
int updateResource(@Param("id") int id, @Param("fullName") String fullName);
List<ResourcesTask> selectBatchFullNames(@Param("fullNameArr") String[] fullNameArr);
List<ResourcesTask> selectSubfoldersFullNames(@Param("folderPath") String folderPath);
}
......@@ -75,7 +75,7 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
List<DefinitionGroupByUser> countDefinitionGroupByUser(@Param("projectCodes") Long[] projectCodes);
/**
* list all resource ids
* list all resource ids and task_params containing resourceList
*
* @return task ids list
*/
......
......@@ -49,7 +49,7 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
/**
* udf function page
* @param page page
* @param userId userId
* @param ids userId
* @param searchVal searchVal
* @return udf function IPage
*/
......@@ -59,7 +59,7 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
/**
* query udf function by type
* @param userId userId
* @param ids userId
* @param type type
* @return udf function list
*/
......@@ -95,6 +95,13 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
List<UdfFunc> listUdfByResourceId(@Param("resourceIds") Integer[] resourceIds);
/**
* list UDF by resource fullName
* @param resourceFullNames resource fullName array
* @return UDF function list
*/
List<UdfFunc> listUdfByResourceFullName(@Param("resourceFullNames") String[] resourceFullNames);
/**
* list authorized UDF by resource id
* @param resourceIds resource id array
......
......@@ -66,4 +66,34 @@ public class ResourceProcessDefinitionUtils {
return resourceResult;
}
public static <T> Map<Integer, Set<T>> getResourceObjectMap(List<Map<String, Object>> resourceList,
String objectName, Class<T> clazz) {
// resourceId -> task ids or code depends on the objectName
Map<Integer, Set<T>> resourceResult = new HashMap<>();
if (CollectionUtils.isNotEmpty(resourceList)) {
for (Map<String, Object> resourceMap : resourceList) {
// get resName and resource_id_news, td_id
T taskId = (T) resourceMap.get(objectName);
String[] resourceIds = ((String) resourceMap.get("resource_ids"))
.split(",");
Set<Integer> resourceIdSet =
Arrays.stream(resourceIds).map(Integer::parseInt).collect(Collectors.toSet());
for (Integer resourceId : resourceIdSet) {
Set<T> codeSet;
if (resourceResult.containsKey(resourceId)) {
codeSet = resourceResult.get(resourceId);
} else {
codeSet = new HashSet<>();
}
codeSet.add(taskId);
resourceResult.put(resourceId, codeSet);
}
}
}
return resourceResult;
}
}
......@@ -168,7 +168,7 @@
</select>
<select id="listResources" resultType="java.util.HashMap">
SELECT distinct pd.code,td.resource_ids
SELECT distinct pd.code, td.id as td_id, td.resource_ids
FROM t_ds_process_task_relation ptr
join t_ds_process_definition pd
on ptr.process_definition_code=pd.code and ptr.process_definition_version = pd.version
......@@ -180,7 +180,7 @@
</select>
<select id="listResourcesByUser" resultType="java.util.HashMap">
SELECT distinct pd.code,td.resource_ids
SELECT distinct pd.code, td.id as td_id, td.resource_ids
FROM t_ds_process_task_relation ptr
join t_ds_process_definition pd
on ptr.process_definition_code=pd.code and ptr.process_definition_version = pd.version
......
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper">
<sql id="baseSqlV2">
${alias}.id, ${alias}.full_name, ${alias}.task_id, ${alias}.type
</sql>
<select id="existResourceByTaskIdNFullName" resultType="java.lang.Integer">
select
id
from t_ds_relation_resources_task
where full_name = #{fullName} and task_id = #{taskId}
</select>
<select id="selectBatchFullNames" resultType="org.apache.dolphinscheduler.dao.entity.ResourcesTask">
select
id, full_name, task_id, type
from t_ds_relation_resources_task
where full_name in
<foreach collection="fullNameArr" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</select>
<update id="updateResource" >
UPDATE t_ds_relation_resources_task SET full_name=#{fullName} WHERE id=#{id}
</update>
<delete id="deleteIds" parameterType="java.lang.Integer">
delete from t_ds_relation_resources_task where id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</delete>
<select id="selectSubfoldersFullNames" resultType="org.apache.dolphinscheduler.dao.entity.ResourcesTask">
select
id, full_name, task_id, type
from t_ds_relation_resources_task
where full_name like concat(#{folderPath}, '%')
</select>
</mapper>
\ No newline at end of file
......@@ -96,8 +96,8 @@
#{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},
#{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes},
#{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout},
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}, #{taskDefinition.taskGroupId},
#{taskDefinition.taskExecuteType})
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime},
#{taskDefinition.taskGroupId}, #{taskDefinition.taskExecuteType})
</foreach>
</insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
......
......@@ -141,6 +141,20 @@
</foreach>
</if>
</select>
<select id="listUdfByResourceFullName" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select
<include refid="baseSql">
<property name="alias" value="udf"/>
</include>
from t_ds_udfs udf
where 1=1
<if test="resourceFullNames != null and resourceFullNames.length > 0">
and udf.resource_name in
<foreach collection="resourceFullNames" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<select id="listAuthorizedUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select
<include refid="baseSql">
......
......@@ -793,6 +793,24 @@ CREATE TABLE t_ds_resources
-- Records of t_ds_resources
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_relation_resources_task
-- ----------------------------
DROP TABLE IF EXISTS t_ds_relation_resources_task CASCADE;
CREATE TABLE t_ds_relation_resources_task
(
id int(11) NOT NULL AUTO_INCREMENT,
task_id int(11) DEFAULT NULL,
full_name varchar(255) DEFAULT NULL,
type tinyint(4) DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY t_ds_relation_resources_task_un (task_id, full_name)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_ds_relation_resources_task
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_schedules
-- ----------------------------
......
......@@ -791,6 +791,23 @@ CREATE TABLE `t_ds_resources` (
-- Records of t_ds_resources
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_relation_resources_task
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_relation_resources_task`;
CREATE TABLE `t_ds_relation_resources_task` (
`id` int NOT NULL AUTO_INCREMENT COMMENT 'key',
`task_id` int(11) DEFAULT NULL COMMENT 'task id',
`full_name` varchar(255) DEFAULT NULL,
`type` tinyint DEFAULT NULL COMMENT 'resource type,0:FILE,1:UDF',
PRIMARY KEY (`id`),
UNIQUE KEY `t_ds_relation_resources_task_un` (`task_id`, `full_name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_ds_relation_resources_task
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_schedules
-- ----------------------------
......
......@@ -700,6 +700,18 @@ CREATE TABLE t_ds_resources (
CONSTRAINT t_ds_resources_un UNIQUE (full_name, type)
) ;
--
-- Table structure for table t_ds_relation_resources_task
--
DROP TABLE IF EXISTS t_ds_relation_resources_task;
CREATE TABLE t_ds_relation_resources_task (
id SERIAL NOT NULL,
task_id int DEFAULT NULL,
full_name varchar(255) DEFAULT NULL,
type int DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT t_ds_relation_resources_task_un UNIQUE (task_id, full_name)
);
--
-- Table structure for table t_ds_schedules
......
......@@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRule;
import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
......@@ -92,9 +91,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
......@@ -624,26 +620,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources =
projectResourceFiles.stream().filter(t -> t.getId() == null).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
oldVersionResources.forEach(t -> resourcesMap.put(t.getRes(),
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
}
// get the resource id in order to get the resource names in batch
Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(ResourceInfo::getId);
Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds);
resources.forEach(t -> resourcesMap.put(t.getFullName(),
processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
}
// TODO: Modify this part to accomodate(migrate) oldversionresources in the future.
projectResourceFiles.forEach(file -> resourcesMap.put(file.getResourceName(),
processService.queryTenantCodeByResName(file.getResourceName(), ResourceType.FILE)));
}
}
......
......@@ -69,6 +69,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.ResourcesTask;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
......@@ -97,6 +98,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
......@@ -142,9 +144,11 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
......@@ -229,6 +233,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private ResourceMapper resourceMapper;
@Autowired
private ResourceTaskMapper resourceTaskMapper;
@Autowired
private ResourceUserMapper resourceUserMapper;
......@@ -1376,7 +1383,7 @@ public class ProcessServiceImpl implements ProcessService {
ResourceInfo mainJar = JSONUtils.parseObject(
JSONUtils.toJsonString(mainJarObj),
ResourceInfo.class);
ResourceInfo resourceInfo = updateResourceInfo(mainJar);
ResourceInfo resourceInfo = updateResourceInfo(taskDefinition.getId(), mainJar);
if (resourceInfo != null) {
taskParameters.put("mainJar", resourceInfo);
}
......@@ -1387,7 +1394,7 @@ public class ProcessServiceImpl implements ProcessService {
List<ResourceInfo> resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class);
List<ResourceInfo> updatedResourceInfos = resourceInfos
.stream()
.map(this::updateResourceInfo)
.map(resourceInfo -> updateResourceInfo(taskDefinition.getId(), resourceInfo))
.filter(Objects::nonNull)
.collect(Collectors.toList());
taskParameters.put("resourceList", updatedResourceInfos);
......@@ -1403,21 +1410,23 @@ public class ProcessServiceImpl implements ProcessService {
* @param res origin resource info
* @return {@link ResourceInfo}
*/
protected ResourceInfo updateResourceInfo(ResourceInfo res) {
protected ResourceInfo updateResourceInfo(int task_id, ResourceInfo res) {
ResourceInfo resourceInfo = null;
// only if mainJar is not null and does not contains "resourceName" field
// only if mainJar is not null and does not contain "resourceName" field
if (res != null) {
Integer resourceId = res.getId();
if (resourceId == null) {
logger.error("invalid resourceId, {}", resourceId);
return null;
String resourceFullName = res.getResourceName();
if (StringUtils.isBlank(resourceFullName)) {
logger.error("invalid resource full name, {}", resourceFullName);
return new ResourceInfo();
}
resourceInfo = new ResourceInfo();
// get resource from database, only one resource should be returned
Resource resource = getResourceById(resourceId);
resourceInfo.setId(resourceId);
resourceInfo.setRes(resource.getFileName());
resourceInfo.setResourceName(resource.getFullName());
Integer resultList = resourceTaskMapper.existResourceByTaskIdNFullName(task_id, resourceFullName);
if (resultList != null) {
resourceInfo.setId(resultList);
resourceInfo.setRes(res.getRes());
resourceInfo.setResourceName(resourceFullName);
}
if (logger.isInfoEnabled()) {
logger.info("updated resource info {}",
JSONUtils.toJsonString(resourceInfo));
......@@ -2033,7 +2042,6 @@ public class ProcessServiceImpl implements ProcessService {
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() == 0) {
taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
}
......@@ -2074,6 +2082,8 @@ public class ProcessServiceImpl implements ProcessService {
TaskDefinition task = taskDefinitionMap.get(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
taskDefinitionToUpdate.setId(task.getId());
}
}
}
......@@ -2091,9 +2101,42 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
for (TaskDefinitionLog newTaskDefinitionLog : newTaskDefinitionLogs) {
Set<String> resourceFullNameSet = getResourceFullNames(newTaskDefinitionLog);
for (String resourceFullName : resourceFullNameSet) {
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.selectByMap(
Collections.singletonMap("code", newTaskDefinitionLog.getCode()));
if (taskDefinitionList.size() > 0) {
createRelationTaskResourcesIfNotExist(
taskDefinitionList.get(0).getId(), resourceFullName);
}
}
}
}
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
for (TaskDefinitionLog taskDefinitionLog : updateTaskDefinitionLogs) {
Set<String> resourceFullNameSet = getResourceFullNames(taskDefinitionLog);
// remove resources that user deselected.
for (ResourcesTask resourcesTask : resourceTaskMapper.selectByMap(
Collections.singletonMap("task_id",
taskDefinitionMapper.queryByCode(taskDefinitionLog.getCode()).getId()))) {
if (!resourceFullNameSet.contains(resourcesTask.getFullName())) {
resourceTaskMapper.deleteById(resourcesTask.getId());
}
}
for (String resourceFullName : resourceFullNameSet) {
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.selectByMap(
Collections.singletonMap("code", taskDefinitionLog.getCode()));
if (taskDefinitionList.size() > 0) {
createRelationTaskResourcesIfNotExist(
taskDefinitionList.get(0).getId(), resourceFullName);
}
}
updateResult += taskDefinitionMapper.updateById(taskDefinitionLog);
}
}
......@@ -2682,4 +2725,36 @@ public class ProcessServiceImpl implements ProcessService {
return testDataSourceId;
return null;
}
private Set<String> getResourceFullNames(TaskDefinition taskDefinition) {
Set<String> resourceFullNames = null;
AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType()).taskParams(taskDefinition.getTaskParams()).build());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceFullNames = params.getResourceFilesList().stream()
.filter(t -> !StringUtils.isBlank(t.getResourceName()))
.map(ResourceInfo::getResourceName)
.collect(toSet());
}
if (CollectionUtils.isEmpty(resourceFullNames)) {
return new HashSet<String>();
}
return resourceFullNames;
}
private Integer createRelationTaskResourcesIfNotExist(int taskId, String resourceFullName) {
Integer resourceId = resourceTaskMapper.existResourceByTaskIdNFullName(taskId, resourceFullName);
if (null == resourceId) {
// create the relation if not exist
ResourcesTask resourcesTask = new ResourcesTask(taskId, resourceFullName, ResourceType.FILE);
resourceTaskMapper.insert(resourcesTask);
return resourcesTask.getId();
}
return resourceId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.dolphinscheduler.service.storage;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
import lombok.Data;
// StorageEneity is an entity representing a resource in the third-part storage service.
// It is only stored in t_ds_relation_resources_task if the resource is used by a task.
// It is not put in the model module because it has more attributes than corresponding objects stored
// in table t_ds_relation_resources_task.
@Data
public class StorageEntity {
/**
* exist only if it is stored in t_ds_relation_resources_task.
*
*/
private int id;
/**
* fullname is in a format of basepath + tenantCode + res/udf + filename
*/
private String fullName;
/**
* filename is in a format of possible parent folders + alias
*/
private String fileName;
/**
* the name of the file
*/
private String alias;
/**
* parent folder time
*/
private String pfullName;
private boolean isDirectory;
private String description;
private int userId;
private String userName;
private ResourceType type;
private long size;
private Date createTime;
private Date updateTime;
}
......@@ -68,6 +68,13 @@ public interface StorageOperate {
*/
String getResourceFileName(String tenantCode, String fullName);
/**
* get the path of the resource file excluding the base path.
* @param fullName
* @return
*/
String getResourceFileName(String fullName);
/**
* get the path of the file
* @param resourceType
......@@ -79,23 +86,23 @@ public interface StorageOperate {
/**
* predicate if the resource of tenant exists
* @param tenantCode
* @param fileName
* @param fullName
* @return
* @throws IOException
*/
boolean exists(String tenantCode, String fileName) throws IOException;
boolean exists(String fullName) throws IOException;
/**
* delete the resource of filePath
* todo if the filePath is the type of directory ,the files in the filePath need to be deleted at all
* @param tenantCode
* @param filePath
* @param recursive
* @return
* @throws IOException
*/
boolean delete(String tenantCode, String filePath, boolean recursive) throws IOException;
boolean delete(String filePath, boolean recursive) throws IOException;
boolean delete(String filePath, List<String> childrenPathArray, boolean recursive) throws IOException;
/**
* copy the file from srcPath to dstPath
......@@ -167,4 +174,21 @@ public interface StorageOperate {
*/
ResUploadType returnStorageType();
/**
* return files and folders in the current directory and subdirectories
* */
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode,
ResourceType type);
/**
* return files and folders in the current directory
* */
public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception;
/**
* return a file status
* */
public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception;
}
......@@ -169,7 +169,7 @@ public class OssOperatorTest {
boolean doesExist = false;
doReturn(true).when(ossClientMock).doesObjectExist(BUCKET_NAME_MOCK, FILE_NAME_MOCK);
try {
doesExist = ossOperator.exists(TENANT_CODE_MOCK, FILE_NAME_MOCK);
doesExist = ossOperator.exists(FILE_NAME_MOCK);
} catch (IOException e) {
Assertions.fail("unexpected IO exception in unit test");
}
......@@ -183,7 +183,7 @@ public class OssOperatorTest {
boolean isDeleted = false;
doReturn(null).when(ossClientMock).deleteObject(anyString(), anyString());
try {
isDeleted = ossOperator.delete(TENANT_CODE_MOCK, FILE_NAME_MOCK, true);
isDeleted = ossOperator.delete(FILE_NAME_MOCK, true);
} catch (IOException e) {
Assertions.fail("unexpected IO exception in unit test");
}
......
......@@ -572,9 +572,7 @@ public class SqlTask extends AbstractTask {
String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS;
String uploadPath = CommonUtils.getHdfsUdfDir(value.getTenantCode());
String resourceFullName = value.getResourceName();
resourceFullName =
resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName);
return String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName);
return String.format("add jar %s", resourceFullName);
}).collect(Collectors.toList());
}
......
......@@ -16,7 +16,7 @@
*/
const removeUselessChildren = (
list: { children?: []; dirctory?: boolean; disabled?: boolean }[]
list: { children?: []; directory?: boolean; disabled?: boolean; dirctory?: boolean }[]
) => {
if (!list.length) return
list.forEach((item) => {
......
......@@ -125,7 +125,6 @@ export function useModal(
params.startParams = !_.isEmpty(startParams)
? JSON.stringify(startParams)
: ''
await startProcessInstance(params, variables.projectCode)
window.$message.success(t('project.workflow.success'))
state.saving = false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册