提交 b777fed7 编写于 作者: N Nikita Koksharov

Improvement - reduced memory consumption by ClusterConnectionManager. #2105

上级 18a6cea3
...@@ -19,6 +19,7 @@ import java.net.InetSocketAddress; ...@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
...@@ -399,7 +400,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ...@@ -399,7 +400,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue; continue;
} }
MasterSlaveEntry entry = getEntry(currentPart.getSlots().iterator().next()); MasterSlaveEntry entry = getEntry(currentPart.slots().nextSetBit(0));
// should be invoked first in order to remove stale failedSlaveAddresses // should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart); Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive? // Do some slaves have changed state from failed to alive?
...@@ -464,7 +465,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ...@@ -464,7 +465,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private int slotsAmount(Collection<ClusterPartition> partitions) { private int slotsAmount(Collection<ClusterPartition> partitions) {
int result = 0; int result = 0;
for (ClusterPartition clusterPartition : partitions) { for (ClusterPartition clusterPartition : partitions) {
result += clusterPartition.getSlots().size(); result += clusterPartition.getSlotsAmount();
} }
return result; return result;
} }
...@@ -558,7 +559,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ...@@ -558,7 +559,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Integer slot : lastPartitions.keySet()) { for (Integer slot : lastPartitions.keySet()) {
boolean found = false; boolean found = false;
for (ClusterPartition clusterPartition : newPartitions) { for (ClusterPartition clusterPartition : newPartitions) {
if (clusterPartition.getSlots().contains(slot)) { if (clusterPartition.hasSlot(slot)) {
found = true; found = true;
break; break;
} }
...@@ -580,34 +581,34 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ...@@ -580,34 +581,34 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
} }
Set<Integer> addedSlots = new HashSet<Integer>(); BitSet addedSlots = new BitSet();
for (ClusterPartition clusterPartition : newPartitions) { for (ClusterPartition clusterPartition : newPartitions) {
for (Integer slot : clusterPartition.getSlots()) { for (Integer slot : clusterPartition.getSlots()) {
if (!lastPartitions.containsKey(slot)) { if (!lastPartitions.containsKey(slot)) {
addedSlots.add(slot); addedSlots.set(slot);
} }
} }
} }
if (!addedSlots.isEmpty()) { if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots.size()); log.info("{} slots found to add", addedSlots.size());
} }
for (Integer slot : addedSlots) { for (Integer slot : (Iterable<Integer>) addedSlots.stream()::iterator) {
ClusterPartition partition = find(newPartitions, slot); ClusterPartition partition = find(newPartitions, slot);
Set<Integer> oldSlots = new HashSet<Integer>(partition.getSlots()); BitSet oldSlots = partition.copySlots();
oldSlots.removeAll(addedSlots); oldSlots.andNot(addedSlots);
if (oldSlots.isEmpty()) { if (oldSlots.isEmpty()) {
continue; continue;
} }
MasterSlaveEntry entry = getEntry(oldSlots.iterator().next()); MasterSlaveEntry entry = getEntry(oldSlots.nextSetBit(0));
if (entry != null) { if (entry != null) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, partition); lastPartitions.put(slot, partition);
} }
} }
} }
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) { private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition currentPartition : getLastPartitions()) { for (ClusterPartition currentPartition : getLastPartitions()) {
for (ClusterPartition newPartition : newPartitions) { for (ClusterPartition newPartition : newPartitions) {
...@@ -615,13 +616,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ...@@ -615,13 +616,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue; continue;
} }
MasterSlaveEntry entry = getEntry(currentPartition.getSlots().iterator().next()); MasterSlaveEntry entry = getEntry(currentPartition.slots().nextSetBit(0));
Set<Integer> addedSlots = new HashSet<Integer>(newPartition.getSlots()); BitSet addedSlots = newPartition.copySlots();
addedSlots.removeAll(currentPartition.getSlots()); addedSlots.andNot(currentPartition.slots());
currentPartition.addSlots(addedSlots); currentPartition.addSlots(addedSlots);
for (Integer slot : addedSlots) { for (Integer slot : (Iterable<Integer>) addedSlots.stream()::iterator) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, currentPartition); lastPartitions.put(slot, currentPartition);
} }
...@@ -629,9 +630,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ...@@ -629,9 +630,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress()); log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress());
} }
Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots()); BitSet removedSlots = currentPartition.copySlots();
removedSlots.removeAll(newPartition.getSlots()); removedSlots.andNot(newPartition.slots());
for (Integer removeSlot : removedSlots) { for (Integer removeSlot : (Iterable<Integer>) removedSlots.stream()::iterator) {
if (lastPartitions.remove(removeSlot, currentPartition)) { if (lastPartitions.remove(removeSlot, currentPartition)) {
removeEntry(removeSlot); removeEntry(removeSlot);
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package org.redisson.cluster; package org.redisson.cluster;
import java.net.URI; import java.net.URI;
import java.util.BitSet;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
...@@ -37,7 +38,7 @@ public class ClusterPartition { ...@@ -37,7 +38,7 @@ public class ClusterPartition {
private final Set<URI> slaveAddresses = new HashSet<URI>(); private final Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> failedSlaves = new HashSet<URI>(); private final Set<URI> failedSlaves = new HashSet<URI>();
private final Set<Integer> slots = new HashSet<Integer>(); private final BitSet slots = new BitSet();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>(); private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
private ClusterPartition parent; private ClusterPartition parent;
...@@ -74,18 +75,18 @@ public class ClusterPartition { ...@@ -74,18 +75,18 @@ public class ClusterPartition {
return masterFail; return masterFail;
} }
public void addSlots(Set<Integer> slots) { public void addSlots(BitSet slots) {
this.slots.addAll(slots); this.slots.or(slots);
} }
public void removeSlots(Set<Integer> slots) { public void removeSlots(BitSet slots) {
this.slots.removeAll(slots); this.slots.andNot(slots);
} }
public void addSlotRanges(Set<ClusterSlotRange> ranges) { public void addSlotRanges(Set<ClusterSlotRange> ranges) {
for (ClusterSlotRange clusterSlotRange : ranges) { for (ClusterSlotRange clusterSlotRange : ranges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.add(i); slots.set(i);
} }
} }
slotRanges.addAll(ranges); slotRanges.addAll(ranges);
...@@ -93,7 +94,7 @@ public class ClusterPartition { ...@@ -93,7 +94,7 @@ public class ClusterPartition {
public void removeSlotRanges(Set<ClusterSlotRange> ranges) { public void removeSlotRanges(Set<ClusterSlotRange> ranges) {
for (ClusterSlotRange clusterSlotRange : ranges) { for (ClusterSlotRange clusterSlotRange : ranges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.remove(i); slots.clear(i);
} }
} }
slotRanges.removeAll(ranges); slotRanges.removeAll(ranges);
...@@ -101,9 +102,26 @@ public class ClusterPartition { ...@@ -101,9 +102,26 @@ public class ClusterPartition {
public Set<ClusterSlotRange> getSlotRanges() { public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges; return slotRanges;
} }
public Set<Integer> getSlots() {
public Iterable<Integer> getSlots() {
return (Iterable<Integer>) slots.stream()::iterator;
}
public BitSet slots() {
return slots; return slots;
} }
public BitSet copySlots() {
return (BitSet) slots.clone();
}
public boolean hasSlot(int slot) {
return slots.get(slot);
}
public int getSlotsAmount() {
return slots.cardinality();
}
public URI getMasterAddress() { public URI getMasterAddress() {
return masterAddress; return masterAddress;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册