提交 e4458fbe 编写于 作者: K Kohsuke Kawaguchi

Merge pull request #1660 from deadmoose/really_sync

Fix concurrency issues in ConsistentHash
......@@ -64,6 +64,7 @@ public class ConsistentHash<T> {
* All the items in the hash, to their replication factors.
*/
private final Map<T,Point[]> items = new HashMap<T,Point[]>();
private int numPoints;
private final int defaultReplication;
private final Hash<T> hash;
......@@ -100,8 +101,13 @@ public class ConsistentHash<T> {
private final Object[] owner; // really T[]
private Table() {
int r=0;
for (Point[] v : items.values())
r+=v.length;
numPoints = r;
// merge all points from all nodes and sort them into a single array
Point[] allPoints = new Point[countAllPoints()];
Point[] allPoints = new Point[numPoints];
int p=0;
for (Point[] v : items.values()) {
System.arraycopy(v,0,allPoints,p,v.length);
......@@ -186,18 +192,18 @@ public class ConsistentHash<T> {
String hash(T t);
}
static final Hash DEFAULT_HASH = new Hash() {
static final Hash<?> DEFAULT_HASH = new Hash<Object>() {
public String hash(Object o) {
return o.toString();
}
};
public ConsistentHash() {
this(DEFAULT_HASH);
this((Hash<T>) DEFAULT_HASH);
}
public ConsistentHash(int defaultReplication) {
this(DEFAULT_HASH,defaultReplication);
this((Hash<T>) DEFAULT_HASH,defaultReplication);
}
public ConsistentHash(Hash<T> hash) {
......@@ -211,23 +217,20 @@ public class ConsistentHash<T> {
}
public int countAllPoints() {
int r=0;
for (Point[] v : items.values())
r+=v.length;
return r;
return numPoints;
}
/**
* Adds a new node with the default number of replica.
*/
public void add(T node) {
public synchronized void add(T node) {
add(node,defaultReplication);
}
/**
* Calls {@link #add(Object)} with all the arguments.
*/
public void addAll(T... nodes) {
public synchronized void addAll(T... nodes) {
for (T node : nodes)
addInternal(node,defaultReplication);
refreshTable();
......@@ -236,7 +239,7 @@ public class ConsistentHash<T> {
/**
* Calls {@link #add(Object)} with all the arguments.
*/
public void addAll(Collection<? extends T> nodes) {
public synchronized void addAll(Collection<? extends T> nodes) {
for (T node : nodes)
addInternal(node,defaultReplication);
refreshTable();
......@@ -245,7 +248,7 @@ public class ConsistentHash<T> {
/**
* Calls {@link #add(Object,int)} with all the arguments.
*/
public void addAll(Map<? extends T,Integer> nodes) {
public synchronized void addAll(Map<? extends T,Integer> nodes) {
for (Map.Entry<? extends T,Integer> node : nodes.entrySet())
addInternal(node.getKey(),node.getValue());
refreshTable();
......@@ -254,23 +257,20 @@ public class ConsistentHash<T> {
/**
* Removes the node entirely. This is the same as {@code add(node,0)}
*/
public void remove(T node) {
public synchronized void remove(T node) {
add(node, 0);
}
/**
* Adds a new node with the given number of replica.
*
* <p>
* This is the only function that manipulates {@link #items}.
*/
public synchronized void add(T node, int replica) {
addInternal(node, replica);
refreshTable();
}
private void addInternal(T node, int replica) {
if(replica==0) {
private synchronized void addInternal(T node, int replica) {
if (replica==0) {
items.remove(node);
} else {
Point[] points = new Point[replica];
......@@ -281,11 +281,10 @@ public class ConsistentHash<T> {
}
}
private void refreshTable() {
private synchronized void refreshTable() {
table = new Table();
}
/**
* Compresses a string into an integer with MD5.
*/
......
......@@ -122,11 +122,58 @@ public class ConsistentHashTest {
@Test
public void emptyBehavior() {
ConsistentHash<String> hash = new ConsistentHash<String>();
assertEquals(0, hash.countAllPoints());
assertFalse(hash.list(0).iterator().hasNext());
assertNull(hash.lookup(0));
assertNull(hash.lookup(999));
}
@Test
public void countAllPoints() {
ConsistentHash<String> hash = new ConsistentHash<String>();
assertEquals(0, hash.countAllPoints());
hash.add("foo", 10);
assertEquals(10, hash.countAllPoints());
hash.add("bar", 5);
assertEquals(15, hash.countAllPoints());
hash.remove("foo");
assertEquals(5, hash.countAllPoints());
}
@Test
public void defaultReplicationIsOneHundred() {
ConsistentHash<String> hash = new ConsistentHash<String>();
assertEquals(0, hash.countAllPoints());
hash.add("foo");
assertEquals(100, hash.countAllPoints());
}
@Test
public void setCustomDefaultReplication() {
ConsistentHash<String> hash = new ConsistentHash<String>((ConsistentHash.Hash<String>) ConsistentHash.DEFAULT_HASH, 7);
assertEquals(0, hash.countAllPoints());
hash.add("foo");
assertEquals(7, hash.countAllPoints());
}
@Test
public void usesCustomHash() {
final RuntimeException exception = new RuntimeException();
ConsistentHash.Hash<String> hashFunction = new ConsistentHash.Hash<String>() {
public String hash(String str) {
throw exception;
}
};
try {
ConsistentHash<String> hash = new ConsistentHash<String>(hashFunction);
hash.add("foo");
fail("Didn't use custom hash function");
} catch (RuntimeException e) {
assertSame(exception, e);
}
}
/**
* This test doesn't fail but it's written to measure the performance of the consistent hash function with large data set.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册