HadoopUtils.java 21.8 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.common.utils;
L
ligang 已提交
18

19 20 21
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Q
qiaozhanwei 已提交
22 23 24
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
L
ligang 已提交
25 26 27 28
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
29
import org.apache.dolphinscheduler.common.enums.ResourceType;
L
ligang 已提交
30
import org.apache.hadoop.conf.Configuration;
L
lgcareer 已提交
31
import org.apache.hadoop.fs.*;
L
ligang 已提交
32
import org.apache.hadoop.fs.FileSystem;
journey2018's avatar
journey2018 已提交
33
import org.apache.hadoop.security.UserGroupInformation;
L
ligang 已提交
34 35 36 37 38
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
39
import java.nio.file.Files;
L
add  
lgcareer 已提交
40
import java.security.PrivilegedExceptionAction;
41
import java.util.Collections;
L
ligang 已提交
42
import java.util.List;
43
import java.util.Map;
44
import java.util.concurrent.TimeUnit;
L
ligang 已提交
45 46 47
import java.util.stream.Collectors;
import java.util.stream.Stream;

48 49
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;

L
ligang 已提交
50 51 52 53 54 55 56 57
/**
 * hadoop utils
 * single instance
 */
public class HadoopUtils implements Closeable {

    private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);

L
lgcareer 已提交
58
    private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
59
    public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
D
dailidong 已提交
60 61
    public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
    public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
Q
test  
qiaozhanwei 已提交
62

63 64 65 66 67 68 69 70 71 72 73 74
    private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";

    private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
            .newBuilder()
            .expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 7), TimeUnit.DAYS)
            .build(new CacheLoader<String, HadoopUtils>() {
                @Override
                public HadoopUtils load(String key) throws Exception {
                    return new HadoopUtils();
                }
            });

75
    private static volatile boolean yarnEnabled = false;
L
ligang 已提交
76

77 78
    private Configuration configuration;
    private FileSystem fs;
L
lgcareer 已提交
79

80
    private HadoopUtils() {
L
ligang 已提交
81
        init();
L
lgcareer 已提交
82
        initHdfsPath();
L
ligang 已提交
83 84
    }

85 86 87
    public static HadoopUtils getInstance() {

        return cache.getUnchecked(HADOOP_UTILS_KEY);
L
ligang 已提交
88 89
    }

L
add  
lgcareer 已提交
90
    /**
91
     * init dolphinscheduler root path in hdfs
L
add  
lgcareer 已提交
92
     */
Q
test  
qiaozhanwei 已提交
93

94
    private void initHdfsPath() {
95
        Path path = new Path(resourceUploadPath);
L
lgcareer 已提交
96

L
add  
lgcareer 已提交
97
        try {
L
lgcareer 已提交
98 99 100
            if (!fs.exists(path)) {
                fs.mkdirs(path);
            }
L
add  
lgcareer 已提交
101
        } catch (Exception e) {
102
            logger.error(e.getMessage(), e);
L
add  
lgcareer 已提交
103 104 105
        }
    }

L
lgcareer 已提交
106

L
ligang 已提交
107 108 109 110
    /**
     * init hadoop configuration
     */
    private void init() {
111 112 113
        try {
            configuration = new Configuration();

D
dailidong 已提交
114 115
            String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
            ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
116

D
dailidong 已提交
117 118
            if (resUploadType == ResUploadType.HDFS){
                if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
119 120
                    System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
                            PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
D
dailidong 已提交
121 122
                    configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
                    hdfsUser = "";
123 124 125 126
                    UserGroupInformation.setConfiguration(configuration);
                    UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
                }
journey2018's avatar
journey2018 已提交
127

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
                String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
                //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
                // the default is the local file system
                if (defaultFS.startsWith("file")) {
                    String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
                    if (StringUtils.isNotBlank(defaultFSProp)) {
                        Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
                        configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
                        fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
                    } else {
                        logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
                        throw new RuntimeException(
                                String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
                        );
                    }
                } else {
                    logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
                }
L
ligang 已提交
146

147 148 149 150 151 152 153 154
                if (fs == null) {
                    if (StringUtils.isNotEmpty(hdfsUser)) {
                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
                        ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
                            @Override
                            public Boolean run() throws Exception {
                                fs = FileSystem.get(configuration);
                                return true;
L
lgcareer 已提交
155
                            }
156 157 158 159
                        });
                    } else {
                        logger.warn("hdfs.root.user is not set value!");
                        fs = FileSystem.get(configuration);
L
ligang 已提交
160 161
                    }
                }
