HadoopUtils.java 23.7 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.common.utils;
L
ligang 已提交
18

张世鸣 已提交
19
import com.fasterxml.jackson.databind.node.ObjectNode;
20 21 22
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
23
import org.apache.commons.io.IOUtils;
Q
qiaozhanwei 已提交
24 25 26
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
27
import org.apache.dolphinscheduler.common.enums.ResourceType;
L
ligang 已提交
28 29
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
S
simon 已提交
30
import org.apache.hadoop.fs.*;
journey2018's avatar
journey2018 已提交
31
import org.apache.hadoop.security.UserGroupInformation;
L
ligang 已提交
32 33 34 35 36
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
37
import java.nio.file.Files;
L
add  
lgcareer 已提交
38
import java.security.PrivilegedExceptionAction;
39
import java.util.Collections;
L
ligang 已提交
40
import java.util.List;
41
import java.util.Map;
42
import java.util.concurrent.TimeUnit;
L
ligang 已提交
43 44 45
import java.util.stream.Collectors;
import java.util.stream.Stream;

46 47
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;

L
ligang 已提交
48 49 50 51 52 53 54 55
/**
 * hadoop utils
 * single instance
 */
public class HadoopUtils implements Closeable {

    private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);

L
lgcareer 已提交
56
    private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
57
    public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
D
dailidong 已提交
58 59
    public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
    public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
60
    public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
Q
test  
qiaozhanwei 已提交
61

62 63 64 65
    private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";

    private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
            .newBuilder()
66
            .expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 2), TimeUnit.HOURS)
67 68 69 70 71 72 73
            .build(new CacheLoader<String, HadoopUtils>() {
                @Override
                public HadoopUtils load(String key) throws Exception {
                    return new HadoopUtils();
                }
            });

74
    private static volatile boolean yarnEnabled = false;
L
ligang 已提交
75

76 77
    private Configuration configuration;
    private FileSystem fs;
L
lgcareer 已提交
78

79
    private HadoopUtils() {
L
ligang 已提交
80
        init();
L
lgcareer 已提交
81
        initHdfsPath();
L
ligang 已提交
82 83
    }

84 85 86
    public static HadoopUtils getInstance() {

        return cache.getUnchecked(HADOOP_UTILS_KEY);
L
ligang 已提交
87 88
    }

L
add  
lgcareer 已提交
89
    /**
90
     * init dolphinscheduler root path in hdfs
L
add  
lgcareer 已提交
91
     */
Q
test  
qiaozhanwei 已提交
92

93
    private void initHdfsPath() {
94
        Path path = new Path(resourceUploadPath);
L
lgcareer 已提交
95

L
add  
lgcareer 已提交
96
        try {
L
lgcareer 已提交
97 98 99
            if (!fs.exists(path)) {
                fs.mkdirs(path);
            }
L
add  
lgcareer 已提交
100
        } catch (Exception e) {
101
            logger.error(e.getMessage(), e);
L
add  
lgcareer 已提交
102 103 104
        }
    }

L
lgcareer 已提交
105

L
ligang 已提交
106 107 108 109
    /**
     * init hadoop configuration
     */
    private void init() {
110 111 112
        try {
            configuration = new Configuration();

113
            String resourceStorageType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
D
dailidong 已提交
114
            ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
115

116 117
            if (resUploadType == ResUploadType.HDFS) {
                if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
118 119
                    System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
                            PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
120
                    configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
D
dailidong 已提交
121
                    hdfsUser = "";
122 123 124 125
                    UserGroupInformation.setConfiguration(configuration);
                    UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
                }
journey2018's avatar
journey2018 已提交
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
                String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
                //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
                // the default is the local file system
                if (defaultFS.startsWith("file")) {
                    String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
                    if (StringUtils.isNotBlank(defaultFSProp)) {
                        Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
                        configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
                        fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
                    } else {
                        logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
                        throw new RuntimeException(
                                String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
                        );
                    }
                } else {
                    logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
                }
L
ligang 已提交
145

146 147 148 149 150 151 152 153
                if (fs == null) {
                    if (StringUtils.isNotEmpty(hdfsUser)) {
                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
                        ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
                            @Override
                            public Boolean run() throws Exception {
                                fs = FileSystem.get(configuration);
                                return true;
L
lgcareer 已提交
154
                            }
155 156 157 158
                        });
                    } else {
                        logger.warn("hdfs.root.user is not set value!");
                        fs = FileSystem.get(configuration);
L
ligang 已提交
159 160
                    }
                }
