diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java index 97afb4f6d9abdd13f02e4cca993c2ea82f07b2c0..210db5c4c41fda097ce313ef304e62aecb668f16 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.entity; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import java.io.Serializable; -import java.util.List; +import java.util.Map; /** * SQL Task ExecutionContext @@ -38,9 +38,9 @@ public class SQLTaskExecutionContext implements Serializable { */ private String connectionParams; /** - * udf function list + * udf function tenant code map */ - private List udfFuncList; + private Map udfFuncTenantCodeMap; public int getWarningGroupId() { @@ -51,12 +51,12 @@ public class SQLTaskExecutionContext implements Serializable { this.warningGroupId = warningGroupId; } - public List getUdfFuncList() { - return udfFuncList; + public Map getUdfFuncTenantCodeMap() { + return udfFuncTenantCodeMap; } - public void setUdfFuncList(List udfFuncList) { - this.udfFuncList = udfFuncList; + public void setUdfFuncTenantCodeMap(Map udfFuncTenantCodeMap) { + this.udfFuncTenantCodeMap = udfFuncTenantCodeMap; } public String getConnectionParams() { @@ -72,7 +72,7 @@ public class SQLTaskExecutionContext implements Serializable { return "SQLTaskExecutionContext{" + "warningGroupId=" + warningGroupId + ", connectionParams='" + connectionParams + '\'' + - ", udfFuncList=" + udfFuncList + + ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index ee48ca0f8a450b6988cf8e60b662b146bd5ac21f..b1681bc3f35c9f0a417931a3f8daa357d0e5a944 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -324,7 +324,13 @@ public class TaskPriorityQueueConsumer extends Thread{ } List udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray); - sqlTaskExecutionContext.setUdfFuncList(udfFuncList); + Map udfFuncMap = new HashMap<>(); + for(UdfFunc udfFunc : udfFuncList) { + String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF); + udfFuncMap.put(udfFunc,tenantCode); + } + + sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap); } } @@ -366,7 +372,7 @@ public class TaskPriorityQueueConsumer extends Thread{ if (baseParam != null) { List projectResourceFiles = baseParam.getResourceFilesList(); - if (projectResourceFiles != null) { + if (CollectionUtils.isNotEmpty(projectResourceFiles)) { // filter the resources that the resource id equals 0 Set oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 63efb24a3efd3110611f94a70a724c85a9681c0b..3a8c8fe7d67a652edfd0e03db71dea1be6541f72 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.server.utils; +import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; @@ -24,10 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.slf4j.Logger; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty; @@ -43,53 +42,44 @@ public class UDFUtils { /** * create function list - * @param udfFuncs udf functions - * @param tenantCode tenant code - * @param logger logger + * @param udfFuncTenantCodeMap key is udf function,value is tenant code + * @param logger logger * @return create function list */ - public static List createFuncs(List udfFuncs, String tenantCode,Logger logger){ + public static List createFuncs(Map udfFuncTenantCodeMap, Logger logger){ - if (CollectionUtils.isEmpty(udfFuncs)){ + if (MapUtils.isEmpty(udfFuncTenantCodeMap)){ logger.info("can't find udf function resource"); return null; } - // get hive udf jar path - String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode); - logger.info("hive udf jar path : {}" , hiveUdfJarPath); - - // is the root directory of udf defined - if (StringUtils.isEmpty(hiveUdfJarPath)) { - logger.error("not define hive udf jar path"); - throw new RuntimeException("hive udf jar base path not defined "); - } - Set resources = getFuncResouces(udfFuncs); List funcList = new ArrayList<>(); // build jar sql - buildJarSql(funcList, resources, hiveUdfJarPath); + buildJarSql(funcList, udfFuncTenantCodeMap); // build temp function sql - buildTempFuncSql(funcList, udfFuncs); + buildTempFuncSql(funcList, udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList())); return funcList; } /** * build jar sql - * @param sqls sql list - * @param resources resource set - * @param uploadPath upload path + * @param sqls sql list + * @param udfFuncTenantCodeMap key is udf function,value is tenant code */ - private static void buildJarSql(List sqls, Set resources, String uploadPath) { + private static void buildJarSql(List sqls, Map udfFuncTenantCodeMap) { String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); - if (!uploadPath.startsWith("hdfs:")) { - uploadPath = defaultFS + uploadPath; - } - for (String resource : resources) { - sqls.add(String.format("add jar %s/%s", uploadPath, resource)); + Set> entries = udfFuncTenantCodeMap.entrySet(); + for (Map.Entry entry:entries){ + String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue()); + if (!uploadPath.startsWith("hdfs:")) { + uploadPath = defaultFS + uploadPath; + } + sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName())); } + } /** @@ -106,20 +96,5 @@ public class UDFUtils { } } - /** - * get the resource names of all functions - * @param udfFuncs udf function list - * @return - */ - private static Set getFuncResouces(List udfFuncs) { - Set resources = new HashSet<>(); - - for (UdfFunc udfFunc : udfFuncs) { - resources.add(udfFunc.getResourceName()); - } - - return resources; - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index c62364e18a007dea1dfc7b2e4014a402f6db5dd4..a6eb26668ed7e1ef56bab8b84a2cabbb12a9c82a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -132,8 +132,7 @@ public class SqlTask extends AbstractTask { .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); - List createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), - taskExecutionContext.getTenantCode(), + List createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(), logger); // execute sql task