162 163 164 165 166 167
            } else if (resUploadType == ResUploadType.S3) {
                configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
                configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
                configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
                configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
                fs = FileSystem.get(configuration);
L
ligang 已提交
168
            }
169 170 171 172


        } catch (Exception e) {
            logger.error(e.getMessage(), e);
L
ligang 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185
        }
    }

    /**
     * @return Configuration
     */
    public Configuration getConfiguration() {
        return configuration;
    }

    /**
     * get application url
     *
D
dailidong 已提交
186 187
     * @param applicationId application id
     * @return url of application
L
ligang 已提交
188 189
     */
    public String getApplicationUrl(String applicationId) {
D
dailidong 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
        /**
         * if rmHaIds contains xx, it signs not use resourcemanager
         * otherwise:
         *  if rmHaIds is empty, single resourcemanager enabled
         *  if rmHaIds not empty: resourcemanager HA enabled
         */
        String appUrl = "";
        //not use resourcemanager
        if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){

            yarnEnabled = false;
            logger.warn("should not step here");
        } else if (!StringUtils.isEmpty(rmHaIds)) {
            //resourcemanager HA enabled
            appUrl = getAppAddress(appAddress, rmHaIds);
            yarnEnabled = true;
            logger.info("application url : {}", appUrl);
        } else {
            //single resourcemanager enabled
            yarnEnabled = true;
        }

        return String.format(appUrl, applicationId);
L
ligang 已提交
213 214 215 216 217
    }

    /**
     * cat file on hdfs
     *
218
     * @param hdfsFilePath hdfs file path
D
dailidong 已提交
219 220
     * @return byte[] byte array
     * @throws IOException errors
L
ligang 已提交
221 222 223
     */
    public byte[] catFile(String hdfsFilePath) throws IOException {

224 225
        if (StringUtils.isBlank(hdfsFilePath)) {
            logger.error("hdfs file path:{} is blank", hdfsFilePath);
226
            return new byte[0];
L
ligang 已提交
227 228 229 230 231 232 233 234 235 236
        }

        FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath));
        return IOUtils.toByteArray(fsDataInputStream);
    }


    /**
     * cat file on hdfs
     *
237 238 239
     * @param hdfsFilePath hdfs file path
     * @param skipLineNums skip line numbers
     * @param limit        read how many lines
D
dailidong 已提交
240 241
     * @return content of file
     * @throws IOException errors
L
ligang 已提交
242 243 244
     */
    public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {

245 246
        if (StringUtils.isBlank(hdfsFilePath)) {
            logger.error("hdfs file path:{} is blank", hdfsFilePath);
247
            return Collections.emptyList();
L
ligang 已提交
248 249
        }

250
        try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
251 252 253 254
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
            return stream.collect(Collectors.toList());
        }
255

L
ligang 已提交
256 257 258 259 260 261 262 263
    }

    /**
     * make the given file and all non-existent parents into
     * directories. Has the semantics of Unix 'mkdir -p'.
     * Existence of the directory hierarchy is not an error.
     *
     * @param hdfsPath path to create
D
dailidong 已提交
264 265
     * @return mkdir result
     * @throws IOException errors
L
ligang 已提交
266 267 268 269 270 271 272 273 274 275 276 277
     */
    public boolean mkdir(String hdfsPath) throws IOException {
        return fs.mkdirs(new Path(hdfsPath));
    }

    /**
     * copy files between FileSystems
     *
     * @param srcPath      source hdfs path
     * @param dstPath      destination hdfs path
     * @param deleteSource whether to delete the src
     * @param overwrite    whether to overwrite an existing file
D
dailidong 已提交
278 279
     * @return if success or not
     * @throws IOException errors
L
ligang 已提交
280 281 282 283 284 285 286 287
     */
    public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
        return FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, overwrite, fs.getConf());
    }

    /**
     * the src file is on the local disk.  Add it to FS at
     * the given dst name.
288 289 290 291 292
     *
     * @param srcFile      local file
     * @param dstHdfsPath  destination hdfs path
     * @param deleteSource whether to delete the src
     * @param overwrite    whether to overwrite an existing file
D
dailidong 已提交
293 294
     * @return if success or not
     * @throws IOException errors
L
ligang 已提交
295 296 297
     */
    public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
        Path srcPath = new Path(srcFile);