161
            } else if (resUploadType == ResUploadType.S3) {
162
                System.setProperty(Constants.AWS_S3_V4, Constants.STRING_TRUE);
163 164 165 166 167
                configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
                configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
                configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
                configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
                fs = FileSystem.get(configuration);
L
ligang 已提交
168
            }
169 170 171 172


        } catch (Exception e) {
            logger.error(e.getMessage(), e);
L
ligang 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185
        }
    }

    /**
     * @return Configuration
     */
    public Configuration getConfiguration() {
        return configuration;
    }

    /**
     * get application url
     *
D
dailidong 已提交
186 187
     * @param applicationId application id
     * @return url of application
L
ligang 已提交
188
     */
189
    public String getApplicationUrl(String applicationId) throws Exception {
D
dailidong 已提交
190 191 192 193 194 195 196 197
        /**
         * if rmHaIds contains xx, it signs not use resourcemanager
         * otherwise:
         *  if rmHaIds is empty, single resourcemanager enabled
         *  if rmHaIds not empty: resourcemanager HA enabled
         */
        String appUrl = "";

198
        if (StringUtils.isEmpty(rmHaIds)) {
199 200 201 202
            //single resourcemanager enabled
            appUrl = appAddress;
            yarnEnabled = true;
        } else {
D
dailidong 已提交
203 204 205 206 207 208
            //resourcemanager HA enabled
            appUrl = getAppAddress(appAddress, rmHaIds);
            yarnEnabled = true;
            logger.info("application url : {}", appUrl);
        }

209
        if (StringUtils.isBlank(appUrl)) {
210 211
            throw new Exception("application url is blank");
        }
D
dailidong 已提交
212
        return String.format(appUrl, applicationId);
L
ligang 已提交
213 214
    }

215 216 217 218 219 220
    public String getJobHistoryUrl(String applicationId) {
        //eg:application_1587475402360_712719 -> job_1587475402360_712719
        String jobId = applicationId.replace("application", "job");
        return String.format(jobHistoryAddress, jobId);
    }

L
ligang 已提交
221 222 223
    /**
     * cat file on hdfs
     *
224
     * @param hdfsFilePath hdfs file path
D
dailidong 已提交
225 226
     * @return byte[] byte array
     * @throws IOException errors
L
ligang 已提交
227 228 229
     */
    public byte[] catFile(String hdfsFilePath) throws IOException {

230 231
        if (StringUtils.isBlank(hdfsFilePath)) {
            logger.error("hdfs file path:{} is blank", hdfsFilePath);
232
            return new byte[0];
L
ligang 已提交
233 234 235 236 237 238 239 240 241 242
        }

        FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath));
        return IOUtils.toByteArray(fsDataInputStream);
    }


    /**
     * cat file on hdfs
     *
243 244 245
     * @param hdfsFilePath hdfs file path
     * @param skipLineNums skip line numbers
     * @param limit        read how many lines
D
dailidong 已提交
246 247
     * @return content of file
     * @throws IOException errors
L
ligang 已提交
248 249 250
     */
    public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {

251 252
        if (StringUtils.isBlank(hdfsFilePath)) {
            logger.error("hdfs file path:{} is blank", hdfsFilePath);
253
            return Collections.emptyList();
L
ligang 已提交
254 255
        }

256
        try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
257 258 259 260
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
            return stream.collect(Collectors.toList());
        }
