未验证 提交 e5336931 编写于 作者: Z zhuangchong 提交者: GitHub

[Fix-4721][worker]The shell background starts the YARN task scenario, and the...

[Fix-4721][worker]The shell background starts the YARN task scenario, and the kill function is abnormal (#4722)

* fix the shell starts the yarn task in the background

* update StringUtils code style.

* solve code smell.

* add method comment in StringUtils class.

* update AlertGroupMapper code style.

* update AlertGroupMapperTest

* update sql script test.
上级 9c3cec5b
......@@ -205,6 +205,7 @@ public class HadoopUtils implements Closeable {
* if rmHaIds is empty, single resourcemanager enabled
* if rmHaIds not empty: resourcemanager HA enabled
*/
yarnEnabled = true;
String appUrl = StringUtils.isEmpty(rmHaIds) ? appAddress : getAppAddress(appAddress, rmHaIds);
if (StringUtils.isBlank(appUrl)) {
......@@ -419,7 +420,9 @@ public class HadoopUtils implements Closeable {
String result = Constants.FAILED;
String applicationUrl = getApplicationUrl(applicationId);
logger.info("applicationUrl={}", applicationUrl);
if (logger.isDebugEnabled()) {
logger.debug("generate yarn application url, applicationUrl={}", applicationUrl);
}
String responseContent = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false) ? KerberosHttpClient.get(applicationUrl) : HttpUtils.get(applicationUrl);
if (responseContent != null) {
......@@ -432,7 +435,9 @@ public class HadoopUtils implements Closeable {
} else {
//may be in job history
String jobHistoryUrl = getJobHistoryUrl(applicationId);
logger.info("jobHistoryUrl={}", jobHistoryUrl);
if (logger.isDebugEnabled()) {
logger.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl);
}
responseContent = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false) ? KerberosHttpClient.get(jobHistoryUrl) : HttpUtils.get(jobHistoryUrl);
if (null != responseContent) {
......
......@@ -17,48 +17,169 @@
package org.apache.dolphinscheduler.common.utils;
import java.util.Collection;
import java.util.Iterator;
/**
* java.lang.String utils class
*/
public class StringUtils {
/**
* The empty String {@code ""}.
*/
public static final String EMPTY = "";
private StringUtils() {
throw new UnsupportedOperationException("Construct StringUtils");
}
/**
* <p>Checks if a CharSequence is empty ("") or null.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is empty or null
*/
public static boolean isEmpty(final CharSequence cs) {
return cs == null || cs.length() == 0;
}
/**
* <p>Checks if a CharSequence is not empty ("") and not null.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is not empty and not null
*/
public static boolean isNotEmpty(final CharSequence cs) {
return !isEmpty(cs);
}
public static boolean isBlank(String str) {
/**
* <p>Checks if a CharSequence is empty (""), null or whitespace only.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is null, empty or whitespace only
*/
public static boolean isBlank(final CharSequence cs) {
int strLen;
if (str != null && (strLen = str.length()) != 0) {
for (int i = 0; i < strLen; ++i) {
if (!Character.isWhitespace(str.charAt(i))) {
return false;
}
if (cs == null || (strLen = cs.length()) == 0) {
return true;
}
for (int i = 0; i < strLen; i++) {
if (!Character.isWhitespace(cs.charAt(i))) {
return false;
}
}
return true;
}
/**
* <p>Checks if a CharSequence is not empty (""), not null and not whitespace only.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is not empty and not null and not whitespace only
*/
public static boolean isNotBlank(final CharSequence cs) {
return !isBlank(cs);
}
public static boolean isNotBlank(String s) {
return !isBlank(s);
/**
* <p>Replace all strings matching the regular expression \t \n \r with _</p>
*
* @param src the String , may be null
* @return the string that has been replaced
*/
public static String replaceNRTtoUnderline(String src) {
return isBlank(src) ? src : src.replaceAll("[\n|\r|\t]", "_");
}
public static String trim(String str) {
/**
* <p>Removes control characters (char &lt;= 32) from both
* ends of this String, handling {@code null} by returning
* {@code null}.</p>
*
* @param str the String to be trimmed, may be null
* @return the trimmed string, {@code null} if null String input
*/
public static String trim(final String str) {
return str == null ? null : str.trim();
}
public static String defaultIfBlank(String str, String defaultStr) {
/**
* <p>Returns either the passed in CharSequence, or if the CharSequence is
* whitespace, empty ("") or {@code null}, the value of {@code defaultStr}.</p>
*
* @param <T> the specific kind of CharSequence
* @param str the CharSequence to check, may be null
* @param defaultStr the default CharSequence to return
* if the input is whitespace, empty ("") or {@code null}, may be null
* @return the passed in CharSequence, or the default
*/
public static <T extends CharSequence> T defaultIfBlank(final T str, final T defaultStr) {
return isBlank(str) ? defaultStr : str;
}
/**
* <p>Compares two String, returning {@code true} if they represent
* equal string, ignoring case.</p>
*
* @param str1 the first String, may be null
* @param str2 the second String, may be null
* @return {@code true} if the String are equal, case insensitive, or
* both {@code null}
*/
public static boolean equalsIgnoreCase(String str1, String str2) {
return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
}
/**
* <p>Joins the elements of the provided Collection into a single String
* containing the provided Collection of elements.</p>
*
* @param collection the collection, may be null
* @param separator the separator
* @return a single String
*/
public static String join(Collection<?> collection, String separator) {
return collection == null ? null : join(collection.iterator(), separator);
}
/**
* <p>Joins the elements of the provided Iterator into a single String
* containing the provided Iterator of elements.</p>
*
* @param iterator the iterator, may be null
* @param separator the separator
* @return a single String
*/
public static String join(Iterator<?> iterator, String separator) {
if (iterator == null) {
return null;
} else if (!iterator.hasNext()) {
return "";
} else {
Object first = iterator.next();
if (!iterator.hasNext()) {
return first == null ? "" : first.toString();
} else {
StringBuilder buf = new StringBuilder(256);
if (first != null) {
buf.append(first);
}
while (iterator.hasNext()) {
if (separator != null) {
buf.append(separator);
}
Object obj = iterator.next();
if (obj != null) {
buf.append(obj);
}
}
return buf.toString();
}
}
}
}
......@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.common.utils;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
......@@ -80,4 +83,14 @@ public class StringUtilsTest {
defaultStr = StringUtils.defaultIfBlank("test", "defaultStr");
Assert.assertEquals("test", defaultStr);
}
@Test
public void testJoin() {
List<String> list = new ArrayList<>();
list.add("1");
list.add("3");
list.add("4");
String join = StringUtils.join(list, ",");
Assert.assertEquals("1,3,4", join);
}
}
......@@ -14,17 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.dao.entity.AlertGroup;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* alertgroup mapper interface
*/
......
......@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -385,11 +386,13 @@ public class ProcessUtils {
return;
}
String cmd = String.format("kill -9 %s", getPidsStr(processId));
cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
String pidsStr = getPidsStr(processId);
if (StringUtils.isNotEmpty(pidsStr)) {
String cmd = String.format("kill -9 %s", pidsStr);
cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
}
} catch (Exception e) {
logger.error("kill task failed", e);
......@@ -430,10 +433,10 @@ public class ProcessUtils {
/**
* find logs and kill yarn tasks.
*
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static void killYarnJob(TaskExecutionContext taskExecutionContext) {
public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null;
......@@ -457,11 +460,13 @@ public class ProcessUtils {
}
if (CollectionUtils.isNotEmpty(appIds)) {
cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
return appIds;
}
}
} catch (Exception e) {
logger.error("kill yarn job failure", e);
}
return Collections.emptyList();
}
}
......@@ -124,11 +124,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return Pair.of(true, appIds);
}
String cmd = String.format("kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
if (StringUtils.isNotEmpty(pidsStr)) {
String cmd = String.format("kill -9 %s", pidsStr);
cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
}
OSUtils.exeCmd(cmd);
} catch (Exception e) {
processFlag = false;
logger.error("kill task error", e);
......
......@@ -408,9 +408,12 @@ public abstract class AbstractCommandExecutor {
boolean result = true;
try {
for (String appId : appIds) {
logger.info("check yarn application status, appId:{}", appId);
while (Stopper.isRunning()) {
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
logger.info("appId:{}, final state:{}", appId, applicationStatus.name());
if (logger.isDebugEnabled()) {
logger.debug("check yarn application status, appId:{}, final state:{}", appId, applicationStatus.name());
}
if (applicationStatus.equals(ExecutionStatus.FAILURE)
|| applicationStatus.equals(ExecutionStatus.KILL)) {
return false;
......@@ -423,7 +426,7 @@ public abstract class AbstractCommandExecutor {
}
}
} catch (Exception e) {
logger.error(String.format("yarn applications: %s status failed ", appIds.toString()), e);
logger.error("yarn applications: {} , query status failed, exception:{}", StringUtils.join(appIds, ","), e);
result = false;
}
return result;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册