HadoopUtils.java 23.0 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.common.utils;
L
ligang 已提交
18

19 20 21
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
22 23 24
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
25
import org.apache.commons.io.IOUtils;
Q
qiaozhanwei 已提交
26 27 28
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
29
import org.apache.dolphinscheduler.common.enums.ResourceType;
L
ligang 已提交
30
import org.apache.hadoop.conf.Configuration;
L
lgcareer 已提交
31
import org.apache.hadoop.fs.*;
L
ligang 已提交
32
import org.apache.hadoop.fs.FileSystem;
journey2018's avatar
journey2018 已提交
33
import org.apache.hadoop.security.UserGroupInformation;
L
ligang 已提交
34 35 36 37 38
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
39
import java.nio.file.Files;
L
add  
lgcareer 已提交
40
import java.security.PrivilegedExceptionAction;
41
import java.util.Collections;
L
ligang 已提交
42
import java.util.List;
43
import java.util.Map;
44
import java.util.concurrent.TimeUnit;
L
ligang 已提交
45 46 47
import java.util.stream.Collectors;
import java.util.stream.Stream;

48 49
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;

L
ligang 已提交
50 51 52 53 54 55 56 57
/**
 * hadoop utils
 * single instance
 */
public class HadoopUtils implements Closeable {

    private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);

L
lgcareer 已提交
58
    private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
59
    public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
D
dailidong 已提交
60 61
    public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
    public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
62
    public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
Q
test  
qiaozhanwei 已提交
63

64 65 66 67 68 69 70 71 72 73 74 75
    private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";

    private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
            .newBuilder()
            .expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 7), TimeUnit.DAYS)
            .build(new CacheLoader<String, HadoopUtils>() {
                @Override
                public HadoopUtils load(String key) throws Exception {
                    return new HadoopUtils();
                }
            });

76
    private static volatile boolean yarnEnabled = false;
L
ligang 已提交
77

78 79
    private Configuration configuration;
    private FileSystem fs;
L
lgcareer 已提交
80

81
    private HadoopUtils() {
L
ligang 已提交
82
        init();
L
lgcareer 已提交
83
        initHdfsPath();
L
ligang 已提交
84 85
    }

86 87 88
    public static HadoopUtils getInstance() {

        return cache.getUnchecked(HADOOP_UTILS_KEY);
L
ligang 已提交
89 90
    }

L
add  
lgcareer 已提交
91
    /**
92
     * init dolphinscheduler root path in hdfs
L
add  
lgcareer 已提交
93
     */
Q
test  
qiaozhanwei 已提交
94

95
    private void initHdfsPath() {
96
        Path path = new Path(resourceUploadPath);
L
lgcareer 已提交
97

L
add  
lgcareer 已提交
98
        try {
L
lgcareer 已提交
99 100 101
            if (!fs.exists(path)) {
                fs.mkdirs(path);
            }
L
add  
lgcareer 已提交
102
        } catch (Exception e) {
103
            logger.error(e.getMessage(), e);
L
add  
lgcareer 已提交
104 105 106
        }
    }

L
lgcareer 已提交
107

L
ligang 已提交
108 109 110 111
    /**
     * init hadoop configuration
     */
    private void init() {
112 113 114
        try {
            configuration = new Configuration();

D
dailidong 已提交
115 116
            String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
            ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
117

118 119
            if (resUploadType == ResUploadType.HDFS) {
                if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
120 121
                    System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
                            PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
122
                    configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
D
dailidong 已提交
123
                    hdfsUser = "";
124 125 126 127
                    UserGroupInformation.setConfiguration(configuration);
                    UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
                }
journey2018's avatar
journey2018 已提交
128

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
                String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
                //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
                // the default is the local file system
                if (defaultFS.startsWith("file")) {
                    String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
                    if (StringUtils.isNotBlank(defaultFSProp)) {
                        Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
                        configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
                        fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
                    } else {
                        logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
                        throw new RuntimeException(
                                String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
                        );
                    }
                } else {
                    logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
                }
L
ligang 已提交
147

148 149 150 151 152 153 154 155
                if (fs == null) {
                    if (StringUtils.isNotEmpty(hdfsUser)) {
                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
                        ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
                            @Override
                            public Boolean run() throws Exception {
                                fs = FileSystem.get(configuration);
                                return true;
L
lgcareer 已提交
156
                            }
157 158 159 160
                        });
                    } else {
                        logger.warn("hdfs.root.user is not set value!");
                        fs = FileSystem.get(configuration);
L
ligang 已提交
161 162
                    }
                }