261

L
ligang 已提交
262 263 264 265 266 267 268 269
    }

    /**
     * make the given file and all non-existent parents into
     * directories. Has the semantics of Unix 'mkdir -p'.
     * Existence of the directory hierarchy is not an error.
     *
     * @param hdfsPath path to create
D
dailidong 已提交
270 271
     * @return mkdir result
     * @throws IOException errors
L
ligang 已提交
272 273 274 275 276 277 278 279 280 281 282 283
     */
    public boolean mkdir(String hdfsPath) throws IOException {
        return fs.mkdirs(new Path(hdfsPath));
    }

    /**
     * copy files between FileSystems
     *
     * @param srcPath      source hdfs path
     * @param dstPath      destination hdfs path
     * @param deleteSource whether to delete the src
     * @param overwrite    whether to overwrite an existing file
D
dailidong 已提交
284 285
     * @return if success or not
     * @throws IOException errors
L
ligang 已提交
286 287 288 289 290 291 292 293
     */
    public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
        return FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, overwrite, fs.getConf());
    }

    /**
     * the src file is on the local disk.  Add it to FS at
     * the given dst name.
294 295 296 297 298
     *
     * @param srcFile      local file
     * @param dstHdfsPath  destination hdfs path
     * @param deleteSource whether to delete the src
     * @param overwrite    whether to overwrite an existing file
D
dailidong 已提交
299 300
     * @return if success or not
     * @throws IOException errors
L
ligang 已提交
301 302 303
     */
    public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
        Path srcPath = new Path(srcFile);
304
        Path dstPath = new Path(dstHdfsPath);
L
ligang 已提交
305 306 307 308 309 310 311 312 313

        fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);

        return true;
    }

    /**
     * copy hdfs file to local
     *
314 315 316 317
     * @param srcHdfsFilePath source hdfs file path
     * @param dstFile         destination file
     * @param deleteSource    delete source
     * @param overwrite       overwrite
D
dailidong 已提交
318 319
     * @return result of copy hdfs file to local
     * @throws IOException errors
L
ligang 已提交
320 321 322 323 324 325 326 327
     */
    public boolean copyHdfsToLocal(String srcHdfsFilePath, String dstFile, boolean deleteSource, boolean overwrite) throws IOException {
        Path srcPath = new Path(srcHdfsFilePath);
        File dstPath = new File(dstFile);

        if (dstPath.exists()) {
            if (dstPath.isFile()) {
                if (overwrite) {
328
                    Files.delete(dstPath.toPath());
L
ligang 已提交
329 330 331 332 333 334
                }
            } else {
                logger.error("destination file must be a file");
            }
        }

335
        if (!dstPath.getParentFile().exists()) {
L
ligang 已提交
336 337 338 339 340 341 342 343 344 345
            dstPath.getParentFile().mkdirs();
        }

        return FileUtil.copy(fs, srcPath, dstPath, deleteSource, fs.getConf());
    }

    /**
     * delete a file
     *
     * @param hdfsFilePath the path to delete.
346 347 348 349
     * @param recursive    if path is a directory and set to
     *                     true, the directory is deleted else throws an exception. In
     *                     case of a file the recursive can be set to either true or false.
     * @return true if delete is successful else false.
D
dailidong 已提交
350
     * @throws IOException errors
L
ligang 已提交
351 352 353 354 355 356 357 358 359
     */
    public boolean delete(String hdfsFilePath, boolean recursive) throws IOException {
        return fs.delete(new Path(hdfsFilePath), recursive);
    }

    /**
     * check if exists
     *
     * @param hdfsFilePath source file path
D
dailidong 已提交
360 361
     * @return result of exists or not
     * @throws IOException errors
L
ligang 已提交
362 363 364 365 366
     */
    public boolean exists(String hdfsFilePath) throws IOException {
        return fs.exists(new Path(hdfsFilePath));
    }

