提交 ca7667c8 编写于 作者: L lgcareer

Merge remote-tracking branch 'remotes/upstream/dev-1.1.0' into dev-1.1.0

...@@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants; ...@@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result; import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType; import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
...@@ -455,7 +456,7 @@ public class DataSourceController extends BaseController { ...@@ -455,7 +456,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName()); logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName());
try{ try{
// if upload resource is HDFS and kerberos startup is true , else false // if upload resource is HDFS and kerberos startup is true , else false
return success(Status.SUCCESS.getMsg(), CheckUtils.getKerberosStartupState()); return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState());
}catch (Exception e){ }catch (Exception e){
logger.error(KERBEROS_STARTUP_STATE.getMsg(),e); logger.error(KERBEROS_STARTUP_STATE.getMsg(),e);
return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg()); return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg());
......
...@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType; ...@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType; import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.enums.UserType; import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.job.db.*; import cn.escheduler.common.job.db.*;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.mapper.DataSourceMapper; import cn.escheduler.dao.mapper.DataSourceMapper;
import cn.escheduler.dao.mapper.DatasourceUserMapper; import cn.escheduler.dao.mapper.DatasourceUserMapper;
...@@ -381,7 +382,7 @@ public class DataSourceService extends BaseService{ ...@@ -381,7 +382,7 @@ public class DataSourceService extends BaseService{
break; break;
case HIVE: case HIVE:
case SPARK: case SPARK:
if (CheckUtils.getKerberosStartupState()) { if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
...@@ -477,7 +478,7 @@ public class DataSourceService extends BaseService{ ...@@ -477,7 +478,7 @@ public class DataSourceService extends BaseService{
String address = buildAddress(type, host, port); String address = buildAddress(type, host, port);
String jdbcUrl = address + "/" + database; String jdbcUrl = address + "/" + database;
if (CheckUtils.getKerberosStartupState() && if (CommonUtils.getKerberosStartupState() &&
(type == DbType.HIVE || type == DbType.SPARK)){ (type == DbType.HIVE || type == DbType.SPARK)){
jdbcUrl += ";principal=" + principal; jdbcUrl += ";principal=" + principal;
} }
......
...@@ -160,16 +160,4 @@ public class CheckUtils { ...@@ -160,16 +160,4 @@ public class CheckUtils {
return pattern.matcher(str).matches(); return pattern.matcher(str).matches();
} }
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
} }
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/>
<property name="log.base" value="logs" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="APISERVERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- Log level filter -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<file>${log.base}/escheduler-api-server.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/escheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package cn.escheduler.common.utils; package cn.escheduler.common.utils;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ResUploadType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -63,4 +64,14 @@ public class CommonUtils { ...@@ -63,4 +64,14 @@ public class CommonUtils {
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
} }
...@@ -26,10 +26,10 @@ hadoop.security.authentication.startup.state=false ...@@ -26,10 +26,10 @@ hadoop.security.authentication.startup.state=false
java.security.krb5.conf.path=/opt/krb5.conf java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user # loginUserFromKeytab user
login.user.keytab.username="hdfs-mycluster@ESZ.COM" login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path # loginUserFromKeytab path
login.user.keytab.path="/opt/hdfs.headless.keytab" login.user.keytab.path=/opt/hdfs.headless.keytab
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions # system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
escheduler.env.path=/opt/.escheduler_env.sh escheduler.env.path=/opt/.escheduler_env.sh
......
...@@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao { ...@@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao {
// find pause tasks and init task's state // find pause tasks and init task's state
cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for(Integer taskId : suspendedNodeList){ for(Integer taskId : suspendedNodeList){
// 把暂停状态初始化 // 把暂停状态初始化
initTaskInstance(this.findTaskInstanceById(taskId)); initTaskInstance(this.findTaskInstanceById(taskId));
...@@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao { ...@@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao {
* @param taskInstance * @param taskInstance
*/ */
private void initTaskInstance(TaskInstance taskInstance){ private void initTaskInstance(TaskInstance taskInstance){
if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){
taskInstance.setFlag(Flag.NO); if(!taskInstance.isSubProcess()){
updateTaskInstance(taskInstance); if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){
}else{ taskInstance.setFlag(Flag.NO);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); updateTaskInstance(taskInstance);
updateTaskInstance(taskInstance); return;
}
} }
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
} }
/** /**
......
...@@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider { ...@@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider {
public String queryDetailById(Map<String, Object> parameter) { public String queryDetailById(Map<String, Object> parameter) {
return new SQL() { return new SQL() {
{ {
SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); SELECT("inst.*,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q"); FROM(TABLE_NAME + " inst");
WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}"); WHERE("inst.id = #{processId}");
} }
}.toString(); }.toString();
} }
......
...@@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{ ...@@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{
// get process define // get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant != null){
processInstance.setTenantCode(tenant.getTenantCode());
}
taskInstance.setProcessInstance(processInstance); taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine); taskInstance.setProcessDefine(processDefine);
...@@ -217,9 +211,12 @@ public class FetchTaskThread implements Runnable{ ...@@ -217,9 +211,12 @@ public class FetchTaskThread implements Runnable{
// set task execute path // set task execute path
taskInstance.setExecutePath(execLocalPath); taskInstance.setExecutePath(execLocalPath);
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
// check and create Linux users // check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger); tenant.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskId); logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task // submit task
......
...@@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters; ...@@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.utils.*; import cn.escheduler.common.utils.*;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.log.TaskLogger;
...@@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable<Boolean> { ...@@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable<Boolean> {
// set task params // set task params
taskProps.setTaskParams(taskNode.getParams()); taskProps.setTaskParams(taskNode.getParams());
// set tenant code , execute task linux user // set tenant code , execute task linux user
taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setScheduleTime(processInstance.getScheduleTime());
taskProps.setNodeName(taskInstance.getName()); taskProps.setNodeName(taskInstance.getName());
taskProps.setTaskInstId(taskInstance.getId()); taskProps.setTaskInstId(taskInstance.getId());
taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
// set queue
if (StringUtils.isEmpty(queue)){ ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else { Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
taskProps.setQueue(queue); processDefine.getUserId());
}
taskProps.setTaskStartTime(taskInstance.getStartTime()); if(tenant == null){
taskProps.setDefinedParams(allParamMap); processInstance.setTenantCode(tenant.getTenantCode());
logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}",
// set task timeout processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId()
setTaskTimeout(taskProps, taskNode); );
status = ExecutionStatus.FAILURE;
taskProps.setDependence(taskInstance.getDependency()); }else{
taskProps.setTenantCode(tenant.getTenantCode());
taskProps.setTaskAppId(String.format("%s_%s_%s", String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
taskInstance.getProcessDefine().getId(), // set queue
taskInstance.getProcessInstance().getId(), if (StringUtils.isEmpty(queue)){
taskInstance.getId())); taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else {
// custom logger taskProps.setQueue(tenant.getQueueName());
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, }
taskInstance.getProcessDefine().getId(), taskProps.setTaskStartTime(taskInstance.getStartTime());
taskInstance.getProcessInstance().getId(), taskProps.setDefinedParams(allParamMap);
taskInstance.getId()));
// set task timeout
task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); setTaskTimeout(taskProps, taskNode);
// job init taskProps.setDependence(taskInstance.getDependency());
task.init();
taskProps.setTaskAppId(String.format("%s_%s_%s",
// job handle taskInstance.getProcessDefine().getId(),
task.handle(); taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); // custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ taskInstance.getProcessDefine().getId(),
status = ExecutionStatus.SUCCESS; taskInstance.getProcessInstance().getId(),
// task recor flat : if true , start up qianfan taskInstance.getId()));
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){ task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); // job init
task.init();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), // job handle
taskProps.getDefinedParams(), task.handle();
params.getLocalParametersMap(), logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime()); if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
if (paramsMap != null && !paramsMap.isEmpty() status = ExecutionStatus.SUCCESS;
&& paramsMap.containsKey("v_proc_date")){ // task recor flat : if true , start up qianfan
String vProcDate = paramsMap.get("v_proc_date").getValue(); if (TaskRecordDao.getTaskRecordFlag()
if (!StringUtils.isEmpty(vProcDate)){ && TaskType.typeIsNormalTask(taskInstance.getTaskType())){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState); AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE; // replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
} }
} }
} }
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL; status = ExecutionStatus.KILL;
}else { }else {
status = ExecutionStatus.FAILURE; status = ExecutionStatus.FAILURE;
}
} }
}catch (Exception e){ }catch (Exception e){
logger.error("task escheduler failure : " + e.getMessage(),e); logger.error("task escheduler failure : " + e.getMessage(),e);
......
...@@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds; ...@@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds;
import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.task.sql.SqlType; import cn.escheduler.common.task.sql.SqlType;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
...@@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject; ...@@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.sql.*; import java.sql.*;
...@@ -51,6 +54,8 @@ import java.util.regex.Matcher; ...@@ -51,6 +54,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static cn.escheduler.common.utils.PropertyUtils.getString;
/** /**
* sql task * sql task
*/ */
...@@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask { ...@@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask {
List<String> createFuncs){ List<String> createFuncs){
Connection connection = null; Connection connection = null;
try { try {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
if (DbType.HIVE.name().equals(sqlParameters.getType())) { if (DbType.HIVE.name().equals(sqlParameters.getType())) {
Properties paramProp = new Properties(); Properties paramProp = new Properties();
paramProp.setProperty("user", baseDataSource.getUser()); paramProp.setProperty("user", baseDataSource.getUser());
...@@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask { ...@@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask {
array.add(mapOfColValues); array.add(mapOfColValues);
} }
logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
// send as an attachment // send as an attachment
if (StringUtils.isEmpty(sqlParameters.getShowType())) { if (StringUtils.isEmpty(sqlParameters.getShowType())) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册