163 164 165 166 167 168
            } else if (resUploadType == ResUploadType.S3) {
                configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
                configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
                configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
                configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
                fs = FileSystem.get(configuration);
L
ligang 已提交
169
            }
170 171 172 173


        } catch (Exception e) {
            logger.error(e.getMessage(), e);
L
ligang 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186
        }
    }

    /**
     * @return Configuration
     */
    public Configuration getConfiguration() {
        return configuration;
    }

    /**
     * get application url
     *
D
dailidong 已提交
187 188
     * @param applicationId application id
     * @return url of application
L
ligang 已提交
189 190
     */
    public String getApplicationUrl(String applicationId) {
D
dailidong 已提交
191 192 193 194 195 196 197 198
        /**
         * if rmHaIds contains xx, it signs not use resourcemanager
         * otherwise:
         *  if rmHaIds is empty, single resourcemanager enabled
         *  if rmHaIds not empty: resourcemanager HA enabled
         */
        String appUrl = "";
        //not use resourcemanager
199
        if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)) {
D
dailidong 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213

            yarnEnabled = false;
            logger.warn("should not step here");
        } else if (!StringUtils.isEmpty(rmHaIds)) {
            //resourcemanager HA enabled
            appUrl = getAppAddress(appAddress, rmHaIds);
            yarnEnabled = true;
            logger.info("application url : {}", appUrl);
        } else {
            //single resourcemanager enabled
            yarnEnabled = true;
        }

        return String.format(appUrl, applicationId);
L
ligang 已提交
214 215
    }

216 217 218 219 220 221
    public String getJobHistoryUrl(String applicationId) {
        //eg:application_1587475402360_712719 -> job_1587475402360_712719
        String jobId = applicationId.replace("application", "job");
        return String.format(jobHistoryAddress, jobId);
    }

L
ligang 已提交
222 223 224
    /**
     * cat file on hdfs
     *
225
     * @param hdfsFilePath hdfs file path
D
dailidong 已提交
226 227
     * @return byte[] byte array
     * @throws IOException errors
L
ligang 已提交
228 229 230
     */
    public byte[] catFile(String hdfsFilePath) throws IOException {

231 232
        if (StringUtils.isBlank(hdfsFilePath)) {
            logger.error("hdfs file path:{} is blank", hdfsFilePath);
233
            return new byte[0];
L
ligang 已提交
234 235 236 237 238 239 240 241 242 243
        }

        FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath));
        return IOUtils.toByteArray(fsDataInputStream);
    }


    /**
     * cat file on hdfs
     *
244 245 246
     * @param hdfsFilePath hdfs file path
     * @param skipLineNums skip line numbers
     * @param limit        read how many lines
D
dailidong 已提交
247 248
     * @return content of file
     * @throws IOException errors
L
ligang 已提交
249 250 251
     */
    public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {

252 253
        if (StringUtils.isBlank(hdfsFilePath)) {
            logger.error("hdfs file path:{} is blank", hdfsFilePath);
254
            return Collections.emptyList();
L
ligang 已提交
255 256
        }

257
        try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
258 259 260 261
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
            return stream.collect(Collectors.toList());
        }
262

L
ligang 已提交
263 264 265 266 267 268 269 270
    }

    /**
     * make the given file and all non-existent parents into
     * directories. Has the semantics of Unix 'mkdir -p'.
     * Existence of the directory hierarchy is not an error.
     *
     * @param hdfsPath path to create
D
dailidong 已提交
271 272
     * @return mkdir result
     * @throws IOException errors
L
ligang 已提交
273 274 275 276 277 278 279 280 281 282 283 284
     */
    public boolean mkdir(String hdfsPath) throws IOException {
        return fs.mkdirs(new Path(hdfsPath));
    }

    /**
     * copy files between FileSystems
     *
     * @param srcPath      source hdfs path
     * @param dstPath      destination hdfs path
     * @param deleteSource whether to delete the src
     * @param overwrite    whether to overwrite an existing file
D
dailidong 已提交
285 286
     * @return if success or not
     * @throws IOException errors
L
ligang 已提交
287 288 289 290 291 292 293 294
     */
    public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
        return FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, overwrite, fs.getConf());
    }

    /**
     * the src file is on the local disk.  Add it to FS at
     * the given dst name.
295 296 297 298 299
     *
     * @param srcFile      local file
     * @param dstHdfsPath  destination hdfs path
     * @param deleteSource whether to delete the src
     * @param overwrite    whether to overwrite an existing file
D
dailidong 已提交
300 301
     * @return if success or not
     * @throws IOException errors
L
ligang 已提交
302 303 304
     */
    public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
        Path srcPath = new Path(srcFile);
