UdfFuncService.java 11.0 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.api.service;
L
ligang 已提交
18

Q
qiaozhanwei 已提交
19 20 21
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
22
import org.apache.dolphinscheduler.common.Constants;
Q
qiaozhanwei 已提交
23 24
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
T
Tboy 已提交
25
import org.apache.dolphinscheduler.common.utils.StringUtils;
Q
qiaozhanwei 已提交
26 27 28 29 30 31
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
B
bao liang 已提交
32 33
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
L
ligang 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * udf function service
 */
@Service
public class UdfFuncService extends BaseService{

    private static final Logger logger = LoggerFactory.getLogger(UdfFuncService.class);

    @Autowired
    private ResourceMapper resourceMapper;

    @Autowired
    private UdfFuncMapper udfFuncMapper;

    @Autowired
    private UDFUserMapper udfUserMapper;


    /**
     * create udf function
     *
B
bao liang 已提交
66 67 68 69 70 71 72 73 74
     * @param loginUser login user
     * @param type udf type
     * @param funcName function name
     * @param argTypes argument types
     * @param database database
     * @param desc description
     * @param resourceId resource id
     * @param className class name
     * @return create result code
L
ligang 已提交
75 76 77 78 79 80 81 82 83 84 85
     */
    public Result createUdfFunction(User loginUser,
                                    String funcName,
                                    String className,
                                    String argTypes,
                                    String database,
                                    String desc,
                                    UdfType type,
                                    int resourceId) {
        Result result = new Result();

journey2018's avatar
journey2018 已提交
86 87 88
        // if resource upload startup
        if (!PropertyUtils.getResUploadStartupState()){
            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
L
ligang 已提交
89 90 91 92 93
            putMsg(result, Status.HDFS_NOT_STARTUP);
            return result;
        }

        // verify udf func name exist
B
bao liang 已提交
94
        if (checkUdfFuncNameExists(funcName)) {
L
ligang 已提交
95 96 97 98 99
            logger.error("udf func {} has exist, can't recreate", funcName);
            putMsg(result, Status.UDF_FUNCTION_EXISTS);
            return result;
        }

B
bao liang 已提交
100
        Resource resource = resourceMapper.selectById(resourceId);
L
ligang 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
        if (resource == null) {
            logger.error("resourceId {} is not exist", resourceId);
            putMsg(result, Status.RESOURCE_NOT_EXIST);
            return result;
        }

        //save data
        UdfFunc udf = new UdfFunc();
        Date now = new Date();
        udf.setUserId(loginUser.getId());
        udf.setFuncName(funcName);
        udf.setClassName(className);
        if (StringUtils.isNotEmpty(argTypes)) {
            udf.setArgTypes(argTypes);
        }
116
        if (StringUtils.isNotEmpty(database)) {
L
ligang 已提交
117 118
            udf.setDatabase(database);
        }
119
        udf.setDescription(desc);
L
ligang 已提交
120
        udf.setResourceId(resourceId);
121
        udf.setResourceName(resource.getFullName());
L
ligang 已提交
122 123 124 125 126 127 128 129 130 131
        udf.setType(type);

        udf.setCreateTime(now);
        udf.setUpdateTime(now);

        udfFuncMapper.insert(udf);
        putMsg(result, Status.SUCCESS);
        return result;
    }

B
bao liang 已提交
132 133
    /**
     *
B
bao liang 已提交
134 135
     * @param name name
     * @return check result code
B
bao liang 已提交
136 137 138
     */
    private boolean checkUdfFuncNameExists(String name){
        List<UdfFunc> resource = udfFuncMapper.queryUdfByIdStr(null, name);
139
        return resource != null && resource.size() > 0;
B
bao liang 已提交
140 141
    }

L
ligang 已提交
142 143 144

    /**
     * query udf function
B
bao liang 已提交
145 146 147
     *
     * @param id  udf function id
     * @return udf function detail
L
ligang 已提交
148 149 150
     */
    public Map<String, Object> queryUdfFuncDetail(int id) {

_和's avatar
_和 已提交
151
        Map<String, Object> result = new HashMap<>();
B
bao liang 已提交
152
        UdfFunc udfFunc = udfFuncMapper.selectById(id);
L
ligang 已提交
153 154 155 156 157 158 159 160 161 162 163 164
        if (udfFunc == null) {
            putMsg(result, Status.RESOURCE_NOT_EXIST);
            return result;
        }
        result.put(Constants.DATA_LIST, udfFunc);
        putMsg(result, Status.SUCCESS);
        return result;
    }

    /**
     * updateProcessInstance udf function
     *
B
bao liang 已提交
165 166 167 168 169 170 171 172 173
     * @param udfFuncId udf function id
     * @param type  resource type
     * @param funcName function name
     * @param argTypes argument types
     * @param database data base
     * @param desc description
     * @param resourceId resource id
     * @param className class name
     * @return update result code
L
ligang 已提交
174 175 176 177 178 179 180 181 182 183 184
     */
    public Map<String, Object> updateUdfFunc(int udfFuncId,
                                             String funcName,
                                             String className,
                                             String argTypes,
                                             String database,
                                             String desc,
                                             UdfType type,
                                             int resourceId) {
        Map<String, Object> result = new HashMap<>();
        // verify udfFunc is exist
B
bao liang 已提交
185
        UdfFunc udf = udfFuncMapper.selectUdfById(udfFuncId);
L
ligang 已提交
186

一子三木's avatar
一子三木 已提交
187 188 189 190 191 192
        if (udf == null) {
            result.put(Constants.STATUS, Status.UDF_FUNCTION_NOT_EXIST);
            result.put(Constants.MSG, Status.UDF_FUNCTION_NOT_EXIST.getMsg());
            return result;
        }

journey2018's avatar
journey2018 已提交
193 194 195
        // if resource upload startup
        if (!PropertyUtils.getResUploadStartupState()){
            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
L
ligang 已提交
196 197 198 199 200 201
            putMsg(result, Status.HDFS_NOT_STARTUP);
            return result;
        }

        // verify udfFuncName is exist
        if (!funcName.equals(udf.getFuncName())) {
B
bao liang 已提交
202
            if (checkUdfFuncNameExists(funcName)) {
L
ligang 已提交
203 204 205 206 207 208 209
                logger.error("UdfFunc {} has exist, can't create again.", funcName);
                result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS);
                result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg());
                return result;
            }
        }

