提交 0e3d4158 编写于 作者: N Nikita Koksharov

RedissonMultiLock should implement RLock interface #1973

上级 4a630ac4
......@@ -24,7 +24,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
......@@ -39,7 +38,7 @@ import org.redisson.misc.TransferListener;
* @author Nikita Koksharov
*
*/
public class RedissonMultiLock implements Lock {
public class RedissonMultiLock implements RLock {
class LockState {
......@@ -74,7 +73,7 @@ public class RedissonMultiLock implements Lock {
lockWaitTime = calcLockWaitTime(remainTime);
failedLocksLimit = failedLocksLimit();
acquiredLocks = new ArrayList<RLock>(locks.size());
acquiredLocks = new ArrayList<>(locks.size());
}
void tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
......@@ -147,7 +146,7 @@ public class RedissonMultiLock implements Lock {
if (leaseTime != -1) {
AtomicInteger counter = new AtomicInteger(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
......@@ -187,7 +186,7 @@ public class RedissonMultiLock implements Lock {
}
final List<RLock> locks = new ArrayList<RLock>();
final List<RLock> locks = new ArrayList<>();
/**
* Creates instance with multiple {@link RLock} objects.
......@@ -211,6 +210,7 @@ public class RedissonMultiLock implements Lock {
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
......@@ -219,10 +219,12 @@ public class RedissonMultiLock implements Lock {
}
}
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {
return lockAsync(leaseTime, unit, Thread.currentThread().getId());
}
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId) {
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
......@@ -266,6 +268,7 @@ public class RedissonMultiLock implements Lock {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
......@@ -301,7 +304,7 @@ public class RedissonMultiLock implements Lock {
}
protected void unlockInner(Collection<RLock> locks) {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
......@@ -362,7 +365,7 @@ public class RedissonMultiLock implements Lock {
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
......@@ -414,9 +417,9 @@ public class RedissonMultiLock implements Lock {
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
......@@ -428,7 +431,7 @@ public class RedissonMultiLock implements Lock {
return true;
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
LockState state = new LockState(waitTime, leaseTime, unit, threadId);
......@@ -436,6 +439,7 @@ public class RedissonMultiLock implements Lock {
return result;
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {
return tryLockAsync(waitTime, leaseTime, unit, Thread.currentThread().getId());
}
......@@ -445,13 +449,14 @@ public class RedissonMultiLock implements Lock {
return remainTime;
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
return unlockInnerAsync(locks, threadId);
}
@Override
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
......@@ -462,10 +467,79 @@ public class RedissonMultiLock implements Lock {
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Void> unlockAsync() {
return unlockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Void> lockAsync() {
return lockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Void> lockAsync(long threadId) {
return lockAsync(-1, null, threadId);
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryLockAsync(-1, -1, null, threadId);
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit) {
return tryLockAsync(waitTime, -1, unit);
}
@Override
public RFuture<Integer> getHoldCountAsync() {
throw new UnsupportedOperationException();
}
@Override
public String getName() {
throw new UnsupportedOperationException();
}
@Override
public boolean forceUnlock() {
throw new UnsupportedOperationException();
}
@Override
public boolean isLocked() {
throw new UnsupportedOperationException();
}
@Override
public boolean isHeldByThread(long threadId) {
throw new UnsupportedOperationException();
}
@Override
public boolean isHeldByCurrentThread() {
throw new UnsupportedOperationException();
}
@Override
public int getHoldCount() {
throw new UnsupportedOperationException();
}
}
......@@ -27,8 +27,15 @@ import java.util.concurrent.locks.Lock;
*
*/
public interface RLock extends Lock, RExpirable, RLockAsync {
public interface RLock extends Lock, RLockAsync {
/**
* Returns name of object
*
* @return name - name of object
*/
String getName();
/**
* Acquires the lock.
*
......@@ -87,7 +94,7 @@ public interface RLock extends Lock, RExpirable, RLockAsync {
/**
* Unlocks lock independently of state
*
* @return <code>true</code> if unlocked otherwise <code>false</code>
* @return <code>true</code> if lock existed and now unlocked otherwise <code>false</code>
*/
boolean forceUnlock();
......
......@@ -23,12 +23,12 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov
*
*/
public interface RLockAsync extends RExpirableAsync {
public interface RLockAsync {
/**
* Unlocks the lock independently of state
*
* @return <code>true</code> if unlocked otherwise <code>false</code>
* @return <code>true</code> if lock existed and now unlocked otherwise <code>false</code>
*/
RFuture<Boolean> forceUnlockAsync();
......
......@@ -25,8 +25,15 @@ import reactor.core.publisher.Mono;
* @author Nikita Koksharov
*
*/
public interface RLockReactive extends RExpirableReactive {
public interface RLockReactive {
/**
* Returns name of object
*
* @return name - name of object
*/
String getName();
/**
* Unlocks the lock independently of state
*
......
......@@ -25,8 +25,15 @@ import io.reactivex.Flowable;
* @author Nikita Koksharov
*
*/
public interface RLockRx extends RExpirableRx {
public interface RLockRx {
/**
* Returns name of object
*
* @return name - name of object
*/
String getName();
/**
* Unlocks the lock independently of state
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册