未验证 提交 d6a32ac6 编写于 作者: C CalvinKirs 提交者: GitHub

[Feature#3234][cluster]enhanced load balancing (#3235)

* [Future#3234][cluster]enhanced load balancing
weight-based load balancing algorithm
this close # 3234

* remove useless parameter

* code smell

* load balancing according to work group

* add smooth weight round robin

* remove unused constants

* perfect unit test

* code smell

* code smell

* add work weight config

* fix config error

* add weight docs to readme.md
上级 5584f0cb
......@@ -238,6 +238,10 @@ This environment variable sets max cpu load avg for `worker-server`. The default
This environment variable sets reserved memory for `worker-server`. The default value is `0.1`.
**`WORKER_WEIGHT`**
This environment variable sets port for `worker-server`. The default value is `100`.
**`WORKER_LISTEN_PORT`**
This environment variable sets port for `worker-server`. The default value is `1234`.
......
......@@ -238,6 +238,10 @@ Dolphin Scheduler映像使用了几个容易遗漏的环境变量。虽然这些
配置`worker-server`的保留内存,默认值 `0.1`
**`WORKER_WEIGHT`**
配置`worker-server`的权重,默认之`100`
**`WORKER_LISTEN_PORT`**
配置`worker-server`的端口,默认值 `1234`
......
......@@ -34,4 +34,7 @@ worker.reserved.memory=${WORKER_RESERVED_MEMORY}
#worker.listen.port=${WORKER_LISTEN_PORT}
# default worker group
#worker.group=${WORKER_GROUP}
\ No newline at end of file
#worker.groups=${WORKER_GROUP}
# default worker weight
#worker.weight=${WORKER_WEIGHT}
\ No newline at end of file
......@@ -74,6 +74,7 @@ export WORKER_MAX_CPULOAD_AVG=${WORKER_MAX_CPULOAD_AVG:-"100"}
export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"}
export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"}
export WORKER_GROUP=${WORKER_GROUP:-"default"}
export WORKER_WEIGHT=${WORKER_WEIGHT:-"100"}
#============================================================================
# Alert Server
......
......@@ -187,6 +187,7 @@ services:
WORKER_MAX_CPULOAD_AVG: "100"
WORKER_RESERVED_MEMORY: "0.1"
WORKER_GROUP: "default"
WORKER_WEIGHT: "100"
DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler"
DATABASE_HOST: dolphinscheduler-postgresql
DATABASE_PORT: 5432
......
......@@ -187,6 +187,7 @@ services:
WORKER_MAX_CPULOAD_AVG: "100"
WORKER_RESERVED_MEMORY: "0.1"
WORKER_GROUP: "default"
WORKER_WEIGHT: "100"
DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler"
DATABASE_HOST: dolphinscheduler-postgresql
DATABASE_PORT: 5432
......
......@@ -31,6 +31,7 @@ data:
WORKER_RESERVED_MEMORY: {{ .Values.worker.configmap.WORKER_RESERVED_MEMORY | quote }}
WORKER_LISTEN_PORT: {{ .Values.worker.configmap.WORKER_LISTEN_PORT | quote }}
WORKER_GROUP: {{ .Values.worker.configmap.WORKER_GROUP | quote }}
WORKER_WEIGHT: {{ .Values.worker.configmap.WORKER_WEIGHT | quote }}
DOLPHINSCHEDULER_DATA_BASEDIR_PATH: {{ include "dolphinscheduler.worker.base.dir" . | quote }}
dolphinscheduler_env.sh: |-
{{- range .Values.worker.configmap.DOLPHINSCHEDULER_ENV }}
......
......@@ -162,6 +162,11 @@ spec:
configMapKeyRef:
name: {{ include "dolphinscheduler.fullname" . }}-worker
key: WORKER_GROUP
- name: WORKER_WEUGHT
valueFrom:
configMapKeyRef:
name: {{ include "dolphinscheduler.fullname" . }}-worker
key: WORKER_WEIGHT
- name: DOLPHINSCHEDULER_DATA_BASEDIR_PATH
valueFrom:
configMapKeyRef:
......
......@@ -201,6 +201,7 @@ worker:
WORKER_RESERVED_MEMORY: "0.1"
WORKER_LISTEN_PORT: "1234"
WORKER_GROUP: "default"
WORKER_WEIGHT: "100"
DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler"
DOLPHINSCHEDULER_ENV:
- "export HADOOP_HOME=/opt/soft/hadoop"
......
......@@ -20,7 +20,7 @@ import java.io.Serializable;
import java.util.Objects;
/**
* server address
* server address
*/
public class Host implements Serializable {
......@@ -39,6 +39,16 @@ public class Host implements Serializable {
*/
private int port;
/**
* weight
*/
private int weight;
/**
* workGroup
*/
private String workGroup;
public Host() {
}
......@@ -48,6 +58,21 @@ public class Host implements Serializable {
this.address = ip + ":" + port;
}
public Host(String ip, int port, int weight) {
this.ip = ip;
this.port = port;
this.address = ip + ":" + port;
this.weight = weight;
}
public Host(String ip, int port, int weight,String workGroup) {
this.ip = ip;
this.port = port;
this.address = ip + ":" + port;
this.weight = weight;
this.workGroup=workGroup;
}
public String getAddress() {
return address;
}
......@@ -65,6 +90,14 @@ public class Host implements Serializable {
this.address = ip + ":" + port;
}
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
public int getPort() {
return port;
}
......@@ -74,31 +107,47 @@ public class Host implements Serializable {
this.address = ip + ":" + port;
}
public String getWorkGroup() {
return workGroup;
}
public void setWorkGroup(String workGroup) {
this.workGroup = workGroup;
}
/**
* address convert host
*
* @param address address
* @return host
*/
public static Host of(String address){
if(address == null) {
public static Host of(String address) {
if (address == null) {
throw new IllegalArgumentException("Host : address is null.");
}
String[] parts = address.split(":");
if (parts.length != 2) {
if (parts.length < 2) {
throw new IllegalArgumentException(String.format("Host : %s illegal.", address));
}
Host host = new Host(parts[0], Integer.parseInt(parts[1]));
Host host = null;
if (parts.length == 2) {
host = new Host(parts[0], Integer.parseInt(parts[1]));
}
if (parts.length == 3) {
host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]));
}
return host;
}
/**
* whether old version
*
* @param address address
* @return old version is true , otherwise is false
*/
public static Boolean isOldVersion(String address){
public static Boolean isOldVersion(String address) {
String[] parts = address.split(":");
return parts.length != 2 ? true : false;
return parts.length != 2 && parts.length != 3;
}
@Override
......
......@@ -71,7 +71,12 @@ public abstract class CommonHostManager implements HostManager {
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
nodes.forEach(node -> {
Host nodeHost=Host.of(node);
nodeHost.setWorkGroup(context.getWorkerGroup());
candidateHosts.add(nodeHost);
});
return select(candidateHosts);
}
......
......@@ -38,7 +38,7 @@ public class RandomHostManager extends CommonHostManager {
* set round robin
*/
public RandomHostManager(){
this.selector = new RandomSelector<>();
this.selector = new RandomSelector();
}
@Override
......
......@@ -38,7 +38,7 @@ public class RoundRobinHostManager extends CommonHostManager {
* set round robin
*/
public RoundRobinHostManager(){
this.selector = new RoundRobinSelector<>();
this.selector = new RoundRobinSelector();
}
@Override
......
......@@ -17,27 +17,44 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* random selector
* @param <T> T
*/
public class RandomSelector<T> extends AbstractSelector<T> {
private final Random random = new Random();
public class RandomSelector extends AbstractSelector<Host> {
@Override
public T doSelect(final Collection<T> source) {
int size = source.size();
/**
* random select
*/
int randomIndex = random.nextInt(size);
return (T) source.toArray()[randomIndex];
public Host doSelect(final Collection<Host> source) {
List<Host> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
for (Host host : hosts) {
totalWeight += host.getWeight();
weights[index] = host.getWeight();
index++;
}
if (totalWeight > 0) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < size; i++) {
offset -= weights[i];
if (offset < 0) {
return hosts.get(i);
}
}
}
return hosts.get(ThreadLocalRandom.current().nextInt(size));
}
}
......@@ -16,27 +16,123 @@
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* round robin selector
* @param <T> T
* Smooth Weight Round Robin
*/
@Service
public class RoundRobinSelector<T> extends AbstractSelector<T> {
public class RoundRobinSelector extends AbstractSelector<Host> {
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> workGroupWeightMap = new ConcurrentHashMap<>();
private static final int RECYCLE_PERIOD = 100000;
private AtomicBoolean updateLock = new AtomicBoolean();
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
int getWeight() {
return weight;
}
void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
long increaseCurrent() {
return current.addAndGet(weight);
}
void sel(int total) {
current.addAndGet(-1L * total);
}
long getLastUpdate() {
return lastUpdate;
}
void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
private final AtomicInteger index = new AtomicInteger(0);
@Override
public T doSelect(Collection<T> source) {
public Host doSelect(Collection<Host> source) {
List<Host> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkGroup();
ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
map = workGroupWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Host selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
for (Host host : hosts) {
String workGroupHost = host.getWorkGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
int weight = host.getWeight();
if (weight < 0) {
weight = 0;
}
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// set weight
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(workGroupHost, weightedRoundRobin);
weightedRoundRobin = map.get(workGroupHost);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedHost = host;
selectWeightRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
workGroupWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedHost != null) {
selectWeightRoundRobin.sel(totalWeight);
return selectedHost;
}
int size = source.size();
/**
* round robin
*/
return (T) source.toArray()[index.getAndIncrement() % size];
return hosts.get(0);
}
}
......@@ -49,6 +49,9 @@ public class WorkerConfig {
@Value("${worker.listen.port: 1234}")
private int listenPort;
@Value("${worker.weight:100}")
private int weight;
public int getListenPort() {
return listenPort;
}
......@@ -107,4 +110,13 @@ public class WorkerConfig {
public void setWorkerMaxCpuloadAvg(int workerMaxCpuloadAvg) {
this.workerMaxCpuloadAvg = workerMaxCpuloadAvg;
}
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
}
\ No newline at end of file
......@@ -16,9 +16,6 @@
*/
package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
......@@ -44,9 +41,11 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* worker registry
* worker registry
*/
@Service
public class WorkerRegistry {
......@@ -54,13 +53,13 @@ public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
/**
* zookeeper registry center
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* worker config
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
......@@ -86,7 +85,7 @@ public class WorkerRegistry {
}
/**
* registry
* registry
*/
public void registry() {
String address = NetUtils.getHost();
......@@ -122,7 +121,7 @@ public class WorkerRegistry {
}
/**
* remove registry info
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
......@@ -135,13 +134,14 @@ public class WorkerRegistry {
}
/**
* get worker path
* get worker path
*/
private Set<String> getWorkerZkPaths() {
Set<String> workerZkPaths = Sets.newHashSet();
String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
String weight = getWorkerWeight();
for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100);
......@@ -152,15 +152,23 @@ public class WorkerRegistry {
// trim and lower case is need
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
workerZkPathBuilder.append(address);
workerZkPathBuilder.append(weight).append(SLASH);
workerZkPaths.add(workerZkPathBuilder.toString());
}
return workerZkPaths;
}
/**
* get local address
* get local address
*/
private String getLocalAddress() {
return NetUtils.getHost() + ":" + workerConfig.getListenPort();
}
/**
* get Worker Weight
*/
private String getWorkerWeight() {
return ":" + workerConfig.getWeight();
}
}
......@@ -32,3 +32,6 @@
# default worker group
#worker.groups=default
# default worker weight
#work.weight=100
......@@ -16,7 +16,9 @@
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.commons.lang.ObjectUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
......@@ -36,16 +38,16 @@ public class RandomSelectorTest {
@Test
public void testSelect1(){
RandomSelector<String> selector = new RandomSelector();
String result = selector.select(Arrays.asList("1"));
Assert.assertTrue(StringUtils.isNotEmpty(result));
Assert.assertTrue(result.equalsIgnoreCase("1"));
RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20)));
Assert.assertNotNull(result);
}
@Test
public void testSelect(){
RandomSelector<Integer> selector = new RandomSelector();
int result = selector.select(Arrays.asList(1,2,3,4,5,6,7));
Assert.assertTrue(result >= 1 && result <= 7);
RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20)));
Assert.assertNotNull(result);
}
}
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
......@@ -30,26 +31,46 @@ import java.util.List;
public class RoundRobinSelectorTest {
@Test(expected = IllegalArgumentException.class)
public void testSelectWithIllegalArgumentException(){
public void testSelectWithIllegalArgumentException() {
RoundRobinSelector selector = new RoundRobinSelector();
selector.select(Collections.EMPTY_LIST);
}
@Test
public void testSelect1(){
RoundRobinSelector<String> selector = new RoundRobinSelector();
String result = selector.select(Arrays.asList("1"));
Assert.assertTrue(StringUtils.isNotEmpty(result));
Assert.assertTrue(result.equalsIgnoreCase("1"));
}
public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector();
Host result = null;
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
// add new host
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
Assert.assertEquals("192.168.1.3",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
Assert.assertEquals("192.168.1.2",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
Assert.assertEquals("192.168.1.3",result.getIp());
// remove host3
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.2",result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
Assert.assertEquals("192.168.1.1",result.getIp());
@Test
public void testSelect(){
RoundRobinSelector<Integer> selector = new RoundRobinSelector();
List<Integer> sources = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
int result = selector.select(sources);
Assert.assertTrue(result == 1);
int result2 = selector.select(Arrays.asList(1,2,3,4,5,6,7));
Assert.assertTrue(result2 == 2);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册