diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 7a87d552de6bba0260af489f9e6d306782deaa07..398c77f913d2b76f8625469d17fa9a59bd40f30f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -180,9 +180,10 @@ public enum Status { RESOURCE_SIZE_EXCEED_LIMIT(20007, "upload resource file size exceeds limit"), RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified"), UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar"), - HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail"), - RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!"), - RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!"), + HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail"), + RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!"), + RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists in hdfs!"), + UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index 66bf2146086795026594afb9dd41515d2b25f758..bf36c30f3f4658979d90f0e072dc69fac6097c96 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -16,12 +16,16 @@ */ package org.apache.dolphinscheduler.api.service; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.commons.collections.BeanMap; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ResourceType; -import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; @@ -29,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import org.apache.commons.collections.BeanMap; -import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.dao.mapper.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,15 +277,9 @@ public class ResourcesService extends BaseService { String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode(); // get file hdfs path // delete hdfs file by type - String originHdfsFileName = ""; - String destHdfsFileName = ""; - if (resource.getType().equals(ResourceType.FILE)) { - originHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, originResourceName); - destHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, name); - } else if (resource.getType().equals(ResourceType.UDF)) { - originHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, originResourceName); - destHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, name); - } + String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originResourceName); + String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,name); + try { if (HadoopUtils.getInstance().exists(originHdfsFileName)) { logger.info("hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName); @@ -354,15 +348,8 @@ public class ResourcesService extends BaseService { // save file to hdfs, and delete original file - String hdfsFilename = ""; - String resourcePath = ""; - if (type.equals(ResourceType.FILE)) { - hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name); - resourcePath = HadoopUtils.getHdfsResDir(tenantCode); - } else if (type.equals(ResourceType.UDF)) { - hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name); - resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode); - } + String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,name); + String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode); try { // if tenant dir not exists if (!HadoopUtils.getInstance().exists(resourcePath)) { @@ -429,12 +416,19 @@ public class ResourcesService extends BaseService { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } + //if resource type is UDF,need check whether it is bound by UDF functon + if (resource.getType() == (ResourceType.UDF)) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId}); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString()); + putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName()); + return result; + } + } - String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); - String hdfsFilename = ""; - + String tenantCode = userMapper.queryTenantCodeByUserId(resource.getUserId()).getTenantCode(); // delete hdfs file by type - hdfsFilename = getHdfsFileName(resource, tenantCode, hdfsFilename); + String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias()); //delete data in database resourcesMapper.deleteById(resourceId); @@ -466,7 +460,7 @@ public class ResourcesService extends BaseService { String tenantCode = tenant.getTenantCode(); try { - String hdfsFilename = getHdfsFileName(type,tenantCode,name); + String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,name); if(HadoopUtils.getInstance().exists(hdfsFilename)){ logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename); putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename); @@ -525,7 +519,7 @@ public class ResourcesService extends BaseService { User user = userMapper.queryDetailsById(resource.getUserId()); String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode(); // hdfs path - String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias()); + String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getAlias()); logger.info("resource hdfs path is {} ", hdfsFileName); try { if(HadoopUtils.getInstance().exists(hdfsFileName)){ @@ -684,8 +678,8 @@ public class ResourcesService extends BaseService { return result; } - // get file hdfs path - hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName); + // get resource file hdfs path + hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName); String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); logger.info("resource hdfs path is {} ", hdfsFileName); @@ -732,8 +726,7 @@ public class ResourcesService extends BaseService { User user = userMapper.queryDetailsById(resource.getUserId()); String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode(); - String hdfsFileName = ""; - hdfsFileName = getHdfsFileName(resource, tenantCode, hdfsFileName); + String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias()); String localFileName = FileUtils.getDownloadFilename(resource.getAlias()); logger.info("resource hdfs path is {} ", hdfsFileName); @@ -848,40 +841,6 @@ public class ResourcesService extends BaseService { return result; } - /** - * get hdfs file name - * - * @param resource resource - * @param tenantCode tenant code - * @param hdfsFileName hdfs file name - * @return hdfs file name - */ - private String getHdfsFileName(Resource resource, String tenantCode, String hdfsFileName) { - if (resource.getType().equals(ResourceType.FILE)) { - hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias()); - } else if (resource.getType().equals(ResourceType.UDF)) { - hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, resource.getAlias()); - } - return hdfsFileName; - } - - /** - * get hdfs file name - * - * @param resourceType resource type - * @param tenantCode tenant code - * @param hdfsFileName hdfs file name - * @return hdfs file name - */ - private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) { - if (resourceType.equals(ResourceType.FILE)) { - hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName); - } else if (resourceType.equals(ResourceType.UDF)) { - hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName); - } - return hdfsFileName; - } - /** * get authorized resource list * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java index a4664b6c3e5d820f720db2708e65daa9021bd814..dd047008bc795d396474bae363224e5e2622f8eb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.api.service; +import org.apache.commons.lang3.ArrayUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -39,6 +40,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; +import java.util.stream.Collectors; /** * user service @@ -72,6 +74,9 @@ public class UsersService extends BaseService { @Autowired private AlertGroupMapper alertGroupMapper; + @Autowired + private UdfFuncMapper udfFuncMapper; + /** * create user, only system admin have permission @@ -413,15 +418,14 @@ public class UsersService extends BaseService { return result; } - /** * grant resource - * - * @param loginUser login user - * @param userId user id - * @param resourceIds resource id array - * @return grant result code + * @param loginUser login user + * @param userId user id + * @param resourceIds resource id array + * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantResources(User loginUser, int userId, String resourceIds) { Map result = new HashMap<>(5); //only admin can operate @@ -433,15 +437,28 @@ public class UsersService extends BaseService { putMsg(result, Status.USER_NOT_EXIST, userId); return result; } + String[] resourcesIdArr = resourceIds.split(","); + //if resource type is UDF,need check whether it is bound by UDF functon + Set needAuthorizedIds = new HashSet<>(); + if (StringUtils.isNotEmpty(resourceIds)) { + needAuthorizedIds = Arrays.stream(resourcesIdArr).map(t->Integer.parseInt(t)).collect(Collectors.toSet()); + } + List udfResourceList = resourceMapper.queryResourceList("", 0, ResourceType.UDF.ordinal()); + Set allUdfResIds = udfResourceList.stream().map(t -> t.getId()).collect(Collectors.toSet()); + allUdfResIds.removeAll(needAuthorizedIds); + List udfFuncs = udfFuncMapper.listUdfByResourceId(ArrayUtils.toPrimitive(allUdfResIds.toArray(new Integer[allUdfResIds.size()]))); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString()); + putMsg(result, Status.UDF_RESOURCE_IS_BOUND, udfFuncs.get(0).getFuncName()); + return result; + } + resourcesUserMapper.deleteResourceUser(userId, 0); if (check(result, StringUtils.isEmpty(resourceIds), Status.SUCCESS)) { return result; } - - String[] resourcesIdArr = resourceIds.split(","); - for (String resourceId : resourcesIdArr) { Date now = new Date(); ResourcesUser resourcesUser = new ResourcesUser(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 8e94ccac52a1aab71ea980994e9e4e1d1c1d0314..3c0c1cf03ef05a0449857589824649fc920dc69b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -18,10 +18,17 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -29,17 +36,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.mock.web.MockMultipartFile; +import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; +import java.util.Date; +import java.util.List; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest(classes = ApiApplicationServer.class) +@Transactional +@Rollback(true) public class ResourcesServiceTest { private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class); @Autowired private ResourcesService resourcesService; + @Autowired + private ResourceMapper resourceMapper; + @Autowired + private UdfFuncMapper udfFuncMapper; + @Autowired + private UserMapper userMapper; @Test public void querytResourceList(){ @@ -50,4 +70,81 @@ public class ResourcesServiceTest { Map map = resourcesService.queryResourceList(loginUser, ResourceType.FILE); Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS)); } + + @Test + public void testCreateResource(){ + //create user + User loginUser = createGeneralUser("user1"); + String resourceName = "udf-resource-1.jar"; + String errorResourceName = "udf-resource-1"; + MockMultipartFile udfResource = new MockMultipartFile(resourceName, resourceName, "multipart/form-data", "some content".getBytes()); + Result result = resourcesService.createResource(loginUser, errorResourceName, "", ResourceType.UDF, udfResource); + Assert.assertEquals(result.getCode().intValue(),Status.RESOURCE_SUFFIX_FORBID_CHANGE.getCode()); + List resourceList = resourceMapper.queryResourceList(resourceName, loginUser.getId(), ResourceType.UDF.ordinal()); + Assert.assertTrue(resourceList.size() == 0); + + } + + @Test + public void testDelete() throws Exception{ + //create user + User loginUser = createGeneralUser("user1"); + //create resource + Resource resource = createResource(loginUser,ResourceType.UDF,"udf-resource-1"); + //create UDF function + UdfFunc udfFunc = createUdfFunc(loginUser, resource); + //delete resource + Result result = resourcesService.delete(loginUser, resource.getId()); + Assert.assertEquals(result.getCode().intValue(),Status.UDF_RESOURCE_IS_BOUND.getCode()); + } + + /** + * create general user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + + /** + * create resource by user + * @param user user + * @return Resource + */ + private Resource createResource(User user,ResourceType type,String name){ + //insertOne + Resource resource = new Resource(); + resource.setAlias(String.format("%s-%s",name,user.getUserName())); + resource.setType(type); + resource.setUserId(user.getId()); + resourceMapper.insert(resource); + return resource; + } + + /** + * insert one udf + * @return + */ + private UdfFunc createUdfFunc(User user, Resource resource){ + UdfFunc udfFunc = new UdfFunc(); + udfFunc.setUserId(user.getId()); + udfFunc.setFuncName("dolphin_udf_func"); + udfFunc.setClassName("org.apache.dolphinscheduler.test.mr"); + udfFunc.setType(UdfType.HIVE); + udfFunc.setResourceId(resource.getId()); + udfFunc.setResourceName(resource.getAlias()); + udfFunc.setCreateTime(new Date()); + udfFunc.setUpdateTime(new Date()); + udfFuncMapper.insert(udfFunc); + return udfFunc; + } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java index 30aabe93f2db2968667f85463deb131459bb600b..fd51f91780eb52fa5039aadb3a67e26fd0bbc75a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java @@ -22,10 +22,14 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.EncryptionUtils; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.*; import org.junit.After; @@ -41,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; @@ -68,6 +73,10 @@ public class UsersServiceTest { private DataSourceUserMapper datasourceUserMapper; @Mock private AlertGroupMapper alertGroupMapper; + @Mock + private ResourceMapper resourceMapper; + @Mock + private UdfFuncMapper udfFuncMapper; private String queueName ="UsersServiceTestQueue"; @@ -203,7 +212,7 @@ public class UsersServiceTest { logger.info(result.toString()); //success - when(userMapper.selectById(1)).thenReturn(getUser()); + when(userMapper.selectById(1)).thenReturn(getAdminUser()); result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue"); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); @@ -218,8 +227,8 @@ public class UsersServiceTest { User loginUser = new User(); try { - when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser()); - when(userMapper.selectById(1)).thenReturn(getUser()); + when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getAdminUser()); + when(userMapper.selectById(1)).thenReturn(getAdminUser()); //no operate Map result = usersService.deleteUserById(loginUser,3); @@ -247,7 +256,7 @@ public class UsersServiceTest { @Test public void testGrantProject(){ - when(userMapper.selectById(1)).thenReturn(getUser()); + when(userMapper.selectById(1)).thenReturn(getAdminUser()); User loginUser = new User(); String projectIds= "100000,120000"; Map result = usersService.grantProject(loginUser, 1, projectIds); @@ -268,20 +277,46 @@ public class UsersServiceTest { public void testGrantResources(){ String resourceIds = "100000,120000"; - when(userMapper.selectById(1)).thenReturn(getUser()); - User loginUser = new User(); - Map result = usersService.grantResources(loginUser, 1, resourceIds); + User needAuthorizedUser = new User(); + needAuthorizedUser.setUserType(UserType.GENERAL_USER); + needAuthorizedUser.setId(100); + + User generalUser = getGeneralUser(); + User adminUser = getAdminUser(); + when(userMapper.selectById(needAuthorizedUser.getId())).thenReturn(generalUser); + + Map result = usersService.grantResources(generalUser, needAuthorizedUser.getId(), resourceIds); logger.info(result.toString()); Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS)); //user not exist - loginUser.setUserType(UserType.ADMIN_USER); - result = usersService.grantResources(loginUser, 2, resourceIds); + result = usersService.grantResources(adminUser, 2, resourceIds); logger.info(result.toString()); Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS)); //success - result = usersService.grantResources(loginUser, 1, resourceIds); + result = usersService.grantResources(adminUser, needAuthorizedUser.getId(), resourceIds); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + + List udfResourceList = new ArrayList() {{ + add(createResource(getAdminUser(), ResourceType.UDF, 100000)); + add(createResource(getAdminUser(), ResourceType.UDF, 120000)); + }}; + when(resourceMapper.queryResourceList("", 0, ResourceType.UDF.ordinal())).thenReturn(udfResourceList); + + //mock udf function list + UdfFunc udfFunc = createUdfFunc(getAdminUser(), 100000); + List udfFuncs = new ArrayList<>(); + udfFuncs.add(udfFunc); + + when(udfFuncMapper.listUdfByResourceId(new int[]{100000})).thenReturn(udfFuncs); + + //fail if udf resource is already bound by the udf function + result = usersService.grantResources(adminUser, needAuthorizedUser.getId(), "120000"); + Assert.assertEquals(Status.UDF_RESOURCE_IS_BOUND, result.get(Constants.STATUS)); + + result = usersService.grantResources(adminUser, needAuthorizedUser.getId(), "100000"); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } @@ -289,7 +324,7 @@ public class UsersServiceTest { public void testGrantUDFFunction(){ String udfIds = "100000,120000"; - when(userMapper.selectById(1)).thenReturn(getUser()); + when(userMapper.selectById(1)).thenReturn(getAdminUser()); User loginUser = new User(); Map result = usersService.grantUDFFunction(loginUser, 1, udfIds); logger.info(result.toString()); @@ -309,7 +344,7 @@ public class UsersServiceTest { public void testGrantDataSource(){ String datasourceIds = "100000,120000"; - when(userMapper.selectById(1)).thenReturn(getUser()); + when(userMapper.selectById(1)).thenReturn(getAdminUser()); User loginUser = new User(); Map result = usersService.grantDataSource(loginUser, 1, datasourceIds); logger.info(result.toString()); @@ -350,7 +385,7 @@ public class UsersServiceTest { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); tempUser = (User) result.get(Constants.DATA_LIST); //check userName - Assert.assertEquals("userTest0001",tempUser.getUserName()); + Assert.assertEquals("general-user-0001",tempUser.getUserName()); } @@ -380,7 +415,7 @@ public class UsersServiceTest { logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); //exist user - when(userMapper.queryByUserNameAccurately("userTest0001")).thenReturn(getUser()); + when(userMapper.queryByUserNameAccurately("userTest0001")).thenReturn(getAdminUser()); result = usersService.verifyUserName("userTest0001"); logger.info(result.toString()); Assert.assertEquals(Status.USER_NAME_EXIST.getMsg(), result.getMsg()); @@ -430,8 +465,8 @@ public class UsersServiceTest { User user = new User(); user.setUserType(UserType.GENERAL_USER); - user.setUserName("userTest0001"); - user.setUserPassword("userTest0001"); + user.setUserName("general-user-0001"); + user.setUserPassword("general-user-0001"); return user; } @@ -445,7 +480,7 @@ public class UsersServiceTest { /** * get user */ - private User getUser(){ + private User getAdminUser(){ User user = new User(); user.setUserType(UserType.ADMIN_USER); @@ -461,4 +496,37 @@ public class UsersServiceTest { return tenant; } + /** + * create resource by user + * @param user user + * @return Resource + */ + private Resource createResource(User user, ResourceType type,int id){ + //insertOne + Resource resource = new Resource(); + resource.setId(id); + resource.setType(type); + resource.setUserId(user.getId()); + resourceMapper.insert(resource); + return resource; + } + + /** + * create udf function + * @return udf function + */ + private UdfFunc createUdfFunc(User user, int resourceId){ + UdfFunc udfFunc = new UdfFunc(); + udfFunc.setUserId(user.getId()); + udfFunc.setFuncName("dolphin_udf_func"); + udfFunc.setClassName("org.apache.dolphinscheduler.test.mr"); + udfFunc.setType(UdfType.HIVE); + udfFunc.setResourceId(resourceId); + udfFunc.setCreateTime(new Date()); + udfFunc.setUpdateTime(new Date()); + udfFuncMapper.insert(udfFunc); + return udfFunc; + } + + } \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 6a10a87b513c925ef869f92f63034173de884d5a..60a529e6b1dce6f7c5125f93401f18e8bb79a127 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -24,6 +24,7 @@ import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; @@ -412,6 +413,22 @@ public class HadoopUtils implements Closeable { } } + /** + * hdfs resource dir + * + * @param tenantCode tenant code + * @return hdfs resource dir + */ + public static String getHdfsDir(ResourceType resourceType,String tenantCode) { + String hdfsDir = ""; + if (resourceType.equals(ResourceType.FILE)) { + hdfsDir = getHdfsResDir(tenantCode); + } else if (resourceType.equals(ResourceType.UDF)) { + hdfsDir = getHdfsUdfDir(tenantCode); + } + return hdfsDir; + } + /** * hdfs resource dir * @@ -447,22 +464,42 @@ public class HadoopUtils implements Closeable { * get absolute path and name for file on hdfs * * @param tenantCode tenant code - * @param filename file name + * @param fileName file name + * @return get absolute path and name for file on hdfs + */ + + /** + * get hdfs file name + * + * @param resourceType resource type + * @param tenantCode tenant code + * @param fileName file name + * @return hdfs file name + */ + public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) { + return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName); + } + + /** + * get absolute path and name for resource file on hdfs + * + * @param tenantCode tenant code + * @param fileName file name * @return get absolute path and name for file on hdfs */ - public static String getHdfsFilename(String tenantCode, String filename) { - return String.format("%s/%s", getHdfsResDir(tenantCode), filename); + public static String getHdfsResourceFileName(String tenantCode, String fileName) { + return String.format("%s/%s", getHdfsResDir(tenantCode), fileName); } /** * get absolute path and name for udf file on hdfs * * @param tenantCode tenant code - * @param filename file name + * @param fileName file name * @return get absolute path and name for udf file on hdfs */ - public static String getHdfsUdfFilename(String tenantCode, String filename) { - return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename); + public static String getHdfsUdfFileName(String tenantCode, String fileName) { + return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName); } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index 1e4a179e19795c68a80f974ebc18e20811fe754d..aabd6b4d54b7c8498bbda17a2565c41ac2588f51 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -1549,10 +1549,11 @@ public class ProcessDao { /** * find tenant code by resource name * @param resName resource name + * @param resourceType resource type * @return tenant code */ - public String queryTenantCodeByResName(String resName){ - return resourceMapper.queryTenantCodeByResourceName(resName); + public String queryTenantCodeByResName(String resName,ResourceType resourceType){ + return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal()); } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java index cf65e5d08a24a69686a0b2853853c3b92e7d1139..c40781188f24e7d70ce3ea7028e4166b1bb2124a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java @@ -77,12 +77,20 @@ public interface ResourceMapper extends BaseMapper { List queryResourceExceptUserId(@Param("userId") int userId); + /* *//** + * query tenant code by name + * @param resName resource name + * @return tenant code + *//* + String queryTenantCodeByResourceName(@Param("resName") String resName);*/ + /** * query tenant code by name * @param resName resource name + * @param resType resource type * @return tenant code */ - String queryTenantCodeByResourceName(@Param("resName") String resName); + String queryTenantCodeByResourceName(@Param("resName") String resName,@Param("resType") int resType); /** * list authorized resource diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java index 5a8734233c55248e9e885c3e103952128aea3370..7e7850c5002153523eff6e3c608916a8029fca5f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java @@ -86,4 +86,12 @@ public interface UdfFuncMapper extends BaseMapper { */ List listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds); + /** + * list UDF by resource id + * @param resourceIds resource id array + * @return UDF function list + */ + List listUdfByResourceId(@Param("resourceIds") int[] resourceIds); + + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml index d78ecf2b3b7b1bc43103b35697bd7904cad028b3..b451d6215038536204f55b8e1614ae9cdba41929 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml @@ -70,7 +70,7 @@ + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java index aaf5129c02b25269d8de644d3d6c6911cab32a19..25bcfff7f4d5917bb49f65f469abc791677f0431 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java @@ -288,12 +288,12 @@ public class ResourceMapperTest { resource.setUserId(user.getId()); resourceMapper.updateById(resource); - String resource1 = resourceMapper.queryTenantCodeByResourceName( - resource.getAlias() + String tenantCode = resourceMapper.queryTenantCodeByResourceName( + resource.getAlias(),resource.getType().ordinal() ); - Assert.assertEquals(resource1, "ut tenant code for resource"); + Assert.assertEquals(tenantCode, "ut tenant code for resource"); resourceMapper.deleteById(resource.getId()); } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java index ef2bf752365afe9c2681dc14fb7defc9eda4452c..c4d29270a091afc322ba1da1fb65c9ea46fc77a7 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java @@ -20,8 +20,10 @@ package org.apache.dolphinscheduler.dao.mapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.commons.lang3.ArrayUtils; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.UDFUser; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; @@ -55,6 +57,9 @@ public class UdfFuncMapperTest { @Autowired UDFUserMapper udfUserMapper; + @Autowired + ResourceMapper resourceMapper; + /** * insert one udf * @return UdfFunc @@ -91,6 +96,24 @@ public class UdfFuncMapperTest { return udfFunc; } + /** + * insert one udf + * @return + */ + private UdfFunc insertOne(User user,Resource resource){ + UdfFunc udfFunc = new UdfFunc(); + udfFunc.setUserId(user.getId()); + udfFunc.setFuncName("dolphin_udf_func"); + udfFunc.setClassName("org.apache.dolphinscheduler.test.mr"); + udfFunc.setType(UdfType.HIVE); + udfFunc.setResourceId(resource.getId()); + udfFunc.setResourceName(resource.getAlias()); + udfFunc.setCreateTime(new Date()); + udfFunc.setUpdateTime(new Date()); + udfFuncMapper.insert(udfFunc); + return udfFunc; + } + /** * insert one user * @return User @@ -141,6 +164,20 @@ public class UdfFuncMapperTest { return udfUser; } + /** + * create resource by user + * @param user user + * @return Resource + */ + private Resource createUdfResource(User user,String resourceName){ + Resource resource = new Resource(); + resource.setAlias(resourceName); + resource.setType(ResourceType.UDF); + resource.setUserId(user.getId()); + resourceMapper.insert(resource); + return resource; + } + /** * create general user * @return User @@ -319,4 +356,25 @@ public class UdfFuncMapperTest { authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), ArrayUtils.toObject(udfFuncIds)); Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(ArrayUtils.toObject(udfFuncIds)))); } + + @Test + public void testListUdfByResourceId(){ + //create general user + User generalUser = createGeneralUser("user1"); + //create udf resource + Resource udfResource1 = createUdfResource(generalUser,"udf-resource-1"); + //create udf function + UdfFunc udfFunc1 = insertOne(generalUser,udfResource1); + + List udfFuncList = udfFuncMapper.listUdfByResourceId(new int[]{udfResource1.getId()}); + Assert.assertTrue(udfFuncList.size() == 1); + + //create udf resource + Resource udfResource2 = createUdfResource(generalUser,"udf-resource-2"); + //create udf function + UdfFunc udfFunc2 = insertOne(generalUser,udfResource2); + udfFuncList = udfFuncMapper.listUdfByResourceId(new int[]{udfResource1.getId(),udfResource2.getId()}); + Assert.assertTrue(udfFuncList.size() == 2); + } + } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 937e35454da6b954ee5f3add9ff8f794cdd862c5..c24ff185c79270166fafce02f978e930f4c3b474 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -23,10 +23,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty; @@ -42,29 +40,18 @@ public class UDFUtils { /** * create function list - * @param udfFuncs udf functions - * @param tenantCode tenant code + * @param udfTenantCodeMap key is tenant code,value is udf function * @param logger logger * @return create function list */ - public static List createFuncs(List udfFuncs, String tenantCode,Logger logger){ - // get hive udf jar path - String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode); - logger.info("hive udf jar path : {}" , hiveUdfJarPath); - - // is the root directory of udf defined - if (StringUtils.isEmpty(hiveUdfJarPath)) { - logger.error("not define hive udf jar path"); - throw new RuntimeException("hive udf jar base path not defined "); - } - Set resources = getFuncResouces(udfFuncs); + public static List createFuncs(Map udfTenantCodeMap, Logger logger){ List funcList = new ArrayList<>(); // build jar sql - buildJarSql(funcList, resources, hiveUdfJarPath); + buildJarSql(funcList, udfTenantCodeMap); // build temp function sql - buildTempFuncSql(funcList, udfFuncs); + buildTempFuncSql(funcList, udfTenantCodeMap.values().stream().collect(Collectors.toList())); return funcList; } @@ -72,18 +59,20 @@ public class UDFUtils { /** * build jar sql * @param sqls sql list - * @param resources resource set - * @param uploadPath upload path + * @param udfTenantCodeMap key is tenant code,value is udf function */ - private static void buildJarSql(List sqls, Set resources, String uploadPath) { + private static void buildJarSql(List sqls, Map udfTenantCodeMap) { String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); - if (!uploadPath.startsWith("hdfs:")) { - uploadPath = defaultFS + uploadPath; - } - for (String resource : resources) { - sqls.add(String.format("add jar %s/%s", uploadPath, resource)); + Set> entries = udfTenantCodeMap.entrySet(); + for (Map.Entry entry:udfTenantCodeMap.entrySet()){ + String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getKey()); + if (!uploadPath.startsWith("hdfs:")) { + uploadPath = defaultFS + uploadPath; + } + sqls.add(String.format("add jar %s/%s", uploadPath, entry.getValue().getResourceName())); } + } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 5f66c3477d0995ae3d9659b7aae96739ab9aaad0..4b3a0273f8dc149de35b16d79114ff48064be858 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -23,6 +23,7 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; @@ -311,8 +312,8 @@ public class TaskScheduleThread implements Runnable { if (!resFile.exists()) { try { // query the tenant code of the resource according to the name of the resource - String tentnCode = processDao.queryTenantCodeByResName(res); - String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res); + String tentnCode = processDao.queryTenantCodeByResName(res, ResourceType.FILE); + String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tentnCode, res); logger.info("get resource file from hdfs :{}", resHdfsPath); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + res, false, true); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 62c1dd5f79b6f9dcc9503c444611d8dba5ba66a4..9702e66b0011f7c5549e818e60b087bd074540da 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -24,10 +24,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.EnumUtils; import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.AuthorizationType; -import org.apache.dolphinscheduler.common.enums.ShowType; -import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; -import org.apache.dolphinscheduler.common.enums.UdfType; +import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.job.db.BaseDataSource; import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; import org.apache.dolphinscheduler.common.process.Property; @@ -176,7 +173,14 @@ public class SqlTask extends AbstractTask { // check udf permission checkUdfPermission(ArrayUtils.toObject(idsArray)); List udfFuncList = processDao.queryUdfFunListByids(idsArray); - createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger); + Map udfFuncMap = new HashMap(); + for(UdfFunc udfFunc : udfFuncList) { + String tenantCode = processDao.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF); + udfFuncMap.put(tenantCode,udfFunc); + } + + + createFuncs = UDFUtils.createFuncs(udfFuncMap, logger); } // execute sql task