diff --git a/pom.xml b/pom.xml index eaf489f5f3123bd2d1f5ee551874216fab26f765..6a5f6b529c905860eec38317db6e32cfeb4f031e 100644 --- a/pom.xml +++ b/pom.xml @@ -125,21 +125,21 @@ test - org.slf4j slf4j-api 1.7.5 + com.fasterxml.jackson.core jackson-core - 2.3.0 + 2.3.1 com.fasterxml.jackson.core jackson-databind - 2.3.0 + 2.3.1 diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index b9763e59f4f7a9a556c11ee9046ba7ad86d67921..369e89d6a4f4d855fc17631e766bb5e2afcde71b 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -17,7 +17,6 @@ package org.redisson; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; import org.redisson.core.RAtomicLong; import org.redisson.core.RCountDownLatch; @@ -140,7 +139,7 @@ public class Redisson { RedissonTopic topic = topicsMap.get(name); if (topic == null) { RedisConnection connection = connect(); - RedisPubSubConnection pubSubConnection = connectPubSub(); + RedisPubSubConnection pubSubConnection = connectPubSub(); topic = new RedissonTopic(this, pubSubConnection, connection, name); RedissonTopic oldTopic = topicsMap.putIfAbsent(name, topic); @@ -210,8 +209,8 @@ public class Redisson { return latch; } - private RedisPubSubConnection connectPubSub() { - return redisClient.connectPubSub(codec); + private RedisPubSubConnection connectPubSub() { + return (RedisPubSubConnection) redisClient.connectPubSub(codec); } // TODO implement diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 7b7f7bb42c2b3664e95df09a87b491d3d826de31..21c02c58df105c4d883ab779a732325faf4425d9 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -36,7 +36,8 @@ public class RedissonCountDownLatch implements RCountDownLatch { private final String groupName = "redisson_countdownlatch_"; private final String name; - private static final Integer unlockMessage = 0; + private static final Integer zeroCountMessage = 0; + private static final Integer newCountMessage = 1; private final AtomicBoolean subscribeOnce = new AtomicBoolean(); @@ -56,14 +57,22 @@ public class RedissonCountDownLatch implements RCountDownLatch { @Override public void subscribed(Object channel, long count) { - subscribeLatch.countDown(); + if (getChannelName().equals(channel)) { + subscribeLatch.countDown(); + } } @Override public void message(Object channel, Object message) { - if (message.equals(unlockMessage)) { + if (!getChannelName().equals(channel)) { + return; + } + if (message.equals(zeroCountMessage)) { msg.open(); } + if (message.equals(newCountMessage)) { + msg.close(); + } } }; @@ -111,8 +120,15 @@ public class RedissonCountDownLatch implements RCountDownLatch { Long val = connection.decr(name); if (val == 0) { - connection.publish(getChannelName(), unlockMessage); - connection.del(name); + RedisConnection conn = redisson.connect(); + try { + conn.multi(); + conn.publish(getChannelName(), zeroCountMessage); + conn.del(name); + conn.exec(); + } finally { + conn.close(); + } } else if (val < 0) { connection.del(name); } @@ -133,11 +149,21 @@ public class RedissonCountDownLatch implements RCountDownLatch { @Override public boolean trySetCount(long count) { - Boolean res = connection.setnx(name, count); - if (res) { - msg.close(); + RedisConnection conn = redisson.connect(); + try { + conn.watch(name); + Long oldValue = (Long) conn.get(name); + if (oldValue != null) { + conn.discard(); + return false; + } + conn.multi(); + conn.set(name, count); + conn.publish(getChannelName(), newCountMessage); + return conn.exec().size() == 2; + } finally { + conn.close(); } - return res; } @Override diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 9efa2a2882edd1b1eef119b30cb23bff1571accb..2816d0b6c4f64e6bf91200c956ce87dcbf25ee21 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -103,7 +103,7 @@ public class RedissonLock implements RLock { private final RedisPubSubConnection pubSubConnection; private final RedisConnection connection; - // TODO move it Redisson as ID + // TODO move it to Redisson as ID private final UUID id = UUID.randomUUID(); private final String groupName = "redisson_lock"; private final String name; @@ -130,12 +130,14 @@ public class RedissonLock implements RLock { @Override public void subscribed(Object channel, long count) { - subscribeLatch.countDown(); + if (getChannelName().equals(channel)) { + subscribeLatch.countDown(); + } } @Override public void message(Object channel, Object message) { - if (message.equals(unlockMessage)) { + if (message.equals(unlockMessage) && getChannelName().equals(channel)) { msg.release(); } } @@ -245,9 +247,10 @@ public class RedissonLock implements RLock { @Override public void destroy() { pubSubConnection.unsubscribe(getChannelName()); + pubSubConnection.close(); + connection.del(getKeyName()); connection.close(); - pubSubConnection.close(); redisson.remove(this); } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 60ac70c7f299c4acf89d6103a380cff6c6b37f3f..1b1631c7e120903f23dd7e826dce8c1caf4078b5 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -33,13 +33,13 @@ public class RedissonTopic implements RTopic { private final CountDownLatch subscribeLatch = new CountDownLatch(1); private final AtomicBoolean subscribeOnce = new AtomicBoolean(); - private final Map listeners = new ConcurrentHashMap(); - private final RedisPubSubConnection pubSubConnection; + private final Map> listeners = new ConcurrentHashMap>(); + private final RedisPubSubConnection pubSubConnection; private final RedisConnection connection; private final String name; private final Redisson redisson; - RedissonTopic(Redisson redisson, RedisPubSubConnection pubSubConnection, RedisConnection connection, final String name) { + RedissonTopic(Redisson redisson, RedisPubSubConnection pubSubConnection, RedisConnection connection, final String name) { this.pubSubConnection = pubSubConnection; this.name = name; this.connection = connection; @@ -48,10 +48,10 @@ public class RedissonTopic implements RTopic { public void subscribe() { if (subscribeOnce.compareAndSet(false, true)) { - RedisPubSubAdapter listener = new RedisPubSubAdapter() { + RedisPubSubAdapter listener = new RedisPubSubAdapter() { @Override - public void subscribed(Object channel, long count) { + public void subscribed(String channel, long count) { if (channel.equals(name)) { subscribeLatch.countDown(); } @@ -76,7 +76,7 @@ public class RedissonTopic implements RTopic { @Override public int addListener(MessageListener listener) { - RedisPubSubTopicListener list = new RedisPubSubTopicListener(listener); + RedisPubSubTopicListener list = new RedisPubSubTopicListener(listener, name); listeners.put(list.hashCode(), list); pubSubConnection.addListener(list); return list.hashCode(); @@ -84,7 +84,7 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(int listenerId) { - RedisPubSubTopicListener list = listeners.remove(listenerId); + RedisPubSubTopicListener list = listeners.remove(listenerId); pubSubConnection.removeListener(list); } diff --git a/src/main/java/org/redisson/core/RedisPubSubTopicListener.java b/src/main/java/org/redisson/core/RedisPubSubTopicListener.java index 45672fd237d01d8caf468fd136bc00829ec573ee..929d239469076db6e617a2b422d38a813778fcdd 100644 --- a/src/main/java/org/redisson/core/RedisPubSubTopicListener.java +++ b/src/main/java/org/redisson/core/RedisPubSubTopicListener.java @@ -20,15 +20,20 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; public class RedisPubSubTopicListener extends RedisPubSubAdapter { private final MessageListener listener; + private final K name; - public RedisPubSubTopicListener(MessageListener listener) { + public RedisPubSubTopicListener(MessageListener listener, K name) { super(); this.listener = listener; + this.name = name; } @Override public void message(K channel, V message) { - listener.onMessage(message); + // could be subscribed to multiple channels + if (name.equals(channel)) { + listener.onMessage(message); + } } @Override diff --git a/src/main/java/org/redisson/misc/internal/ReclosableLatch.java b/src/main/java/org/redisson/misc/internal/ReclosableLatch.java index 5f7aa0c312f45a622f9fab269b87b971fc7eff8a..1dbcbd7a68850804ec23b1a39356d8442b2e403e 100644 --- a/src/main/java/org/redisson/misc/internal/ReclosableLatch.java +++ b/src/main/java/org/redisson/misc/internal/ReclosableLatch.java @@ -1,24 +1,17 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2009 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. + * http://www.apache.org/licenses/LICENSE-2.0 * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.redisson.misc.internal; diff --git a/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java b/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java deleted file mode 100644 index bddfdeceba7b9eb051f4076edd23f730b4e24ba7..0000000000000000000000000000000000000000 --- a/src/main/java/org/redisson/misc/internal/ThreadLocalSemaphore.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright 2014 Nikita Koksharov, Nickolay Borbit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.misc.internal; - -import java.util.Collection; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; - -public class ThreadLocalSemaphore { - - private final ThreadLocal semaphore; - private final Set allValues = Collections.newSetFromMap(new ConcurrentHashMap()); - - public ThreadLocalSemaphore() { - semaphore = new ThreadLocal() { - @Override protected Semaphore initialValue() { - Semaphore value = new Semaphore(1); - value.acquireUninterruptibly(); - allValues.add(value); - return value; - } - }; - } - - public Semaphore get() { - return semaphore.get(); - } - - public void remove() { - allValues.remove(get()); - semaphore.remove(); - } - - public Collection getAll() { - return allValues; - } - -} diff --git a/src/test/java/org/redisson/RedissonCountDownLatchTest.java b/src/test/java/org/redisson/RedissonCountDownLatchTest.java index 5bb01fab601cd016866c78da2dcd547c8c78ffe6..841aa3aa215034837be36346a901f0331064347d 100644 --- a/src/test/java/org/redisson/RedissonCountDownLatchTest.java +++ b/src/test/java/org/redisson/RedissonCountDownLatchTest.java @@ -17,7 +17,7 @@ public class RedissonCountDownLatchTest { ExecutorService executor = Executors.newFixedThreadPool(2); final RCountDownLatch latch = redisson.getCountDownLatch("latch1"); - latch.trySetCount(1); + Assert.assertTrue(latch.trySetCount(1)); executor.execute(new Runnable() { @Override @@ -31,6 +31,7 @@ public class RedissonCountDownLatchTest { } }); + executor.execute(new Runnable() { @Override public void run() { @@ -57,7 +58,7 @@ public class RedissonCountDownLatchTest { ExecutorService executor = Executors.newFixedThreadPool(2); final RCountDownLatch latch = redisson.getCountDownLatch("latch1"); - latch.trySetCount(1); + Assert.assertTrue(latch.trySetCount(1)); executor.execute(new Runnable() { @Override @@ -71,6 +72,7 @@ public class RedissonCountDownLatchTest { } }); + executor.execute(new Runnable() { @Override public void run() {