L
ligang 已提交
367 368 369
    /**
     * Gets a list of files in the directory
     *
D
dailidong 已提交
370 371 372
     * @param filePath file path
     * @return {@link FileStatus} file status
     * @throws Exception errors
L
ligang 已提交
373
     */
374
    public FileStatus[] listFileStatus(String filePath) throws Exception {
L
ligang 已提交
375 376 377 378 379 380 381 382
        try {
            return fs.listStatus(new Path(filePath));
        } catch (IOException e) {
            logger.error("Get file list exception", e);
            throw new Exception("Get file list exception", e);
        }
    }

L
ligang 已提交
383 384 385
    /**
     * Renames Path src to Path dst.  Can take place on local fs
     * or remote DFS.
386
     *
L
ligang 已提交
387 388 389
     * @param src path to be renamed
     * @param dst new path after rename
     * @return true if rename is successful
390
     * @throws IOException on failure
L
ligang 已提交
391 392 393 394 395
     */
    public boolean rename(String src, String dst) throws IOException {
        return fs.rename(new Path(src), new Path(dst));
    }

396
    /**
397
     * hadoop resourcemanager enabled or not
398
     *
399
     * @return result
400
     */
401
    public boolean isYarnEnabled() {
402 403
        return yarnEnabled;
    }
L
ligang 已提交
404 405 406 407

    /**
     * get the state of an application
     *
D
dailidong 已提交
408
     * @param applicationId application id
L
ligang 已提交
409 410
     * @return the return may be null or there may be other parse exceptions
     */
B
bao liang 已提交
411
    public ExecutionStatus getApplicationStatus(String applicationId) throws Exception {
L
ligang 已提交
412 413 414 415
        if (StringUtils.isEmpty(applicationId)) {
            return null;
        }

416
        String result = Constants.FAILED;
L
ligang 已提交
417
        String applicationUrl = getApplicationUrl(applicationId);
418
        logger.info("applicationUrl={}", applicationUrl);
L
ligang 已提交
419

420 421 422 423 424 425
        String responseContent;
        if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
            responseContent = KerberosHttpClient.get(applicationUrl);
        } else {
            responseContent = HttpUtils.get(applicationUrl);
        }
426
        if (responseContent != null) {
张世鸣 已提交
427
            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
428 429 430
            if (!jsonObject.has("app")) {
                return ExecutionStatus.FAILURE;
            }
张世鸣 已提交
431
            result = jsonObject.path("app").path("finalStatus").asText();
432

433 434 435 436 437
        } else {
            //may be in job history
            String jobHistoryUrl = getJobHistoryUrl(applicationId);
            logger.info("jobHistoryUrl={}", jobHistoryUrl);
            responseContent = HttpUtils.get(jobHistoryUrl);
438 439 440 441 442 443 444
            if (null != responseContent) {
                ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
                if (!jsonObject.has("job")) {
                    return ExecutionStatus.FAILURE;
                }
                result = jsonObject.path("job").path("state").asText();
            } else {
S
simon824 已提交
445
                return ExecutionStatus.FAILURE;
S
simon824 已提交
446
            }
447
        }