305
        Path dstPath = new Path(dstHdfsPath);
L
ligang 已提交
306 307 308 309 310 311 312 313 314

        fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);

        return true;
    }

    /**
     * copy hdfs file to local
     *
315 316 317 318
     * @param srcHdfsFilePath source hdfs file path
     * @param dstFile         destination file
     * @param deleteSource    delete source
     * @param overwrite       overwrite
D
dailidong 已提交
319 320
     * @return result of copy hdfs file to local
     * @throws IOException errors
L
ligang 已提交
321 322 323 324 325 326 327 328
     */
    public boolean copyHdfsToLocal(String srcHdfsFilePath, String dstFile, boolean deleteSource, boolean overwrite) throws IOException {
        Path srcPath = new Path(srcHdfsFilePath);
        File dstPath = new File(dstFile);

        if (dstPath.exists()) {
            if (dstPath.isFile()) {
                if (overwrite) {
329
                    Files.delete(dstPath.toPath());
L
ligang 已提交
330 331 332 333 334 335
                }
            } else {
                logger.error("destination file must be a file");
            }
        }

336
        if (!dstPath.getParentFile().exists()) {
L
ligang 已提交
337 338 339 340 341 342 343 344 345 346
            dstPath.getParentFile().mkdirs();
        }

        return FileUtil.copy(fs, srcPath, dstPath, deleteSource, fs.getConf());
    }

    /**
     * delete a file
     *
     * @param hdfsFilePath the path to delete.
347 348 349 350
     * @param recursive    if path is a directory and set to
     *                     true, the directory is deleted else throws an exception. In
     *                     case of a file the recursive can be set to either true or false.
     * @return true if delete is successful else false.
D
dailidong 已提交
351
     * @throws IOException errors
L
ligang 已提交
352 353 354 355 356 357 358 359 360
     */
    public boolean delete(String hdfsFilePath, boolean recursive) throws IOException {
        return fs.delete(new Path(hdfsFilePath), recursive);
    }

    /**
     * check if exists
     *
     * @param hdfsFilePath source file path
D
dailidong 已提交
361 362
     * @return result of exists or not
     * @throws IOException errors
L
ligang 已提交
363 364 365 366 367
     */
    public boolean exists(String hdfsFilePath) throws IOException {
        return fs.exists(new Path(hdfsFilePath));
    }

L
ligang 已提交
368 369 370
    /**
     * Gets a list of files in the directory
     *
D
dailidong 已提交
371 372 373
     * @param filePath file path
     * @return {@link FileStatus} file status
     * @throws Exception errors
L
ligang 已提交
374
     */
375
    public FileStatus[] listFileStatus(String filePath) throws Exception {
L
ligang 已提交
376 377 378 379 380 381 382 383
        try {
            return fs.listStatus(new Path(filePath));
        } catch (IOException e) {
            logger.error("Get file list exception", e);
            throw new Exception("Get file list exception", e);
        }
    }

L
ligang 已提交
384 385 386
    /**
     * Renames Path src to Path dst.  Can take place on local fs
     * or remote DFS.
387
     *
L
ligang 已提交
388 389 390
     * @param src path to be renamed
     * @param dst new path after rename
     * @return true if rename is successful
391
     * @throws IOException on failure
L
ligang 已提交
392 393 394 395 396
     */
    public boolean rename(String src, String dst) throws IOException {
        return fs.rename(new Path(src), new Path(dst));
    }

397
    /**
398
     * hadoop resourcemanager enabled or not
399
     *
400
     * @return result
401
     */
402
    public boolean isYarnEnabled() {
403 404
        return yarnEnabled;
    }
