提交 c5e896d4 编写于 作者: N Nikita

RedissonCountDownLatch improvements

上级 f44997a8
......@@ -125,21 +125,21 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
</dependency>
</dependencies>
......
......@@ -17,7 +17,6 @@ package org.redisson;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RCountDownLatch;
......@@ -140,7 +139,7 @@ public class Redisson {
RedissonTopic<M> topic = topicsMap.get(name);
if (topic == null) {
RedisConnection<Object, Object> connection = connect();
RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub();
RedisPubSubConnection<String, M> pubSubConnection = connectPubSub();
topic = new RedissonTopic<M>(this, pubSubConnection, connection, name);
RedissonTopic<M> oldTopic = topicsMap.putIfAbsent(name, topic);
......@@ -210,8 +209,8 @@ public class Redisson {
return latch;
}
private RedisPubSubConnection<Object, Object> connectPubSub() {
return redisClient.connectPubSub(codec);
private <K, V> RedisPubSubConnection<K, V> connectPubSub() {
return (RedisPubSubConnection<K, V>) redisClient.connectPubSub(codec);
}
// TODO implement
......
......@@ -36,7 +36,8 @@ public class RedissonCountDownLatch implements RCountDownLatch {
private final String groupName = "redisson_countdownlatch_";
private final String name;
private static final Integer unlockMessage = 0;
private static final Integer zeroCountMessage = 0;
private static final Integer newCountMessage = 1;
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
......@@ -56,14 +57,22 @@ public class RedissonCountDownLatch implements RCountDownLatch {
@Override
public void subscribed(Object channel, long count) {
subscribeLatch.countDown();
if (getChannelName().equals(channel)) {
subscribeLatch.countDown();
}
}
@Override
public void message(Object channel, Object message) {
if (message.equals(unlockMessage)) {
if (!getChannelName().equals(channel)) {
return;
}
if (message.equals(zeroCountMessage)) {
msg.open();
}
if (message.equals(newCountMessage)) {
msg.close();
}
}
};
......@@ -111,8 +120,15 @@ public class RedissonCountDownLatch implements RCountDownLatch {
Long val = connection.decr(name);
if (val == 0) {
connection.publish(getChannelName(), unlockMessage);
connection.del(name);
RedisConnection<Object, Object> conn = redisson.connect();
try {
conn.multi();
conn.publish(getChannelName(), zeroCountMessage);
conn.del(name);
conn.exec();
} finally {
conn.close();
}
} else if (val < 0) {
connection.del(name);
}
......@@ -133,11 +149,21 @@ public class RedissonCountDownLatch implements RCountDownLatch {
@Override
public boolean trySetCount(long count) {
Boolean res = connection.setnx(name, count);
if (res) {
msg.close();
RedisConnection<Object, Object> conn = redisson.connect();
try {
conn.watch(name);
Long oldValue = (Long) conn.get(name);
if (oldValue != null) {
conn.discard();
return false;
}
conn.multi();
conn.set(name, count);
conn.publish(getChannelName(), newCountMessage);
return conn.exec().size() == 2;
} finally {
conn.close();
}
return res;
}
@Override
......
......@@ -103,7 +103,7 @@ public class RedissonLock implements RLock {
private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final RedisConnection<Object, Object> connection;
// TODO move it Redisson as ID
// TODO move it to Redisson as ID
private final UUID id = UUID.randomUUID();
private final String groupName = "redisson_lock";
private final String name;
......@@ -130,12 +130,14 @@ public class RedissonLock implements RLock {
@Override
public void subscribed(Object channel, long count) {
subscribeLatch.countDown();
if (getChannelName().equals(channel)) {
subscribeLatch.countDown();
}
}
@Override
public void message(Object channel, Object message) {
if (message.equals(unlockMessage)) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
msg.release();
}
}
......@@ -245,9 +247,10 @@ public class RedissonLock implements RLock {
@Override
public void destroy() {
pubSubConnection.unsubscribe(getChannelName());
pubSubConnection.close();
connection.del(getKeyName());
connection.close();
pubSubConnection.close();
redisson.remove(this);
}
......
......@@ -33,13 +33,13 @@ public class RedissonTopic<M> implements RTopic<M> {
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
private final Map<Integer, RedisPubSubTopicListener> listeners = new ConcurrentHashMap<Integer, RedisPubSubTopicListener>();
private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final Map<Integer, RedisPubSubTopicListener<String, M>> listeners = new ConcurrentHashMap<Integer, RedisPubSubTopicListener<String, M>>();
private final RedisPubSubConnection<String, M> pubSubConnection;
private final RedisConnection<Object, Object> connection;
private final String name;
private final Redisson redisson;
RedissonTopic(Redisson redisson, RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, final String name) {
RedissonTopic(Redisson redisson, RedisPubSubConnection<String, M> pubSubConnection, RedisConnection<Object, Object> connection, final String name) {
this.pubSubConnection = pubSubConnection;
this.name = name;
this.connection = connection;
......@@ -48,10 +48,10 @@ public class RedissonTopic<M> implements RTopic<M> {
public void subscribe() {
if (subscribeOnce.compareAndSet(false, true)) {
RedisPubSubAdapter<Object, Object> listener = new RedisPubSubAdapter<Object, Object>() {
RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {
@Override
public void subscribed(Object channel, long count) {
public void subscribed(String channel, long count) {
if (channel.equals(name)) {
subscribeLatch.countDown();
}
......@@ -76,7 +76,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public int addListener(MessageListener<M> listener) {
RedisPubSubTopicListener list = new RedisPubSubTopicListener(listener);
RedisPubSubTopicListener<String, M> list = new RedisPubSubTopicListener<String, M>(listener, name);
listeners.put(list.hashCode(), list);
pubSubConnection.addListener(list);
return list.hashCode();
......@@ -84,7 +84,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeListener(int listenerId) {
RedisPubSubTopicListener list = listeners.remove(listenerId);
RedisPubSubTopicListener<String, M> list = listeners.remove(listenerId);
pubSubConnection.removeListener(list);
}
......
......@@ -20,15 +20,20 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
public class RedisPubSubTopicListener<K, V> extends RedisPubSubAdapter<K, V> {
private final MessageListener<V> listener;
private final K name;
public RedisPubSubTopicListener(MessageListener<V> listener) {
public RedisPubSubTopicListener(MessageListener<V> listener, K name) {
super();
this.listener = listener;
this.name = name;
}
@Override
public void message(K channel, V message) {
listener.onMessage(message);
// could be subscribed to multiple channels
if (name.equals(channel)) {
listener.onMessage(message);
}
}
@Override
......
/*
* JBoss, Home of Professional Open Source
* Copyright 2009 Red Hat Inc. and/or its affiliates and other
* contributors as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a full listing of
* individual contributors.
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
* http://www.apache.org/licenses/LICENSE-2.0
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc.internal;
......
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc.internal;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
public class ThreadLocalSemaphore {
private final ThreadLocal<Semaphore> semaphore;
private final Set<Semaphore> allValues = Collections.newSetFromMap(new ConcurrentHashMap<Semaphore, Boolean>());
public ThreadLocalSemaphore() {
semaphore = new ThreadLocal<Semaphore>() {
@Override protected Semaphore initialValue() {
Semaphore value = new Semaphore(1);
value.acquireUninterruptibly();
allValues.add(value);
return value;
}
};
}
public Semaphore get() {
return semaphore.get();
}
public void remove() {
allValues.remove(get());
semaphore.remove();
}
public Collection<Semaphore> getAll() {
return allValues;
}
}
......@@ -17,7 +17,7 @@ public class RedissonCountDownLatchTest {
ExecutorService executor = Executors.newFixedThreadPool(2);
final RCountDownLatch latch = redisson.getCountDownLatch("latch1");
latch.trySetCount(1);
Assert.assertTrue(latch.trySetCount(1));
executor.execute(new Runnable() {
@Override
......@@ -31,6 +31,7 @@ public class RedissonCountDownLatchTest {
}
});
executor.execute(new Runnable() {
@Override
public void run() {
......@@ -57,7 +58,7 @@ public class RedissonCountDownLatchTest {
ExecutorService executor = Executors.newFixedThreadPool(2);
final RCountDownLatch latch = redisson.getCountDownLatch("latch1");
latch.trySetCount(1);
Assert.assertTrue(latch.trySetCount(1));
executor.execute(new Runnable() {
@Override
......@@ -71,6 +72,7 @@ public class RedissonCountDownLatchTest {
}
});
executor.execute(new Runnable() {
@Override
public void run() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册