298
        Path dstPath = new Path(dstHdfsPath);
L
ligang 已提交
299 300 301 302 303 304 305 306 307

        fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);

        return true;
    }

    /**
     * copy hdfs file to local
     *
308 309 310 311
     * @param srcHdfsFilePath source hdfs file path
     * @param dstFile         destination file
     * @param deleteSource    delete source
     * @param overwrite       overwrite
D
dailidong 已提交
312 313
     * @return result of copy hdfs file to local
     * @throws IOException errors
L
ligang 已提交
314 315 316 317 318 319 320 321
     */
    public boolean copyHdfsToLocal(String srcHdfsFilePath, String dstFile, boolean deleteSource, boolean overwrite) throws IOException {
        Path srcPath = new Path(srcHdfsFilePath);
        File dstPath = new File(dstFile);

        if (dstPath.exists()) {
            if (dstPath.isFile()) {
                if (overwrite) {
322
                    Files.delete(dstPath.toPath());
L
ligang 已提交
323 324 325 326 327 328
                }
            } else {
                logger.error("destination file must be a file");
            }
        }

329
        if (!dstPath.getParentFile().exists()) {
L
ligang 已提交
330 331 332 333 334 335 336 337 338 339
            dstPath.getParentFile().mkdirs();
        }

        return FileUtil.copy(fs, srcPath, dstPath, deleteSource, fs.getConf());
    }

    /**
     * delete a file
     *
     * @param hdfsFilePath the path to delete.
340 341 342 343
     * @param recursive    if path is a directory and set to
     *                     true, the directory is deleted else throws an exception. In
     *                     case of a file the recursive can be set to either true or false.
     * @return true if delete is successful else false.
D
dailidong 已提交
344
     * @throws IOException errors
L
ligang 已提交
345 346 347 348 349 350 351 352 353
     */
    public boolean delete(String hdfsFilePath, boolean recursive) throws IOException {
        return fs.delete(new Path(hdfsFilePath), recursive);
    }

    /**
     * check if exists
     *
     * @param hdfsFilePath source file path
D
dailidong 已提交
354 355
     * @return result of exists or not
     * @throws IOException errors
L
ligang 已提交
356 357 358 359 360
     */
    public boolean exists(String hdfsFilePath) throws IOException {
        return fs.exists(new Path(hdfsFilePath));
    }

L
ligang 已提交
361 362 363
    /**
     * Gets a list of files in the directory
     *
D
dailidong 已提交
364 365 366
     * @param filePath file path
     * @return {@link FileStatus} file status
     * @throws Exception errors
L
ligang 已提交
367
     */
368
    public FileStatus[] listFileStatus(String filePath) throws Exception {
L
ligang 已提交
369 370 371 372 373 374 375 376
        try {
            return fs.listStatus(new Path(filePath));
        } catch (IOException e) {
            logger.error("Get file list exception", e);
            throw new Exception("Get file list exception", e);
        }
    }

L
ligang 已提交
377 378 379
    /**
     * Renames Path src to Path dst.  Can take place on local fs
     * or remote DFS.
380
     *
L
ligang 已提交
381 382 383
     * @param src path to be renamed
     * @param dst new path after rename
     * @return true if rename is successful
384
     * @throws IOException on failure
L
ligang 已提交
385 386 387 388 389
     */
    public boolean rename(String src, String dst) throws IOException {
        return fs.rename(new Path(src), new Path(dst));
    }

390
    /**
391 392
     * hadoop resourcemanager enabled or not
     * @return result
393 394 395 396
     */
    public boolean isYarnEnabled()  {
        return yarnEnabled;
    }
