提交 9c9c940f 编写于 作者: journey2018's avatar journey2018

sql task add kerbos auth

上级 c873bb65
...@@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants; ...@@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result; import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType; import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
...@@ -455,7 +456,7 @@ public class DataSourceController extends BaseController { ...@@ -455,7 +456,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName()); logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName());
try{ try{
// if upload resource is HDFS and kerberos startup is true , else false // if upload resource is HDFS and kerberos startup is true , else false
return success(Status.SUCCESS.getMsg(), CheckUtils.getKerberosStartupState()); return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState());
}catch (Exception e){ }catch (Exception e){
logger.error(KERBEROS_STARTUP_STATE.getMsg(),e); logger.error(KERBEROS_STARTUP_STATE.getMsg(),e);
return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg()); return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg());
......
...@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType; ...@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType; import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.enums.UserType; import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.job.db.*; import cn.escheduler.common.job.db.*;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.mapper.DataSourceMapper; import cn.escheduler.dao.mapper.DataSourceMapper;
import cn.escheduler.dao.mapper.DatasourceUserMapper; import cn.escheduler.dao.mapper.DatasourceUserMapper;
...@@ -374,7 +375,7 @@ public class DataSourceService extends BaseService{ ...@@ -374,7 +375,7 @@ public class DataSourceService extends BaseService{
break; break;
case HIVE: case HIVE:
case SPARK: case SPARK:
if (CheckUtils.getKerberosStartupState()) { if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
...@@ -470,7 +471,7 @@ public class DataSourceService extends BaseService{ ...@@ -470,7 +471,7 @@ public class DataSourceService extends BaseService{
String address = buildAddress(type, host, port); String address = buildAddress(type, host, port);
String jdbcUrl = address + "/" + database; String jdbcUrl = address + "/" + database;
if (CheckUtils.getKerberosStartupState() && if (CommonUtils.getKerberosStartupState() &&
(type == DbType.HIVE || type == DbType.SPARK)){ (type == DbType.HIVE || type == DbType.SPARK)){
jdbcUrl += ";principal=" + principal; jdbcUrl += ";principal=" + principal;
} }
......
...@@ -160,16 +160,4 @@ public class CheckUtils { ...@@ -160,16 +160,4 @@ public class CheckUtils {
return pattern.matcher(str).matches(); return pattern.matcher(str).matches();
} }
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package cn.escheduler.common.utils; package cn.escheduler.common.utils;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ResUploadType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -63,4 +64,14 @@ public class CommonUtils { ...@@ -63,4 +64,14 @@ public class CommonUtils {
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
} }
...@@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds; ...@@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds;
import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.task.sql.SqlType; import cn.escheduler.common.task.sql.SqlType;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
...@@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject; ...@@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.sql.*; import java.sql.*;
...@@ -51,6 +54,8 @@ import java.util.regex.Matcher; ...@@ -51,6 +54,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static cn.escheduler.common.utils.PropertyUtils.getString;
/** /**
* sql task * sql task
*/ */
...@@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask { ...@@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask {
List<String> createFuncs){ List<String> createFuncs){
Connection connection = null; Connection connection = null;
try { try {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
if (DbType.HIVE.name().equals(sqlParameters.getType())) { if (DbType.HIVE.name().equals(sqlParameters.getType())) {
Properties paramProp = new Properties(); Properties paramProp = new Properties();
paramProp.setProperty("user", baseDataSource.getUser()); paramProp.setProperty("user", baseDataSource.getUser());
...@@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask { ...@@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask {
array.add(mapOfColValues); array.add(mapOfColValues);
} }
logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
// send as an attachment // send as an attachment
if (StringUtils.isEmpty(sqlParameters.getShowType())) { if (StringUtils.isEmpty(sqlParameters.getShowType())) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册