L
ligang 已提交
448 449

        switch (result) {
Q
qiaozhanwei 已提交
450
            case Constants.ACCEPTED:
L
ligang 已提交
451
                return ExecutionStatus.SUBMITTED_SUCCESS;
Q
qiaozhanwei 已提交
452
            case Constants.SUCCEEDED:
L
ligang 已提交
453
                return ExecutionStatus.SUCCESS;
Q
qiaozhanwei 已提交
454 455 456 457
            case Constants.NEW:
            case Constants.NEW_SAVING:
            case Constants.SUBMITTED:
            case Constants.FAILED:
L
ligang 已提交
458
                return ExecutionStatus.FAILURE;
Q
qiaozhanwei 已提交
459
            case Constants.KILLED:
L
ligang 已提交
460 461
                return ExecutionStatus.KILL;

Q
qiaozhanwei 已提交
462
            case Constants.RUNNING:
L
ligang 已提交
463
            default:
_和's avatar
_和 已提交
464
                return ExecutionStatus.RUNNING_EXECUTION;
L
ligang 已提交
465 466 467 468
        }
    }

    /**
469
     * get data hdfs path
470
     *
L
ligang 已提交
471 472 473
     * @return data hdfs path
     */
    public static String getHdfsDataBasePath() {
474
        if ("/".equals(resourceUploadPath)) {
475 476 477
            // if basepath is configured to /,  the generated url may be  //default/resources (with extra leading /)
            return "";
        } else {
478
            return resourceUploadPath;
479
        }
L
ligang 已提交
480 481
    }

482 483 484
    /**
     * hdfs resource dir
     *
485
     * @param tenantCode   tenant code
S
simon 已提交
486
     * @param resourceType resource type
487 488
     * @return hdfs resource dir
     */
489
    public static String getHdfsDir(ResourceType resourceType, String tenantCode) {
490 491 492 493 494 495 496 497 498
        String hdfsDir = "";
        if (resourceType.equals(ResourceType.FILE)) {
            hdfsDir = getHdfsResDir(tenantCode);
        } else if (resourceType.equals(ResourceType.UDF)) {
            hdfsDir = getHdfsUdfDir(tenantCode);
        }
        return hdfsDir;
    }

L
ligang 已提交
499 500 501 502 503 504
    /**
     * hdfs resource dir
     *
     * @param tenantCode tenant code
     * @return hdfs resource dir
     */
505
    public static String getHdfsResDir(String tenantCode) {
L
ligang 已提交
506 507 508 509
        return String.format("%s/resources", getHdfsTenantDir(tenantCode));
    }

    /**
510 511 512
     * hdfs user dir
     *
     * @param tenantCode tenant code
513
     * @param userId     user id
514 515
     * @return hdfs resource dir
     */
516 517
    public static String getHdfsUserDir(String tenantCode, int userId) {
        return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
518 519 520 521
    }

    /**
     * hdfs udf dir
L
ligang 已提交
522 523 524 525 526 527 528 529
     *
     * @param tenantCode tenant code
     * @return get udf dir on hdfs
     */
    public static String getHdfsUdfDir(String tenantCode) {
        return String.format("%s/udfs", getHdfsTenantDir(tenantCode));
    }

530 531 532 533

    /**
     * get hdfs file name
     *
534 535 536
     * @param resourceType resource type
     * @param tenantCode   tenant code
     * @param fileName     file name
537 538 539
     * @return hdfs file name
     */
    public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
540
        if (fileName.startsWith("/")) {
541
            fileName = fileName.replaceFirst("/", "");
542
        }
543
        return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName);
544 545 546 547 548 549 550
    }

    /**
     * get absolute path and name for resource file on hdfs
     *
     * @param tenantCode tenant code
     * @param fileName   file name
L
ligang 已提交
551 552
     * @return get absolute path and name for file on hdfs
     */
553
    public static String getHdfsResourceFileName(String tenantCode, String fileName) {
554
        if (fileName.startsWith("/")) {
555
            fileName = fileName.replaceFirst("/", "");
556
        }
557
        return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
L
ligang 已提交
558 559 560 561 562 563
    }

    /**
     * get absolute path and name for udf file on hdfs
     *
     * @param tenantCode tenant code
564
     * @param fileName   file name
L
ligang 已提交
565 566
     * @return get absolute path and name for udf file on hdfs
     */
567
    public static String getHdfsUdfFileName(String tenantCode, String fileName) {
568
        if (fileName.startsWith("/")) {
569
            fileName = fileName.replaceFirst("/", "");
570
        }
571
        return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
L
ligang 已提交
572 573 574
    }

    /**
D
dailidong 已提交
575
     * @param tenantCode tenant code
L
ligang 已提交
576 577
     * @return file directory of tenants on hdfs
     */