L
ligang 已提交
397 398 399 400

    /**
     * get the state of an application
     *
D
dailidong 已提交
401
     * @param applicationId application id
L
ligang 已提交
402
     * @return the return may be null or there may be other parse exceptions
D
dailidong 已提交
403
     * @throws JSONException json exception
L
ligang 已提交
404 405 406 407 408 409 410 411 412 413
     */
    public ExecutionStatus getApplicationStatus(String applicationId) throws JSONException {
        if (StringUtils.isEmpty(applicationId)) {
            return null;
        }

        String applicationUrl = getApplicationUrl(applicationId);

        String responseContent = HttpUtils.get(applicationUrl);

414
        JSONObject jsonObject = JSON.parseObject(responseContent);
L
ligang 已提交
415 416 417
        String result = jsonObject.getJSONObject("app").getString("finalStatus");

        switch (result) {
Q
qiaozhanwei 已提交
418
            case Constants.ACCEPTED:
L
ligang 已提交
419
                return ExecutionStatus.SUBMITTED_SUCCESS;
Q
qiaozhanwei 已提交
420
            case Constants.SUCCEEDED:
L
ligang 已提交
421
                return ExecutionStatus.SUCCESS;
Q
qiaozhanwei 已提交
422 423 424 425
            case Constants.NEW:
            case Constants.NEW_SAVING:
            case Constants.SUBMITTED:
            case Constants.FAILED:
L
ligang 已提交
426
                return ExecutionStatus.FAILURE;
Q
qiaozhanwei 已提交
427
            case Constants.KILLED:
L
ligang 已提交
428 429
                return ExecutionStatus.KILL;

Q
qiaozhanwei 已提交
430
            case Constants.RUNNING:
L
ligang 已提交
431 432 433 434 435 436
            default:
                return ExecutionStatus.RUNNING_EXEUTION;
        }
    }

    /**
437
     * get data hdfs path
L
ligang 已提交
438 439 440
     * @return data hdfs path
     */
    public static String getHdfsDataBasePath() {
441
        if ("/".equals(resourceUploadPath)) {
442 443 444
            // if basepath is configured to /,  the generated url may be  //default/resources (with extra leading /)
            return "";
        } else {
445
            return resourceUploadPath;
446
        }
L
ligang 已提交
447 448
    }

449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
    /**
     * hdfs resource dir
     *
     * @param tenantCode tenant code
     * @return hdfs resource dir
     */
    public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
        String hdfsDir = "";
        if (resourceType.equals(ResourceType.FILE)) {
            hdfsDir = getHdfsResDir(tenantCode);
        } else if (resourceType.equals(ResourceType.UDF)) {
            hdfsDir = getHdfsUdfDir(tenantCode);
        }
        return hdfsDir;
    }

L
ligang 已提交
465 466 467 468 469 470
    /**
     * hdfs resource dir
     *
     * @param tenantCode tenant code
     * @return hdfs resource dir
     */
471
    public static String getHdfsResDir(String tenantCode) {
L
ligang 已提交
472 473 474 475
        return String.format("%s/resources", getHdfsTenantDir(tenantCode));
    }

    /**
476 477 478
     * hdfs user dir
     *
     * @param tenantCode tenant code
479
     * @param userId     user id
480 481
     * @return hdfs resource dir
     */
482 483
    public static String getHdfsUserDir(String tenantCode, int userId) {
        return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
484 485 486 487
    }

    /**
     * hdfs udf dir
L
ligang 已提交
488 489 490 491 492 493 494 495
     *
     * @param tenantCode tenant code
     * @return get udf dir on hdfs
     */
    public static String getHdfsUdfDir(String tenantCode) {
        return String.format("%s/udfs", getHdfsTenantDir(tenantCode));
    }

496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513

    /**
     * get hdfs file name
     *
     * @param resourceType  resource type
     * @param tenantCode    tenant code
     * @param fileName      file name
     * @return hdfs file name
     */
    public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
        return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
    }

    /**
     * get absolute path and name for resource file on hdfs
     *
     * @param tenantCode tenant code
     * @param fileName   file name
L
ligang 已提交
514 515
     * @return get absolute path and name for file on hdfs
     */
