MasterRegistry.java 5.5 KB
Newer Older
T
Tboy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.master.registry;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
T
Tboy 已提交
22 23 24
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
T
Tboy 已提交
25
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
T
Tboy 已提交
26 27 28
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
T
Tboy 已提交
29 30
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
T
Tboy 已提交
31

T
Tboy 已提交
32
import javax.annotation.PostConstruct;
T
Tboy 已提交
33 34 35 36 37 38 39
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;

T
Tboy 已提交
40 41 42
/**
 *  master registry
 */
T
Tboy 已提交
43
@Service
T
Tboy 已提交
44 45 46 47 48 49 50
public class MasterRegistry {

    private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);

    /**
     *  zookeeper registry center
     */
T
Tboy 已提交
51 52
    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;
T
Tboy 已提交
53 54

    /**
T
Tboy 已提交
55
     * master config
T
Tboy 已提交
56
     */
T
Tboy 已提交
57 58
    @Autowired
    private MasterConfig masterConfig;
T
Tboy 已提交
59 60 61 62

    /**
     * heartbeat executor
     */
T
Tboy 已提交
63
    private ScheduledExecutorService heartBeatExecutor;
T
Tboy 已提交
64 65 66 67

    /**
     * worker start time
     */
T
Tboy 已提交
68
    private String startTime;
T
Tboy 已提交
69

T
Tboy 已提交
70 71 72

    @PostConstruct
    public void init(){
T
Tboy 已提交
73 74
        this.startTime = DateUtils.dateToString(new Date());
        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
T
Tboy 已提交
75 76 77 78 79 80
    }

    /**
     *  registry
     */
    public void registry() {
81
        String address = OSUtils.getHost();
T
Tboy 已提交
82 83
        String localNodePath = getMasterPath();
        zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
T
Tboy 已提交
84 85 86 87 88 89 90
        zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if(newState == ConnectionState.LOST){
                    logger.error("master : {} connection lost from zookeeper", address);
                } else if(newState == ConnectionState.RECONNECTED){
                    logger.info("master : {} reconnected to zookeeper", address);
T
Tboy 已提交
91
                    zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
T
Tboy 已提交
92 93 94 95 96
                } else if(newState == ConnectionState.SUSPENDED){
                    logger.warn("master : {} connection SUSPENDED ", address);
                }
            }
        });
T
Tboy 已提交
97
        int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
98
        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
T
Tboy 已提交
99
        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
T
Tboy 已提交
100 101 102 103 104 105 106
    }

    /**
     *  remove registry info
     */
    public void unRegistry() {
        String address = getLocalAddress();
T
Tboy 已提交
107
        String localNodePath = getMasterPath();
T
Tboy 已提交
108
        zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
T
Tboy 已提交
109
        logger.info("master node : {} unRegistry to ZK.", address);
T
Tboy 已提交
110 111 112
    }

    /**
T
Tboy 已提交
113
     *  get master path
T
Tboy 已提交
114 115
     * @return
     */
T
Tboy 已提交
116
    private String getMasterPath() {
T
Tboy 已提交
117
        String address = getLocalAddress();
T
Tboy 已提交
118
        String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
T
Tboy 已提交
119 120 121 122 123 124 125 126
        return localNodePath;
    }

    /**
     *  get local address
     * @return
     */
    private String getLocalAddress(){
127
        return OSUtils.getHost() + ":" + masterConfig.getListenPort();
T
Tboy 已提交
128
    }
T
Tboy 已提交
129

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    /**
     * hear beat task
     */
    class HeartBeatTask implements Runnable{

        @Override
        public void run() {
            try {
                StringBuilder builder = new StringBuilder(100);
                builder.append(OSUtils.cpuUsage()).append(COMMA);
                builder.append(OSUtils.memoryUsage()).append(COMMA);
                builder.append(OSUtils.loadAverage()).append(COMMA);
                builder.append(startTime).append(COMMA);
                builder.append(DateUtils.dateToString(new Date()));
                String masterPath = getMasterPath();
                zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString());
            } catch (Throwable ex){
                logger.error("error write master heartbeat info", ex);
            }
        }
    }
T
Tboy 已提交
151
}