提交 ca70b5ab 编写于 作者: T Technoboy-

add lowerWeight host manager

上级 7056a734
......@@ -43,6 +43,17 @@ public class MasterConfig {
@Value("${master.reserved.memory}")
private double masterReservedMemory;
@Value("${master.host.selector:lowerWeight}")
private String hostSelector;
public String getHostSelector() {
return hostSelector;
}
public void setHostSelector(String hostSelector) {
this.hostSelector = hostSelector;
}
public int getMasterExecThreads() {
return masterExecThreads;
}
......
......@@ -20,12 +20,13 @@ package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -44,14 +45,23 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired
private NettyExecutorManager nettyExecutorManager;
@Autowired
private MasterConfig masterConfig;
/**
* round robin host manager
*/
@Autowired
private RoundRobinHostManager hostManager;
private HostManager hostManager;
/**
* executor manager
*/
private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
/**
* constructor
*/
public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* round robin host manager
*/
public abstract class CommonHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class);
/**
* zookeeperNodeManager
*/
@Autowired
protected ZookeeperNodeManager zookeeperNodeManager;
/**
* select host
* @param context context
* @return host
*/
@Override
public Host select(ExecutionContext context){
Host host = new Host();
Collection<String> nodes = null;
/**
* executor type
*/
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executorType : " + executorType);
}
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
return select(candidateHosts);
}
public abstract Host select(Collection<Host> nodes);
public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
this.zookeeperNodeManager = zookeeperNodeManager;
}
public ZookeeperNodeManager getZookeeperNodeManager() {
return zookeeperNodeManager;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* host manager config
*/
@Configuration
public class HostManagerConfig {
private AutowireCapableBeanFactory beanFactory;
@Autowired
private MasterConfig masterConfig;
@Autowired
public HostManagerConfig(AutowireCapableBeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Bean
public HostManager hostManager() {
String hostSelector = masterConfig.getHostSelector();
HostSelector selector = HostSelector.of(hostSelector);
HostManager hostManager;
switch (selector){
case RANDOM:
hostManager = new RandomHostManager();
break;
case ROUNDROBIN:
hostManager = new RoundRobinHostManager();
break;
case LOWERWEIGHT:
hostManager = new LowerWeightHostManager();
break;
default:
throw new IllegalArgumentException("unSupport selector " + hostSelector);
}
beanFactory.autowireBean(hostManager);
return hostManager;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
/**
* round robin host manager
*/
public class LowerWeightHostManager extends CommonHostManager {
private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class);
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter registryCenter;
/**
* round robin host manager
*/
private RoundRobinHostManager roundRobinHostManager;
/**
* selector
*/
private LowerWeightRoundRobin selector;
/**
* worker host weights
*/
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
/**
* worker group host lock
*/
private Lock lock;
/**
* executor service
*/
private ScheduledExecutorService executorService;
@PostConstruct
public void init(){
this.selector = new LowerWeightRoundRobin();
this.workerHostWeights = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
this.roundRobinHostManager = new RoundRobinHostManager();
this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
}
@PreDestroy
public void close(){
this.executorService.shutdownNow();
}
/**
* select host
* @param context context
* @return host
*/
@Override
public Host select(ExecutionContext context){
Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
if(CollectionUtils.isNotEmpty(workerHostWeights)){
return selector.select(workerHostWeights).getHost();
} else{
return roundRobinHostManager.select(context);
}
}
@Override
public Host select(Collection<Host> nodes) {
throw new UnsupportedOperationException("not support");
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
lock.lock();
try {
workerHostWeights.clear();
workerHostWeights.putAll(workerHostWeights);
} finally {
lock.unlock();
}
}
private Set<HostWeight> getWorkerHostWeights(String workerGroup){
lock.lock();
try {
return workerHostWeights.get(workerGroup);
} finally {
lock.unlock();
}
}
class RefreshResourceTask implements Runnable{
@Override
public void run() {
try {
Map<String, Set<String>> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes();
Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
for(Map.Entry<String, Set<String>> entry : entries){
String workerGroup = entry.getKey();
Set<String> nodes = entry.getValue();
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){
String[] parts = heartbeat.split(COMMA);
double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]);
HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage);
hostWeights.add(hostWeight);
}
}
workerHostWeights.put(workerGroup, hostWeights);
}
syncWorkerHostWeight(workerHostWeights);
} catch (Throwable ex){
logger.error("RefreshResourceTask error", ex);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import java.util.Collection;
/**
* round robin host manager
*/
public class RandomHostManager extends CommonHostManager {
/**
* selector
*/
private final Selector<Host> selector;
/**
* set round robin
*/
public RandomHostManager(){
this.selector = new RandomSelector<>();
}
@Override
public Host select(Collection<Host> nodes) {
return selector.select(nodes);
}
}
......@@ -17,36 +17,17 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* round robin host manager
*/
@Service
public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
/**
* zookeeperNodeManager
*/
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
public class RoundRobinHostManager extends CommonHostManager {
/**
* selector
......@@ -60,39 +41,9 @@ public class RoundRobinHostManager implements HostManager {
this.selector = new RoundRobinSelector<>();
}
/**
* select host
* @param context context
* @return host
*/
@Override
public Host select(ExecutionContext context){
Host host = new Host();
Collection<String> nodes = null;
/**
* executor type
*/
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executorType : " + executorType);
}
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
/**
* select
*/
return selector.select(candidateHosts);
public Host select(Collection<Host> nodes) {
return selector.select(nodes);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
/**
* host selector
*/
public enum HostSelector {
RANDOM,
ROUNDROBIN,
LOWERWEIGHT;
public static HostSelector of(String selector){
for(HostSelector hs : values()){
if(hs.name().equalsIgnoreCase(selector)){
return hs;
}
}
throw new IllegalArgumentException("invalid host selector : " + selector);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* host weight
*/
public class HostWeight {
private final int CPU_FACTOR = 10;
private final int MEMORY_FACTOR = 20;
private final int LOAD_AVERAGE_FACTOR = 70;
private final Host host;
private final int weight;
private int currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = calculateWeight(cpu, memory, loadAverage);
this.host = host ;
this.currentWeight = weight ;
}
public int getCurrentWeight() {
return currentWeight;
}
public int getWeight() {
return weight;
}
public void setCurrentWeight(int currentWeight) {
this.currentWeight = currentWeight;
}
public Host getHost() {
return host;
}
@Override
public String toString() {
return "HostWeight{" +
"host=" + host +
", weight=" + weight +
", currentWeight=" + currentWeight +
'}';
}
private int calculateWeight(double cpu, double memory, double loadAverage){
return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
/**
* lower weight round robin
*/
public class LowerWeightRoundRobin implements Selector<HostWeight>{
public HostWeight select(Collection<HostWeight> sources){
int totalWeight = 0;
int lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
}
......@@ -31,6 +31,7 @@ import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
......@@ -227,6 +228,10 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
public Map<String, Set<String>> getWorkerGroupNodes(){
return Collections.unmodifiableMap(workerGroupNodes);
}
/**
* get worker group nodes
* @param workerGroup
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
public class LowerWeightRoundRobinTest {
@Test
public void testSelect(){
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
System.out.println(sources);
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
for(int i = 0; i < 100; i ++){
System.out.println(roundRobin.select(sources));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册