提交 bd7db688 编写于 作者: L lgcareer

Merge remote-tracking branch 'remotes/upstream/1.3.4-prepare' into 1.3.4-prepare-feature-3878

......@@ -41,6 +41,11 @@ public class TargetHiveParameter {
* hive overwrite
*/
private boolean hiveOverWrite;
/**
* hive target dir
*/
private String hiveTargetDir;
/**
* replace delimiter
*/
......@@ -117,4 +122,12 @@ public class TargetHiveParameter {
public void setHivePartitionValue(String hivePartitionValue) {
this.hivePartitionValue = hivePartitionValue;
}
public String getHiveTargetDir() {
return hiveTargetDir;
}
public void setHiveTargetDir(String hiveTargetDir) {
this.hiveTargetDir = hiveTargetDir;
}
}
......@@ -61,29 +61,23 @@ public class ParameterUtils {
* @return convert parameters place holders
*/
public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) {
if (StringUtils.isEmpty(parameterString) || parameterMap == null) {
if (StringUtils.isEmpty(parameterString)) {
return parameterString;
}
//Get current time, schedule execute time
String cronTimeStr = parameterMap.get(Constants.PARAMETER_DATETIME);
Date cronTime;
if (StringUtils.isNotEmpty(cronTimeStr)) {
if (parameterMap != null && !parameterMap.isEmpty()) {
//Get current time, schedule execute time
String cronTimeStr = parameterMap.get(Constants.PARAMETER_DATETIME);
cronTime = DateUtils.parse(cronTimeStr, Constants.PARAMETER_FORMAT_TIME);
// replace variable ${} form,refers to the replacement of system variables and custom variables
parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true);
} else {
cronTime = new Date();
}
// replace variable ${} form,refers to the replacement of system variables and custom variables
parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true);
// replace time $[...] form, eg. $[yyyyMMdd]
if (cronTime != null) {
return dateTemplateParse(parameterString, cronTime);
}
return parameterString;
}
......
......@@ -119,6 +119,8 @@ public class DolphinSchedulerManager {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList();
} else if ("1.3.4".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerSqoopTaskParams();
}
version = schemaVersion;
}
......
......@@ -530,4 +530,50 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
/**
* upgrade DolphinScheduler sqoop task params
* ds-1.3.4 modify the sqoop task params for process definition json
*/
public void upgradeDolphinSchedulerSqoopTaskParams() {
upgradeProcessDefinitionJsonSqoopTaskParams();
}
/**
* upgradeProcessDefinitionJsonSqoopTaskParams
*/
protected void upgradeProcessDefinitionJsonSqoopTaskParams() {
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()) {
JSONObject jsonObject = JSONObject.parseObject(entry.getValue());
JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks"));
for (int i = 0; i < tasks.size(); i++) {
JSONObject task = tasks.getJSONObject(i);
String taskType = task.getString("type");
if ("SQOOP".equals(taskType) && !task.getString("params").contains("jobType")) {
JSONObject taskParams = JSONObject.parseObject(task.getString("params"));
taskParams.put("jobType","TEMPLATE");
taskParams.put("jobName","sqoop-job");
taskParams.put("hadoopCustomParams", new JSONArray());
taskParams.put("sqoopAdvancedParams", new JSONArray());
task.remove(task.getString("params"));
task.put("params",taskParams);
}
jsonObject.remove(jsonObject.getString("tasks"));
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toJSONString());
}
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json sqoop task params error: {}", e.getMessage());
}
}
}
......@@ -374,11 +374,14 @@ public class DagHelper {
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
Map<String, TaskNode> skipTaskNodeList){
if (!dag.containsNode(skipNodeName)) {
return;
}
skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeName);
for(String post : postNodeList){
for (String post : postNodeList) {
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode, skipTaskNodeList)){
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
}
}
......
......@@ -19,11 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
......@@ -34,7 +30,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
......@@ -53,6 +48,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -118,8 +114,15 @@ public class TaskPriorityQueueConsumer extends Thread{
failedDispatchTasks.add(taskPriorityInfo);
}
}
for(String dispatchFailedTask : failedDispatchTasks){
taskPriorityQueue.put(dispatchFailedTask);
if (!failedDispatchTasks.isEmpty()) {
for (String dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If there are tasks in a cycle that cannot find the worker group,
// sleep for 1 second
if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}catch (Exception e){
logger.error("dispatcher task error",e);
......@@ -134,7 +137,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskInstanceId taskInstanceId
* @return result
*/
private boolean dispatch(int taskInstanceId){
protected boolean dispatch(int taskInstanceId) {
boolean result = false;
try {
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
......@@ -255,8 +258,8 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskNode taskNode
*/
private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
......@@ -371,8 +374,8 @@ public class TaskPriorityQueueConsumer extends Thread{
/**
* get resource map key is full name and value is tenantCode
*/
private Map<String,String> getResourceFullNames(TaskNode taskNode) {
Map<String,String> resourceMap = new HashMap<>();
protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
Map<String, String> resourceMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
......
......@@ -42,7 +42,16 @@ public interface TaskExecutionContextCacheManager {
/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
void removeByTaskInstanceId(Integer taskInstanceId);
/**
* If the value for the specified key is present and non-null,then perform the update,otherwise it will return false
*
* @param taskExecutionContext taskExecutionContext
* @return status
*/
boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext);
}
......@@ -59,10 +59,17 @@ public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContex
/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
@Override
public void removeByTaskInstanceId(Integer taskInstanceId) {
taskExecutionContextCache.remove(taskInstanceId);
}
@Override
public boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
taskExecutionContextCache.computeIfPresent(taskExecutionContext.getTaskInstanceId(), (k, v) -> taskExecutionContext);
return taskExecutionContextCache.containsKey(taskExecutionContext.getTaskInstanceId());
}
}
......@@ -39,9 +39,12 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -62,28 +65,45 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final ExecutorService workerExecService;
/**
* worker config
* worker config
*/
private final WorkerConfig workerConfig;
/**
* task callback service
* task callback service
*/
private final TaskCallbackService taskCallbackService;
public TaskExecuteProcessor(){
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
/**
* Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache
*
* @param taskExecutionContext task
*/
private void setTaskCache(TaskExecutionContext taskExecutionContext) {
TaskExecutionContext preTaskCache = new TaskExecutionContext();
preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
command.getBody(), TaskExecuteRequestCommand.class);
logger.info("received command : {}", taskRequestCommand);
......@@ -99,6 +119,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null");
return;
}
setTaskCache(taskExecutionContext);
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
......@@ -120,6 +141,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
FileUtils.taskLoggerThreadLocal.remove();
......
......@@ -102,15 +102,17 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
List<String> appIds = Collections.EMPTY_LIST;
List<String> appIds = Collections.emptyList();
try {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)){
logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
return Pair.of(false, appIds);
if (processId.equals(0)) {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return Pair.of(true, appIds);
}
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
......
......@@ -47,6 +47,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
/**
......@@ -160,12 +161,18 @@ public abstract class AbstractCommandExecutor {
* @return CommandExecuteResult
* @throws Exception if error throws Exception
*/
public CommandExecuteResult run(String execCommand) throws Exception{
public CommandExecuteResult run(String execCommand) throws Exception {
CommandExecuteResult result = new CommandExecuteResult();
int taskInstanceId = taskExecutionContext.getTaskInstanceId();
// If the task has been killed, then the task in the cache is null
if (null == taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
......@@ -187,7 +194,12 @@ public abstract class AbstractCommandExecutor {
// cache processId
taskExecutionContext.setProcessId(processId);
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
boolean updateTaskExecutionContextStatus = taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
// print process id
logger.info("process start, process id is: {}", processId);
......
......@@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
......@@ -135,21 +134,8 @@ public class ShellTask extends AbstractTask {
shellParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null){
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
// new
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (paramsMap != null) {
if (taskExecutionContext.getScheduleTime() != null) {
String dateTime = DateUtils.format(taskExecutionContext.getScheduleTime(), Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_SHECDULE_TIME);
paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p);
}
script = ParameterUtils.convertParameterPlaceholders2(script, ParamUtils.convert(paramsMap));
}
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
shellParameters.setRawScript(script);
......
......@@ -53,13 +53,14 @@ public class CommonGenerator {
//hadoop custom param
List<Property> hadoopCustomParams = sqoopParameters.getHadoopCustomParams();
if (CollectionUtils.isNotEmpty(hadoopCustomParams)) {
StringBuilder hadoopCustomParamStr = new StringBuilder();
for (Property hadoopCustomParam : hadoopCustomParams) {
String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(),
Constants.EQUAL_SIGN, hadoopCustomParam.getValue());
commonSb.append(Constants.SPACE).append(Constants.D)
.append(Constants.SPACE).append(hadoopCustomParamStr);
hadoopCustomParamStr.append(Constants.D)
.append(Constants.SPACE).append(hadoopCustomParam.getProp())
.append(Constants.EQUAL_SIGN).append(hadoopCustomParam.getValue())
.append(Constants.SPACE);
}
commonSb.append(Constants.SPACE).append(hadoopCustomParamStr.substring(0, hadoopCustomParamStr.length() - 1));
}
//sqoop custom params
......
......@@ -97,7 +97,10 @@ public class MysqlSourceGenerator implements ISourceGenerator {
if (null != mapColumnHive && !mapColumnHive.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
for (Property item : mapColumnHive) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
if (!item.getProp().isEmpty()) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN)
.append(item.getValue()).append(Constants.COMMA);
}
}
if (StringUtils.isNotEmpty(columnMap.toString())) {
......@@ -110,14 +113,17 @@ public class MysqlSourceGenerator implements ISourceGenerator {
List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
if (null != mapColumnJava && !mapColumnJava.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
StringBuilder columnJavaMap = new StringBuilder();
for (Property item : mapColumnJava) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
if (!item.getProp().isEmpty()) {
columnJavaMap.append(item.getProp()).append(Constants.EQUAL_SIGN)
.append(item.getValue()).append(Constants.COMMA);
}
}
if (StringUtils.isNotEmpty(columnMap.toString())) {
if (StringUtils.isNotEmpty(columnJavaMap.toString())) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA)
.append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
.append(Constants.SPACE).append(columnJavaMap.substring(0, columnJavaMap.length() - 1));
}
}
}
......
......@@ -68,6 +68,11 @@ public class HiveTargetGenerator implements ITargetGenerator {
.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
}
if (StringUtils.isNotEmpty(targetHiveParameter.getHiveTargetDir())) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.TARGET_DIR)
.append(Constants.SPACE).append(targetHiveParameter.getHiveTargetDir());
}
if (StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DELIMS_REPLACEMENT)
.append(Constants.SPACE).append(targetHiveParameter.getReplaceDelimiter());
......
......@@ -134,17 +134,22 @@ public class SqoopTaskTest {
//import mysql to hive
String mysqlToHive =
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
+ "\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"},{\"prop\":\"mapreduce.reduce.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"}],"
+ "\"sqoopAdvancedParams\":[{\"prop\":\"--delete-target-dir\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"},{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
+ "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
+ "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
+ "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
+ "\\\"mapColumnHive\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"}],"
+ "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"}]}\","
+ "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
+ "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
+ "\\\"hiveOverWrite\\\":true,\\\"hiveTargetDir\\\":\\\"/tmp/sqoop_import_hive\\\",\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
String mysqlToHiveExpected =
"sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 "
+ "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
"sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --delete-target-dir --direct -m 1 "
+ "--connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-hive create_time=string,update_time=string --map-column-java create_time=java.sql.Date,update_time=java.sql.Date "
+ "--hive-import --hive-database stg --hive-table person_internal_2 "
+ "--create-hive-table --hive-overwrite --delete-target-dir --target-dir /tmp/sqoop_import_hive --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);
//sqoop CUSTOM job
......
......@@ -341,6 +341,17 @@
<x-switch v-model="targetHiveParams.hiveOverWrite"></x-switch>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Hive Target Dir')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHiveParams.hiveTargetDir"
:placeholder="$t('Please enter hive target dir')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('ReplaceDelimiter')}}</div>
<div slot="content">
......@@ -690,9 +701,9 @@
dropDelimiter:false,
hiveOverWrite:true,
replaceDelimiter:"",
hiveTargetDir:"",
hivePartitionKey:"",
hivePartitionValue:""
}
}
},
......@@ -773,21 +784,14 @@
_getTargetTypeList(data){
switch(data){
case 'MYSQL':
if (this.srcQueryType === "1") {
this.targetTypeList = [
{
code: "HDFS"
}]
} else {
this.targetTypeList = [
{
code: "HIVE"
},
{
code: "HDFS"
}
]
}
this.targetTypeList = [
{
code: "HIVE"
},
{
code: "HDFS"
}
]
break;
case 'HDFS':
this.targetTypeList = [
......
......@@ -556,6 +556,7 @@ export default {
'Please enter Target Dir(required)': 'Please enter Target Dir(required)',
'Please enter Export Dir(required)': 'Please enter Export Dir(required)',
'Please enter Hive Database(required)': 'Please enter Hive Databasec(required)',
'Please enter hive target dir': 'Please enter hive target dir',
'Please enter Hive Table(required)': 'Please enter Hive Table(required)',
'Please enter Hive Partition Keys': 'Please enter Hive Partition Key',
'Please enter Hive Partition Values': 'Please enter Partition Value',
......@@ -589,6 +590,7 @@ export default {
CreateHiveTable: 'CreateHiveTable',
DropDelimiter: 'DropDelimiter',
OverWriteSrc: 'OverWriteSrc',
'Hive Target Dir': 'Hive Target Dir',
ReplaceDelimiter: 'ReplaceDelimiter',
Concurrency: 'Concurrency',
Form: 'Form',
......
......@@ -555,6 +555,7 @@ export default {
'Please enter Export Dir(required)': '请输入数据源路径(必填)',
'Please enter Hive Database(required)': '请输入Hive数据库(必填)',
'Please enter Hive Table(required)': '请输入Hive表名(必填)',
'Please enter hive target dir': '请输入Hive临时目录',
'Please enter Hive Partition Keys': '请输入分区键',
'Please enter Hive Partition Values': '请输入分区值',
'Please enter Replace Delimiter': '请输入替换分隔符',
......@@ -587,6 +588,7 @@ export default {
CreateHiveTable: '是否创建新表',
DropDelimiter: '是否删除分隔符',
OverWriteSrc: '是否覆盖数据源',
'Hive Target Dir': 'Hive临时目录',
ReplaceDelimiter: '替换分隔符',
Concurrency: '并发度',
Form: '表单',
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册