提交 65841a57 编写于 作者: S simon824

1. change windows new line 'CR' to linux new line 'LF'

2. solve conflict
上级 0b112758
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; /** * kill task request command */public class TaskKillRequestCommand implements Serializable { /** * task id */ private int taskInstanceId; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskInstanceId=" + taskInstanceId + '}'; }}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public class TaskKillRequestCommand implements Serializable {
/**
* task id
*/
private int taskInstanceId;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillRequestCommand{" +
"taskInstanceId=" + taskInstanceId +
'}';
}
}
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable;import java.util.List; /** * kill task response command */public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; }}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
import java.util.List;
/**
* kill task response command
*/
public class TaskKillResponseCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* host
*/
private String host;
/**
* status
*/
private int status;
/**
* processId
*/
private int processId;
/**
* other resource manager appId , for example : YARN etc
*/
protected List<String> appIds;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public List<String> getAppIds() {
return appIds;
}
public void setAppIds(List<String> appIds) {
this.appIds = appIds;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_KILL_RESPONSE);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", host='" + host + '\'' +
", status=" + status +
", processId=" + processId +
", appIds=" + appIds +
'}';
}
}
......@@ -16,8 +16,8 @@
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
......@@ -86,7 +86,7 @@ public class SqoopTaskTest {
//import mysql to HDFS with hadoo
String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," +
"\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters mysqlToHdfsParams = JSON.parseObject(mysqlToHdfs,SqoopParameters.class);
SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs,SqoopParameters.class);
SqoopJobGenerator generator = new SqoopJobGenerator();
String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams,mysqlTaskExecutionContext);
String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
......@@ -96,28 +96,28 @@ public class SqoopTaskTest {
String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," +
"\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," +
"\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters hdfsToMysqlParams = JSON.parseObject(hdfsToMysql,SqoopParameters.class);
SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql,SqoopParameters.class);
String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams,mysqlTaskExecutionContext);
String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript);
//export hive to mysql
String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters hiveToMysqlParams = JSON.parseObject(hiveToMysql,SqoopParameters.class);
SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql,SqoopParameters.class);
String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams,mysqlTaskExecutionContext);
String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript);
//import mysql to hive
String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters mysqlToHiveParams = JSON.parseObject(mysqlToHive,SqoopParameters.class);
SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive,SqoopParameters.class);
String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams,mysqlTaskExecutionContext);
String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);
//sqoop CUSTOM job
String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}";
SqoopParameters sqoopCustomParams = JSON.parseObject(sqoopCustomString, SqoopParameters.class);
SqoopParameters sqoopCustomParams = JSONUtils.parseObject(sqoopCustomString, SqoopParameters.class);
String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext());
String sqoopCustomExpected = "sqoop import";
Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript);
......
......@@ -815,6 +815,7 @@
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册