578
    public static String getHdfsTenantDir(String tenantCode) {
579
        return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
L
ligang 已提交
580 581 582 583 584 585
    }


    /**
     * getAppAddress
     *
D
dailidong 已提交
586
     * @param appAddress app address
587
     * @param rmHa       resource manager ha
D
dailidong 已提交
588
     * @return app address
L
ligang 已提交
589 590 591 592 593 594
     */
    public static String getAppAddress(String appAddress, String rmHa) {

        //get active ResourceManager
        String activeRM = YarnHAAdminUtils.getAcitveRMName(rmHa);

Q
qiaozhanwei 已提交
595
        String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
L
ligang 已提交
596 597 598 599 600

        if (split1.length != 2) {
            return null;
        }

Q
qiaozhanwei 已提交
601 602
        String start = split1[0] + Constants.DOUBLE_SLASH;
        String[] split2 = split1[1].split(Constants.COLON);
L
ligang 已提交
603 604 605 606 607

        if (split2.length != 2) {
            return null;
        }

Q
qiaozhanwei 已提交
608
        String end = Constants.COLON + split2[1];
L
ligang 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639

        return start + activeRM + end;
    }


    @Override
    public void close() throws IOException {
        if (fs != null) {
            try {
                fs.close();
            } catch (IOException e) {
                logger.error("Close HadoopUtils instance failed", e);
                throw new IOException("Close HadoopUtils instance failed", e);
            }
        }
    }


    /**
     * yarn ha admin utils
     */
    private static final class YarnHAAdminUtils extends RMAdminCLI {

        /**
         * get active resourcemanager
         *
         * @param rmIds
         * @return
         */
        public static String getAcitveRMName(String rmIds) {

Q
qiaozhanwei 已提交
640
            String[] rmIdArr = rmIds.split(Constants.COMMA);
L
ligang 已提交
641

Q
qiaozhanwei 已提交
642
            int activeResourceManagerPort = PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
L
ligang 已提交
643 644 645 646 647 648 649 650 651 652

            String yarnUrl = "http://%s:" + activeResourceManagerPort + "/ws/v1/cluster/info";

            String state = null;
            try {
                /**
                 * send http get request to rm1
                 */
                state = getRMState(String.format(yarnUrl, rmIdArr[0]));

Q
qiaozhanwei 已提交
653
                if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
654
                    return rmIdArr[0];
Q
qiaozhanwei 已提交
655
                } else if (Constants.HADOOP_RM_STATE_STANDBY.equals(state)) {
L
ligang 已提交
656
                    state = getRMState(String.format(yarnUrl, rmIdArr[1]));
Q
qiaozhanwei 已提交
657
                    if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
658 659 660 661 662 663 664
                        return rmIdArr[1];
                    }
                } else {
                    return null;
                }
            } catch (Exception e) {
                state = getRMState(String.format(yarnUrl, rmIdArr[1]));
Q
qiaozhanwei 已提交
665
                if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
L
ligang 已提交
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
                    return rmIdArr[0];
                }
            }
            return null;
        }


        /**
         * get ResourceManager state
         *
         * @param url
         * @return
         */
        public static String getRMState(String url) {

            String retStr = HttpUtils.get(url);

            if (StringUtils.isEmpty(retStr)) {
                return null;
            }
            //to json
S
simon 已提交
687
            ObjectNode jsonObject = JSONUtils.parseObject(retStr);
L
ligang 已提交
688 689

            //get ResourceManager state
690
            if (!jsonObject.has("clusterInfo")) {
S
simon824 已提交
691 692
                return null;
            }
S
simon 已提交
693
            return jsonObject.get("clusterInfo").path("haState").asText();
L
ligang 已提交
694 695 696 697
        }

    }
}