From fe3026627fc2d38da08ae396724cc61bc922374a Mon Sep 17 00:00:00 2001 From: BoYiZhang <39816903+BoYiZhang@users.noreply.github.com> Date: Mon, 16 Nov 2020 10:55:20 +0800 Subject: [PATCH] [Feature-3985][Datax] Datax supports setting up running memory (#3986) * Datax supports setting up running memory * Datax supports setting up running memory * Datax supports setting up running memory * When running a task, the resource file is lost, which results in an error * add unit test * add unit test * add unit test * add test unit * add test unit * add test unit * fix code smell * add test unit * add test unit --- .../common/task/datax/DataxParameters.java | 65 +++++++--- .../common/task/DataxParametersTest.java | 93 ++++++++++++++ .../server/worker/task/datax/DataxTask.java | 116 ++++++++++-------- .../worker/task/datax/DataxTaskTest.java | 88 ++++++++++--- .../dag/_source/formModel/tasks/datax.vue | 39 +++++- .../src/js/module/i18n/locale/en_US.js | 3 + .../src/js/module/i18n/locale/zh_CN.js | 3 + pom.xml | 1 + 8 files changed, 314 insertions(+), 94 deletions(-) create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java index f54e10799..c1f5f1d81 100755 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -14,15 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.common.task.datax; -import java.util.ArrayList; -import java.util.List; +package org.apache.dolphinscheduler.common.task.datax; -import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; /** * DataX parameter @@ -89,6 +90,16 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; + /** + * Xms memory + */ + private int xms; + + /** + * Xmx memory + */ + private int xmx; + public int getCustomConfig() { return customConfig; } @@ -185,6 +196,22 @@ public class DataxParameters extends AbstractParameters { this.jobSpeedRecord = jobSpeedRecord; } + public int getXms() { + return xms; + } + + public void setXms(int xms) { + this.xms = xms; + } + + public int getXmx() { + return xmx; + } + + public void setXmx(int xmx) { + this.xmx = xmx; + } + @Override public boolean checkParameters() { if (customConfig == Flag.NO.ordinal()) { @@ -204,19 +231,21 @@ public class DataxParameters extends AbstractParameters { @Override public String toString() { - return "DataxParameters{" + - "customConfig=" + customConfig + - ", json='" + json + '\'' + - ", dsType='" + dsType + '\'' + - ", dataSource=" + dataSource + - ", dtType='" + dtType + '\'' + - ", dataTarget=" + dataTarget + - ", sql='" + sql + '\'' + - ", targetTable='" + targetTable + '\'' + - ", preStatements=" + preStatements + - ", postStatements=" + postStatements + - ", jobSpeedByte=" + jobSpeedByte + - ", jobSpeedRecord=" + jobSpeedRecord + - '}'; + return "DataxParameters{" + + "customConfig=" + customConfig + + ", json='" + json + '\'' + + ", dsType='" + dsType + '\'' + + ", dataSource=" + dataSource + + ", dtType='" + dtType + '\'' + + ", dataTarget=" + dataTarget + + ", sql='" + sql + '\'' + + ", targetTable='" + targetTable + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + ", jobSpeedByte=" + jobSpeedByte + + ", jobSpeedRecord=" + jobSpeedRecord + + ", xms=" + xms + + ", xmx=" + xmx + + '}'; } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java new file mode 100644 index 000000000..d6e2f6988 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task; + +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; + +import org.junit.Assert; +import org.junit.Test; + +public class DataxParametersTest { + + /** + * jvm parameters + */ + public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" "; + + @Test + public void testLoadJvmEnv() { + + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setXms(0); + dataxParameters.setXmx(-100); + + String actual = loadJvmEnvTest(dataxParameters); + + String except = " --jvm=\"-Xms1G -Xmx1G\" "; + Assert.assertEquals(except,actual); + + dataxParameters.setXms(13); + dataxParameters.setXmx(14); + actual = loadJvmEnvTest(dataxParameters); + except = " --jvm=\"-Xms13G -Xmx14G\" "; + Assert.assertEquals(except,actual); + + } + + @Test + public void testToString() { + + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setCustomConfig(0); + dataxParameters.setXms(0); + dataxParameters.setXmx(-100); + dataxParameters.setDataSource(1); + dataxParameters.setDataTarget(1); + dataxParameters.setDsType("MYSQL"); + dataxParameters.setDtType("MYSQL"); + dataxParameters.setJobSpeedByte(1); + dataxParameters.setJobSpeedRecord(1); + dataxParameters.setJson("json"); + + String expected = "DataxParameters" + + "{" + + "customConfig=0, " + + "json='json', " + + "dsType='MYSQL', " + + "dataSource=1, " + + "dtType='MYSQL', " + + "dataTarget=1, " + + "sql='null', " + + "targetTable='null', " + + "preStatements=null, " + + "postStatements=null, " + + "jobSpeedByte=1, " + + "jobSpeedRecord=1, " + + "xms=0, " + + "xmx=-100" + + "}"; + + Assert.assertEquals(expected,dataxParameters.toString()); + } + + public String loadJvmEnvTest(DataxParameters dataXParameters) { + int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); + int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); + return String.format(JVM_EVN, xms, xmx); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 6d701a00a..caf487947 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -14,27 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.datax; +package org.apache.dolphinscheduler.server.worker.task.datax; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; -import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; -import com.alibaba.druid.sql.ast.statement.*; -import com.alibaba.druid.sql.parser.SQLStatementParser; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; @@ -46,7 +37,8 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; -import org.slf4j.Logger; + +import org.apache.commons.io.FileUtils; import java.io.File; import java.nio.charset.StandardCharsets; @@ -56,25 +48,48 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.slf4j.Logger; + +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * DataX task */ public class DataxTask extends AbstractTask { + /** + * jvm parameters + */ + public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" "; /** * python process(datax only supports version 2.7 by default) */ private static final String DATAX_PYTHON = "python2.7"; - /** * datax home path */ private static final String DATAX_HOME_EVN = "${DATAX_HOME}"; - /** * datax channel count */ @@ -97,6 +112,7 @@ public class DataxTask extends AbstractTask { /** * constructor + * * @param taskExecutionContext taskExecutionContext * @param logger logger */ @@ -104,9 +120,8 @@ public class DataxTask extends AbstractTask { super(taskExecutionContext, logger); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext,logger); + taskExecutionContext, logger); } /** @@ -149,9 +164,7 @@ public class DataxTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); - } - catch (Exception e) { - logger.error("datax task failure", e); + } catch (Exception e) { setExitStatusCode(Constants.EXIT_CODE_FAILURE); throw e; } @@ -189,9 +202,9 @@ public class DataxTask extends AbstractTask { return fileName; } - if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){ + if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()) { json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); - }else { + } else { ObjectNode job = JSONUtils.createObjectNode(); job.putArray("content").addAll(buildDataxJobContentJson()); job.set("setting", buildDataxJobSettingJson()); @@ -248,7 +261,6 @@ public class DataxTask extends AbstractTask { readerParam.put("password", dataSourceCfg.getPassword()); readerParam.putArray("connection").addAll(readerConnArr); - ObjectNode reader = JSONUtils.createObjectNode(); reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype()))); reader.set("parameter", readerParam); @@ -277,7 +289,6 @@ public class DataxTask extends AbstractTask { } writerParam.putArray("connection").addAll(writerConnArr); - if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) { ArrayNode preSqlArr = writerParam.putArray("preSql"); for (String preSql : dataXParameters.getPreStatements()) { @@ -368,7 +379,7 @@ public class DataxTask extends AbstractTask { * @throws Exception if error throws Exception */ private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) - throws Exception { + throws Exception { // generate scripts String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(), @@ -387,6 +398,7 @@ public class DataxTask extends AbstractTask { sbr.append(" "); sbr.append(DATAX_HOME_EVN); sbr.append(" "); + sbr.append(loadJvmEnv(dataXParameters)); sbr.append(jobConfigFilePath); // replace placeholder @@ -409,17 +421,19 @@ public class DataxTask extends AbstractTask { return fileName; } + public String loadJvmEnv(DataxParameters dataXParameters) { + int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); + int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); + return String.format(JVM_EVN, xms, xmx); + } + /** * parsing synchronized column names in SQL statements * - * @param dsType - * the database type of the data source - * @param dtType - * the database type of the data target - * @param dataSourceCfg - * the database connection parameters of the data source - * @param sql - * sql for data synchronization + * @param dsType the database type of the data source + * @param dtType the database type of the data target + * @param dataSourceCfg the database connection parameters of the data source + * @param sql sql for data synchronization * @return Keyword converted column names */ private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) { @@ -438,10 +452,8 @@ public class DataxTask extends AbstractTask { /** * try grammatical parsing column * - * @param dbType - * database type - * @param sql - * sql for data synchronization + * @param dbType database type + * @param sql sql for data synchronization * @return column name array * @throws RuntimeException if error throws RuntimeException */ @@ -453,16 +465,16 @@ public class DataxTask extends AbstractTask { notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); SQLStatement sqlStatement = parser.parseStatement(); - SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement; + SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement; SQLSelect sqlSelect = sqlSelectStatement.getSelect(); List selectItemList = null; if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) { - SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery(); selectItemList = block.getSelectList(); } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) { - SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery(); - SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight(); + SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight(); selectItemList = block.getSelectList(); } @@ -470,7 +482,7 @@ public class DataxTask extends AbstractTask { String.format("select query type [%s] is not support", sqlSelect.getQuery().toString())); columnNames = new String[selectItemList.size()]; - for (int i = 0; i < selectItemList.size(); i++ ) { + for (int i = 0; i < selectItemList.size(); i++) { SQLSelectItem item = selectItemList.get(i); String columnName = null; @@ -479,10 +491,10 @@ public class DataxTask extends AbstractTask { columnName = item.getAlias(); } else if (item.getExpr() != null) { if (item.getExpr() instanceof SQLPropertyExpr) { - SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr(); + SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr(); columnName = expr.getName(); } else if (item.getExpr() instanceof SQLIdentifierExpr) { - SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr(); + SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr(); columnName = expr.getName(); } } else { @@ -497,8 +509,7 @@ public class DataxTask extends AbstractTask { columnNames[i] = columnName; } - } - catch (Exception e) { + } catch (Exception e) { logger.warn(e.getMessage(), e); return null; } @@ -509,10 +520,8 @@ public class DataxTask extends AbstractTask { /** * try to execute sql to resolve column names * - * @param baseDataSource - * the database connection parameters - * @param sql - * sql for data synchronization + * @param baseDataSource the database connection parameters + * @param sql sql for data synchronization * @return column name array */ public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) { @@ -529,11 +538,10 @@ public class DataxTask extends AbstractTask { ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); columnNames = new String[num]; - for (int i = 1; i <= num; i++ ) { + for (int i = 1; i <= num; i++) { columnNames[i - 1] = md.getColumnName(i); } - } - catch (SQLException e) { + } catch (SQLException e) { logger.warn(e.getMessage(), e); return null; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java index e5177aa78..8d03c1460 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -14,19 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.datax; +package org.apache.dolphinscheduler.server.worker.task.datax; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; @@ -39,6 +33,13 @@ import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.UUID; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,7 +50,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * DataxTask Tester. @@ -58,7 +60,13 @@ public class DataxTaskTest { private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class); - private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"; + private static final String CONNECTION_PARAMS = " {\n" + + " \"user\":\"root\",\n" + + " \"password\":\"123456\",\n" + + " \"address\":\"jdbc:mysql://127.0.0.1:3306\",\n" + + " \"database\":\"test\",\n" + + " \"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"\n" + + "}"; private DataxTask dataxTask; @@ -69,7 +77,7 @@ public class DataxTaskTest { private ApplicationContext applicationContext; private TaskExecutionContext taskExecutionContext; - private TaskProps props = new TaskProps(); + private final TaskProps props = new TaskProps(); @Before public void before() @@ -97,12 +105,40 @@ public class DataxTaskTest { props.setTaskTimeout(0); if (customConfig == 1) { props.setTaskParams( - "{\"customConfig\":1, \"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}"); + "{\n" + + " \"customConfig\":1,\n" + + " \"localParams\":[\n" + + " {\n" + + " \"prop\":\"test\",\n" + + " \"value\":\"38294729\"\n" + + " }\n" + + " ],\n" + + " \"json\":\"" + + "{\"job\":{\"setting\":{\"speed\":{\"byte\":1048576},\"errorLimit\":{\"record\":0,\"percentage\":0.02}},\"content\":[" + + "{\"reader\":{\"name\":\"rdbmsreader\",\"parameter\":{\"username\":\"xxx\",\"password\":\"${test}\",\"column\":[\"id\",\"name\"],\"splitPk\":\"pk\",\"" + + "connection\":[{\"querySql\":[\"SELECT * from dual\"],\"jdbcUrl\":[\"jdbc:dm://ip:port/database\"]}],\"fetchSize\":1024,\"where\":\"1 = 1\"}},\"" + + "writer\":{\"name\":\"streamwriter\",\"parameter\":{\"print\":true}}}]}}\"\n" + + "}"); -// "{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}"); } else { props.setTaskParams( - "{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"dataSource\":1,\"dsType\":\"MYSQL\",\"dataTarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); + "{\n" + + " \"customConfig\":0,\n" + + " \"targetTable\":\"test\",\n" + + " \"postStatements\":[\n" + + " \"delete from test\"\n" + + " ],\n" + + " \"jobSpeedByte\":0,\n" + + " \"jobSpeedRecord\":1000,\n" + + " \"dtType\":\"MYSQL\",\n" + + " \"dataSource\":1,\n" + + " \"dsType\":\"MYSQL\",\n" + + " \"dataTarget\":2,\n" + + " \"sql\":\"select 1 as test from dual\",\n" + + " \"preStatements\":[\n" + + " \"delete from test\"\n" + + " ]\n" + + "}"); } taskExecutionContext = Mockito.mock(TaskExecutionContext.class); @@ -114,7 +150,6 @@ public class DataxTaskTest { Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); - DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); dataxTaskExecutionContext.setSourcetype(0); dataxTaskExecutionContext.setTargetType(0); @@ -126,7 +161,6 @@ public class DataxTaskTest { dataxTask.init(); props.setCmdTypeIfComplement(START_PROCESS); - Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); @@ -138,7 +172,6 @@ public class DataxTaskTest { e.printStackTrace(); } - dataxTask = PowerMockito.spy(new DataxTask(taskExecutionContext, logger)); dataxTask.init(); } @@ -405,4 +438,23 @@ public class DataxTaskTest { } } + @Test + public void testLoadJvmEnv() { + DataxTask dataxTask = new DataxTask(null,null); + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setXms(0); + dataxParameters.setXmx(-100); + + String actual = dataxTask.loadJvmEnv(dataxParameters); + + String except = " --jvm=\"-Xms1G -Xmx1G\" "; + Assert.assertEquals(except,actual); + + dataxParameters.setXms(13); + dataxParameters.setXmx(14); + actual = dataxTask.loadJvmEnv(dataxParameters); + except = " --jvm=\"-Xms13G -Xmx14G\" "; + Assert.assertEquals(except,actual); + + } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue index 6ced1ad0e..42bd2341f 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -144,6 +144,22 @@ +
+
+ {{$t('Running Memory')}} +
+
+ {{$t('Min Memory')}} + + +    G    + {{$t('Max Memory')}} + + +    G +
+
+