未验证 提交 88a07f7b 编写于 作者: Z zhuangchong 提交者: GitHub

[Feature-4093][server] Support for stored procedures and stored function calls...

[Feature-4093][server] Support for stored procedures and stored function calls and data source supports DB2 (#4094)

* Support for stored procedures and stored function calls and data source supports DB2.
Co-authored-by: Nzhuangchong <zhuangchong8@163.com>
上级 2a59ed09
......@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask;
import org.apache.dolphinscheduler.server.worker.task.processdure.ProcedureTask;
import org.apache.dolphinscheduler.server.worker.task.procedure.ProcedureTask;
import org.apache.dolphinscheduler.server.worker.task.python.PythonTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.processdure;
package org.apache.dolphinscheduler.server.worker.task.procedure;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.ConnectionParam;
......@@ -77,7 +77,6 @@ public class ProcedureTask extends AbstractTask {
this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
// check parameters
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
......@@ -108,8 +107,6 @@ public class ProcedureTask extends AbstractTask {
// get jdbc connection
connection = DatasourceUtil.getConnection(dbType, connectionParam);
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
......@@ -117,30 +114,18 @@ public class ProcedureTask extends AbstractTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
Collection<Property> userDefParamsList = null;
if (procedureParameters.getLocalParametersMap() != null) {
userDefParamsList = procedureParameters.getLocalParametersMap().values();
}
String method = getCallMethod(userDefParamsList);
logger.info("call method : {}",method);
// call method
stmt = connection.prepareCall(method);
stmt = connection.prepareCall(procedureParameters.getMethod());
// set timeout
setTimeout(stmt);
// outParameterMap
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap, userDefParamsList);
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap);
stmt.executeUpdate();
/**
* print the output parameters to the log
*/
// print the output parameters to the log
printOutParameter(stmt, outParameterMap);
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
......@@ -149,37 +134,15 @@ public class ProcedureTask extends AbstractTask {
logger.error("procedure task error", e);
throw e;
} finally {
close(stmt, connection);
close(stmt,connection);
}
}
/**
* get call method
* @param userDefParamsList userDefParamsList
* @return method
*/
private String getCallMethod(Collection<Property> userDefParamsList) {
String method;// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)) {
method = "{call " + procedureParameters.getMethod() + "}";
} else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0; i < size - 1; i++) {
parameter.append("?,");
}
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString() + "}";
}
return method;
}
/**
* print outParameter
* @param stmt CallableStatement
* @param outParameterMap outParameterMap
* @throws SQLException
* @throws SQLException SQLException
*/
private void printOutParameter(CallableStatement stmt,
Map<Integer, Property> outParameterMap) throws SQLException {
......@@ -201,80 +164,85 @@ public class ProcedureTask extends AbstractTask {
*
* @param stmt CallableStatement
* @param paramsMap paramsMap
* @param userDefParamsList userDefParamsList
* @return outParameterMap
* @throws Exception
* @throws Exception Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
Map<String, Property> paramsMap,
Collection<Property> userDefParamsList) throws Exception {
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<String, Property> paramsMap) throws Exception {
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0) {
int index = 1;
for (Property property : userDefParamsList) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
, property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property);
}
index++;
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
}
Collection<Property> userDefParamsList = procedureParameters.getLocalParametersMap().values();
if (CollectionUtils.isEmpty(userDefParamsList)) {
return outParameterMap;
}
int index = 1;
for (Property property : userDefParamsList) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
}
index++;
}
return outParameterMap;
}
/**
* set timtou
* @param stmt CallableStatement
* @throws SQLException
* @throws SQLException SQLException
*/
private void setTimeout(CallableStatement stmt) throws SQLException {
Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED;
Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
if (failed || warnfailed) {
Boolean warnFailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
if (failed || warnFailed) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
}
/**
* close jdbc resource
*
* @param stmt
* @param connection
*/
private void close(PreparedStatement stmt,
Connection connection) {
* close jdbc resource
*
* @param stmt stmt
* @param connection connection
*/
private void close(PreparedStatement stmt, Connection connection) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}",e.getMessage(),e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}",e.getMessage(),e);
}
}
}
/**
* get output parameter
* @param stmt
* @param index
* @param prop
* @param dataType
* @throws SQLException
* @param stmt stmt
* @param index index
* @param prop prop
* @param dataType dataType
* @throws SQLException SQLException
*/
private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
switch (dataType) {
......@@ -324,67 +292,43 @@ public class ProcedureTask extends AbstractTask {
* @param value value
* @throws Exception exception
*/
private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception {
if (dataType.equals(DataType.VARCHAR)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.VARCHAR);
} else {
stmt.registerOutParameter(index, Types.VARCHAR, value);
}
} else if (dataType.equals(DataType.INTEGER)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.INTEGER);
} else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}
} else if (dataType.equals(DataType.LONG)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.INTEGER);
} else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}
} else if (dataType.equals(DataType.FLOAT)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.FLOAT);
} else {
stmt.registerOutParameter(index, Types.FLOAT, value);
}
} else if (dataType.equals(DataType.DOUBLE)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.DOUBLE);
} else {
stmt.registerOutParameter(index, Types.DOUBLE, value);
}
} else if (dataType.equals(DataType.DATE)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.DATE);
} else {
stmt.registerOutParameter(index, Types.DATE, value);
}
} else if (dataType.equals(DataType.TIME)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.TIME);
} else {
stmt.registerOutParameter(index, Types.TIME, value);
}
} else if (dataType.equals(DataType.TIMESTAMP)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.TIMESTAMP);
} else {
stmt.registerOutParameter(index, Types.TIMESTAMP, value);
}
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception {
int sqlType;
switch (dataType) {
case VARCHAR:
sqlType = Types.VARCHAR;
break;
case INTEGER:
case LONG:
sqlType = Types.INTEGER;
break;
case FLOAT:
sqlType = Types.FLOAT;
break;
case DOUBLE:
sqlType = Types.DOUBLE;
break;
case DATE:
sqlType = Types.DATE;
break;
case TIME:
sqlType = Types.TIME;
break;
case TIMESTAMP:
sqlType = Types.TIMESTAMP;
break;
case BOOLEAN:
sqlType = Types.BOOLEAN;
break;
default:
throw new IllegalStateException("Unexpected value: " + dataType);
}
} else if (dataType.equals(DataType.BOOLEAN)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.BOOLEAN);
} else {
stmt.registerOutParameter(index, Types.BOOLEAN, value);
}
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, sqlType);
} else {
stmt.registerOutParameter(index, sqlType, value);
}
}
}
\ No newline at end of file
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.procedure;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ProcedureTask.class,DriverManager.class})
public class ProcedureTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ProcedureTaskTest.class);
private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
+ "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
private ProcedureTask procedureTask;
private ProcessService processService;
private ApplicationContext applicationContext;
private TaskExecutionContext taskExecutionContext;
@Before
public void before() throws Exception {
taskExecutionContext = new TaskExecutionContext();
processService = PowerMockito.mock(ProcessService.class);
applicationContext = PowerMockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstanceId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams(
"{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"method\":\"add\"}");
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
PowerMockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
PowerMockito.when(taskExecutionContext.getTaskAppId()).thenReturn("1");
PowerMockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
procedureTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
PowerMockito.when(taskExecutionContext.getProcedureTaskExecutionContext()).thenReturn(procedureTaskExecutionContext);
procedureTask = new ProcedureTask(taskExecutionContext, logger);
procedureTask.init();
}
@Test
public void testGetParameters() {
Assert.assertNotNull(procedureTask.getParameters());
}
@Test
public void testHandle() throws SQLException {
Connection connection = PowerMockito.mock(Connection.class);
PowerMockito.mockStatic(DriverManager.class);
PowerMockito.when(DriverManager.getConnection(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(connection);
CallableStatement callableStatement = PowerMockito.mock(CallableStatement.class);
PowerMockito.when(connection.prepareCall(Mockito.any())).thenReturn(callableStatement);
try {
procedureTask.handle();
Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,procedureTask.getExitStatusCode());
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}
......@@ -22,20 +22,23 @@
<m-datasource
ref="refDs"
@on-dsData="_onDsData"
:supportType="['MYSQL','POSTGRESQL','CLICKHOUSE', 'ORACLE', 'SQLSERVER']"
:data="{ type:type,datasource:datasource }">
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('methods')}}</div>
<div slot="text">
<el-tooltip :content="$t('The procedure method script example')" placement="top">
<span>{{$t('SQL Statement')}}<em class="el-icon-question" /></span>
</el-tooltip>
</div>
<div slot="content">
<el-input
type="input"
size="small"
:disabled="isDetails"
v-model="method"
:placeholder="$t('Please enter method(optional)')">
:autosize="{minRows:5}"
type="textarea"
:disabled="isDetails"
v-model="method"
:placeholder="$t('Please enter the procedure method')">
</el-input>
</div>
</m-list-box>
......@@ -103,7 +106,7 @@
// Verification function
if (!this.method) {
this.$message.warning(`${i18n.$t('Please enter method')}`)
this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`)
return false
}
......@@ -111,6 +114,7 @@
if (!this.$refs.refLocalParams._verifProp()) {
return false
}
// storage
this.$emit('on-params', {
type: this.type,
......
......@@ -99,7 +99,8 @@ export default {
'Custom template': 'Custom template',
Datasource: 'Datasource',
methods: 'methods',
'Please enter method(optional)': 'Please enter method(optional)',
'Please enter the procedure method': 'Please enter the procedure script \n\ncall procedure:{call <procedure-name>[(<arg1>,<arg2>, ...)]}\n\ncall function:{?= call <procedure-name>[(<arg1>,<arg2>, ...)]} ',
'The procedure method script example': 'example:{call <procedure-name>[(?,?, ...)]} or {?= call <procedure-name>[(?,?, ...)]}',
Script: 'Script',
'Please enter script(required)': 'Please enter script(required)',
'Deploy Mode': 'Deploy Mode',
......@@ -439,7 +440,6 @@ export default {
'Process instance details': 'Process instance details',
'Create Resource': 'Create Resource',
'User Center': 'User Center',
'Please enter method': 'Please enter method',
None: 'None',
Name: 'Name',
'Process priority': 'Process priority',
......
......@@ -99,7 +99,8 @@ export default {
'Custom template': '自定义模版',
Datasource: '数据源',
methods: '方法',
'Please enter method(optional)': '请输入方法(选填)',
'Please enter the procedure method': '请输入存储脚本 \n\n调用存储过程:{call <procedure-name>[(<arg1>,<arg2>, ...)]}\n\n调用存储函数:{?= call <procedure-name>[(<arg1>,<arg2>, ...)]} ',
'The procedure method script example': '示例:{call <procedure-name>[(?,?, ...)]} 或 {?= call <procedure-name>[(?,?, ...)]}',
Script: '脚本',
'Please enter script(required)': '请输入脚本(必填)',
'Deploy Mode': '部署方式',
......@@ -439,7 +440,6 @@ export default {
'Process instance details': '流程实例详情',
'Create Resource': '创建资源',
'User Center': '用户中心',
'Please enter method': '请输入方法',
None: '',
Name: '名称',
'Process priority': '流程优先级',
......
......@@ -969,6 +969,7 @@
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/processdure/ProcedureTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册