B
bao liang 已提交
210
        Resource resource = resourceMapper.selectById(resourceId);
L
ligang 已提交
211 212 213 214 215 216 217 218 219
        if (resource == null) {
            logger.error("resourceId {} is not exist", resourceId);
            result.put(Constants.STATUS, Status.RESOURCE_NOT_EXIST);
            result.put(Constants.MSG, Status.RESOURCE_NOT_EXIST.getMsg());
            return result;
        }
        Date now = new Date();
        udf.setFuncName(funcName);
        udf.setClassName(className);
220
        udf.setArgTypes(argTypes);
B
bao liang 已提交
221 222 223
        if (StringUtils.isNotEmpty(database)) {
            udf.setDatabase(database);
        }
224
        udf.setDescription(desc);
L
ligang 已提交
225
        udf.setResourceId(resourceId);
226
        udf.setResourceName(resource.getFullName());
L
ligang 已提交
227 228 229 230
        udf.setType(type);

        udf.setUpdateTime(now);

B
bao liang 已提交
231
        udfFuncMapper.updateById(udf);
L
ligang 已提交
232 233 234 235 236 237 238 239
        putMsg(result, Status.SUCCESS);
        return result;
    }


    /**
     * query udf function list paging
     *
B
bao liang 已提交
240 241 242 243 244
     * @param loginUser login user
     * @param pageNo page number
     * @param pageSize page size
     * @param searchVal search value
     * @return udf function list page
L
ligang 已提交
245 246
     */
    public Map<String, Object> queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
_和's avatar
_和 已提交
247
        Map<String, Object> result = new HashMap<>();
L
ligang 已提交
248 249 250


        PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
B
bao liang 已提交
251 252 253
        IPage<UdfFunc> udfFuncList = getUdfFuncsPage(loginUser, searchVal, pageSize, pageNo);
        pageInfo.setTotalCount((int)udfFuncList.getTotal());
        pageInfo.setLists(udfFuncList.getRecords());
L
ligang 已提交
254 255 256 257 258 259 260 261
        result.put(Constants.DATA_LIST, pageInfo);
        putMsg(result, Status.SUCCESS);
        return result;
    }

    /**
     * get udf functions
     *
B
bao liang 已提交
262 263 264 265 266
     * @param loginUser login user
     * @param searchVal search value
     * @param pageSize page size
     * @param pageNo page number
     * @return udf function list page
L
ligang 已提交
267
     */
B
bao liang 已提交
268
    private IPage<UdfFunc> getUdfFuncsPage(User loginUser, String searchVal, Integer pageSize, int pageNo) {
L
ligang 已提交
269

B
bao liang 已提交
270
        int userId = loginUser.getId();
L
ligang 已提交
271
        if (isAdmin(loginUser)) {
B
bao liang 已提交
272
            userId = 0;
L
ligang 已提交
273
        }
B
bao liang 已提交
274 275
        Page<UdfFunc> page = new Page(pageNo, pageSize);
        return udfFuncMapper.queryUdfFuncPaging(page, userId, searchVal);
L
ligang 已提交
276 277 278 279 280
    }

    /**
     * query data resource by type
     *
B
bao liang 已提交
281 282 283
     * @param loginUser login user
     * @param type  resource type
     * @return resource list
L
ligang 已提交
284 285
     */
    public Map<String, Object> queryResourceList(User loginUser, Integer type) {
_和's avatar
_和 已提交
286
        Map<String, Object> result = new HashMap<>();
L
ligang 已提交
287 288 289 290 291 292 293 294 295 296
        List<UdfFunc> udfFuncList = udfFuncMapper.getUdfFuncByType(loginUser.getId(), type);

        result.put(Constants.DATA_LIST, udfFuncList);
        putMsg(result, Status.SUCCESS);
        return result;
    }

    /**
     * delete udf function
     *
B
bao liang 已提交
297 298
     * @param id udf function id
     * @return delete result code
L
ligang 已提交
299
     */
300
    @Transactional(rollbackFor = RuntimeException.class)
L
ligang 已提交
301 302
    public Result delete(int id) {
        Result result = new Result();
一子三木's avatar
一子三木 已提交
303
        
B
bao liang 已提交
304
        udfFuncMapper.deleteById(id);
L
ligang 已提交
305 306 307 308 309 310 311 312
        udfUserMapper.deleteByUdfFuncId(id);
        putMsg(result, Status.SUCCESS);
        return result;
    }

    /**
     * verify udf function by name
     *
B
bao liang 已提交
313 314
     * @param name name
     * @return true if the name can user, otherwise return false
L
ligang 已提交
315 316 317
     */
    public Result verifyUdfFuncByName(String name) {
        Result result = new Result();
B
bao liang 已提交
318
        if (checkUdfFuncNameExists(name)) {
L
ligang 已提交
319 320 321 322 323 324 325 326 327 328
            logger.error("UDF function name:{} has exist, can't create again.", name);
            putMsg(result, Status.UDF_FUNCTION_EXISTS);
        } else {
            putMsg(result, Status.SUCCESS);
        }

        return result;
    }

}