L
ligang 已提交
405 406 407 408

    /**
     * get the state of an application
     *
D
dailidong 已提交
409
     * @param applicationId application id
L
ligang 已提交
410
     * @return the return may be null or there may be other parse exceptions
D
dailidong 已提交
411
     * @throws JSONException json exception
L
ligang 已提交
412 413 414 415 416 417
     */
    public ExecutionStatus getApplicationStatus(String applicationId) throws JSONException {
        if (StringUtils.isEmpty(applicationId)) {
            return null;
        }

418
        String result = Constants.FAILED;
L
ligang 已提交
419
        String applicationUrl = getApplicationUrl(applicationId);
420
        logger.info("applicationUrl={}", applicationUrl);
L
ligang 已提交
421 422

        String responseContent = HttpUtils.get(applicationUrl);
423 424 425 426 427 428 429 430 431 432 433
        if (responseContent != null) {
            JSONObject jsonObject = JSON.parseObject(responseContent);
            result = jsonObject.getJSONObject("app").getString("finalStatus");
        } else {
            //may be in job history
            String jobHistoryUrl = getJobHistoryUrl(applicationId);
            logger.info("jobHistoryUrl={}", jobHistoryUrl);
            responseContent = HttpUtils.get(jobHistoryUrl);
            JSONObject jsonObject = JSONObject.parseObject(responseContent);
            result = jsonObject.getJSONObject("job").getString("state");
        }
L
ligang 已提交
434 435

        switch (result) {
Q
qiaozhanwei 已提交
436
            case Constants.ACCEPTED:
L
ligang 已提交
437
                return ExecutionStatus.SUBMITTED_SUCCESS;
Q
qiaozhanwei 已提交
438
            case Constants.SUCCEEDED:
L
ligang 已提交
439
                return ExecutionStatus.SUCCESS;
Q
qiaozhanwei 已提交
440 441 442 443
            case Constants.NEW:
            case Constants.NEW_SAVING:
            case Constants.SUBMITTED:
            case Constants.FAILED:
L
ligang 已提交
444
                return ExecutionStatus.FAILURE;
Q
qiaozhanwei 已提交
445
            case Constants.KILLED:
L
ligang 已提交
446 447
                return ExecutionStatus.KILL;

Q
qiaozhanwei 已提交
448
            case Constants.RUNNING:
L
ligang 已提交
449 450 451 452 453 454
            default:
                return ExecutionStatus.RUNNING_EXEUTION;
        }
    }

    /**
455
     * get data hdfs path
456
     *
L
ligang 已提交
457 458 459
     * @return data hdfs path
     */
    public static String getHdfsDataBasePath() {
460
        if ("/".equals(resourceUploadPath)) {
461 462 463
            // if basepath is configured to /,  the generated url may be  //default/resources (with extra leading /)
            return "";
        } else {
464
            return resourceUploadPath;
465
        }
L
ligang 已提交
466 467
    }

468 469 470 471 472 473
    /**
     * hdfs resource dir
     *
     * @param tenantCode tenant code
     * @return hdfs resource dir
     */
474
    public static String getHdfsDir(ResourceType resourceType, String tenantCode) {
475 476 477 478 479 480 481 482 483
        String hdfsDir = "";
        if (resourceType.equals(ResourceType.FILE)) {
            hdfsDir = getHdfsResDir(tenantCode);
        } else if (resourceType.equals(ResourceType.UDF)) {
            hdfsDir = getHdfsUdfDir(tenantCode);
        }
        return hdfsDir;
    }

L
ligang 已提交
484 485 486 487 488 489
    /**
     * hdfs resource dir
     *
     * @param tenantCode tenant code
     * @return hdfs resource dir
     */
490
    public static String getHdfsResDir(String tenantCode) {
L
ligang 已提交
491 492 493 494
        return String.format("%s/resources", getHdfsTenantDir(tenantCode));
    }

    /**
495 496 497
     * hdfs user dir
     *
     * @param tenantCode tenant code
498
     * @param userId     user id
499 500
     * @return hdfs resource dir
     */
501 502
    public static String getHdfsUserDir(String tenantCode, int userId) {
        return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
503 504 505 506
    }

    /**
     * hdfs udf dir
L
ligang 已提交
507 508 509 510 511 512 513 514
     *
     * @param tenantCode tenant code
     * @return get udf dir on hdfs
     */
    public static String getHdfsUdfDir(String tenantCode) {
        return String.format("%s/udfs", getHdfsTenantDir(tenantCode));
    }

515 516 517 518

    /**
     * get hdfs file name
     *
519 520 521
     * @param resourceType resource type
     * @param tenantCode   tenant code
     * @param fileName     file name
522 523 524
     * @return hdfs file name
     */
    public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
525
        if (fileName.startsWith("/")) {
526
            fileName = fileName.replaceFirst("/", "");
527
        }
528
        return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName);
529 530 531 532 533 534 535
    }

    /**
     * get absolute path and name for resource file on hdfs
     *
     * @param tenantCode tenant code
     * @param fileName   file name
L
ligang 已提交
536 537
     * @return get absolute path and name for file on hdfs
     */
