DynamicServerListLoadBalancer.java 12.9 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 *
 * Copyright 2013 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
18
package com.netflix.loadbalancer;
19

20
import java.util.Date;
21 22 23 24
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
25
import java.util.concurrent.ScheduledFuture;
26 27 28 29
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
30
import java.util.concurrent.atomic.AtomicLong;
31 32 33 34

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

A
Allen Wang 已提交
35
import com.netflix.client.ClientFactory;
36
import com.netflix.client.config.CommonClientConfigKey;
A
Allen Wang 已提交
37
import com.netflix.client.config.DefaultClientConfigImpl;
38
import com.netflix.client.config.IClientConfig;
39
import com.netflix.config.DynamicIntProperty;
40
import com.netflix.config.DynamicProperty;
41 42
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
43
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
45 46

/**
47 48 49 50 51 52
 * A LoadBalancer that has the capabilities to obtain the candidate list of
 * servers using a dynamic source. i.e. The list of servers can potentially be
 * changed at Runtime. It also contains facilities wherein the list of servers
 * can be passed through a Filter criteria to filter out servers that do not
 * meet the desired criteria.
 * 
53
 * @author stonse
54
 * 
55
 */
56 57 58 59 60 61 62
public class DynamicServerListLoadBalancer<T extends Server> extends
        BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;
63
    private static Thread _shutdownThread;
64 65 66 67 68 69 70 71 72 73 74

    // to keep track of modification of server lists
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(
            false);

    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
    private static long LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
                                                                         // //
                                                                         // every
                                                                         // 30
                                                                         // secs
75

76
    private static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
77 78 79 80 81 82

    private long refeshIntervalMills = LISTOFSERVERS_CACHE_REPEAT_INTERVAL;

    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;
83 84
    
    private AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis());
85 86
    
    protected volatile boolean serverRefreshEnabled = false;
87
    private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
88 89 90
    private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
    
    private volatile ScheduledFuture<?> scheduledFuture;
91 92

    static {
93
        int coreSize = poolSizeProp.get();
94 95
        ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true).build();
        _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
96 97 98 99 100 101 102
        poolSizeProp.addCallback(new Runnable() {
            @Override
            public void run() {
                _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
            }
        
        });
103 104 105 106 107 108 109 110 111
        _shutdownThread = new Thread(new Runnable() {
            public void run() {
                LOGGER.info("Shutting down the Executor Pool for DynamicServerListLoadBalancer");
                shutdownExecutorPool();
            }
        });
        Runtime.getRuntime().addShutdownHook(_shutdownThread);
    }
    
112 113 114 115 116 117 118
    public DynamicServerListLoadBalancer() {
        super();
    }

    public DynamicServerListLoadBalancer(IClientConfig niwsClientConfig) {
        initWithNiwsConfig(niwsClientConfig);
    }
119 120

    @Override
121
    public void initWithNiwsConfig(IClientConfig clientConfig) {
122
        try {
123
            super.initWithNiwsConfig(clientConfig);
124 125 126 127 128 129 130 131
            String niwsServerListClassName = clientConfig.getProperty(
                    CommonClientConfigKey.NIWSServerListClassName,
                    DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS)
                    .toString();

            ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
                    .instantiateInstanceWithClientConfig(
                            niwsServerListClassName, clientConfig);
132
            this.serverListImpl = niwsServerListImpl;
133

A
Allen Wang 已提交
134
            if (niwsServerListImpl instanceof AbstractServerList) {
135 136
                AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
                        .getFilterImpl(clientConfig);
A
Allen Wang 已提交
137 138 139
                niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
                this.filter = niwsFilter;
            }
140 141 142 143 144

            refeshIntervalMills = Integer.valueOf(clientConfig.getProperty(
                    CommonClientConfigKey.ServerListRefreshInterval,
                    LISTOFSERVERS_CACHE_REPEAT_INTERVAL).toString());

145 146 147
            boolean primeConnection = this.isEnablePrimingConnections();
            // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
            this.setEnablePrimingConnections(false);
148
            enableAndInitLearnNewServersFeature();
149

150
            updateListOfServers();
151
            if (primeConnection && this.getPrimeConnections() != null) {
152 153
                this.getPrimeConnections()
                        .primeConnections(getServerList(true));
154
            }
155
            this.setEnablePrimingConnections(primeConnection);
156 157 158 159

        } catch (Exception e) {
            throw new RuntimeException(
                    "Exception while initializing NIWSDiscoveryLoadBalancer:"
160 161
                            + clientConfig.getClientName()
                            + ", niwsClientConfig:" + clientConfig, e);
162
        }
163 164
    }

165 166
    @Override
    public void setServersList(List lsrv) {
167 168
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
169
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
170 171 172
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

188 189
    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
190 191 192
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }
193