516 517
    public static String getHdfsResourceFileName(String tenantCode, String fileName) {
        return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
L
ligang 已提交
518 519 520 521 522 523
    }

    /**
     * get absolute path and name for udf file on hdfs
     *
     * @param tenantCode tenant code
524
     * @param fileName   file name
L
ligang 已提交
525 526
     * @return get absolute path and name for udf file on hdfs
     */
527 528
    public static String getHdfsUdfFileName(String tenantCode, String fileName) {
        return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
L
ligang 已提交
529 530 531
    }

    /**
D
dailidong 已提交
532
     * @param tenantCode tenant code
L
ligang 已提交
533 534
     * @return file directory of tenants on hdfs
     */
535
    public static String getHdfsTenantDir(String tenantCode) {
536
        return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
L
ligang 已提交
537 538 539 540 541 542
    }


    /**
     * getAppAddress
     *
D
dailidong 已提交
543
     * @param appAddress app address
544
     * @param rmHa       resource manager ha
D
dailidong 已提交
545
     * @return app address
L
ligang 已提交
546 547 548 549 550 551
     */
    public static String getAppAddress(String appAddress, String rmHa) {

        //get active ResourceManager
        String activeRM = YarnHAAdminUtils.getAcitveRMName(rmHa);

Q
qiaozhanwei 已提交
552
        String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
L
ligang 已提交
553 554 555 556 557

        if (split1.length != 2) {
            return null;
        }

Q
qiaozhanwei 已提交
558 559
        String start = split1[0] + Constants.DOUBLE_SLASH;
        String[] split2 = split1[1].split(Constants.COLON);
L
ligang 已提交
560 561 562 563 564

        if (split2.length != 2) {
            return null;
        }

Q
qiaozhanwei 已提交
565
        String end = Constants.COLON + split2[1];
L
ligang 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596

        return start + activeRM + end;
    }


    @Override
    public void close() throws IOException {
        if (fs != null) {
            try {
                fs.close();
            } catch (IOException e) {
                logger.error("Close HadoopUtils instance failed", e);
                throw new IOException("Close HadoopUtils instance failed", e);
            }
        }
    }


    /**
     * yarn ha admin utils
     */
    private static final class YarnHAAdminUtils extends RMAdminCLI {

        /**
         * get active resourcemanager
         *
         * @param rmIds
         * @return
         */
        public static String getAcitveRMName(String rmIds) {

Q
qiaozhanwei 已提交
597
            String[] rmIdArr = rmIds.split(Constants.COMMA);
L
ligang 已提交
598

Q
qiaozhanwei 已提交
599
            int activeResourceManagerPort = PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
L
ligang 已提交
600 601 602 603 604 605 606 607 608 609

            String yarnUrl = "http://%s:" + activeResourceManagerPort + "/ws/v1/cluster/info";

            String state = null;
            try {
                /**
                 * send http get request to rm1
                 */
                state = getRMState(String.format(yarnUrl, rmIdArr[0]));

Q
qiaozhanwei 已提交
610
                if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
611
                    return rmIdArr[0];
Q
qiaozhanwei 已提交
612
                } else if (Constants.HADOOP_RM_STATE_STANDBY.equals(state)) {
L
ligang 已提交
613
                    state = getRMState(String.format(yarnUrl, rmIdArr[1]));
Q
qiaozhanwei 已提交
614
                    if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
615 616 617 618 619 620 621
                        return rmIdArr[1];
                    }
                } else {
                    return null;
                }
            } catch (Exception e) {
                state = getRMState(String.format(yarnUrl, rmIdArr[1]));
Q
qiaozhanwei 已提交
622
                if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
                    return rmIdArr[0];
                }
            }
            return null;
        }


        /**
         * get ResourceManager state
         *
         * @param url
         * @return
         */
        public static String getRMState(String url) {

            String retStr = HttpUtils.get(url);

            if (StringUtils.isEmpty(retStr)) {
                return null;
            }
            //to json
            JSONObject jsonObject = JSON.parseObject(retStr);

            //get ResourceManager state
647
            return jsonObject.getJSONObject("clusterInfo").getString("haState");
L
ligang 已提交
648 649 650 651
        }

    }
}