538
    public static String getHdfsResourceFileName(String tenantCode, String fileName) {
539
        if (fileName.startsWith("/")) {
540
            fileName = fileName.replaceFirst("/", "");
541
        }
542
        return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
L
ligang 已提交
543 544 545 546 547 548
    }

    /**
     * get absolute path and name for udf file on hdfs
     *
     * @param tenantCode tenant code
549
     * @param fileName   file name
L
ligang 已提交
550 551
     * @return get absolute path and name for udf file on hdfs
     */
552
    public static String getHdfsUdfFileName(String tenantCode, String fileName) {
553
        if (fileName.startsWith("/")) {
554
            fileName = fileName.replaceFirst("/", "");
555
        }
556
        return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
L
ligang 已提交
557 558 559
    }

    /**
D
dailidong 已提交
560
     * @param tenantCode tenant code
L
ligang 已提交
561 562
     * @return file directory of tenants on hdfs
     */
563
    public static String getHdfsTenantDir(String tenantCode) {
564
        return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
L
ligang 已提交
565 566 567 568 569 570
    }


    /**
     * getAppAddress
     *
D
dailidong 已提交
571
     * @param appAddress app address
572
     * @param rmHa       resource manager ha
D
dailidong 已提交
573
     * @return app address
L
ligang 已提交
574 575 576 577 578 579
     */
    public static String getAppAddress(String appAddress, String rmHa) {

        //get active ResourceManager
        String activeRM = YarnHAAdminUtils.getAcitveRMName(rmHa);

Q
qiaozhanwei 已提交
580
        String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
L
ligang 已提交
581 582 583 584 585

        if (split1.length != 2) {
            return null;
        }

Q
qiaozhanwei 已提交
586 587
        String start = split1[0] + Constants.DOUBLE_SLASH;
        String[] split2 = split1[1].split(Constants.COLON);
L
ligang 已提交
588 589 590 591 592

        if (split2.length != 2) {
            return null;
        }

Q
qiaozhanwei 已提交
593
        String end = Constants.COLON + split2[1];
L
ligang 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624

        return start + activeRM + end;
    }


    @Override
    public void close() throws IOException {
        if (fs != null) {
            try {
                fs.close();
            } catch (IOException e) {
                logger.error("Close HadoopUtils instance failed", e);
                throw new IOException("Close HadoopUtils instance failed", e);
            }
        }
    }


    /**
     * yarn ha admin utils
     */
    private static final class YarnHAAdminUtils extends RMAdminCLI {

        /**
         * get active resourcemanager
         *
         * @param rmIds
         * @return
         */
        public static String getAcitveRMName(String rmIds) {

Q
qiaozhanwei 已提交
625
            String[] rmIdArr = rmIds.split(Constants.COMMA);
L
ligang 已提交
626

Q
qiaozhanwei 已提交
627
            int activeResourceManagerPort = PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
L
ligang 已提交
628 629 630 631 632 633 634 635 636 637

            String yarnUrl = "http://%s:" + activeResourceManagerPort + "/ws/v1/cluster/info";

            String state = null;
            try {
                /**
                 * send http get request to rm1
                 */
                state = getRMState(String.format(yarnUrl, rmIdArr[0]));

Q
qiaozhanwei 已提交
638
                if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
639
                    return rmIdArr[0];
Q
qiaozhanwei 已提交
640
                } else if (Constants.HADOOP_RM_STATE_STANDBY.equals(state)) {
L
ligang 已提交
641
                    state = getRMState(String.format(yarnUrl, rmIdArr[1]));
Q
qiaozhanwei 已提交
642
                    if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
643 644 645 646 647 648 649
                        return rmIdArr[1];
                    }
                } else {
                    return null;
                }
            } catch (Exception e) {
                state = getRMState(String.format(yarnUrl, rmIdArr[1]));
Q
qiaozhanwei 已提交
650
                if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
                    return rmIdArr[0];
                }
            }
            return null;
        }


        /**
         * get ResourceManager state
         *
         * @param url
         * @return
         */
        public static String getRMState(String url) {

            String retStr = HttpUtils.get(url);

            if (StringUtils.isEmpty(retStr)) {
                return null;
            }
            //to json
            JSONObject jsonObject = JSON.parseObject(retStr);

            //get ResourceManager state
675
            return jsonObject.getJSONObject("clusterInfo").getString("haState");
L
ligang 已提交
676 677 678 679
        }

    }
}