diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java index d5371e5f0a6f8e9ba148d05c92ceffb0a2b31676..58c24bddc138881ca20a8b6102d2a980d13285e7 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java @@ -38,6 +38,7 @@ import org.springframework.transaction.annotation.Transactional; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.*; /** @@ -217,6 +218,9 @@ public class DataSourceService extends BaseService{ case POSTGRESQL: separator = "&"; break; + case CLICKHOUSE: + separator = "&"; + break; default: separator = "&"; break; @@ -367,6 +371,10 @@ public class DataSourceService extends BaseService{ datasource = JSONObject.parseObject(parameter, SparkDataSource.class); Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); break; + case CLICKHOUSE: + datasource = JSONObject.parseObject(parameter, ClickHouseDataSource.class); + Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER); + break; default: break; } @@ -392,6 +400,11 @@ public class DataSourceService extends BaseService{ Connection con = getConnection(type, parameter); if (con != null) { isConnection = true; + try { + con.close(); + } catch (SQLException e) { + logger.error("close connection fail at DataSourceService::checkConnection()", e); + } } return isConnection; } @@ -428,7 +441,7 @@ public class DataSourceService extends BaseService{ String address = buildAddress(type, host, port); String jdbcUrl = address + "/" + database; String separator = ""; - if (Constants.MYSQL.equals(type.name()) || Constants.POSTGRESQL.equals(type.name())) { + if (Constants.MYSQL.equals(type.name()) || Constants.POSTGRESQL.equals(type.name()) || Constants.CLICKHOUSE.equals(type.name())) { separator = "&"; } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) { separator = ";"; @@ -479,6 +492,9 @@ public class DataSourceService extends BaseService{ } sb.deleteCharAt(sb.length() - 1); } + } else if (Constants.CLICKHOUSE.equals(type.name())) { + sb.append(Constants.JDBC_CLICKHOUSE); + sb.append(host).append(":").append(port); } return sb.toString(); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java index bdeb6d689e9117186a7fd4d1ac8ee674779270e2..aa25da8f1116430c9fceb24edd9263410f2cacb2 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java @@ -82,6 +82,7 @@ public class Constants { public static final String ORG_POSTGRESQL_DRIVER = "org.postgresql.Driver"; public static final String COM_MYSQL_JDBC_DRIVER = "com.mysql.jdbc.Driver"; public static final String ORG_APACHE_HIVE_JDBC_HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; + public static final String COM_CLICKHOUSE_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver"; /** * database type @@ -90,6 +91,7 @@ public class Constants { public static final String POSTGRESQL = "POSTGRESQL"; public static final String HIVE = "HIVE"; public static final String SPARK = "SPARK"; + public static final String CLICKHOUSE = "CLICKHOUSE"; /** * jdbc url @@ -97,6 +99,7 @@ public class Constants { public static final String JDBC_MYSQL = "jdbc:mysql://"; public static final String JDBC_POSTGRESQL = "jdbc:postgresql://"; public static final String JDBC_HIVE_2 = "jdbc:hive2://"; + public static final String JDBC_CLICKHOUSE = "jdbc:clickhouse://"; public static final String ADDRESS = "address"; diff --git a/escheduler-common/pom.xml b/escheduler-common/pom.xml index e06b344c4fd1de8e8b3b1d756264e279317ab107..35c83bc2b0aff8751fd8d474e809dde79440f522 100644 --- a/escheduler-common/pom.xml +++ b/escheduler-common/pom.xml @@ -371,6 +371,21 @@ com.github.oshi oshi-core + + + ru.yandex.clickhouse + clickhouse-jdbc + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index e0e0c399e9ea32524a5bbb3f87e16c4ba6783c7e..e61bef101888e1d34b8bcefef39b542834e1a19a 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -602,15 +602,19 @@ public final class Constants { public static final String JDBC_POSTGRESQL_CLASS_NAME = "org.postgresql.Driver"; /** - * postgresql + * hive */ public static final String JDBC_HIVE_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver"; /** - * postgresql + * spark */ public static final String JDBC_SPARK_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver"; + /** + * ClickHouse + */ + public static final String JDBC_CLICKHOUSE_CLASS_NAME = "ru.yandex.clickhouse.ClickHouseDriver"; /** * spark params constant diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java index 70f767444f13abe3a64979b34c5a33c3c8735028..bcd7e71dbddf285d88d16c32bd8dd240c80fd0c2 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java @@ -25,6 +25,7 @@ public enum DbType { * 1 postgresql * 2 hive * 3 spark + * 4 clickhouse */ - MYSQL, POSTGRESQL, HIVE, SPARK + MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/ClickHouseDataSource.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/ClickHouseDataSource.java new file mode 100644 index 0000000000000000000000000000000000000000..b4df4d8f5a36830634ed0aee28ff2f451849343f --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/ClickHouseDataSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.job.db; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * data source of ClickHouse + */ +public class ClickHouseDataSource extends BaseDataSource { + private static final Logger logger = LoggerFactory.getLogger(ClickHouseDataSource.class); + + /** + * gets the JDBC url for the data source connection + * @return + */ + @Override + public String getJdbcUrl() { + String jdbcUrl = getAddress(); + if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) { + jdbcUrl += "/"; + } + + jdbcUrl += getDatabase(); + + if (StringUtils.isNotEmpty(getOther())) { + jdbcUrl += "?" + getOther(); + } + + return jdbcUrl; + } + + /** + * test whether the data source can be connected successfully + * @throws Exception + */ + @Override + public void isConnectable() throws Exception { + Connection con = null; + try { + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); + } finally { + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + logger.error("ClickHouse datasource try conn close conn error", e); + throw e; + } + } + } + + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java index 06858fade319bc24a18b90ba14be0dc734cb2dd9..c694b9c70813dfa8aad72809622b63966088d877 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java @@ -39,6 +39,8 @@ public class DataSourceFactory { return JSONUtils.parseObject(parameter, HiveDataSource.class); case SPARK: return JSONUtils.parseObject(parameter, SparkDataSource.class); + case CLICKHOUSE: + return JSONUtils.parseObject(parameter, ClickHouseDataSource.class); default: return null; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java index 2efe8cbf54accfc7846bcea386ed4424fbf5e9cd..4c3f3f63adcbcd171acd5e2ff6bb8fdfe3246734 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java @@ -22,6 +22,7 @@ import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.Direct; import cn.escheduler.common.enums.TaskTimeoutStrategy; import cn.escheduler.common.job.db.BaseDataSource; +import cn.escheduler.common.job.db.ClickHouseDataSource; import cn.escheduler.common.job.db.MySQLDataSource; import cn.escheduler.common.job.db.PostgreDataSource; import cn.escheduler.common.process.Property; @@ -111,6 +112,11 @@ public class ProcedureTask extends AbstractTask { }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){ baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class); Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME); + }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){ + // NOTE: currently, ClickHouse don't support procedure or UDF yet, + // but still load JDBC driver to keep source code sync with other DB + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class); + Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME); } // get jdbc connection diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index 36d92d71b5633d8d763d26d0b78f2bc2a910cbe4..858e7b8bfc8fec9f2c19048653864242f10f624a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -120,6 +120,9 @@ public class SqlTask extends AbstractTask { }else if (DbType.SPARK.name().equals(dataSource.getType().name())){ baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class); Class.forName(Constants.JDBC_SPARK_CLASS_NAME); + }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){ + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class); + Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME); } Map sqlParamMap = new HashMap(); diff --git a/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java b/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java index aecf3e9230a1fecf6fc790bbe177271e189d6590..5565cd6a089549e3d99fd6a87efe6ee25cb09c21 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java @@ -52,21 +52,45 @@ public class SqlExecutorTest { @Test public void test() throws Exception { + String nodeName = "mysql sql test"; + String taskAppId = "51_11282_263978"; + String tenantCode = "hdfs"; + Integer taskInstId = 263978; + sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); + } + + @Test + public void testClickhouse() throws Exception { + String nodeName = "ClickHouse sql test"; + String taskAppId = "1_11_20"; + String tenantCode = "default"; + Integer taskInstId = 20; + sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); + } + /** + * Basic test template for SQLTasks, mainly test different types of DBMS types + * @param nodeName node name for selected task + * @param taskAppId task app id + * @param tenantCode tenant code + * @param taskInstId task instance id + * @throws Exception + */ + private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, Integer taskInstId) throws Exception { TaskProps taskProps = new TaskProps(); taskProps.setTaskDir(""); // processDefineId_processInstanceId_taskInstanceId - taskProps.setTaskAppId("51_11282_263978"); + taskProps.setTaskAppId(taskAppId); // set tenant -> task execute linux user - taskProps.setTenantCode("hdfs"); + taskProps.setTenantCode(tenantCode); taskProps.setTaskStartTime(new Date()); taskProps.setTaskTimeout(360000); - taskProps.setTaskInstId(263978); - taskProps.setNodeName("mysql sql test"); + taskProps.setTaskInstId(taskInstId); + taskProps.setNodeName(nodeName); - TaskInstance taskInstance = processDao.findTaskInstanceById(263978); + TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); String taskJson = taskInstance.getTaskJson(); TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue index 44a2d8dd40a32e8b512348d407457daf851d1b79..84a2d05634d5252aad7ffc498745e33bf86027f5 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue @@ -6,7 +6,7 @@ diff --git a/escheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue b/escheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue index 4915e27b5c9a7c92c6b66a96d49923be1b7f406d..96e453c3180b17628b74c5c7c5f1a66c7c672bb4 100644 --- a/escheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue +++ b/escheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue @@ -13,6 +13,7 @@ POSTGRESQL HIVE SPARK + CLICKHOUSE diff --git a/escheduler-ui/src/js/conf/home/store/dag/state.js b/escheduler-ui/src/js/conf/home/store/dag/state.js index 63b06abef65419ba5c3f307153222a68c8596792..e51c2186ec0fb666072ae12ed3ee5b4031bfa4c6 100644 --- a/escheduler-ui/src/js/conf/home/store/dag/state.js +++ b/escheduler-ui/src/js/conf/home/store/dag/state.js @@ -66,6 +66,11 @@ export default { id: 3, code: 'SPARK', disabled: false + }, + { + id: 4, + code: 'CLICKHOUSE', + disabled: false } ], // Alarm interface diff --git a/escheduler-ui/src/js/conf/home/store/datasource/actions.js b/escheduler-ui/src/js/conf/home/store/datasource/actions.js index 0aa17266f8becc2ce3e9f865ce7520f0b8ccb78a..4e7bd13d76b8bb801b86c51f6c153b0927377e5b 100644 --- a/escheduler-ui/src/js/conf/home/store/datasource/actions.js +++ b/escheduler-ui/src/js/conf/home/store/datasource/actions.js @@ -20,7 +20,7 @@ import io from '@/module/io' export default { /** * Data source creation - * @param "type": string,//MYSQL, POSTGRESQL, HIVE + * @param "type": string,//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE * @param "name": string, * @param "desc": string, * @param "parameter":string //{"address":"jdbc:hive2://192.168.220.189:10000","autoReconnect":"true","characterEncoding":"utf8","database":"default","initialTimeout":3000,"jdbcUrl":"jdbc:hive2://192.168.220.189:10000/default","maxReconnect":10,"password":"","useUnicode":true,"user":"hive"} @@ -49,7 +49,7 @@ export default { }, /** * Query data source list - no paging - * @param "type": string//MYSQL, POSTGRESQL, HIVE + * @param "type": string//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE */ getDatasourcesList ({ state }, payload) { return new Promise((resolve, reject) => { diff --git a/pom.xml b/pom.xml index e6f845d0ed6c4a9a6a29d7b447724dd8e9065f78..2b0b585106a082986f9cfb4899e427f63efb304b 100644 --- a/pom.xml +++ b/pom.xml @@ -366,6 +366,12 @@ 3.5.0 + + ru.yandex.clickhouse + clickhouse-jdbc + 0.1.52 + +