TaskCallbackService.java 5.8 KB
Newer Older
T
Tboy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.server.worker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
23
import org.apache.dolphinscheduler.common.thread.Stopper;
24
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
T
Tboy 已提交
25
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
T
Tboy 已提交
26
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
T
Tboy 已提交
27
import org.apache.dolphinscheduler.remote.command.Command;
T
Tboy 已提交
28
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
T
Tboy 已提交
29 30
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
T
Tboy 已提交
31 32
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
T
Tboy 已提交
33
import org.springframework.beans.factory.annotation.Autowired;
T
Tboy 已提交
34
import org.springframework.stereotype.Service;
T
Tboy 已提交
35
import java.util.Set;
T
Tboy 已提交
36
import java.util.concurrent.ConcurrentHashMap;
37 38
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;

Q
qiaozhanwei 已提交
39
/**
40
 *  task callback service
Q
qiaozhanwei 已提交
41
 */
T
Tboy 已提交
42
@Service
Q
qiaozhanwei 已提交
43
public class TaskCallbackService {
T
Tboy 已提交
44

T
Tboy 已提交
45 46 47 48 49 50 51
    private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);

    /**
     *  remote channels
     */
    private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();

T
Tboy 已提交
52 53 54 55 56 57
    /**
     * zookeeper register center
     */
    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

Q
qiaozhanwei 已提交
58
    /**
T
Tboy 已提交
59
     * netty remoting client
Q
qiaozhanwei 已提交
60
     */
T
Tboy 已提交
61 62 63 64 65 66 67
    private final NettyRemotingClient nettyRemotingClient;


    public TaskCallbackService(){
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }
T
Tboy 已提交
68

Q
qiaozhanwei 已提交
69 70 71 72 73
    /**
     *  add callback channel
     * @param taskInstanceId taskInstanceId
     * @param channel  channel
     */
T
Tboy 已提交
74 75
    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
        REMOTE_CHANNELS.put(taskInstanceId, channel);
T
Tboy 已提交
76 77
    }

Q
qiaozhanwei 已提交
78 79 80 81 82
    /**
     *  get callback channel
     * @param taskInstanceId taskInstanceId
     * @return callback channel
     */
83
    private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
T
Tboy 已提交
84
        NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
85 86 87
        if(nettyRemoteChannel == null){
            throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first");
        }
T
Tboy 已提交
88 89 90 91 92
        if(nettyRemoteChannel.isActive()){
            return nettyRemoteChannel;
        }
        Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
        if(newChannel != null){
T
Tboy 已提交
93 94
            return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
        }
95
        logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost());
Q
qiaozhanwei 已提交
96
        Set<String> masterNodes = null;
97
        while (Stopper.isRunning()) {
Q
qiaozhanwei 已提交
98
            masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
99
            if (CollectionUtils.isEmpty(masterNodes)) {
100 101 102 103
                logger.error("no available master node");
                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
            }else {
                break;
104
            }
105 106 107 108 109
        }
        for(String masterNode : masterNodes){
            newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
            if(newChannel != null){
                return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
T
Tboy 已提交
110
            }
T
Tboy 已提交
111
        }
112
        throw new IllegalStateException(String.format("all available master nodes : %s are not reachable", masterNodes));
T
Tboy 已提交
113 114 115 116 117 118
    }

    private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId){
        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
        addRemoteChannel(taskInstanceId, remoteChannel);
        return remoteChannel;
T
Tboy 已提交
119 120
    }

Q
qiaozhanwei 已提交
121 122 123 124
    /**
     *  remove callback channels
     * @param taskInstanceId taskInstanceId
     */
T
Tboy 已提交
125
    public void remove(int taskInstanceId){
T
Tboy 已提交
126
        REMOTE_CHANNELS.remove(taskInstanceId);
T
Tboy 已提交
127 128
    }

Q
qiaozhanwei 已提交
129 130 131
    /**
     *  send ack
     * @param taskInstanceId taskInstanceId
T
Tboy 已提交
132
     * @param command command
Q
qiaozhanwei 已提交
133
     */
T
Tboy 已提交
134
    public void sendAck(int taskInstanceId, Command command){
T
Tboy 已提交
135
        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
T
Tboy 已提交
136
        nettyRemoteChannel.writeAndFlush(command);
T
Tboy 已提交
137 138
    }

Q
qiaozhanwei 已提交
139 140 141 142
    /**
     *  send result
     *
     * @param taskInstanceId taskInstanceId
T
Tboy 已提交
143
     * @param command command
Q
qiaozhanwei 已提交
144
     */
T
Tboy 已提交
145
    public void sendResult(int taskInstanceId, Command command){
T
Tboy 已提交
146
        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
T
Tboy 已提交
147 148 149 150 151 152 153
        nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if(future.isSuccess()){
                    remove(taskInstanceId);
                    return;
T
Tboy 已提交
154
                }
T
Tboy 已提交
155 156
            }
        });
T
Tboy 已提交
157 158
    }
}