From 6070738acbb89e0a7ed49b487fc1225888e29019 Mon Sep 17 00:00:00 2001 From: dailidong Date: Thu, 5 Mar 2020 23:34:37 +0800 Subject: [PATCH] Support worker server to run bat script (#2023) * Support worker server to run bat script 1. Reimplement ProcessImpl.java, ProcessEnvironment.java and ProcessBuilder.java for Windows 2. Modify shell task code for windows 3. Add ASF License * Add Unit Test --- .../dolphinscheduler/common/Constants.java | 2 +- .../utils/process/ProcessBuilderForWin32.java | 1065 +++++++++++++++++ .../process/ProcessEnvironmentForWin32.java | 286 +++++ .../utils/process/ProcessImplForWin32.java | 752 ++++++++++++ .../common/ConstantsTest.java | 40 + .../common/utils/OSUtilsTest.java | 24 +- .../process/ProcessBuilderForWin32Test.java | 210 ++++ .../ProcessEnvironmentForWin32Test.java | 124 ++ .../process/ProcessImplForWin32Test.java | 70 ++ .../worker/task/AbstractCommandExecutor.java | 92 +- .../worker/task/ShellCommandExecutor.java | 39 +- .../server/worker/task/datax/DataxTask.java | 11 +- .../server/worker/task/shell/ShellTask.java | 9 +- .../server/utils/ProcessUtilsTest.java | 17 + .../worker/task/shell/ShellTaskTest.java | 198 +++ pom.xml | 4 + 16 files changed, 2887 insertions(+), 56 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 73125f492..6af0e6445 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -746,7 +746,7 @@ public final class Constants { * application regex */ public static final String APPLICATION_REGEX = "application_\\d+_\\d+"; - public static final String PID = "pid"; + public static final String PID = OSUtils.isWindows() ? "handle" : "pid"; /** * month_begin */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java new file mode 100644 index 000000000..4fb5f9461 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java @@ -0,0 +1,1065 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils.process; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * This class is used to create operating system processes. + * + *

Each {@code ProcessBuilderForWindows} instance manages a collection + * of process attributes. The {@link #start()} method creates a new + * {@link Process} instance with those attributes. The {@link + * #start()} method can be invoked repeatedly from the same instance + * to create new subprocesses with identical or related attributes. + * + *

Each process builder manages these process attributes: + * + *

+ * + *

Modifying a process builder's attributes will affect processes + * subsequently started by that object's {@link #start()} method, but + * will never affect previously started processes or the Java process + * itself. + * + *

Most error checking is performed by the {@link #start()} method. + * It is possible to modify the state of an object so that {@link + * #start()} will fail. For example, setting the command attribute to + * an empty list will not throw an exception unless {@link #start()} + * is invoked. + * + *

Note that this class is not synchronized. + * If multiple threads access a {@code ProcessBuilderForWindows} instance + * concurrently, and at least one of the threads modifies one of the + * attributes structurally, it must be synchronized externally. + * + *

Starting a new process which uses the default working directory + * and environment is easy: + * + *

 {@code
+ * Process p = new ProcessBuilderForWindows("myCommand", "myArg").start();
+ * }
+ * + *

Here is an example that starts a process with a modified working + * directory and environment, and redirects standard output and error + * to be appended to a log file: + * + *

 {@code
+ * ProcessBuilderForWindows pb =
+ *   new ProcessBuilderForWindows("myCommand", "myArg1", "myArg2");
+ * Map env = pb.environment();
+ * env.put("VAR1", "myValue");
+ * env.remove("OTHERVAR");
+ * env.put("VAR2", env.get("VAR1") + "suffix");
+ * pb.directory(new File("myDir"));
+ * File log = new File("log");
+ * pb.redirectErrorStream(true);
+ * pb.redirectOutput(Redirect.appendTo(log));
+ * Process p = pb.start();
+ * assert pb.redirectInput() == Redirect.PIPE;
+ * assert pb.redirectOutput().file() == log;
+ * assert p.getInputStream().read() == -1;
+ * }
+ * + *

To start a process with an explicit set of environment + * variables, first call {@link Map#clear() Map.clear()} + * before adding environment variables. + * + * @author Martin Buchholz + * @since 1.5 + */ + +public class ProcessBuilderForWin32 { + + private String username; + private String password; + private List command; + private File directory; + private Map environment; + private boolean redirectErrorStream; + private ProcessBuilderForWin32.Redirect[] redirects; + + /** + * Constructs a process builder with the specified operating + * system program and arguments. This constructor does not + * make a copy of the {@code command} list. Subsequent + * updates to the list will be reflected in the state of the + * process builder. It is not checked whether + * {@code command} corresponds to a valid operating system + * command. + * + * @param command the list containing the program and its arguments + * @throws NullPointerException if the argument is null + */ + public ProcessBuilderForWin32(List command) { + if (command == null) + throw new NullPointerException(); + this.command = command; + } + + /** + * Constructs a process builder with the specified operating + * system program and arguments. This is a convenience + * constructor that sets the process builder's command to a string + * list containing the same strings as the {@code command} + * array, in the same order. It is not checked whether + * {@code command} corresponds to a valid operating system + * command. + * + * @param command a string array containing the program and its arguments + */ + public ProcessBuilderForWin32(String... command) { + this.command = new ArrayList<>(command.length); + for (String arg : command) + this.command.add(arg); + } + + /** + * set username and password for process + * + * @param username username + * @param password password + * @return this process builder + */ + public ProcessBuilderForWin32 user(String username, String password) { + this.username = username; + this.password = password; + return this; + } + + /** + * Sets this process builder's operating system program and + * arguments. This method does not make a copy of the + * {@code command} list. Subsequent updates to the list will + * be reflected in the state of the process builder. It is not + * checked whether {@code command} corresponds to a valid + * operating system command. + * + * @param command the list containing the program and its arguments + * @return this process builder + * + * @throws NullPointerException if the argument is null + */ + public ProcessBuilderForWin32 command(List command) { + if (command == null) + throw new NullPointerException(); + this.command = command; + return this; + } + + /** + * Sets this process builder's operating system program and + * arguments. This is a convenience method that sets the command + * to a string list containing the same strings as the + * {@code command} array, in the same order. It is not + * checked whether {@code command} corresponds to a valid + * operating system command. + * + * @param command a string array containing the program and its arguments + * @return this process builder + */ + public ProcessBuilderForWin32 command(String... command) { + this.command = new ArrayList<>(command.length); + for (String arg : command) + this.command.add(arg); + return this; + } + + /** + * Returns this process builder's operating system program and + * arguments. The returned list is not a copy. Subsequent + * updates to the list will be reflected in the state of this + * process builder. + * + * @return this process builder's program and its arguments + */ + public List command() { + return command; + } + + /** + * Returns a string map view of this process builder's environment. + * + * Whenever a process builder is created, the environment is + * initialized to a copy of the current process environment (see + * {@link System#getenv()}). Subprocesses subsequently started by + * this object's {@link #start()} method will use this map as + * their environment. + * + *

The returned object may be modified using ordinary {@link + * Map Map} operations. These modifications will be + * visible to subprocesses started via the {@link #start()} + * method. Two {@code ProcessBuilderForWindows} instances always + * contain independent process environments, so changes to the + * returned map will never be reflected in any other + * {@code ProcessBuilderForWindows} instance or the values returned by + * {@link System#getenv System.getenv}. + * + *

If the system does not support environment variables, an + * empty map is returned. + * + *

The returned map does not permit null keys or values. + * Attempting to insert or query the presence of a null key or + * value will throw a {@link NullPointerException}. + * Attempting to query the presence of a key or value which is not + * of type {@link String} will throw a {@link ClassCastException}. + * + *

The behavior of the returned map is system-dependent. A + * system may not allow modifications to environment variables or + * may forbid certain variable names or values. For this reason, + * attempts to modify the map may fail with + * {@link UnsupportedOperationException} or + * {@link IllegalArgumentException} + * if the modification is not permitted by the operating system. + * + *

Since the external format of environment variable names and + * values is system-dependent, there may not be a one-to-one + * mapping between them and Java's Unicode strings. Nevertheless, + * the map is implemented in such a way that environment variables + * which are not modified by Java code will have an unmodified + * native representation in the subprocess. + * + *

The returned map and its collection views may not obey the + * general contract of the {@link Object#equals} and + * {@link Object#hashCode} methods. + * + *

The returned map is typically case-sensitive on all platforms. + * + *

If a security manager exists, its + * {@link SecurityManager#checkPermission checkPermission} method + * is called with a + * {@link RuntimePermission}{@code ("getenv.*")} permission. + * This may result in a {@link SecurityException} being thrown. + * + *

When passing information to a Java subprocess, + * system properties + * are generally preferred over environment variables. + * + * @return this process builder's environment + * + * @throws SecurityException + * if a security manager exists and its + * {@link SecurityManager#checkPermission checkPermission} + * method doesn't allow access to the process environment + * + * @see Runtime#exec(String[],String[], File) + * @see System#getenv() + */ + public Map environment() { + SecurityManager security = System.getSecurityManager(); + if (security != null) + security.checkPermission(new RuntimePermission("getenv.*")); + + if (environment == null) + environment = ProcessEnvironmentForWin32.environment(); + + assert environment != null; + + return environment; + } + + // Only for use by Runtime.exec(...envp...) + ProcessBuilderForWin32 environment(String[] envp) { + assert environment == null; + if (envp != null) { + environment = ProcessEnvironmentForWin32.emptyEnvironment(envp.length); + assert environment != null; + + for (String envstring : envp) { + // Before 1.5, we blindly passed invalid envstrings + // to the child process. + // We would like to throw an exception, but do not, + // for compatibility with old broken code. + + // Silently discard any trailing junk. + if (envstring.indexOf((int) '\u0000') != -1) + envstring = envstring.replaceFirst("\u0000.*", ""); + + int eqlsign = + envstring.indexOf('=', ProcessEnvironmentForWin32.MIN_NAME_LENGTH); + // Silently ignore envstrings lacking the required `='. + if (eqlsign != -1) + environment.put(envstring.substring(0,eqlsign), + envstring.substring(eqlsign+1)); + } + } + return this; + } + + /** + * Returns this process builder's working directory. + * + * Subprocesses subsequently started by this object's {@link + * #start()} method will use this as their working directory. + * The returned value may be {@code null} -- this means to use + * the working directory of the current Java process, usually the + * directory named by the system property {@code user.dir}, + * as the working directory of the child process. + * + * @return this process builder's working directory + */ + public File directory() { + return directory; + } + + /** + * Sets this process builder's working directory. + * + * Subprocesses subsequently started by this object's {@link + * #start()} method will use this as their working directory. + * The argument may be {@code null} -- this means to use the + * working directory of the current Java process, usually the + * directory named by the system property {@code user.dir}, + * as the working directory of the child process. + * + * @param directory the new working directory + * @return this process builder + */ + public ProcessBuilderForWin32 directory(File directory) { + this.directory = directory; + return this; + } + + // ---------------- I/O Redirection ---------------- + + /** + * Implements a null input stream. + */ + static class NullInputStream extends InputStream { + static final ProcessBuilderForWin32.NullInputStream INSTANCE = new ProcessBuilderForWin32.NullInputStream(); + private NullInputStream() {} + public int read() { return -1; } + public int available() { return 0; } + } + + /** + * Implements a null output stream. + */ + static class NullOutputStream extends OutputStream { + static final ProcessBuilderForWin32.NullOutputStream INSTANCE = new ProcessBuilderForWin32.NullOutputStream(); + private NullOutputStream() {} + public void write(int b) throws IOException { + throw new IOException("Stream closed"); + } + } + + /** + * Represents a source of subprocess input or a destination of + * subprocess output. + * + * Each {@code Redirect} instance is one of the following: + * + *

+ * + *

Each of the above categories has an associated unique + * {@link ProcessBuilderForWin32.Redirect.Type Type}. + * + * @since 1.7 + */ + public static abstract class Redirect { + /** + * The type of a {@link ProcessBuilderForWin32.Redirect}. + */ + public enum Type { + /** + * The type of {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}. + */ + PIPE, + + /** + * The type of {@link ProcessBuilderForWin32.Redirect#INHERIT Redirect.INHERIT}. + */ + INHERIT, + + /** + * The type of redirects returned from + * {@link ProcessBuilderForWin32.Redirect#from Redirect.from(File)}. + */ + READ, + + /** + * The type of redirects returned from + * {@link ProcessBuilderForWin32.Redirect#to Redirect.to(File)}. + */ + WRITE, + + /** + * The type of redirects returned from + * {@link ProcessBuilderForWin32.Redirect#appendTo Redirect.appendTo(File)}. + */ + APPEND + }; + + /** + * Returns the type of this {@code Redirect}. + * @return the type of this {@code Redirect} + */ + public abstract ProcessBuilderForWin32.Redirect.Type type(); + + /** + * Indicates that subprocess I/O will be connected to the + * current Java process over a pipe. + * + * This is the default handling of subprocess standard I/O. + * + *

It will always be true that + *

 {@code
+         * Redirect.PIPE.file() == null &&
+         * Redirect.PIPE.type() == Redirect.Type.PIPE
+         * }
+ */ + public static final ProcessBuilderForWin32.Redirect PIPE = new ProcessBuilderForWin32.Redirect() { + public Type type() { return Type.PIPE; } + public String toString() { return type().toString(); }}; + + /** + * Indicates that subprocess I/O source or destination will be the + * same as those of the current process. This is the normal + * behavior of most operating system command interpreters (shells). + * + *

It will always be true that + *

 {@code
+         * Redirect.INHERIT.file() == null &&
+         * Redirect.INHERIT.type() == Redirect.Type.INHERIT
+         * }
+ */ + public static final ProcessBuilderForWin32.Redirect INHERIT = new ProcessBuilderForWin32.Redirect() { + public Type type() { return Type.INHERIT; } + public String toString() { return type().toString(); }}; + + /** + * Returns the {@link File} source or destination associated + * with this redirect, or {@code null} if there is no such file. + * + * @return the file associated with this redirect, + * or {@code null} if there is no such file + */ + public File file() { return null; } + + /** + * When redirected to a destination file, indicates if the output + * is to be written to the end of the file. + */ + boolean append() { + throw new UnsupportedOperationException(); + } + + /** + * Returns a redirect to read from the specified file. + * + *

It will always be true that + *

 {@code
+         * Redirect.from(file).file() == file &&
+         * Redirect.from(file).type() == Redirect.Type.READ
+         * }
+ * + * @param file The {@code File} for the {@code Redirect}. + * @throws NullPointerException if the specified file is null + * @return a redirect to read from the specified file + */ + public static ProcessBuilderForWin32.Redirect from(final File file) { + if (file == null) + throw new NullPointerException(); + return new ProcessBuilderForWin32.Redirect() { + public Type type() { return Type.READ; } + public File file() { return file; } + public String toString() { + return "redirect to read from file \"" + file + "\""; + } + }; + } + + /** + * Returns a redirect to write to the specified file. + * If the specified file exists when the subprocess is started, + * its previous contents will be discarded. + * + *

It will always be true that + *

 {@code
+         * Redirect.to(file).file() == file &&
+         * Redirect.to(file).type() == Redirect.Type.WRITE
+         * }
+ * + * @param file The {@code File} for the {@code Redirect}. + * @throws NullPointerException if the specified file is null + * @return a redirect to write to the specified file + */ + public static ProcessBuilderForWin32.Redirect to(final File file) { + if (file == null) + throw new NullPointerException(); + return new ProcessBuilderForWin32.Redirect() { + public Type type() { return Type.WRITE; } + public File file() { return file; } + public String toString() { + return "redirect to write to file \"" + file + "\""; + } + boolean append() { return false; } + }; + } + + /** + * Returns a redirect to append to the specified file. + * Each write operation first advances the position to the + * end of the file and then writes the requested data. + * Whether the advancement of the position and the writing + * of the data are done in a single atomic operation is + * system-dependent and therefore unspecified. + * + *

It will always be true that + *

 {@code
+         * Redirect.appendTo(file).file() == file &&
+         * Redirect.appendTo(file).type() == Redirect.Type.APPEND
+         * }
+ * + * @param file The {@code File} for the {@code Redirect}. + * @throws NullPointerException if the specified file is null + * @return a redirect to append to the specified file + */ + public static ProcessBuilderForWin32.Redirect appendTo(final File file) { + if (file == null) + throw new NullPointerException(); + return new ProcessBuilderForWin32.Redirect() { + public Type type() { return Type.APPEND; } + public File file() { return file; } + public String toString() { + return "redirect to append to file \"" + file + "\""; + } + boolean append() { return true; } + }; + } + + /** + * Compares the specified object with this {@code Redirect} for + * equality. Returns {@code true} if and only if the two + * objects are identical or both objects are {@code Redirect} + * instances of the same type associated with non-null equal + * {@code File} instances. + */ + public boolean equals(Object obj) { + if (obj == this) + return true; + if (! (obj instanceof ProcessBuilderForWin32.Redirect)) + return false; + ProcessBuilderForWin32.Redirect r = (ProcessBuilderForWin32.Redirect) obj; + if (r.type() != this.type()) + return false; + assert this.file() != null; + return this.file().equals(r.file()); + } + + /** + * Returns a hash code value for this {@code Redirect}. + * @return a hash code value for this {@code Redirect} + */ + public int hashCode() { + File file = file(); + if (file == null) + return super.hashCode(); + else + return file.hashCode(); + } + + /** + * No public constructors. Clients must use predefined + * static {@code Redirect} instances or factory methods. + */ + private Redirect() {} + } + + private ProcessBuilderForWin32.Redirect[] redirects() { + if (redirects == null) + redirects = new ProcessBuilderForWin32.Redirect[] { + ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE + }; + return redirects; + } + + /** + * Sets this process builder's standard input source. + * + * Subprocesses subsequently started by this object's {@link #start()} + * method obtain their standard input from this source. + * + *

If the source is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE} + * (the initial value), then the standard input of a + * subprocess can be written to using the output stream + * returned by {@link Process#getOutputStream()}. + * If the source is set to any other value, then + * {@link Process#getOutputStream()} will return a + * null output stream. + * + * @param source the new standard input source + * @return this process builder + * @throws IllegalArgumentException + * if the redirect does not correspond to a valid source + * of data, that is, has type + * {@link ProcessBuilderForWin32.Redirect.Type#WRITE WRITE} or + * {@link ProcessBuilderForWin32.Redirect.Type#APPEND APPEND} + * @since 1.7 + */ + public ProcessBuilderForWin32 redirectInput(ProcessBuilderForWin32.Redirect source) { + if (source.type() == ProcessBuilderForWin32.Redirect.Type.WRITE || + source.type() == ProcessBuilderForWin32.Redirect.Type.APPEND) + throw new IllegalArgumentException( + "Redirect invalid for reading: " + source); + redirects()[0] = source; + return this; + } + + /** + * Sets this process builder's standard output destination. + * + * Subprocesses subsequently started by this object's {@link #start()} + * method send their standard output to this destination. + * + *

If the destination is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE} + * (the initial value), then the standard output of a subprocess + * can be read using the input stream returned by {@link + * Process#getInputStream()}. + * If the destination is set to any other value, then + * {@link Process#getInputStream()} will return a + * null input stream. + * + * @param destination the new standard output destination + * @return this process builder + * @throws IllegalArgumentException + * if the redirect does not correspond to a valid + * destination of data, that is, has type + * {@link ProcessBuilderForWin32.Redirect.Type#READ READ} + * @since 1.7 + */ + public ProcessBuilderForWin32 redirectOutput(ProcessBuilderForWin32.Redirect destination) { + if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ) + throw new IllegalArgumentException( + "Redirect invalid for writing: " + destination); + redirects()[1] = destination; + return this; + } + + /** + * Sets this process builder's standard error destination. + * + * Subprocesses subsequently started by this object's {@link #start()} + * method send their standard error to this destination. + * + *

If the destination is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE} + * (the initial value), then the error output of a subprocess + * can be read using the input stream returned by {@link + * Process#getErrorStream()}. + * If the destination is set to any other value, then + * {@link Process#getErrorStream()} will return a + * null input stream. + * + *

If the {@link #redirectErrorStream redirectErrorStream} + * attribute has been set {@code true}, then the redirection set + * by this method has no effect. + * + * @param destination the new standard error destination + * @return this process builder + * @throws IllegalArgumentException + * if the redirect does not correspond to a valid + * destination of data, that is, has type + * {@link ProcessBuilderForWin32.Redirect.Type#READ READ} + * @since 1.7 + */ + public ProcessBuilderForWin32 redirectError(ProcessBuilderForWin32.Redirect destination) { + if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ) + throw new IllegalArgumentException( + "Redirect invalid for writing: " + destination); + redirects()[2] = destination; + return this; + } + + /** + * Sets this process builder's standard input source to a file. + * + *

This is a convenience method. An invocation of the form + * {@code redirectInput(file)} + * behaves in exactly the same way as the invocation + * {@link #redirectInput(ProcessBuilderForWin32.Redirect) redirectInput} + * {@code (Redirect.from(file))}. + * + * @param file the new standard input source + * @return this process builder + * @since 1.7 + */ + public ProcessBuilderForWin32 redirectInput(File file) { + return redirectInput(ProcessBuilderForWin32.Redirect.from(file)); + } + + /** + * Sets this process builder's standard output destination to a file. + * + *

This is a convenience method. An invocation of the form + * {@code redirectOutput(file)} + * behaves in exactly the same way as the invocation + * {@link #redirectOutput(ProcessBuilderForWin32.Redirect) redirectOutput} + * {@code (Redirect.to(file))}. + * + * @param file the new standard output destination + * @return this process builder + * @since 1.7 + */ + public ProcessBuilderForWin32 redirectOutput(File file) { + return redirectOutput(ProcessBuilderForWin32.Redirect.to(file)); + } + + /** + * Sets this process builder's standard error destination to a file. + * + *

This is a convenience method. An invocation of the form + * {@code redirectError(file)} + * behaves in exactly the same way as the invocation + * {@link #redirectError(ProcessBuilderForWin32.Redirect) redirectError} + * {@code (Redirect.to(file))}. + * + * @param file the new standard error destination + * @return this process builder + * @since 1.7 + */ + public ProcessBuilderForWin32 redirectError(File file) { + return redirectError(ProcessBuilderForWin32.Redirect.to(file)); + } + + /** + * Returns this process builder's standard input source. + * + * Subprocesses subsequently started by this object's {@link #start()} + * method obtain their standard input from this source. + * The initial value is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}. + * + * @return this process builder's standard input source + * @since 1.7 + */ + public ProcessBuilderForWin32.Redirect redirectInput() { + return (redirects == null) ? ProcessBuilderForWin32.Redirect.PIPE : redirects[0]; + } + + /** + * Returns this process builder's standard output destination. + * + * Subprocesses subsequently started by this object's {@link #start()} + * method redirect their standard output to this destination. + * The initial value is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}. + * + * @return this process builder's standard output destination + * @since 1.7 + */ + public ProcessBuilderForWin32.Redirect redirectOutput() { + return (redirects == null) ? ProcessBuilderForWin32.Redirect.PIPE : redirects[1]; + } + + /** + * Returns this process builder's standard error destination. + * + * Subprocesses subsequently started by this object's {@link #start()} + * method redirect their standard error to this destination. + * The initial value is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}. + * + * @return this process builder's standard error destination + * @since 1.7 + */ + public ProcessBuilderForWin32.Redirect redirectError() { + return (redirects == null) ? ProcessBuilderForWin32.Redirect.PIPE : redirects[2]; + } + + /** + * Sets the source and destination for subprocess standard I/O + * to be the same as those of the current Java process. + * + *

This is a convenience method. An invocation of the form + *

 {@code
+     * pb.inheritIO()
+     * }
+ * behaves in exactly the same way as the invocation + *
 {@code
+     * pb.redirectInput(Redirect.INHERIT)
+     *   .redirectOutput(Redirect.INHERIT)
+     *   .redirectError(Redirect.INHERIT)
+     * }
+ * + * This gives behavior equivalent to most operating system + * command interpreters, or the standard C library function + * {@code system()}. + * + * @return this process builder + * @since 1.7 + */ + public ProcessBuilderForWin32 inheritIO() { + Arrays.fill(redirects(), ProcessBuilderForWin32.Redirect.INHERIT); + return this; + } + + /** + * Tells whether this process builder merges standard error and + * standard output. + * + *

If this property is {@code true}, then any error output + * generated by subprocesses subsequently started by this object's + * {@link #start()} method will be merged with the standard + * output, so that both can be read using the + * {@link Process#getInputStream()} method. This makes it easier + * to correlate error messages with the corresponding output. + * The initial value is {@code false}. + * + * @return this process builder's {@code redirectErrorStream} property + */ + public boolean redirectErrorStream() { + return redirectErrorStream; + } + + /** + * Sets this process builder's {@code redirectErrorStream} property. + * + *

If this property is {@code true}, then any error output + * generated by subprocesses subsequently started by this object's + * {@link #start()} method will be merged with the standard + * output, so that both can be read using the + * {@link Process#getInputStream()} method. This makes it easier + * to correlate error messages with the corresponding output. + * The initial value is {@code false}. + * + * @param redirectErrorStream the new property value + * @return this process builder + */ + public ProcessBuilderForWin32 redirectErrorStream(boolean redirectErrorStream) { + this.redirectErrorStream = redirectErrorStream; + return this; + } + + /** + * Starts a new process using the attributes of this process builder. + * + *

The new process will + * invoke the command and arguments given by {@link #command()}, + * in a working directory as given by {@link #directory()}, + * with a process environment as given by {@link #environment()}. + * + *

This method checks that the command is a valid operating + * system command. Which commands are valid is system-dependent, + * but at the very least the command must be a non-empty list of + * non-null strings. + * + *

A minimal set of system dependent environment variables may + * be required to start a process on some operating systems. + * As a result, the subprocess may inherit additional environment variable + * settings beyond those in the process builder's {@link #environment()}. + * + *

If there is a security manager, its + * {@link SecurityManager#checkExec checkExec} + * method is called with the first component of this object's + * {@code command} array as its argument. This may result in + * a {@link SecurityException} being thrown. + * + *

Starting an operating system process is highly system-dependent. + * Among the many things that can go wrong are: + *

+ * + *

In such cases an exception will be thrown. The exact nature + * of the exception is system-dependent, but it will always be a + * subclass of {@link IOException}. + * + *

Subsequent modifications to this process builder will not + * affect the returned {@link Process}. + * + * @return a new {@link Process} object for managing the subprocess + * + * @throws NullPointerException + * if an element of the command list is null + * + * @throws IndexOutOfBoundsException + * if the command is an empty list (has size {@code 0}) + * + * @throws SecurityException + * if a security manager exists and + *

+ * + * @throws IOException if an I/O error occurs + * + * @see Runtime#exec(String[], String[], File) + */ + public Process start() throws IOException { + // Must convert to array first -- a malicious user-supplied + // list might try to circumvent the security check. + String[] cmdarray = command.toArray(new String[command.size()]); + cmdarray = cmdarray.clone(); + + for (String arg : cmdarray) + if (arg == null) + throw new NullPointerException(); + // Throws IndexOutOfBoundsException if command is empty + String prog = cmdarray[0]; + + SecurityManager security = System.getSecurityManager(); + if (security != null) + security.checkExec(prog); + + String dir = directory == null ? null : directory.toString(); + + for (int i = 1; i < cmdarray.length; i++) { + if (cmdarray[i].indexOf('\u0000') >= 0) { + throw new IOException("invalid null character in command"); + } + } + + try { + return ProcessImplForWin32.start( + username, + password, + cmdarray, + environment, + dir, + redirects, + redirectErrorStream); + } catch (IOException | IllegalArgumentException e) { + String exceptionInfo = ": " + e.getMessage(); + Throwable cause = e; + if ((e instanceof IOException) && security != null) { + // Can not disclose the fail reason for read-protected files. + try { + security.checkRead(prog); + } catch (SecurityException se) { + exceptionInfo = ""; + cause = se; + } + } + // It's much easier for us to create a high-quality error + // message than the low-level C code which found the problem. + throw new IOException( + "Cannot run program \"" + prog + "\"" + + (dir == null ? "" : " (in directory \"" + dir + "\")") + + exceptionInfo, + cause); + } + } + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java new file mode 100644 index 000000000..3dbe7cb50 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils.process; + +import com.sun.jna.platform.win32.Kernel32Util; + +import java.util.*; + +final class ProcessEnvironmentForWin32 extends HashMap { + + private static final long serialVersionUID = -8017839552603542824L; + + private static String validateName(String name) { + // An initial `=' indicates a magic Windows variable name -- OK + if (name.indexOf('=', 1) != -1 || + name.indexOf('\u0000') != -1) + throw new IllegalArgumentException + ("Invalid environment variable name: \"" + name + "\""); + return name; + } + + private static String validateValue(String value) { + if (value.indexOf('\u0000') != -1) + throw new IllegalArgumentException + ("Invalid environment variable value: \"" + value + "\""); + return value; + } + + private static String nonNullString(Object o) { + if (o == null) + throw new NullPointerException(); + return (String) o; + } + + public String put(String key, String value) { + return super.put(validateName(key), validateValue(value)); + } + + public String get(Object key) { + return super.get(nonNullString(key)); + } + + public boolean containsKey(Object key) { + return super.containsKey(nonNullString(key)); + } + + public boolean containsValue(Object value) { + return super.containsValue(nonNullString(value)); + } + + public String remove(Object key) { + return super.remove(nonNullString(key)); + } + + private static class CheckedEntry implements Entry { + private final Entry e; + public CheckedEntry(Entry e) {this.e = e;} + public String getKey() { return e.getKey();} + public String getValue() { return e.getValue();} + public String setValue(String value) { + return e.setValue(validateValue(value)); + } + public String toString() { return getKey() + "=" + getValue();} + public boolean equals(Object o) {return e.equals(o);} + public int hashCode() {return e.hashCode();} + } + + private static class CheckedEntrySet extends AbstractSet> { + private final Set> s; + public CheckedEntrySet(Set> s) {this.s = s;} + public int size() {return s.size();} + public boolean isEmpty() {return s.isEmpty();} + public void clear() { s.clear();} + public Iterator> iterator() { + return new Iterator>() { + Iterator> i = s.iterator(); + public boolean hasNext() { return i.hasNext();} + public Entry next() { + return new CheckedEntry(i.next()); + } + public void remove() { i.remove();} + }; + } + private static Entry checkedEntry(Object o) { + @SuppressWarnings("unchecked") + Entry e = (Entry) o; + nonNullString(e.getKey()); + nonNullString(e.getValue()); + return e; + } + public boolean contains(Object o) {return s.contains(checkedEntry(o));} + public boolean remove(Object o) {return s.remove(checkedEntry(o));} + } + + private static class CheckedValues extends AbstractCollection { + private final Collection c; + public CheckedValues(Collection c) {this.c = c;} + public int size() {return c.size();} + public boolean isEmpty() {return c.isEmpty();} + public void clear() { c.clear();} + public Iterator iterator() {return c.iterator();} + public boolean contains(Object o) {return c.contains(nonNullString(o));} + public boolean remove(Object o) {return c.remove(nonNullString(o));} + } + + private static class CheckedKeySet extends AbstractSet { + private final Set s; + public CheckedKeySet(Set s) {this.s = s;} + public int size() {return s.size();} + public boolean isEmpty() {return s.isEmpty();} + public void clear() { s.clear();} + public Iterator iterator() {return s.iterator();} + public boolean contains(Object o) {return s.contains(nonNullString(o));} + public boolean remove(Object o) {return s.remove(nonNullString(o));} + } + + public Set keySet() { + return new CheckedKeySet(super.keySet()); + } + + public Collection values() { + return new CheckedValues(super.values()); + } + + public Set> entrySet() { + return new CheckedEntrySet(super.entrySet()); + } + + private static final class NameComparator implements Comparator { + public int compare(String s1, String s2) { + // We can't use String.compareToIgnoreCase since it + // canonicalizes to lower case, while Windows + // canonicalizes to upper case! For example, "_" should + // sort *after* "Z", not before. + int n1 = s1.length(); + int n2 = s2.length(); + int min = Math.min(n1, n2); + for (int i = 0; i < min; i++) { + char c1 = s1.charAt(i); + char c2 = s2.charAt(i); + if (c1 != c2) { + c1 = Character.toUpperCase(c1); + c2 = Character.toUpperCase(c2); + if (c1 != c2) + // No overflow because of numeric promotion + return c1 - c2; + } + } + return n1 - n2; + } + } + + private static final class EntryComparator implements Comparator> { + public int compare(Entry e1, + Entry e2) { + return nameComparator.compare(e1.getKey(), e2.getKey()); + } + } + + // Allow `=' as first char in name, e.g. =C:=C:\DIR + static final int MIN_NAME_LENGTH = 1; + + private static final NameComparator nameComparator; + private static final EntryComparator entryComparator; + private static final ProcessEnvironmentForWin32 theEnvironment; + private static final Map theUnmodifiableEnvironment; + private static final Map theCaseInsensitiveEnvironment; + + static { + nameComparator = new NameComparator(); + entryComparator = new EntryComparator(); + theEnvironment = new ProcessEnvironmentForWin32(); + theUnmodifiableEnvironment = Collections.unmodifiableMap(theEnvironment); + + theEnvironment.putAll(environmentBlock()); + + theCaseInsensitiveEnvironment = new TreeMap<>(nameComparator); + theCaseInsensitiveEnvironment.putAll(theEnvironment); + } + + private ProcessEnvironmentForWin32() { + super(); + } + + private ProcessEnvironmentForWin32(int capacity) { + super(capacity); + } + + // Only for use by System.getenv(String) + static String getenv(String name) { + // The original implementation used a native call to _wgetenv, + // but it turns out that _wgetenv is only consistent with + // GetEnvironmentStringsW (for non-ASCII) if `wmain' is used + // instead of `main', even in a process created using + // CREATE_UNICODE_ENVIRONMENT. Instead we perform the + // case-insensitive comparison ourselves. At least this + // guarantees that System.getenv().get(String) will be + // consistent with System.getenv(String). + return theCaseInsensitiveEnvironment.get(name); + } + + // Only for use by System.getenv() + static Map getenv() { + return theUnmodifiableEnvironment; + } + + // Only for use by ProcessBuilder.environment() + @SuppressWarnings("unchecked") + static Map environment() { + return (Map) theEnvironment.clone(); + } + + // Only for use by ProcessBuilder.environment(String[] envp) + static Map emptyEnvironment(int capacity) { + return new ProcessEnvironmentForWin32(capacity); + } + + private static Map environmentBlock() { + return Kernel32Util.getEnvironmentVariables(); + } + + // Only for use by ProcessImpl.start() + String toEnvironmentBlock() { + // Sort Unicode-case-insensitively by name + List> list = new ArrayList<>(entrySet()); + Collections.sort(list, entryComparator); + + StringBuilder sb = new StringBuilder(size()*30); + int cmp = -1; + + // Some versions of MSVCRT.DLL require SystemRoot to be set. + // So, we make sure that it is always set, even if not provided + // by the caller. + final String SYSTEMROOT = "SystemRoot"; + + for (Entry e : list) { + String key = e.getKey(); + String value = e.getValue(); + if (cmp < 0 && (cmp = nameComparator.compare(key, SYSTEMROOT)) > 0) { + // Not set, so add it here + addToEnvIfSet(sb, SYSTEMROOT); + } + addToEnv(sb, key, value); + } + if (cmp < 0) { + // Got to end of list and still not found + addToEnvIfSet(sb, SYSTEMROOT); + } + if (sb.length() == 0) { + // Environment was empty and SystemRoot not set in parent + sb.append('\u0000'); + } + // Block is double NUL terminated + sb.append('\u0000'); + return sb.toString(); + } + + // add the environment variable to the child, if it exists in parent + private static void addToEnvIfSet(StringBuilder sb, String name) { + String s = getenv(name); + if (s != null) + addToEnv(sb, name, s); + } + + private static void addToEnv(StringBuilder sb, String name, String val) { + sb.append(name).append('=').append(val).append('\u0000'); + } + + static String toEnvironmentBlock(Map map) { + return map == null ? null : ((ProcessEnvironmentForWin32)map).toEnvironmentBlock(); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java new file mode 100644 index 000000000..4583be8af --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java @@ -0,0 +1,752 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils.process; + +import com.sun.jna.Pointer; +import com.sun.jna.platform.win32.*; +import com.sun.jna.ptr.IntByReference; +import sun.security.action.GetPropertyAction; + +import java.io.*; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.sun.jna.platform.win32.WinBase.STILL_ACTIVE; + +public class ProcessImplForWin32 extends Process { + private static final sun.misc.JavaIOFileDescriptorAccess fdAccess + = sun.misc.SharedSecrets.getJavaIOFileDescriptorAccess(); + + private static final int PIPE_SIZE = 4096 + 24; + + private static final int HANDLE_STORAGE_SIZE = 6; + + private static final int OFFSET_READ = 0; + + private static final int OFFSET_WRITE = 1; + + private static final WinNT.HANDLE JAVA_INVALID_HANDLE_VALUE = new WinNT.HANDLE(Pointer.createConstant(-1)); + + /** + * Open a file for writing. If {@code append} is {@code true} then the file + * is opened for atomic append directly and a FileOutputStream constructed + * with the resulting handle. This is because a FileOutputStream created + * to append to a file does not open the file in a manner that guarantees + * that writes by the child process will be atomic. + */ + private static FileOutputStream newFileOutputStream(File f, boolean append) + throws IOException + { + if (append) { + String path = f.getPath(); + SecurityManager sm = System.getSecurityManager(); + if (sm != null) + sm.checkWrite(path); + long handle = openForAtomicAppend(path); + final FileDescriptor fd = new FileDescriptor(); + fdAccess.setHandle(fd, handle); + return AccessController.doPrivileged( + new PrivilegedAction() { + public FileOutputStream run() { + return new FileOutputStream(fd); + } + } + ); + } else { + return new FileOutputStream(f); + } + } + + // System-dependent portion of ProcessBuilderForWindows.start() + static Process start(String username, + String password, + String cmdarray[], + java.util.Map environment, + String dir, + ProcessBuilderForWin32.Redirect[] redirects, + boolean redirectErrorStream) + throws IOException + { + String envblock = ProcessEnvironmentForWin32.toEnvironmentBlock(environment); + + FileInputStream f0 = null; + FileOutputStream f1 = null; + FileOutputStream f2 = null; + + try { + long[] stdHandles; + if (redirects == null) { + stdHandles = new long[] { -1L, -1L, -1L }; + } else { + stdHandles = new long[3]; + + if (redirects[0] == ProcessBuilderForWin32.Redirect.PIPE) + stdHandles[0] = -1L; + else if (redirects[0] == ProcessBuilderForWin32.Redirect.INHERIT) + stdHandles[0] = fdAccess.getHandle(FileDescriptor.in); + else { + f0 = new FileInputStream(redirects[0].file()); + stdHandles[0] = fdAccess.getHandle(f0.getFD()); + } + + if (redirects[1] == ProcessBuilderForWin32.Redirect.PIPE) + stdHandles[1] = -1L; + else if (redirects[1] == ProcessBuilderForWin32.Redirect.INHERIT) + stdHandles[1] = fdAccess.getHandle(FileDescriptor.out); + else { + f1 = newFileOutputStream(redirects[1].file(), + redirects[1].append()); + stdHandles[1] = fdAccess.getHandle(f1.getFD()); + } + + if (redirects[2] == ProcessBuilderForWin32.Redirect.PIPE) + stdHandles[2] = -1L; + else if (redirects[2] == ProcessBuilderForWin32.Redirect.INHERIT) + stdHandles[2] = fdAccess.getHandle(FileDescriptor.err); + else { + f2 = newFileOutputStream(redirects[2].file(), + redirects[2].append()); + stdHandles[2] = fdAccess.getHandle(f2.getFD()); + } + } + + return new ProcessImplForWin32(username, password, cmdarray, envblock, dir, stdHandles, redirectErrorStream); + } finally { + // In theory, close() can throw IOException + // (although it is rather unlikely to happen here) + try { if (f0 != null) f0.close(); } + finally { + try { if (f1 != null) f1.close(); } + finally { if (f2 != null) f2.close(); } + } + } + + } + + private static class LazyPattern { + // Escape-support version: + // "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)"; + private static final Pattern PATTERN = + Pattern.compile("[^\\s\"]+|\"[^\"]*\""); + }; + + /* Parses the command string parameter into the executable name and + * program arguments. + * + * The command string is broken into tokens. The token separator is a space + * or quota character. The space inside quotation is not a token separator. + * There are no escape sequences. + */ + private static String[] getTokensFromCommand(String command) { + ArrayList matchList = new ArrayList<>(8); + Matcher regexMatcher = ProcessImplForWin32.LazyPattern.PATTERN.matcher(command); + while (regexMatcher.find()) + matchList.add(regexMatcher.group()); + return matchList.toArray(new String[matchList.size()]); + } + + private static final int VERIFICATION_CMD_BAT = 0; + private static final int VERIFICATION_WIN32 = 1; + private static final int VERIFICATION_WIN32_SAFE = 2; // inside quotes not allowed + private static final int VERIFICATION_LEGACY = 3; + // See Command shell overview for documentation of special characters. + // https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb490954(v=technet.10) + private static final char ESCAPE_VERIFICATION[][] = { + // We guarantee the only command file execution for implicit [cmd.exe] run. + // http://technet.microsoft.com/en-us/library/bb490954.aspx + {' ', '\t', '<', '>', '&', '|', '^'}, + {' ', '\t', '<', '>'}, + {' ', '\t', '<', '>'}, + {' ', '\t'} + }; + + private static String createCommandLine(int verificationType, + final String executablePath, + final String cmd[]) + { + StringBuilder cmdbuf = new StringBuilder(80); + + cmdbuf.append(executablePath); + + for (int i = 1; i < cmd.length; ++i) { + cmdbuf.append(' '); + String s = cmd[i]; + if (needsEscaping(verificationType, s)) { + cmdbuf.append('"'); + + if (verificationType == VERIFICATION_WIN32_SAFE) { + // Insert the argument, adding '\' to quote any interior quotes + int length = s.length(); + for (int j = 0; j < length; j++) { + char c = s.charAt(j); + if (c == DOUBLEQUOTE) { + int count = countLeadingBackslash(verificationType, s, j); + while (count-- > 0) { + cmdbuf.append(BACKSLASH); // double the number of backslashes + } + cmdbuf.append(BACKSLASH); // backslash to quote the quote + } + cmdbuf.append(c); + } + } else { + cmdbuf.append(s); + } + // The code protects the [java.exe] and console command line + // parser, that interprets the [\"] combination as an escape + // sequence for the ["] char. + // http://msdn.microsoft.com/en-us/library/17w5ykft.aspx + // + // If the argument is an FS path, doubling of the tail [\] + // char is not a problem for non-console applications. + // + // The [\"] sequence is not an escape sequence for the [cmd.exe] + // command line parser. The case of the [""] tail escape + // sequence could not be realized due to the argument validation + // procedure. + int count = countLeadingBackslash(verificationType, s, s.length()); + while (count-- > 0) { + cmdbuf.append(BACKSLASH); // double the number of backslashes + } + cmdbuf.append('"'); + } else { + cmdbuf.append(s); + } + } + return cmdbuf.toString(); + } + + /** + * Return the argument without quotes (1st and last) if present, else the arg. + * @param str a string + * @return the string without 1st and last quotes + */ + private static String unQuote(String str) { + int len = str.length(); + return (len >= 2 && str.charAt(0) == DOUBLEQUOTE && str.charAt(len - 1) == DOUBLEQUOTE) + ? str.substring(1, len - 1) + : str; + } + + private static boolean needsEscaping(int verificationType, String arg) { + // Switch off MS heuristic for internal ["]. + // Please, use the explicit [cmd.exe] call + // if you need the internal ["]. + // Example: "cmd.exe", "/C", "Extended_MS_Syntax" + + // For [.exe] or [.com] file the unpaired/internal ["] + // in the argument is not a problem. + String unquotedArg = unQuote(arg); + boolean argIsQuoted = !arg.equals(unquotedArg); + boolean embeddedQuote = unquotedArg.indexOf(DOUBLEQUOTE) >= 0; + + switch (verificationType) { + case VERIFICATION_CMD_BAT: + if (embeddedQuote) { + throw new IllegalArgumentException("Argument has embedded quote, " + + "use the explicit CMD.EXE call."); + } + break; // break determine whether to quote + case VERIFICATION_WIN32_SAFE: + if (argIsQuoted && embeddedQuote) { + throw new IllegalArgumentException("Malformed argument has embedded quote: " + + unquotedArg); + } + break; + default: + break; + } + + if (!argIsQuoted) { + char testEscape[] = ESCAPE_VERIFICATION[verificationType]; + for (int i = 0; i < testEscape.length; ++i) { + if (arg.indexOf(testEscape[i]) >= 0) { + return true; + } + } + } + return false; + } + + private static String getExecutablePath(String path) + throws IOException + { + String name = unQuote(path); + if (name.indexOf(DOUBLEQUOTE) >= 0) { + throw new IllegalArgumentException("Executable name has embedded quote, " + + "split the arguments: " + name); + } + // Win32 CreateProcess requires path to be normalized + File fileToRun = new File(name); + + // From the [CreateProcess] function documentation: + // + // "If the file name does not contain an extension, .exe is appended. + // Therefore, if the file name extension is .com, this parameter + // must include the .com extension. If the file name ends in + // a period (.) with no extension, or if the file name contains a path, + // .exe is not appended." + // + // "If the file name !does not contain a directory path!, + // the system searches for the executable file in the following + // sequence:..." + // + // In practice ANY non-existent path is extended by [.exe] extension + // in the [CreateProcess] function with the only exception: + // the path ends by (.) + + return fileToRun.getPath(); + } + + /** + * An executable is any program that is an EXE or does not have an extension + * and the Windows createProcess will be looking for .exe. + * The comparison is case insensitive based on the name. + * @param executablePath the executable file + * @return true if the path ends in .exe or does not have an extension. + */ + private boolean isExe(String executablePath) { + File file = new File(executablePath); + String upName = file.getName().toUpperCase(Locale.ROOT); + return (upName.endsWith(".EXE") || upName.indexOf('.') < 0); + } + + // Old version that can be bypassed + private boolean isShellFile(String executablePath) { + String upPath = executablePath.toUpperCase(); + return (upPath.endsWith(".CMD") || upPath.endsWith(".BAT")); + } + + private String quoteString(String arg) { + StringBuilder argbuf = new StringBuilder(arg.length() + 2); + return argbuf.append('"').append(arg).append('"').toString(); + } + + // Count backslashes before start index of string. + // .bat files don't include backslashes as part of the quote + private static int countLeadingBackslash(int verificationType, + CharSequence input, int start) { + if (verificationType == VERIFICATION_CMD_BAT) + return 0; + int j; + for (j = start - 1; j >= 0 && input.charAt(j) == BACKSLASH; j--) { + // just scanning backwards + } + return (start - 1) - j; // number of BACKSLASHES + } + + private static final char DOUBLEQUOTE = '\"'; + private static final char BACKSLASH = '\\'; + + private WinNT.HANDLE handle; + private OutputStream stdin_stream; + private InputStream stdout_stream; + private InputStream stderr_stream; + + private ProcessImplForWin32( + String username, + String password, + String cmd[], + final String envblock, + final String path, + final long[] stdHandles, + final boolean redirectErrorStream) + throws IOException + { + String cmdstr; + final SecurityManager security = System.getSecurityManager(); + GetPropertyAction action = new GetPropertyAction("jdk.lang.Process.allowAmbiguousCommands", + (security == null) ? "true" : "false"); + final boolean allowAmbiguousCommands = !"false".equalsIgnoreCase(action.run()); + if (allowAmbiguousCommands && security == null) { + // Legacy mode. + + // Normalize path if possible. + String executablePath = new File(cmd[0]).getPath(); + + // No worry about internal, unpaired ["], and redirection/piping. + if (needsEscaping(VERIFICATION_LEGACY, executablePath) ) + executablePath = quoteString(executablePath); + + cmdstr = createCommandLine( + //legacy mode doesn't worry about extended verification + VERIFICATION_LEGACY, + executablePath, + cmd); + } else { + String executablePath; + try { + executablePath = getExecutablePath(cmd[0]); + } catch (IllegalArgumentException e) { + // Workaround for the calls like + // Runtime.getRuntime().exec("\"C:\\Program Files\\foo\" bar") + + // No chance to avoid CMD/BAT injection, except to do the work + // right from the beginning. Otherwise we have too many corner + // cases from + // Runtime.getRuntime().exec(String[] cmd [, ...]) + // calls with internal ["] and escape sequences. + + // Restore original command line. + StringBuilder join = new StringBuilder(); + // terminal space in command line is ok + for (String s : cmd) + join.append(s).append(' '); + + // Parse the command line again. + cmd = getTokensFromCommand(join.toString()); + executablePath = getExecutablePath(cmd[0]); + + // Check new executable name once more + if (security != null) + security.checkExec(executablePath); + } + + // Quotation protects from interpretation of the [path] argument as + // start of longer path with spaces. Quotation has no influence to + // [.exe] extension heuristic. + boolean isShell = allowAmbiguousCommands ? isShellFile(executablePath) + : !isExe(executablePath); + cmdstr = createCommandLine( + // We need the extended verification procedures + isShell ? VERIFICATION_CMD_BAT + : (allowAmbiguousCommands ? VERIFICATION_WIN32 : VERIFICATION_WIN32_SAFE), + quoteString(executablePath), + cmd); + } + + handle = create(username, password, cmdstr, envblock, path, stdHandles, redirectErrorStream); + + AccessController.doPrivileged( + new PrivilegedAction() { + public Void run() { + if (stdHandles[0] == -1L) + stdin_stream = ProcessBuilderForWin32.NullOutputStream.INSTANCE; + else { + FileDescriptor stdin_fd = new FileDescriptor(); + fdAccess.setHandle(stdin_fd, stdHandles[0]); + stdin_stream = new BufferedOutputStream( + new FileOutputStream(stdin_fd)); + } + + if (stdHandles[1] == -1L) + stdout_stream = ProcessBuilderForWin32.NullInputStream.INSTANCE; + else { + FileDescriptor stdout_fd = new FileDescriptor(); + fdAccess.setHandle(stdout_fd, stdHandles[1]); + stdout_stream = new BufferedInputStream( + new FileInputStream(stdout_fd)); + } + + if (stdHandles[2] == -1L) + stderr_stream = ProcessBuilderForWin32.NullInputStream.INSTANCE; + else { + FileDescriptor stderr_fd = new FileDescriptor(); + fdAccess.setHandle(stderr_fd, stdHandles[2]); + stderr_stream = new FileInputStream(stderr_fd); + } + + return null; }}); + } + + public OutputStream getOutputStream() { + return stdin_stream; + } + + public InputStream getInputStream() { + return stdout_stream; + } + + public InputStream getErrorStream() { + return stderr_stream; + } + + protected void finalize() { + closeHandle(handle); + } + + public int exitValue() { + int exitCode = getExitCodeProcess(handle); + if (exitCode == STILL_ACTIVE) + throw new IllegalThreadStateException("process has not exited"); + return exitCode; + } + + public int waitFor() throws InterruptedException { + waitForInterruptibly(handle); + if (Thread.interrupted()) + throw new InterruptedException(); + return exitValue(); + } + + @Override + public boolean waitFor(long timeout, TimeUnit unit) + throws InterruptedException + { + if (getExitCodeProcess(handle) != STILL_ACTIVE) return true; + if (timeout <= 0) return false; + + long remainingNanos = unit.toNanos(timeout); + long deadline = System.nanoTime() + remainingNanos ; + + do { + // Round up to next millisecond + long msTimeout = TimeUnit.NANOSECONDS.toMillis(remainingNanos + 999_999L); + waitForTimeoutInterruptibly(handle, msTimeout); + if (Thread.interrupted()) + throw new InterruptedException(); + if (getExitCodeProcess(handle) != STILL_ACTIVE) { + return true; + } + remainingNanos = deadline - System.nanoTime(); + } while (remainingNanos > 0); + + return (getExitCodeProcess(handle) != STILL_ACTIVE); + } + + public void destroy() { terminateProcess(handle); } + + public Process destroyForcibly() { + destroy(); + return this; + } + + public boolean isAlive() { + return isProcessAlive(handle); + } + + private static boolean initHolder(WinNT.HANDLEByReference pjhandles, + WinNT.HANDLEByReference[] pipe, + int offset, + WinNT.HANDLEByReference phStd) { + if (!pjhandles.getValue().equals(JAVA_INVALID_HANDLE_VALUE)) { + phStd.setValue(pjhandles.getValue()); + pjhandles.setValue(JAVA_INVALID_HANDLE_VALUE); + } else { + if (!Kernel32.INSTANCE.CreatePipe(pipe[0], pipe[1], null, PIPE_SIZE)) { + throw new Win32Exception(Kernel32.INSTANCE.GetLastError()); + } else { + WinNT.HANDLE thisProcessEnd = offset == OFFSET_READ ? pipe[1].getValue() : pipe[0].getValue(); + phStd.setValue(pipe[offset].getValue()); + pjhandles.setValue(thisProcessEnd); + } + } + Kernel32.INSTANCE.SetHandleInformation(phStd.getValue(), Kernel32.HANDLE_FLAG_INHERIT, Kernel32.HANDLE_FLAG_INHERIT); + return true; + } + + private static void releaseHolder(boolean complete, WinNT.HANDLEByReference[] pipe, int offset) { + closeHandle(pipe[offset].getValue()); + if (complete) { + closeHandle(pipe[offset == OFFSET_READ ? OFFSET_WRITE : OFFSET_READ].getValue()); + } + } + + private static void prepareIOEHandleState(WinNT.HANDLE[] stdIOE, Boolean[] inherit) { + for(int i = 0; i < HANDLE_STORAGE_SIZE; ++i) { + WinNT.HANDLE hstd = stdIOE[i]; + if (!Kernel32.INVALID_HANDLE_VALUE.equals(hstd)) { + inherit[i] = Boolean.TRUE; + Kernel32.INSTANCE.SetHandleInformation(hstd, Kernel32.HANDLE_FLAG_INHERIT, 0); + } + } + } + + private static void restoreIOEHandleState(WinNT.HANDLE[] stdIOE, Boolean[] inherit) { + for (int i = HANDLE_STORAGE_SIZE - 1; i >= 0; --i) { + if (!Kernel32.INVALID_HANDLE_VALUE.equals(stdIOE[i])) { + Kernel32.INSTANCE.SetHandleInformation(stdIOE[i], Kernel32.HANDLE_FLAG_INHERIT, inherit[i] ? Kernel32.HANDLE_FLAG_INHERIT : 0); + } + } + } + + private static WinNT.HANDLE processCreate(String username, + String password, + String cmd, + final String envblock, + final String path, + final WinNT.HANDLEByReference[] stdHandles, + final boolean redirectErrorStream) { + WinNT.HANDLE ret = new WinNT.HANDLE(Pointer.createConstant(0)); + + WinNT.HANDLE[] stdIOE = new WinNT.HANDLE[] { + Kernel32.INVALID_HANDLE_VALUE, Kernel32.INVALID_HANDLE_VALUE, Kernel32.INVALID_HANDLE_VALUE, + stdHandles[0].getValue(), stdHandles[1].getValue(), stdHandles[2].getValue() + }; + stdIOE[0] = Kernel32.INSTANCE.GetStdHandle(Kernel32.STD_INPUT_HANDLE); + stdIOE[1] = Kernel32.INSTANCE.GetStdHandle(Kernel32.STD_OUTPUT_HANDLE); + stdIOE[2] = Kernel32.INSTANCE.GetStdHandle(Kernel32.STD_ERROR_HANDLE); + + Boolean[] inherit = new Boolean[] { + Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE, Boolean.FALSE, Boolean.FALSE + }; + + prepareIOEHandleState(stdIOE, inherit); + + // input + WinNT.HANDLEByReference hStdInput = new WinNT.HANDLEByReference(); + WinNT.HANDLEByReference[] pipeIn = new WinNT.HANDLEByReference[] { + new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE), new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE) }; + + // output + WinNT.HANDLEByReference hStdOutput = new WinNT.HANDLEByReference(); + WinNT.HANDLEByReference[] pipeOut = new WinNT.HANDLEByReference[] { + new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE), new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE) }; + + // error + WinNT.HANDLEByReference hStdError = new WinNT.HANDLEByReference(); + WinNT.HANDLEByReference[] pipeError = new WinNT.HANDLEByReference[] { + new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE), new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE) }; + + boolean success; + if (initHolder(stdHandles[0], pipeIn, OFFSET_READ, hStdInput)) { + if (initHolder(stdHandles[1], pipeOut, OFFSET_WRITE, hStdOutput)) { + WinBase.STARTUPINFO si = new WinBase.STARTUPINFO(); + si.hStdInput = hStdInput.getValue(); + si.hStdOutput = hStdOutput.getValue(); + + if (redirectErrorStream) { + si.hStdError = si.hStdOutput; + stdHandles[2].setValue(JAVA_INVALID_HANDLE_VALUE); + success = true; + } else { + success = initHolder(stdHandles[2], pipeError, OFFSET_WRITE, hStdError); + si.hStdError = hStdError.getValue(); + } + + if (success) { + WTypes.LPSTR lpEnvironment = envblock == null ? new WTypes.LPSTR() : new WTypes.LPSTR(envblock); + Kernel32.PROCESS_INFORMATION pi = new WinBase.PROCESS_INFORMATION(); + si.dwFlags = Kernel32.STARTF_USESTDHANDLES; + if (!Advapi32.INSTANCE.CreateProcessWithLogonW( + username + , null + , password + , Advapi32.LOGON_WITH_PROFILE + , null + , cmd + , Kernel32.CREATE_NO_WINDOW + , lpEnvironment.getPointer() + , path + , si + , pi)) { + throw new Win32Exception(Kernel32.INSTANCE.GetLastError()); + } else { + closeHandle(pi.hThread); + ret = pi.hProcess; + } + } + releaseHolder(ret.getPointer().equals(Pointer.createConstant(0)), pipeError, OFFSET_WRITE); + releaseHolder(ret.getPointer().equals(Pointer.createConstant(0)), pipeOut, OFFSET_WRITE); + } + releaseHolder(ret.getPointer().equals(Pointer.createConstant(0)), pipeIn, OFFSET_READ); + } + restoreIOEHandleState(stdIOE, inherit); + return ret; + } + + private static synchronized WinNT.HANDLE create(String username, + String password, + String cmd, + final String envblock, + final String path, + final long[] stdHandles, + final boolean redirectErrorStream) { + WinNT.HANDLE ret = new WinNT.HANDLE(Pointer.createConstant(0)); + WinNT.HANDLEByReference[] handles = new WinNT.HANDLEByReference[stdHandles.length]; + for (int i = 0; i < stdHandles.length; i++) { + handles[i] = new WinNT.HANDLEByReference(new WinNT.HANDLE(Pointer.createConstant(stdHandles[i]))); + } + + if (cmd != null) { + if (username != null && password != null) { + ret = processCreate(username, password, cmd, envblock, path, handles, redirectErrorStream); + } + } + + for (int i = 0; i < stdHandles.length; i++) { + stdHandles[i] = handles[i].getPointer().getLong(0); + } + + return ret; + } + + private static int getExitCodeProcess(WinNT.HANDLE handle) { + IntByReference exitStatus = new IntByReference(); + if (!Kernel32.INSTANCE.GetExitCodeProcess(handle, exitStatus)) { + throw new Win32Exception(Kernel32.INSTANCE.GetLastError()); + } + return exitStatus.getValue(); + } + + private static void terminateProcess(WinNT.HANDLE handle) { + Kernel32.INSTANCE.TerminateProcess(handle, 1); + } + + private static boolean isProcessAlive(WinNT.HANDLE handle) { + IntByReference exitStatus = new IntByReference(); + Kernel32.INSTANCE.GetExitCodeProcess(handle, exitStatus); + return exitStatus.getValue() == STILL_ACTIVE; + } + + private static void closeHandle(WinNT.HANDLE handle) { + Kernel32Util.closeHandle(handle); + } + + /** + * Opens a file for atomic append. The file is created if it doesn't + * already exist. + * + * @param path the file to open or create + * @return the native HANDLE + */ + private static long openForAtomicAppend(String path) throws IOException { + int access = Kernel32.GENERIC_READ | Kernel32.GENERIC_WRITE; + int sharing = Kernel32.FILE_SHARE_READ | Kernel32.FILE_SHARE_WRITE; + int disposition = Kernel32.OPEN_ALWAYS; + int flagsAndAttributes = Kernel32.FILE_ATTRIBUTE_NORMAL; + if (path == null || path.isEmpty()) { + return -1; + } else { + WinNT.HANDLE handle = Kernel32.INSTANCE.CreateFile(path, access, sharing, null, disposition, flagsAndAttributes, null); + if (handle == Kernel32.INVALID_HANDLE_VALUE) { + throw new Win32Exception(Kernel32.INSTANCE.GetLastError()); + } + return handle.getPointer().getLong(0); + } + } + + private static void waitForInterruptibly(WinNT.HANDLE handle) { + int result = Kernel32.INSTANCE.WaitForMultipleObjects(1, new WinNT.HANDLE[]{handle}, false, Kernel32.INFINITE); + if (result == Kernel32.WAIT_FAILED) { + throw new Win32Exception(Kernel32.INSTANCE.GetLastError()); + } + } + + private static void waitForTimeoutInterruptibly(WinNT.HANDLE handle, long timeout) { + int result = Kernel32.INSTANCE.WaitForMultipleObjects(1, new WinNT.HANDLE[]{handle}, false, (int) timeout); + if (result == Kernel32.WAIT_FAILED) { + throw new Win32Exception(Kernel32.INSTANCE.GetLastError()); + } + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java new file mode 100644 index 000000000..3280a9629 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common; + +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Constants Test + */ +public class ConstantsTest { + + /** + * Test PID via env + */ + @Test + public void testPID() { + if (OSUtils.isWindows()) { + Assert.assertEquals(Constants.PID, "handle"); + } else { + Assert.assertEquals(Constants.PID, "pid"); + } + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index 7106804aa..b955787c6 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -39,16 +39,20 @@ public class OSUtilsTest { @Test public void testOSMetric(){ - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); - Assert.assertTrue(availablePhysicalMemorySize > 0.0f); - double totalMemorySize = OSUtils.totalMemorySize(); - Assert.assertTrue(totalMemorySize > 0.0f); - double loadAverage = OSUtils.loadAverage(); - logger.info("loadAverage {}", loadAverage); - double memoryUsage = OSUtils.memoryUsage(); - Assert.assertTrue(memoryUsage > 0.0f); - double cpuUsage = OSUtils.cpuUsage(); - Assert.assertTrue(cpuUsage > 0.0f); + if (!OSUtils.isWindows()) { + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); + Assert.assertTrue(availablePhysicalMemorySize > 0.0f); + double totalMemorySize = OSUtils.totalMemorySize(); + Assert.assertTrue(totalMemorySize > 0.0f); + double loadAverage = OSUtils.loadAverage(); + logger.info("loadAverage {}", loadAverage); + double memoryUsage = OSUtils.memoryUsage(); + Assert.assertTrue(memoryUsage > 0.0f); + double cpuUsage = OSUtils.cpuUsage(); + Assert.assertTrue(cpuUsage > 0.0f); + } else { + // TODO window ut + } } @Test diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java new file mode 100644 index 000000000..ce0434674 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils.process; + +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(OSUtils.class) +public class ProcessBuilderForWin32Test { + + private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class); + + @Before + public void before() { + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.when(OSUtils.isWindows()).thenReturn(true); + } + + @Test + public void testCreateProcessBuilderForWin32() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + Assert.assertNotNull(builder); + + builder = new ProcessBuilderForWin32("net"); + Assert.assertNotNull(builder); + + builder = new ProcessBuilderForWin32(Collections.singletonList("net")); + Assert.assertNotNull(builder); + + builder = new ProcessBuilderForWin32((List) null); + Assert.assertNotNull(builder); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testBuildUser() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + builder.user("test", StringUtils.EMPTY); + Assert.assertNotNull(builder); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testBuildCommand() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + builder.command(Collections.singletonList("net")); + Assert.assertNotEquals(0, builder.command().size()); + + builder = new ProcessBuilderForWin32(); + builder.command("net"); + Assert.assertNotEquals(0, builder.command().size()); + + builder = new ProcessBuilderForWin32(); + builder.command((List) null); + Assert.assertNotEquals(0, builder.command().size()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testEnvironment() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + Assert.assertNotNull(builder.environment()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + builder.environment(new String[]{ "a=123" }); + Assert.assertNotEquals(0, builder.environment().size()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testDirectory() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + builder.directory(new File("/tmp")); + Assert.assertNotNull(builder.directory()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testStream() { + try { + InputStream in = ProcessBuilderForWin32.NullInputStream.INSTANCE; + Assert.assertNotNull(in); + Assert.assertEquals(-1, in.read()); + Assert.assertEquals(0, in.available()); + + OutputStream out = ProcessBuilderForWin32.NullOutputStream.INSTANCE; + Assert.assertNotNull(out); + out.write(new byte[] {1}); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testRedirect() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + + builder.redirectInput(new File("/tmp")); + Assert.assertNotNull(builder.redirectInput()); + Assert.assertNotNull(builder.redirectInput().file()); + + builder.redirectOutput(new File("/tmp")); + Assert.assertNotNull(builder.redirectOutput()); + Assert.assertNotNull(builder.redirectOutput().file()); + + builder.redirectError(new File("/tmp")); + Assert.assertNotNull(builder.redirectError()); + Assert.assertNotNull(builder.redirectError().file()); + + builder.redirectInput(builder.redirectOutput()); + builder.redirectOutput(builder.redirectInput()); + builder.redirectError(builder.redirectInput()); + + Assert.assertNotNull(ProcessBuilderForWin32.Redirect.PIPE.type()); + Assert.assertNotNull(ProcessBuilderForWin32.Redirect.PIPE.toString()); + Assert.assertNotNull(ProcessBuilderForWin32.Redirect.INHERIT.type()); + Assert.assertNotNull(ProcessBuilderForWin32.Redirect.INHERIT.toString()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testRedirectErrorStream() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + builder.redirectErrorStream(true); + Assert.assertTrue(builder.redirectErrorStream()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void runCmdViaUser() { + try { + ProcessBuilderForWin32 builder = new ProcessBuilderForWin32(); + builder.user("test123", StringUtils.EMPTY); + + List commands = new ArrayList<>(); + commands.add("cmd.exe"); + commands.add("/c"); + commands.add("net user"); + builder.command(commands); + + Process process = builder.start(); + BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("GBK"))); + String line; + StringBuilder sb = new StringBuilder(); + while ((line = inReader.readLine()) != null) { + sb.append(line); + } + logger.info("net user: {}", sb.toString()); + Assert.assertNotEquals(StringUtils.EMPTY, sb.toString()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java new file mode 100644 index 000000000..00c54c016 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils.process; + +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({OSUtils.class, ProcessEnvironmentForWin32.class}) +public class ProcessEnvironmentForWin32Test { + + private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class); + + @Before + public void before() { + try { + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.when(OSUtils.isWindows()).thenReturn(true); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testPutAndGet() { + try { + ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0); + processEnvironmentForWin32.put("a", "123"); + Assert.assertEquals("123", processEnvironmentForWin32.get("a")); + Assert.assertTrue(processEnvironmentForWin32.containsKey("a")); + Assert.assertTrue(processEnvironmentForWin32.containsValue("123")); + Assert.assertEquals("123", processEnvironmentForWin32.remove("a")); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + + try { + ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0); + processEnvironmentForWin32.put("b=", "123"); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + + try { + ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0); + processEnvironmentForWin32.put("b", "\u0000"); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + + try { + ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0); + processEnvironmentForWin32.get(null); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testEntrySet() { + try { + ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0); + processEnvironmentForWin32.clear(); + processEnvironmentForWin32.put("a", "123"); + Assert.assertEquals(0, processEnvironmentForWin32.entrySet().size()); + Assert.assertTrue(processEnvironmentForWin32.entrySet().isEmpty()); + for (Map.Entry entry : processEnvironmentForWin32.entrySet()) { + Assert.assertNotNull(entry); + Assert.assertNotNull(entry.getKey()); + Assert.assertNotNull(entry.getValue()); + Assert.assertNotNull(entry.setValue("123")); + } + + processEnvironmentForWin32.clear(); + Set keys = processEnvironmentForWin32.keySet(); + Assert.assertEquals(0, keys.size()); + Assert.assertTrue(keys.isEmpty()); + + processEnvironmentForWin32.clear(); + Collection values = processEnvironmentForWin32.values(); + Assert.assertEquals(0, keys.size()); + Assert.assertTrue(keys.isEmpty()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testToEnvironmentBlock() { + try { + ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0); + Assert.assertNotNull(processEnvironmentForWin32.toEnvironmentBlock()); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java new file mode 100644 index 000000000..3f8bcbfb6 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils.process; + +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.security.action.GetPropertyAction; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({OSUtils.class, GetPropertyAction.class}) +public class ProcessImplForWin32Test { + + private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class); + + @Before + public void before() { + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.mockStatic(GetPropertyAction.class); + PowerMockito.when(OSUtils.isWindows()).thenReturn(true); + } + + @Test + public void testStart() { + try { + Process process = ProcessImplForWin32.start( + "test123", StringUtils.EMPTY, new String[]{"net"}, + null, null, null, false); + Assert.assertNotNull(process); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + + try { + Process process = ProcessImplForWin32.start( + "test123", StringUtils.EMPTY, new String[]{"net"}, + null, null, new ProcessBuilderForWin32.Redirect[]{ + ProcessBuilderForWin32.Redirect.PIPE, + ProcessBuilderForWin32.Redirect.PIPE, + ProcessBuilderForWin32.Redirect.PIPE + }, false); + Assert.assertNotNull(process); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index bac498c15..8e0ccee16 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -16,21 +16,29 @@ */ package org.apache.dolphinscheduler.server.worker.task; +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.WinNT; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.process.ProcessBuilderForWin32; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import java.io.*; import java.lang.reflect.Field; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -194,26 +202,49 @@ public abstract class AbstractCommandExecutor { * @throws IOException IO Exception */ private void buildProcess(String commandFile) throws IOException { + // command list + List command = new ArrayList<>(); + //init process builder - ProcessBuilder processBuilder = new ProcessBuilder(); - // setting up a working directory - processBuilder.directory(new File(taskDir)); - // merge error information to standard output stream - processBuilder.redirectErrorStream(true); - // setting up user to run commands - List command = new LinkedList<>(); - command.add("sudo"); - command.add("-u"); - command.add(tenantCode); - command.add(commandInterpreter()); - command.addAll(commandOptions()); - command.add(commandFile); - processBuilder.command(command); - - process = processBuilder.start(); + if (OSUtils.isWindows()) { + ProcessBuilderForWin32 processBuilder = new ProcessBuilderForWin32(); + // setting up a working directory + processBuilder.directory(new File(taskDir)); + processBuilder.user(tenantCode, StringUtils.EMPTY); + // merge error information to standard output stream + processBuilder.redirectErrorStream(true); + + // setting up user to run commands + command.add(commandInterpreter()); + command.add("/c"); + command.addAll(commandOptions()); + command.add(commandFile); + + // setting commands + processBuilder.command(command); + process = processBuilder.start(); + } else { + ProcessBuilder processBuilder = new ProcessBuilder(); + // setting up a working directory + processBuilder.directory(new File(taskDir)); + // merge error information to standard output stream + processBuilder.redirectErrorStream(true); + + // setting up user to run commands + command.add("sudo"); + command.add("-u"); + command.add(tenantCode); + command.add(commandInterpreter()); + command.addAll(commandOptions()); + command.add(commandFile); + + // setting commands + processBuilder.command(command); + process = processBuilder.start(); + } // print command - printCommand(processBuilder); + printCommand(command); } /** @@ -320,13 +351,13 @@ public abstract class AbstractCommandExecutor { /** * print command - * @param processBuilder process builder + * @param command command */ - private void printCommand(ProcessBuilder processBuilder) { + private void printCommand(List command) { String cmdStr; try { - cmdStr = ProcessUtils.buildCommandStr(processBuilder.command()); + cmdStr = ProcessUtils.buildCommandStr(command); logger.info("task run command:\n{}", cmdStr); } catch (IOException e) { logger.error(e.getMessage(), e); @@ -358,7 +389,11 @@ public abstract class AbstractCommandExecutor { BufferedReader inReader = null; try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + if (OSUtils.isWindows()) { + inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("GBK"))); + } else { + inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + } String line; long lastFlushTime = System.currentTimeMillis(); @@ -406,7 +441,7 @@ public abstract class AbstractCommandExecutor { } Thread.sleep(Constants.SLEEP_TIME_MILLIS); } - } + } } catch (Exception e) { logger.error("yarn applications: {} status failed ", appIds,e); result = false; @@ -510,12 +545,15 @@ public abstract class AbstractCommandExecutor { */ private int getProcessId(Process process) { int processId = 0; - try { Field f = process.getClass().getDeclaredField(Constants.PID); f.setAccessible(true); - - processId = f.getInt(process); + if (OSUtils.isWindows()) { + WinNT.HANDLE handle = (WinNT.HANDLE) f.get(process); + processId = Kernel32.INSTANCE.GetProcessId(handle); + } else { + processId = f.getInt(process); + } } catch (Throwable e) { logger.error(e.getMessage(), e); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index db46d0d85..5d14e6b2a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -17,11 +17,12 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.commons.io.FileUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.slf4j.Logger; import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Date; @@ -34,10 +35,15 @@ import java.util.function.Consumer; public class ShellCommandExecutor extends AbstractCommandExecutor { /** - * sh + * For Unix-like, using sh */ public static final String SH = "sh"; + /** + * For Windows, using cmd.exe + */ + public static final String CMD = "cmd.exe"; + /** * constructor * @param logHandler log handler @@ -66,7 +72,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { @Override protected String buildCommandFilePath() { // command file - return String.format("%s/%s.command", taskDir, taskAppId); + return String.format("%s/%s.%s", taskDir, taskAppId, OSUtils.isWindows() ? "bat" : "command"); } /** @@ -75,7 +81,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { */ @Override protected String commandInterpreter() { - return SH; + return OSUtils.isWindows() ? CMD : SH; } /** @@ -103,21 +109,26 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { logger.info("create command file:{}", commandFile); StringBuilder sb = new StringBuilder(); - sb.append("#!/bin/sh\n"); - sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); - sb.append("cd $BASEDIR\n"); - - if (envFile != null) { - sb.append("source " + envFile + "\n"); + if (OSUtils.isWindows()) { + sb.append("@echo off\n"); + sb.append("cd /d %~dp0\n"); + if (envFile != null) { + sb.append("call ").append(envFile).append("\n"); + } + } else { + sb.append("#!/bin/sh\n"); + sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); + sb.append("cd $BASEDIR\n"); + if (envFile != null) { + sb.append("source ").append(envFile).append("\n"); + } } - sb.append("\n\n"); sb.append(execCommand); - logger.info("command : {}",sb.toString()); + logger.info("command : {}", sb.toString()); // write data to file - FileUtils.writeStringToFile(new File(commandFile), sb.toString(), - Charset.forName("UTF-8")); + FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index ef941cd06..7537ca2ed 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; @@ -338,7 +339,7 @@ public class DataxTask extends AbstractTask { private String buildShellCommandFile(String jobConfigFilePath) throws Exception { // generate scripts - String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId()); + String fileName = String.format("%s/%s_node.%s", taskDir, taskProps.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); Path path = new File(fileName).toPath(); if (Files.exists(path)) { @@ -370,7 +371,13 @@ public class DataxTask extends AbstractTask { // create shell command file Set perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X); FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); - Files.createFile(path, attr); + + if (OSUtils.isWindows()) { + Files.createFile(path); + } else { + Files.createFile(path, attr); + } + Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND); return fileName; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 5704c8052..90661a690 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -123,7 +124,7 @@ public class ShellTask extends AbstractTask { */ private String buildCommand() throws Exception { // generate scripts - String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId()); + String fileName = String.format("%s/%s_node.%s", taskDir, taskProps.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); Path path = new File(fileName).toPath(); if (Files.exists(path)) { @@ -154,7 +155,11 @@ public class ShellTask extends AbstractTask { Set perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X); FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); - Files.createFile(path, attr); + if (OSUtils.isWindows()) { + Files.createFile(path); + } else { + Files.createFile(path, attr); + } Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java index 77fc39870..1e0adaad9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java @@ -21,7 +21,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + public class ProcessUtilsTest { + private static final Logger logger = LoggerFactory.getLogger(ProcessUtilsTest.class); @Test @@ -30,4 +35,16 @@ public class ProcessUtilsTest { Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, ""); logger.info("Sub process list : {}", pidList); } + + @Test + public void testBuildCommandStr() { + List commands = new ArrayList<>(); + commands.add("sudo"); + try { + Assert.assertEquals(ProcessUtils.buildCommandStr(commands), "sudo"); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java new file mode 100644 index 000000000..5536665e2 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.shell; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +import java.util.Date; + +/** + * shell task test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(OSUtils.class) +public class ShellTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); + + private ShellTask shellTask; + + private ProcessService processService; + + private ShellCommandExecutor shellCommandExecutor; + + private ApplicationContext applicationContext; + + @Before + public void before() throws Exception { + PowerMockito.mockStatic(OSUtils.class); + processService = PowerMockito.mock(ProcessService.class); + shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); + + applicationContext = PowerMockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}"); + shellTask = new ShellTask(props, logger); + shellTask.init(); + + PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); + PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); + PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); + + String fileName = String.format("%s/%s_node.%s", props.getTaskDir(), props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); + PowerMockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0); + } + + private DataSource getDataSource() { + DataSource dataSource = new DataSource(); + dataSource.setType(DbType.MYSQL); + dataSource.setConnectionParams( + "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"); + dataSource.setUserId(1); + return dataSource; + } + + private ProcessInstance getProcessInstance() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setCommandType(CommandType.START_PROCESS); + processInstance.setScheduleTime(new Date()); + return processInstance; + } + + @After + public void after() {} + + /** + * Method: ShellTask() + */ + @Test + public void testShellTask() + throws Exception { + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + ShellTask shellTaskTest = new ShellTask(props, logger); + Assert.assertNotNull(shellTaskTest); + } + + /** + * Method: init for Unix-like + */ + @Test + public void testInitForUnix() { + try { + PowerMockito.when(OSUtils.isWindows()).thenReturn(false); + shellTask.init(); + Assert.assertTrue(true); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + /** + * Method: init for Windows + */ + @Test + public void testInitForWindows() { + try { + PowerMockito.when(OSUtils.isWindows()).thenReturn(true); + shellTask.init(); + Assert.assertTrue(true); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + + /** + * Method: handle() for Unix-like + */ + @Test + public void testHandleForUnix() throws Exception { + try { + PowerMockito.when(OSUtils.isWindows()).thenReturn(false); + shellTask.handle(); + Assert.assertTrue(true); + } catch (Error | Exception e) { + if (!e.getMessage().contains("process error . exitCode is : -1") + && !System.getProperty("os.name").startsWith("Windows")) { + logger.error(e.getMessage()); + } + } + } + + /** + * Method: handle() for Windows + */ + @Test + public void testHandleForWindows() throws Exception { + try { + PowerMockito.when(OSUtils.isWindows()).thenReturn(true); + shellTask.handle(); + Assert.assertTrue(true); + } catch (Error | Exception e) { + if (!e.getMessage().contains("process error . exitCode is : -1")) { + logger.error(e.getMessage()); + } + } + } + + /** + * Method: cancelApplication() + */ + @Test + public void testCancelApplication() throws Exception { + try { + shellTask.cancelApplication(true); + Assert.assertTrue(true); + } catch (Error | Exception e) { + logger.error(e.getMessage()); + } + } + +} diff --git a/pom.xml b/pom.xml index dee1dce8b..a7feec0e1 100644 --- a/pom.xml +++ b/pom.xml @@ -684,6 +684,9 @@ **/common/utils/*.java + **/common/utils/process/ProcessBuilderForWin32Test.java + **/common/utils/process/ProcessEnvironmentForWin32Test.java + **/common/utils/process/ProcessImplForWin32Test.java **/common/log/*.java **/common/threadutils/*.java **/common/graph/*.java @@ -732,6 +735,7 @@ **/alert/template/AlertTemplateFactoryTest.java **/alert/template/impl/DefaultHTMLTemplateTest.java **/server/worker/task/datax/DataxTaskTest.java + **/server/worker/task/shell/ShellTaskTest.java **/server/worker/task/sqoop/SqoopTaskTest.java **/server/utils/DataxUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java -- GitLab