diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java index a83cd64ccac442f1983e5e29c5fd7240451e6cb1..86989e1227cdc41e810529790365b266306f6053 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java @@ -94,6 +94,16 @@ public class SqlParameters extends AbstractParameters { */ private String title; + private int limit; + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + public String getType() { return type; } @@ -217,6 +227,7 @@ public class SqlParameters extends AbstractParameters { + ", sqlType=" + sqlType + ", sendEmail=" + sendEmail + ", displayRows=" + displayRows + + ", limit=" + limit + ", udfs='" + udfs + '\'' + ", showType='" + showType + '\'' + ", connParams='" + connParams + '\'' diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java index 6fc4d6c9c9d8dd369444ec6be2b9cfd18291182d..57b0d6097ebb877e83dde4dc2b4d4b7d046c8dcb 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java @@ -35,6 +35,7 @@ public class SqlParametersTest { private final String showType = "TABLE"; private final String title = "sql test"; private final int groupId = 0; + private final int limit = 0; @Test public void testSqlParameters() { @@ -51,6 +52,7 @@ public class SqlParametersTest { sqlParameters.setShowType(showType); sqlParameters.setTitle(title); sqlParameters.setGroupId(groupId); + sqlParameters.setLimit(limit); Assert.assertEquals(type, sqlParameters.getType()); Assert.assertEquals(sql, sqlParameters.getSql()); @@ -62,6 +64,7 @@ public class SqlParametersTest { Assert.assertEquals(showType, sqlParameters.getShowType()); Assert.assertEquals(title, sqlParameters.getTitle()); Assert.assertEquals(groupId, sqlParameters.getGroupId()); + Assert.assertEquals(limit, sqlParameters.getLimit()); Assert.assertTrue(sqlParameters.checkParameters()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 100f344614c01cc05be3e1f0ddc62dae4b967d75..75194a36f5e8b3b9841b4867fb48c2826713ebdc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -86,12 +86,6 @@ public class SqlTask extends AbstractTask { */ private TaskExecutionContext taskExecutionContext; - /** - * default query sql limit - */ - private static final int LIMIT = 10000; - - private AlertClientService alertClientService; public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) { @@ -117,14 +111,16 @@ public class SqlTask extends AbstractTask { Thread.currentThread().setName(threadLoggerInfoName); logger.info("Full sql parameters: {}", sqlParameters); - logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", + logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {}, udfs : {},showType : {}, " + + "connParams : {}, query max result limit : {}", sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(), sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), - sqlParameters.getConnParams()); + sqlParameters.getConnParams(), + sqlParameters.getLimit()); try { SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); @@ -309,7 +305,7 @@ public class SqlTask extends AbstractTask { int rowCount = 0; - while (rowCount < LIMIT && resultSet.next()) { + while (rowCount < sqlParameters.getLimit() && resultSet.next()) { ObjectNode mapOfColValues = JSONUtils.createObjectNode(); for (int i = 1; i <= num; i++) { mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i))); @@ -326,12 +322,11 @@ public class SqlTask extends AbstractTask { logger.info("row {} : {}", i + 1, row); } } - String result = JSONUtils.toJsonString(resultJSONArray); if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) { sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) - ? sqlParameters.getTitle() - : taskExecutionContext.getTaskName() + " query result sets", result); + ? sqlParameters.getTitle() + : taskExecutionContext.getTaskName() + " query result sets", result); } logger.debug("execute sql result : {}", result); return result; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java index 4dbcb2b0486a958721bce724069fc5750611ddfc..7746d98d1c1b0e8621ba0c9d4b523c448f38c5ce 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; @@ -148,4 +149,60 @@ public class SqlTaskTest { String result = Whitebox.invokeMethod(sqlTask, "resultProcess", resultSet); Assert.assertNotNull(result); } + + @Test + public void testQueryBySQLUsingLimit() throws Exception { + TaskExecutionContext localTaskExecutionContext; + TaskProps props = new TaskProps(); + props.setExecutePath("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstanceId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams( + "{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}]," + + "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"SELECT * FROM tb_1\"," + + "\"sqlType\":0, \"limit\":1, \"sendEmail\":\"false\"}"); + + localTaskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + PowerMockito.when(localTaskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); + PowerMockito.when(localTaskExecutionContext.getExecutePath()).thenReturn("/tmp"); + PowerMockito.when(localTaskExecutionContext.getTaskAppId()).thenReturn("1"); + PowerMockito.when(localTaskExecutionContext.getTenantCode()).thenReturn("root"); + PowerMockito.when(localTaskExecutionContext.getStartTime()).thenReturn(new Date()); + PowerMockito.when(localTaskExecutionContext.getTaskTimeout()).thenReturn(10000); + PowerMockito.when(localTaskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); + + SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); + sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); + PowerMockito.when(localTaskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext); + + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao()); + AlertClientService localAlertClientService = PowerMockito.mock(AlertClientService.class); + SqlTask localSqlTask = new SqlTask(localTaskExecutionContext, logger, localAlertClientService); + localSqlTask.init(); + + ResultSet resultSet = PowerMockito.mock(ResultSet.class); + ResultSetMetaData mockResultMetaData = PowerMockito.mock(ResultSetMetaData.class); + PowerMockito.when(resultSet.getMetaData()).thenReturn(mockResultMetaData); + PowerMockito.when(mockResultMetaData.getColumnCount()).thenReturn(2); + PowerMockito.when(resultSet.next()).thenReturn(true); + PowerMockito.when(resultSet.getObject(Mockito.anyInt())).thenReturn(1); + PowerMockito.when(mockResultMetaData.getColumnLabel(Mockito.anyInt())).thenReturn("a"); + + AlertSendResponseCommand mockResponseCommand = PowerMockito.mock(AlertSendResponseCommand.class); + PowerMockito.when(mockResponseCommand.getResStatus()).thenReturn(true); + PowerMockito.when(localAlertClientService.sendAlert(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(mockResponseCommand); + + String result = Whitebox.invokeMethod(localSqlTask, "resultProcess", resultSet); + Assert.assertEquals(1, ((SqlParameters) localSqlTask.getParameters()).getLimit()); + + // In fact, the target table has 2 rows, as we set the limit to 1, if the limit works, the `resultProcess` method + // should return [{"a":1}] rather then [{"a":1},{"a":1}] + Assert.assertEquals("[{\"a\":1}]", result); + } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue index 3df9ee6c788b6965bab2d335dced1f4c85fa9076..6b238450267a15c32e76f5f10204a758e8eedf95 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue @@ -43,6 +43,13 @@ + +
*{{$t('Max Numbers Return')}}
+
+ + +
+