194 195
    public ServerList<T> getServerListImpl() {
        return serverListImpl;
196 197
    }

198
    public void setServerListImpl(ServerList<T> niwsServerList) {
199
        this.serverListImpl = niwsServerList;
200
    }
201

202
    @Override
203 204
    public void setPing(IPing ping) {
        this.ping = ping;
205
    }
206 207 208 209

    public ServerListFilter<T> getFilter() {
        return filter;
    }
210

211 212 213
    public void setFilter(ServerListFilter<T> filter) {
        this.filter = filter;
    }
214

215
    @Override
216 217 218 219
    /**
     * Makes no sense to ping an inmemory disc client
     * 
     */
220 221 222 223
    public void forceQuickPing() {
        // no-op
    }

224
    /**
225 226 227 228 229 230
     * Feature that lets us add new instances (from AMIs) to the list of
     * existing servers that the LB will use Call this method if you want this
     * feature enabled
     */
    public void enableAndInitLearnNewServersFeature() {
        keepServerListUpdated();
231
        serverRefreshEnabled = true;
232 233 234
    }

    private String getIdentifier() {
235
        return this.getClientConfig().getClientName();
236
    }
237 238

    private void keepServerListUpdated() {
239
        scheduledFuture = _serverListRefreshExecutor.scheduleAtFixedRate(
240 241 242 243 244
                new ServerListRefreshExecutorThread(),
                LISTOFSERVERS_CACHE_UPDATE_DELAY, refeshIntervalMills,
                TimeUnit.MILLISECONDS);
    }

245
    private static void shutdownExecutorPool() {
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
        if (_serverListRefreshExecutor != null) {
            _serverListRefreshExecutor.shutdown();

            if (_shutdownThread != null) {
                try {
                    Runtime.getRuntime().removeShutdownHook(_shutdownThread);
                } catch (IllegalStateException ise) { // NOPMD
                    // this can happen if we're in the middle of a real
                    // shutdown,
                    // and that's 'ok'
                }
            }

        }
    }

262 263 264 265 266 267 268
    public void stopServerListRefreshing() {
        serverRefreshEnabled = false;
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
    }
    
269 270 271 272 273 274 275 276 277 278 279
    /**
     * Class that updates the list of Servers This is based on the method used
     * by the client * Appropriate Filters are applied before coming up with the
     * right set of servers
     * 
     * @author stonse
     * 
     */
    class ServerListRefreshExecutorThread implements Runnable {

        public void run() {
280
            if (!serverRefreshEnabled) {
A
Allen Wang 已提交
281 282 283
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
284 285
                return;
            }
286 287 288 289 290 291 292 293 294 295 296 297 298
            try {
                updateListOfServers();

            } catch (Throwable e) {
                LOGGER.error(
                        "Exception while updating List of Servers obtained from Discovery client",
                        e);
                // e.printStackTrace();
            }
        }

    }

299
    @VisibleForTesting
300
    public void updateListOfServers() {
301 302 303
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
304 305
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);
306 307

            if (filter != null) {
308
                servers = filter.getFilteredListOfServers(servers);
309 310
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
311 312
            }
        }
313
        lastUpdated.set(System.currentTimeMillis());
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
        updateAllServerList(servers);
    }

    /**
     * Update the AllServer list in the LoadBalancer if necessary and enabled
     * 
     * @param ls
     */
    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (!serverListUpdateInProgress.get()) {
            serverListUpdateInProgress.set(true);
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                                  // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
            serverListUpdateInProgress.set(false);
        }
    }

337 338
    @Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
    public int getNumberMissedCycles() {
339 340 341
        if (!serverRefreshEnabled) {
            return 0;
        }
342 343 344 345 346 347 348 349
        return (int) ((int) (System.currentTimeMillis() - lastUpdated.get()) / refeshIntervalMills);
    }
    
    @Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
    public String getLastUpdate() {
        return new Date(lastUpdated.get()).toString();
    }
    
350 351 352 353 354 355 356 357 358
    @Monitor(name="NumThreads", type=DataSourceType.GAUGE) 
    public int getCoreThreads() {
        if (_serverListRefreshExecutor != null) {
            return _serverListRefreshExecutor.getCorePoolSize();
        } else {
            return 0;
        }
    }
    
A
Allen Wang 已提交
359
    @Override
360 361 362 363 364 365
    public String toString() {
        StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
        sb.append(super.toString());
        sb.append("ServerList:" + String.valueOf(serverListImpl));
        return sb.toString();
    }
A
Allen Wang 已提交
366 367 368 369 370 371
    
    @Override 
    public void shutdown() {
        super.shutdown();
        stopServerListRefreshing();
    }
372
}