提交 b81872ca 编写于 作者: L lenboo

merge from dev-1.3.0

上级 73cae7f0
......@@ -522,7 +522,6 @@ public class DataSourceService extends BaseService{
separator = ";";
}
Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
parameterMap.put(TYPE, connectType);
parameterMap.put(Constants.ADDRESS, address);
parameterMap.put(Constants.DATABASE, database);
......
......@@ -16,13 +16,11 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.slf4j.Logger;
......@@ -282,12 +280,16 @@ public abstract class UpgradeDao extends AbstractBaseDao {
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
JSONObject jsonObject = JSONObject.parseObject(entry.getValue());
JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks"));
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0 ;i < tasks.size() ; i++){
JSONObject task = tasks.getJSONObject(i);
Integer workerGroupId = task.getInteger("workerGroupId");
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
Integer workerGroupId = -1;
if(workerGroupNode != null && workerGroupNode.canConvertToInt()){
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
}else {
......@@ -295,11 +297,11 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
}
jsonObject.remove(jsonObject.getString("tasks"));
jsonObject.remove("task");
jsonObject.put("tasks",tasks);
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString());
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
......
......@@ -25,7 +25,7 @@ public class OracleDataSourceTest {
@Test
public void testGetOracleJdbcUrl() {
OracleDataSource oracleDataSource = new OracleDataSource();
oracleDataSource.setType(DbConnectType.ORACLE_SERVICE_NAME);
oracleDataSource.setConnectType(DbConnectType.ORACLE_SERVICE_NAME);
oracleDataSource.setAddress("jdbc:oracle:thin:@//127.0.0.1:1521");
oracleDataSource.setDatabase("test");
oracleDataSource.setPassword("123456");
......@@ -43,7 +43,7 @@ public class OracleDataSourceTest {
oracleDataSource2.setDatabase("orcl");
oracleDataSource2.setPassword("123456");
oracleDataSource2.setUser("test");
oracleDataSource2.setType(DbConnectType.ORACLE_SID);
oracleDataSource2.setConnectType(DbConnectType.ORACLE_SID);
Assert.assertEquals("jdbc:oracle:thin:@127.0.0.1:1521:orcl", oracleDataSource2.getJdbcUrl());
//set fake principal
oracleDataSource2.setPrincipal("fake principal");
......@@ -58,7 +58,7 @@ public class OracleDataSourceTest {
OracleDataSource oracleDataSource = new OracleDataSource();
oracleDataSource.setAddress("jdbc:oracle:thin:@//127.0.0.1:1521");
oracleDataSource.setDatabase("test");
oracleDataSource.setType(DbConnectType.ORACLE_SERVICE_NAME);
oracleDataSource.setConnectType(DbConnectType.ORACLE_SERVICE_NAME);
StringBuilder jdbcUrl = new StringBuilder(oracleDataSource.getAddress());
oracleDataSource.appendDatabase(jdbcUrl);
Assert.assertEquals("jdbc:oracle:thin:@//127.0.0.1:1521/test", jdbcUrl.toString());
......@@ -66,7 +66,7 @@ public class OracleDataSourceTest {
OracleDataSource oracleDataSource2 = new OracleDataSource();
oracleDataSource2.setAddress("jdbc:oracle:thin:@127.0.0.1:1521");
oracleDataSource2.setDatabase("orcl");
oracleDataSource2.setType(DbConnectType.ORACLE_SID);
oracleDataSource2.setConnectType(DbConnectType.ORACLE_SID);
StringBuilder jdbcUrl2 = new StringBuilder(oracleDataSource2.getAddress());
oracleDataSource2.appendDatabase(jdbcUrl2);
Assert.assertEquals("jdbc:oracle:thin:@127.0.0.1:1521:orcl", jdbcUrl2.toString());
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
......@@ -56,7 +56,7 @@ public class RemoveTaskLogRequestCommand implements Serializable {
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
......@@ -28,7 +28,7 @@ import java.io.Serializable;
*/
public class RemoveTaskLogResponseCommand implements Serializable {
/**
/*TaskPriorityQueueConsumer.*
* log path
*/
private Boolean status;
......@@ -56,7 +56,7 @@ public class RemoveTaskLogResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
......@@ -33,7 +32,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
......
......@@ -141,7 +141,7 @@ public class DataxTask extends AbstractTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// run datax process
// run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
......@@ -176,7 +176,7 @@ public class DataxTask extends AbstractTask {
* @return datax json file name
* @throws Exception if error throws Exception
*/
private String buildDataxJsonFile()
private String buildDataxJsonFile(Map<String, Property> paramsMap)
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json",
......
......@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.*;
......@@ -162,7 +163,7 @@ public class LogClientService {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){
RemoveTaskLogResponseCommand taskLogResponse = FastJsonSerializer.deserialize(
RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize(
response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册