未验证 提交 22cd9595 编写于 作者: E escheduler 提交者: GitHub

Merge pull request #36 from Baoqi/bwu_clickhouse

add clickhouse && fix db connection leak issue
...@@ -38,6 +38,7 @@ import org.springframework.transaction.annotation.Transactional; ...@@ -38,6 +38,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.*; import java.util.*;
/** /**
...@@ -217,6 +218,9 @@ public class DataSourceService extends BaseService{ ...@@ -217,6 +218,9 @@ public class DataSourceService extends BaseService{
case POSTGRESQL: case POSTGRESQL:
separator = "&"; separator = "&";
break; break;
case CLICKHOUSE:
separator = "&";
break;
default: default:
separator = "&"; separator = "&";
break; break;
...@@ -367,6 +371,10 @@ public class DataSourceService extends BaseService{ ...@@ -367,6 +371,10 @@ public class DataSourceService extends BaseService{
datasource = JSONObject.parseObject(parameter, SparkDataSource.class); datasource = JSONObject.parseObject(parameter, SparkDataSource.class);
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
break; break;
case CLICKHOUSE:
datasource = JSONObject.parseObject(parameter, ClickHouseDataSource.class);
Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER);
break;
default: default:
break; break;
} }
...@@ -392,6 +400,11 @@ public class DataSourceService extends BaseService{ ...@@ -392,6 +400,11 @@ public class DataSourceService extends BaseService{
Connection con = getConnection(type, parameter); Connection con = getConnection(type, parameter);
if (con != null) { if (con != null) {
isConnection = true; isConnection = true;
try {
con.close();
} catch (SQLException e) {
logger.error("close connection fail at DataSourceService::checkConnection()", e);
}
} }
return isConnection; return isConnection;
} }
...@@ -428,7 +441,7 @@ public class DataSourceService extends BaseService{ ...@@ -428,7 +441,7 @@ public class DataSourceService extends BaseService{
String address = buildAddress(type, host, port); String address = buildAddress(type, host, port);
String jdbcUrl = address + "/" + database; String jdbcUrl = address + "/" + database;
String separator = ""; String separator = "";
if (Constants.MYSQL.equals(type.name()) || Constants.POSTGRESQL.equals(type.name())) { if (Constants.MYSQL.equals(type.name()) || Constants.POSTGRESQL.equals(type.name()) || Constants.CLICKHOUSE.equals(type.name())) {
separator = "&"; separator = "&";
} else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) { } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) {
separator = ";"; separator = ";";
...@@ -479,6 +492,9 @@ public class DataSourceService extends BaseService{ ...@@ -479,6 +492,9 @@ public class DataSourceService extends BaseService{
} }
sb.deleteCharAt(sb.length() - 1); sb.deleteCharAt(sb.length() - 1);
} }
} else if (Constants.CLICKHOUSE.equals(type.name())) {
sb.append(Constants.JDBC_CLICKHOUSE);
sb.append(host).append(":").append(port);
} }
return sb.toString(); return sb.toString();
......
...@@ -82,6 +82,7 @@ public class Constants { ...@@ -82,6 +82,7 @@ public class Constants {
public static final String ORG_POSTGRESQL_DRIVER = "org.postgresql.Driver"; public static final String ORG_POSTGRESQL_DRIVER = "org.postgresql.Driver";
public static final String COM_MYSQL_JDBC_DRIVER = "com.mysql.jdbc.Driver"; public static final String COM_MYSQL_JDBC_DRIVER = "com.mysql.jdbc.Driver";
public static final String ORG_APACHE_HIVE_JDBC_HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; public static final String ORG_APACHE_HIVE_JDBC_HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
public static final String COM_CLICKHOUSE_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
/** /**
* database type * database type
...@@ -90,6 +91,7 @@ public class Constants { ...@@ -90,6 +91,7 @@ public class Constants {
public static final String POSTGRESQL = "POSTGRESQL"; public static final String POSTGRESQL = "POSTGRESQL";
public static final String HIVE = "HIVE"; public static final String HIVE = "HIVE";
public static final String SPARK = "SPARK"; public static final String SPARK = "SPARK";
public static final String CLICKHOUSE = "CLICKHOUSE";
/** /**
* jdbc url * jdbc url
...@@ -97,6 +99,7 @@ public class Constants { ...@@ -97,6 +99,7 @@ public class Constants {
public static final String JDBC_MYSQL = "jdbc:mysql://"; public static final String JDBC_MYSQL = "jdbc:mysql://";
public static final String JDBC_POSTGRESQL = "jdbc:postgresql://"; public static final String JDBC_POSTGRESQL = "jdbc:postgresql://";
public static final String JDBC_HIVE_2 = "jdbc:hive2://"; public static final String JDBC_HIVE_2 = "jdbc:hive2://";
public static final String JDBC_CLICKHOUSE = "jdbc:clickhouse://";
public static final String ADDRESS = "address"; public static final String ADDRESS = "address";
......
...@@ -371,6 +371,21 @@ ...@@ -371,6 +371,21 @@
<groupId>com.github.oshi</groupId> <groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId> <artifactId>oshi-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<exclusions>
<exclusion>
<artifactId>com.fasterxml.jackson.core</artifactId>
<groupId>jackson-core</groupId>
</exclusion>
<exclusion>
<artifactId>com.fasterxml.jackson.core</artifactId>
<groupId>jackson-databind</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
......
...@@ -602,15 +602,19 @@ public final class Constants { ...@@ -602,15 +602,19 @@ public final class Constants {
public static final String JDBC_POSTGRESQL_CLASS_NAME = "org.postgresql.Driver"; public static final String JDBC_POSTGRESQL_CLASS_NAME = "org.postgresql.Driver";
/** /**
* postgresql * hive
*/ */
public static final String JDBC_HIVE_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver"; public static final String JDBC_HIVE_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver";
/** /**
* postgresql * spark
*/ */
public static final String JDBC_SPARK_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver"; public static final String JDBC_SPARK_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver";
/**
* ClickHouse
*/
public static final String JDBC_CLICKHOUSE_CLASS_NAME = "ru.yandex.clickhouse.ClickHouseDriver";
/** /**
* spark params constant * spark params constant
......
...@@ -25,6 +25,7 @@ public enum DbType { ...@@ -25,6 +25,7 @@ public enum DbType {
* 1 postgresql * 1 postgresql
* 2 hive * 2 hive
* 3 spark * 3 spark
* 4 clickhouse
*/ */
MYSQL, POSTGRESQL, HIVE, SPARK MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* data source of ClickHouse
*/
public class ClickHouseDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(ClickHouseDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("ClickHouse datasource try conn close conn error", e);
throw e;
}
}
}
}
}
...@@ -39,6 +39,8 @@ public class DataSourceFactory { ...@@ -39,6 +39,8 @@ public class DataSourceFactory {
return JSONUtils.parseObject(parameter, HiveDataSource.class); return JSONUtils.parseObject(parameter, HiveDataSource.class);
case SPARK: case SPARK:
return JSONUtils.parseObject(parameter, SparkDataSource.class); return JSONUtils.parseObject(parameter, SparkDataSource.class);
case CLICKHOUSE:
return JSONUtils.parseObject(parameter, ClickHouseDataSource.class);
default: default:
return null; return null;
} }
......
...@@ -22,6 +22,7 @@ import cn.escheduler.common.enums.DbType; ...@@ -22,6 +22,7 @@ import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.Direct; import cn.escheduler.common.enums.Direct;
import cn.escheduler.common.enums.TaskTimeoutStrategy; import cn.escheduler.common.enums.TaskTimeoutStrategy;
import cn.escheduler.common.job.db.BaseDataSource; import cn.escheduler.common.job.db.BaseDataSource;
import cn.escheduler.common.job.db.ClickHouseDataSource;
import cn.escheduler.common.job.db.MySQLDataSource; import cn.escheduler.common.job.db.MySQLDataSource;
import cn.escheduler.common.job.db.PostgreDataSource; import cn.escheduler.common.job.db.PostgreDataSource;
import cn.escheduler.common.process.Property; import cn.escheduler.common.process.Property;
...@@ -111,6 +112,11 @@ public class ProcedureTask extends AbstractTask { ...@@ -111,6 +112,11 @@ public class ProcedureTask extends AbstractTask {
}else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){ }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class); baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME); Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
}else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
// NOTE: currently, ClickHouse don't support procedure or UDF yet,
// but still load JDBC driver to keep source code sync with other DB
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
} }
// get jdbc connection // get jdbc connection
......
...@@ -120,6 +120,9 @@ public class SqlTask extends AbstractTask { ...@@ -120,6 +120,9 @@ public class SqlTask extends AbstractTask {
}else if (DbType.SPARK.name().equals(dataSource.getType().name())){ }else if (DbType.SPARK.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class); baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class);
Class.forName(Constants.JDBC_SPARK_CLASS_NAME); Class.forName(Constants.JDBC_SPARK_CLASS_NAME);
}else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
} }
Map<Integer,Property> sqlParamMap = new HashMap<Integer,Property>(); Map<Integer,Property> sqlParamMap = new HashMap<Integer,Property>();
......
...@@ -52,21 +52,45 @@ public class SqlExecutorTest { ...@@ -52,21 +52,45 @@ public class SqlExecutorTest {
@Test @Test
public void test() throws Exception { public void test() throws Exception {
String nodeName = "mysql sql test";
String taskAppId = "51_11282_263978";
String tenantCode = "hdfs";
Integer taskInstId = 263978;
sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
}
@Test
public void testClickhouse() throws Exception {
String nodeName = "ClickHouse sql test";
String taskAppId = "1_11_20";
String tenantCode = "default";
Integer taskInstId = 20;
sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
}
/**
* Basic test template for SQLTasks, mainly test different types of DBMS types
* @param nodeName node name for selected task
* @param taskAppId task app id
* @param tenantCode tenant code
* @param taskInstId task instance id
* @throws Exception
*/
private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, Integer taskInstId) throws Exception {
TaskProps taskProps = new TaskProps(); TaskProps taskProps = new TaskProps();
taskProps.setTaskDir(""); taskProps.setTaskDir("");
// processDefineId_processInstanceId_taskInstanceId // processDefineId_processInstanceId_taskInstanceId
taskProps.setTaskAppId("51_11282_263978"); taskProps.setTaskAppId(taskAppId);
// set tenant -> task execute linux user // set tenant -> task execute linux user
taskProps.setTenantCode("hdfs"); taskProps.setTenantCode(tenantCode);
taskProps.setTaskStartTime(new Date()); taskProps.setTaskStartTime(new Date());
taskProps.setTaskTimeout(360000); taskProps.setTaskTimeout(360000);
taskProps.setTaskInstId(263978); taskProps.setTaskInstId(taskInstId);
taskProps.setNodeName("mysql sql test"); taskProps.setNodeName(nodeName);
TaskInstance taskInstance = processDao.findTaskInstanceById(263978); TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
String taskJson = taskInstance.getTaskJson(); String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
<m-datasource <m-datasource
ref="refDs" ref="refDs"
@on-dsData="_onDsData" @on-dsData="_onDsData"
:supportType="['MYSQL','POSTGRESQL']" :supportType="['MYSQL','POSTGRESQL','CLICKHOUSE']"
:data="{ type:type,datasource:datasource }"> :data="{ type:type,datasource:datasource }">
</m-datasource> </m-datasource>
</div> </div>
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
<x-radio :label="'POSTGRESQL'">POSTGRESQL</x-radio> <x-radio :label="'POSTGRESQL'">POSTGRESQL</x-radio>
<x-radio :label="'HIVE'">HIVE</x-radio> <x-radio :label="'HIVE'">HIVE</x-radio>
<x-radio :label="'SPARK'">SPARK</x-radio> <x-radio :label="'SPARK'">SPARK</x-radio>
<x-radio :label="'CLICKHOUSE'">CLICKHOUSE</x-radio>
</x-radio-group> </x-radio-group>
</template> </template>
</m-list-box-f> </m-list-box-f>
......
...@@ -66,6 +66,11 @@ export default { ...@@ -66,6 +66,11 @@ export default {
id: 3, id: 3,
code: 'SPARK', code: 'SPARK',
disabled: false disabled: false
},
{
id: 4,
code: 'CLICKHOUSE',
disabled: false
} }
], ],
// Alarm interface // Alarm interface
......
...@@ -20,7 +20,7 @@ import io from '@/module/io' ...@@ -20,7 +20,7 @@ import io from '@/module/io'
export default { export default {
/** /**
* Data source creation * Data source creation
* @param "type": string,//MYSQL, POSTGRESQL, HIVE * @param "type": string,//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE
* @param "name": string, * @param "name": string,
* @param "desc": string, * @param "desc": string,
* @param "parameter":string //{"address":"jdbc:hive2://192.168.220.189:10000","autoReconnect":"true","characterEncoding":"utf8","database":"default","initialTimeout":3000,"jdbcUrl":"jdbc:hive2://192.168.220.189:10000/default","maxReconnect":10,"password":"","useUnicode":true,"user":"hive"} * @param "parameter":string //{"address":"jdbc:hive2://192.168.220.189:10000","autoReconnect":"true","characterEncoding":"utf8","database":"default","initialTimeout":3000,"jdbcUrl":"jdbc:hive2://192.168.220.189:10000/default","maxReconnect":10,"password":"","useUnicode":true,"user":"hive"}
...@@ -49,7 +49,7 @@ export default { ...@@ -49,7 +49,7 @@ export default {
}, },
/** /**
* Query data source list - no paging * Query data source list - no paging
* @param "type": string//MYSQL, POSTGRESQL, HIVE * @param "type": string//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE
*/ */
getDatasourcesList ({ state }, payload) { getDatasourcesList ({ state }, payload) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
......
...@@ -366,6 +366,12 @@ ...@@ -366,6 +366,12 @@
<version>3.5.0</version> <version>3.5.0</version>
</dependency> </dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.52</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册