NettyExecutorManager.java 6.2 KB
Newer Older
T
Tboy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.server.master.dispatch.executor;

20 21
import com.github.rholder.retry.RetryException;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
T
Tboy 已提交
22 23
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
T
Tboy 已提交
24
import org.apache.dolphinscheduler.remote.command.CommandType;
T
Tboy 已提交
25 26 27 28 29
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
T
Tboy 已提交
30
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
T
Tboy 已提交
31
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
T
Tboy 已提交
32
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
T
Tboy 已提交
33 34 35 36 37 38
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

T
Tboy 已提交
39
import javax.annotation.PostConstruct;
40
import java.util.*;
41
import java.util.concurrent.ExecutionException;
T
Tboy 已提交
42

Q
qiaozhanwei 已提交
43 44 45
/**
 *  netty executor manager
 */
T
Tboy 已提交
46
@Service
47
public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
T
Tboy 已提交
48 49 50

    private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);

Q
qiaozhanwei 已提交
51 52 53
    /**
     * zookeeper node manager
     */
T
Tboy 已提交
54 55 56
    @Autowired
    private ZookeeperNodeManager zookeeperNodeManager;

Q
qiaozhanwei 已提交
57 58 59
    /**
     * netty remote client
     */
T
Tboy 已提交
60 61
    private final NettyRemotingClient nettyRemotingClient;

T
Tboy 已提交
62 63 64
    /**
     * constructor
     */
T
Tboy 已提交
65 66 67
    public NettyExecutorManager(){
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
T
Tboy 已提交
68 69 70 71
    }

    @PostConstruct
    public void init(){
72 73 74 75
        /**
         * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
         * register EXECUTE_TASK_ACK command type TaskAckProcessor
         */
T
Tboy 已提交
76 77
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
Q
qiaozhanwei 已提交
78
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
T
Tboy 已提交
79 80
    }

Q
qiaozhanwei 已提交
81
    /**
Q
qiaozhanwei 已提交
82
     * execute logic
Q
qiaozhanwei 已提交
83
     * @param context context
Q
qiaozhanwei 已提交
84
     * @return result
85
     * @throws ExecuteException if error throws ExecuteException
Q
qiaozhanwei 已提交
86
     */
T
Tboy 已提交
87
    @Override
88
    public Boolean execute(ExecutionContext context) throws ExecuteException {
89 90 91 92 93
        LinkedList<String> allNodes = new LinkedList<>();
        Set<String> nodes = getAllNodes(context);
        if (nodes != null) {
            allNodes.addAll(nodes);
        }
Q
qiaozhanwei 已提交
94 95 96
        /**
         *  build command accord executeContext
         */
T
Tboy 已提交
97
        Command command = context.getCommand();
Q
qiaozhanwei 已提交
98 99 100 101

        /**
         * execute task host
         */
102 103 104 105 106
        String startHostAddress = context.getHost().getAddress();
        // remove start host address and add it to head
        allNodes.remove(startHostAddress);
        allNodes.addFirst(startHostAddress);
 
T
Tboy 已提交
107
        boolean success = false;
108
        for (String address : allNodes) {
T
Tboy 已提交
109
            try {
110 111
                Host host = Host.of(address);
                doExecute(host, command);
T
Tboy 已提交
112
                success = true;
Q
qiaozhanwei 已提交
113
                context.setHost(host);
114
                break;
T
Tboy 已提交
115
            } catch (ExecuteException ex) {
116
                logger.error("retry execute command : {} host : {}", command, address);
T
Tboy 已提交
117 118
            }
        }
119 120 121 122
        if (!success) {
            throw new ExecuteException("fail after try all nodes");
        }
        
123
        return success;
T
Tboy 已提交
124 125
    }

Q
qiaozhanwei 已提交
126
    @Override
T
Tboy 已提交
127 128
    public void executeDirectly(ExecutionContext context) throws ExecuteException {
        Host host = context.getHost();
T
Tboy 已提交
129
        doExecute(host, context.getCommand());
T
Tboy 已提交
130 131
    }

Q
qiaozhanwei 已提交
132 133 134 135
    /**
     *  execute logic
     * @param host host
     * @param command command
136
     * @throws ExecuteException if error throws ExecuteException
Q
qiaozhanwei 已提交
137
     */
T
Tboy 已提交
138
    private void doExecute(final Host host, final Command command) throws ExecuteException {
139 140
        try {
            RetryerUtils.retryCall(() -> {
T
Tboy 已提交
141
                nettyRemotingClient.send(host, command);
142 143 144 145
                return Boolean.TRUE;
            });
        } catch (ExecutionException | RetryException e) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host), e);
T
Tboy 已提交
146 147 148
        }
    }

Q
qiaozhanwei 已提交
149 150 151 152 153
    /**
     *  get all nodes
     * @param context context
     * @return nodes
     */
T
Tboy 已提交
154 155
    private Set<String> getAllNodes(ExecutionContext context){
        Set<String> nodes = Collections.EMPTY_SET;
Q
qiaozhanwei 已提交
156 157 158
        /**
         * executor type
         */
T
Tboy 已提交
159 160 161
        ExecutorType executorType = context.getExecutorType();
        switch (executorType){
            case WORKER:
T
Tboy 已提交
162
                nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
T
Tboy 已提交
163 164 165
                break;
            case CLIENT:
                break;
166
            default:
T
Tboy 已提交
167 168 169 170 171
                throw new IllegalArgumentException("invalid executor type : " + executorType);

        }
        return nodes;
    }
T
Tboy 已提交
172 173 174 175

    public NettyRemotingClient getNettyRemotingClient() {
        return nettyRemotingClient;
    }
T
Tboy 已提交
176
}