提交 4acee8eb 编写于 作者: V Vorotyntsev

Implemented spin locks

Signed-off-by: NVorotyntsev <vorotyntsevdanila@gmail.com>
上级 449f0563
......@@ -345,7 +345,17 @@ public class Redisson implements RedissonClient {
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
@Override
public RLock getSpinLock(String name) {
return getSpinLock(name, RedissonSpinLock.DEFAULT);
}
@Override
public RLock getSpinLock(String name, RedissonSpinLock.BackOffOptions backOffOptions) {
return new RedissonSpinLock(connectionManager.getCommandExecutor(), name, backOffOptions);
}
@Override
public RLock getMultiLock(RLock... locks) {
return new RedissonMultiLock(locks);
......
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
/**
* Base class for implementing distributed locks
*
* @author Danila Varatyntsev
*/
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
public static class ExpirationEntry {
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
private volatile Timeout timeout;
public ExpirationEntry() {
super();
}
public synchronized void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
public synchronized boolean hasNoThreads() {
return threadIds.isEmpty();
}
public synchronized Long getFirstThreadId() {
if (threadIds.isEmpty()) {
return null;
}
return threadIds.keySet().iterator().next();
}
public synchronized void removeThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadIds.remove(threadId);
} else {
threadIds.put(threadId, counter);
}
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
private static final Logger log = LoggerFactory.getLogger(RedissonBaseLock.class);
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
protected long internalLockLeaseTime;
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
protected String getEntryName() {
return entryName;
}
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CommandBatchService executorService = createCommandBatchService();
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (commandExecutor instanceof CommandBatchService) {
return result;
}
RPromise<T> r = new RedissonPromise<>();
RFuture<BatchResult<?>> future = executorService.executeAsync();
future.onComplete((res, ex) -> {
if (ex != null) {
r.tryFailure(ex);
return;
}
r.trySuccess(result.getNow());
});
return r;
}
private CommandBatchService createCommandBatchService() {
if (commandExecutor instanceof CommandBatchService) {
return (CommandBatchService) commandExecutor;
}
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getName());
BatchOptions options = BatchOptions.defaults()
.syncSlaves(entry.getAvailableSlaves(), 1, TimeUnit.SECONDS);
return new CommandBatchService(commandExecutor.getConnectionManager(), options);
}
protected void acquireFailed(long waitTime, TimeUnit unit, long threadId) {
get(acquireFailedAsync(waitTime, unit, threadId));
}
protected RFuture<Void> acquireFailedAsync(long waitTime, TimeUnit unit, long threadId) {
return RedissonPromise.newSucceededFuture(null);
}
@Override
public Condition newCondition() {
// TODO implement
throw new UnsupportedOperationException();
}
@Override
public boolean isLocked() {
return isExists();
}
@Override
public RFuture<Boolean> isLockedAsync() {
return isExistsAsync();
}
@Override
public RFuture<Boolean> isExistsAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.EXISTS, getName());
}
@Override
public boolean isHeldByCurrentThread() {
return isHeldByThread(Thread.currentThread().getId());
}
@Override
public boolean isHeldByThread(long threadId) {
RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));
return get(future);
}
private static final RedisCommand<Integer> HGET = new RedisCommand<Integer>("HGET", ValueType.MAP_VALUE, new IntegerReplayConvertor(0));
public RFuture<Integer> getHoldCountAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));
}
@Override
public int getHoldCount() {
return get(getHoldCountAsync());
}
@Override
public RFuture<Boolean> deleteAsync() {
return forceUnlockAsync();
}
@Override
public RFuture<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);
}
......@@ -17,35 +17,22 @@ package org.redisson;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.LockPubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
......@@ -57,66 +44,10 @@ import java.util.concurrent.locks.Condition;
* @author Nikita Koksharov
*
*/
public class RedissonLock extends RedissonExpirable implements RLock {
public class RedissonLock extends RedissonBaseLock {
public static class ExpirationEntry {
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
private volatile Timeout timeout;
public ExpirationEntry() {
super();
}
public synchronized void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
public synchronized boolean hasNoThreads() {
return threadIds.isEmpty();
}
public synchronized Long getFirstThreadId() {
if (threadIds.isEmpty()) {
return null;
}
return threadIds.keySet().iterator().next();
}
public synchronized void removeThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadIds.remove(threadId);
} else {
threadIds.put(threadId, counter);
}
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
protected long internalLockLeaseTime;
final String id;
final String entryName;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
......@@ -124,24 +55,14 @@ public class RedissonLock extends RedissonExpirable implements RLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
protected String getEntryName() {
return entryName;
}
String getChannelName() {
return prefixName("redisson_lock__channel", getName());
}
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
@Override
public void lock() {
try {
......@@ -265,104 +186,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return get(tryLockAsync());
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CommandBatchService executorService = createCommandBatchService();
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (commandExecutor instanceof CommandBatchService) {
return result;
}
RPromise<T> r = new RedissonPromise<>();
RFuture<BatchResult<?>> future = executorService.executeAsync();
future.onComplete((res, ex) -> {
if (ex != null) {
r.tryFailure(ex);
return;
}
r.trySuccess(result.getNow());
});
return r;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
......@@ -381,26 +204,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
private CommandBatchService createCommandBatchService() {
if (commandExecutor instanceof CommandBatchService) {
return (CommandBatchService) commandExecutor;
}
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getName());
BatchOptions options = BatchOptions.defaults()
.syncSlaves(entry.getAvailableSlaves(), 1, TimeUnit.SECONDS);
return new CommandBatchService(commandExecutor.getConnectionManager(), options);
}
private void acquireFailed(long waitTime, TimeUnit unit, long threadId) {
get(acquireFailedAsync(waitTime, unit, threadId));
}
protected RFuture<Void> acquireFailedAsync(long waitTime, TimeUnit unit, long threadId) {
return RedissonPromise.newSucceededFuture(null);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
......@@ -509,12 +312,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// throw commandExecutor.convertException(future);
}
@Override
public Condition newCondition() {
// TODO implement
throw new UnsupportedOperationException();
}
@Override
public boolean forceUnlock() {
return get(forceUnlockAsync());
......@@ -533,54 +330,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}
@Override
public boolean isLocked() {
return isExists();
}
@Override
public RFuture<Boolean> isLockedAsync() {
return isExistsAsync();
}
@Override
public RFuture<Boolean> isExistsAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.EXISTS, getName());
}
@Override
public boolean isHeldByCurrentThread() {
return isHeldByThread(Thread.currentThread().getId());
}
@Override
public boolean isHeldByThread(long threadId) {
RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));
return get(future);
}
private static final RedisCommand<Integer> HGET = new RedisCommand<Integer>("HGET", ValueType.MAP_VALUE, new IntegerReplayConvertor(0));
public RFuture<Integer> getHoldCountAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));
}
@Override
public int getHoldCount() {
return get(getHoldCountAsync());
}
@Override
public RFuture<Boolean> deleteAsync() {
return forceUnlockAsync();
}
@Override
public RFuture<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
......@@ -598,32 +347,6 @@ public class RedissonLock extends RedissonExpirable implements RLock {
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
@Override
public RFuture<Void> lockAsync() {
......
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.redisson.api.RFuture;
import org.redisson.client.RedisException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.<br>
* Lock will be removed automatically if client disconnects.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order.
*
* @author Danila Varatyntsev
*/
public class RedissonSpinLock extends RedissonBaseLock {
public interface BackOffOptions {
BackOffPolicy create();
}
public interface BackOffPolicy {
long getNextSleepPeriod();
}
public static class ExponentialBackOffOptions implements BackOffOptions {
private long maxDelay = 128;
private long initialDelay = 1;
private int multiplier = 2;
@Override
public BackOffPolicy create() {
return new ExponentialBackOffPolicy(initialDelay, maxDelay, multiplier);
}
public long getMaxDelay() {
return maxDelay;
}
public ExponentialBackOffOptions maxDelay(long maxDelay) {
this.maxDelay = maxDelay;
return this;
}
public long getInitialDelay() {
return initialDelay;
}
public ExponentialBackOffOptions initialDelay(long initialDelay) {
this.initialDelay = initialDelay;
return this;
}
public int getMultiplier() {
return multiplier;
}
public ExponentialBackOffOptions multiplier(int multiplier) {
this.multiplier = multiplier;
return this;
}
}
private static final class ExponentialBackOffPolicy implements BackOffPolicy {
private static final SecureRandom RANDOM = new SecureRandom();
private final long maxDelay;
private final int multiplier;
private int fails;
private long nextSleep;
private ExponentialBackOffPolicy(long initialDelay, long maxDelay, int multiplier) {
this.nextSleep = initialDelay;
this.maxDelay = maxDelay;
this.multiplier = multiplier;
}
@Override
public long getNextSleepPeriod() {
if (nextSleep == maxDelay) {
return maxDelay;
}
long result = nextSleep;
nextSleep = nextSleep * multiplier + RANDOM.nextInt(++fails);
nextSleep = Math.min(maxDelay, nextSleep);
return result;
}
}
public static class ConstantBackOffOptions implements BackOffOptions {
private long delay = 64;
@Override
public BackOffPolicy create() {
return new ConstantBackOffPolicy(delay);
}
public long getDelay() {
return delay;
}
public ConstantBackOffOptions delay(long maxDelay) {
this.delay = maxDelay;
return this;
}
}
private static final class ConstantBackOffPolicy implements BackOffPolicy {
private final long delay;
private ConstantBackOffPolicy(long delay) {
this.delay = delay;
}
@Override
public long getNextSleepPeriod() {
return delay;
}
}
public static final BackOffOptions DEFAULT = new ExponentialBackOffOptions();
protected long internalLockLeaseTime;
protected final BackOffOptions backOffOptions;
final CommandAsyncExecutor commandExecutor;
public RedissonSpinLock(CommandAsyncExecutor commandExecutor, String name, BackOffOptions backOffOptions) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.backOffOptions = backOffOptions;
}
@Override
public void lock() {
try {
lockInterruptibly(-1, null);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
BackOffPolicy backOffPolicy = backOffOptions.create();
while (ttl != null) {
long nextSleepPeriod = backOffPolicy.getNextSleepPeriod();
Thread.sleep(nextSleepPeriod);
ttl = tryAcquire(leaseTime, unit, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
BackOffPolicy backOffPolicy = backOffOptions.create();
while (ttl != null) {
current = System.currentTimeMillis();
Thread.sleep(backOffPolicy.getNextSleepPeriod());
ttl = tryAcquire(leaseTime, unit, threadId);
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
return true;
}
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public boolean forceUnlock() {
return get(forceUnlockAsync());
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Collections.singletonList(getName()));
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"return 1; " +
"end; " +
"return nil;",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
@Override
public RFuture<Void> lockAsync() {
return lockAsync(-1, null);
}
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {
long currentThreadId = Thread.currentThread().getId();
return lockAsync(leaseTime, unit, currentThreadId);
}
@Override
public RFuture<Void> lockAsync(long currentThreadId) {
return lockAsync(-1, null, currentThreadId);
}
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long currentThreadId) {
RPromise<Void> result = new RedissonPromise<>();
BackOffPolicy backOffPolicy = backOffOptions.create();
lockAsync(leaseTime, unit, currentThreadId, result, backOffPolicy);
return result;
}
private void lockAsync(long leaseTime, TimeUnit unit, long currentThreadId, RPromise<Void> result,
BackOffPolicy backOffPolicy) {
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
// lock acquired
if (ttl == null) {
if (!result.trySuccess(null)) {
unlockAsync(currentThreadId);
}
return;
}
long nextSleepPeriod = backOffPolicy.getNextSleepPeriod();
try {
Thread.sleep(nextSleepPeriod);
} catch (InterruptedException interruptedException) {
result.tryFailure(interruptedException);
return;
}
GlobalEventExecutor.INSTANCE.schedule(
() -> lockAsync(leaseTime, unit, currentThreadId, result, backOffPolicy),
nextSleepPeriod, TimeUnit.MILLISECONDS);
});
}
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Long> longRFuture = tryAcquireAsync(-1, null, threadId);
longRFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
}
// lock acquired
result.trySuccess(res == null);
});
return result;
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit) {
return tryLockAsync(waitTime, -1, unit);
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {
long currentThreadId = Thread.currentThread().getId();
return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit,
long currentThreadId) {
RPromise<Boolean> result = new RedissonPromise<>();
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
BackOffPolicy backOffPolicy = backOffOptions.create();
tryLock(leaseTime, unit, currentThreadId, result, time, backOffPolicy);
return result;
}
private void tryLock(long leaseTime, TimeUnit unit, long currentThreadId, RPromise<Boolean> result,
AtomicLong time, BackOffPolicy backOffPolicy) {
long startTime = System.currentTimeMillis();
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
// lock acquired
if (ttl == null) {
if (!result.trySuccess(true)) {
unlockAsync(currentThreadId);
}
return;
}
long el = System.currentTimeMillis() - startTime;
time.addAndGet(-el);
if (time.get() <= 0) {
trySuccessFalse(currentThreadId, result);
return;
}
long nextSleepPeriod = backOffPolicy.getNextSleepPeriod();
try {
Thread.sleep(nextSleepPeriod);
} catch (InterruptedException interruptedException) {
result.tryFailure(interruptedException);
return;
}
commandExecutor.getConnectionManager().newTimeout(
timeout -> tryLock(leaseTime, unit, currentThreadId, result, time, backOffPolicy),
nextSleepPeriod, TimeUnit.MILLISECONDS);
});
}
private void trySuccessFalse(long currentThreadId, RPromise<Boolean> result) {
acquireFailedAsync(-1, null, currentThreadId).onComplete((res, e) -> {
if (e == null) {
result.trySuccess(false);
} else {
result.tryFailure(e);
}
});
}
}
......@@ -15,6 +15,7 @@
*/
package org.redisson.api;
import org.redisson.RedissonSpinLock;
import org.redisson.api.redisnode.BaseRedisNodes;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.codec.Codec;
......@@ -475,6 +476,28 @@ public interface RedissonClient {
*/
RLock getLock(String name);
/**
* Returns Spin lock instance by name.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order by threads.
* <p>
*
* @param name - name of object
* @return Lock object
*/
RLock getSpinLock(String name);
/**
* Returns Spin lock instance by name with specified back off options.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order by threads.
* <p>
*
* @param name - name of object
* @return Lock object
*/
RLock getSpinLock(String name, RedissonSpinLock.BackOffOptions backOffOptions);
/**
* Returns MultiLock instance associated with specified <code>locks</code>
*
......
package org.redisson;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSpinLockBackoffTest {
@Test
public void testExponentialBackoff() {
RedissonSpinLock.BackOffOptions backOffOptions = new RedissonSpinLock.ExponentialBackOffOptions()
.initialDelay(10)
.maxDelay(100)
.multiplier(3);
RedissonSpinLock.BackOffPolicy backOffPolicy = backOffOptions.create();
assertThat(backOffPolicy.getNextSleepPeriod()).isBetween(10L, 10L);
assertThat(backOffPolicy.getNextSleepPeriod()).isBetween(30L, 31L);
assertThat(backOffPolicy.getNextSleepPeriod()).isBetween(90L, 92L);
assertThat(backOffPolicy.getNextSleepPeriod()).isBetween(100L, 103L);
assertThat(backOffPolicy.getNextSleepPeriod()).isBetween(100L, 104L);
}
@Test
public void testConstantBackoff() {
RedissonSpinLock.ConstantBackOffOptions backOffOptions = new RedissonSpinLock.ConstantBackOffOptions()
.delay(30);
RedissonSpinLock.BackOffPolicy backOffPolicy = backOffOptions.create();
for (int i = 0; i < 10; i++) {
assertThat(backOffPolicy.getNextSleepPeriod()).isEqualTo(30L);
}
}
}
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
public class RedissonSpinLockTest extends BaseConcurrentTest {
static class LockWithoutBoolean extends Thread {
private CountDownLatch latch;
private RedissonClient redisson;
public LockWithoutBoolean(String name, CountDownLatch latch, RedissonClient redisson) {
super(name);
this.latch = latch;
this.redisson = redisson;
}
public void run() {
RLock lock = redisson.getSpinLock("lock");
lock.lock(10, TimeUnit.MINUTES);
System.out.println(Thread.currentThread().getName() + " gets lock. and interrupt: " + Thread.currentThread().isInterrupted());
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
latch.countDown();
Thread.currentThread().interrupt();
} finally {
try {
lock.unlock();
} finally {
latch.countDown();
}
}
System.out.println(Thread.currentThread().getName() + " ends.");
}
}
@Test(expected = WriteRedisConnectionException.class)
public void testRedisFailed() throws IOException, InterruptedException {
RedisRunner.RedisProcess master = new RedisRunner()
.port(6377)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6377");
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getSpinLock("myLock");
// kill RedisServer while main thread is sleeping.
master.stop();
Thread.sleep(3000);
lock.tryLock(5, 10, TimeUnit.SECONDS);
}
@Test
public void testTryLockWait() throws InterruptedException {
testSingleInstanceConcurrency(1, r -> {
RLock lock = r.getSpinLock("lock");
lock.lock();
});
RLock lock = redisson.getSpinLock("lock");
long startTime = System.currentTimeMillis();
lock.tryLock(3, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L);
}
@Test
public void testLockUninterruptibly() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
Thread thread_1 = new LockWithoutBoolean("thread-1", latch, redisson);
Thread thread_2 = new LockWithoutBoolean("thread-2", latch, redisson);
thread_1.start();
TimeUnit.SECONDS.sleep(1); // let thread-1 get the lock
thread_2.start();
TimeUnit.SECONDS.sleep(1); // let thread_2 waiting for the lock
thread_2.interrupt(); // interrupte the thread-2
boolean res = latch.await(2, TimeUnit.SECONDS);
assertThat(res).isFalse();
}
@Test
public void testForceUnlock() {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
lock.forceUnlock();
Assert.assertFalse(lock.isLocked());
lock = redisson.getSpinLock("lock");
Assert.assertFalse(lock.isLocked());
}
@Test
public void testExpire() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock(2, TimeUnit.SECONDS);
final long startTime = System.currentTimeMillis();
Thread t = new Thread() {
public void run() {
RLock lock1 = redisson.getSpinLock("lock");
lock1.lock();
long spendTime = System.currentTimeMillis() - startTime;
Assert.assertTrue(spendTime < 2020);
lock1.unlock();
}
;
};
t.start();
t.join();
assertThatThrownBy(() -> {
lock.unlock();
}).isInstanceOf(IllegalMonitorStateException.class);
}
@Test
public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getSpinLock("myLock");
lock.lock();
assertThat(lock.isLocked()).isTrue();
lock.unlock();
assertThat(lock.isLocked()).isFalse();
redisson.shutdown();
process.shutdown();
}
@Test
public void testAutoExpire() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
RedissonClient r = createInstance();
Thread t = new Thread() {
@Override
public void run() {
RLock lock = r.getSpinLock("lock");
lock.lock();
latch.countDown();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t.start();
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getSpinLock("lock");
t.join();
r.shutdown();
await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@Test
public void testGetHoldCount() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertEquals(0, lock.getHoldCount());
lock.lock();
Assert.assertEquals(1, lock.getHoldCount());
lock.unlock();
Assert.assertEquals(0, lock.getHoldCount());
lock.lock();
lock.lock();
Assert.assertEquals(2, lock.getHoldCount());
lock.unlock();
Assert.assertEquals(1, lock.getHoldCount());
lock.unlock();
Assert.assertEquals(0, lock.getHoldCount());
}
@Test
public void testIsHeldByCurrentThreadOtherThread() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
Thread t = new Thread() {
public void run() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertFalse(lock.isHeldByCurrentThread());
}
;
};
t.start();
t.join();
lock.unlock();
Thread t2 = new Thread() {
public void run() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertFalse(lock.isHeldByCurrentThread());
}
;
};
t2.start();
t2.join();
}
@Test
public void testIsHeldByCurrentThread() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertFalse(lock.isHeldByCurrentThread());
lock.lock();
Assert.assertTrue(lock.isHeldByCurrentThread());
lock.unlock();
Assert.assertFalse(lock.isHeldByCurrentThread());
}
@Test
public void testIsLockedOtherThread() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
Thread t = new Thread() {
public void run() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertTrue(lock.isLocked());
}
;
};
t.start();
t.join();
lock.unlock();
Thread t2 = new Thread() {
public void run() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertFalse(lock.isLocked());
}
;
};
t2.start();
t2.join();
}
@Test
public void testIsLocked() {
RLock lock = redisson.getSpinLock("lock");
Assert.assertFalse(lock.isLocked());
lock.lock();
Assert.assertTrue(lock.isLocked());
lock.unlock();
Assert.assertFalse(lock.isLocked());
}
@Test(expected = IllegalMonitorStateException.class)
public void testUnlockFail() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
Thread t = new Thread() {
public void run() {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
;
};
t.start();
t.join(400);
try {
lock.unlock();
} catch (IllegalMonitorStateException e) {
t.join();
throw e;
}
}
@Test
public void testLockUnlock() {
Lock lock = redisson.getSpinLock("lock1");
lock.lock();
lock.unlock();
lock.lock();
lock.unlock();
}
@Test
public void testReentrancy() throws InterruptedException {
Lock lock = redisson.getSpinLock("lock1");
Assert.assertTrue(lock.tryLock());
Assert.assertTrue(lock.tryLock());
lock.unlock();
// next row for test renew expiration tisk.
//Thread.currentThread().sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS*2));
Thread thread1 = new Thread() {
@Override
public void run() {
RLock lock1 = redisson.getSpinLock("lock1");
Assert.assertFalse(lock1.tryLock());
}
};
thread1.start();
thread1.join();
lock.unlock();
}
@Test
public void testConcurrency_SingleInstance() throws InterruptedException {
final AtomicInteger lockedCounter = new AtomicInteger();
int iterations = 15;
testSingleInstanceConcurrency(iterations, r -> {
Lock lock = r.getSpinLock("testConcurrency_SingleInstance");
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
});
Assert.assertEquals(iterations, lockedCounter.get());
}
@Test
public void testConcurrencyLoop_MultiInstance() throws InterruptedException {
final int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
testMultiInstanceConcurrency(16, r -> {
for (int i = 0; i < iterations; i++) {
r.getSpinLock("testConcurrency_MultiInstance1").lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockedCounter.incrementAndGet();
r.getSpinLock("testConcurrency_MultiInstance1").unlock();
}
});
Assert.assertEquals(16 * iterations, lockedCounter.get());
}
@Test
public void testConcurrency_MultiInstance() throws InterruptedException {
int iterations = 100;
final AtomicInteger lockedCounter = new AtomicInteger();
testMultiInstanceConcurrency(iterations, r -> {
Lock lock = r.getSpinLock("testConcurrency_MultiInstance2");
lock.lock();
lockedCounter.incrementAndGet();
lock.unlock();
});
Assert.assertEquals(iterations, lockedCounter.get());
}
@Test
public void testTryLockAsync() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
AtomicBoolean lockAsyncSucceed = new AtomicBoolean();
Thread thread = new Thread(() -> {
RFuture<Void> booleanRFuture = lock.lockAsync();
booleanRFuture.onComplete((res, e) -> {
if (e != null) {
Assert.fail("Lock aquire failed for some reason");
}
lockAsyncSucceed.set(true);
});
});
thread.start();
Thread.sleep(1_000);
assertThat(lockAsyncSucceed.get()).isFalse();
lock.unlock();
Thread.sleep(200);
assertThat(lockAsyncSucceed.get()).isTrue();
lock.forceUnlock();
}
@Test
public void testTimedTryLockAsync() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
AtomicBoolean lockAsyncSucceed = new AtomicBoolean();
Thread thread = new Thread(() -> {
RFuture<Boolean> booleanRFuture = lock.tryLockAsync(1, 30, TimeUnit.SECONDS);
booleanRFuture.onComplete((res, e) -> {
if (e != null) {
Assert.fail("Lock aquire failed for some reason");
}
lockAsyncSucceed.set(res);
});
});
thread.start();
Thread.sleep(500);
assertThat(lockAsyncSucceed.get()).isFalse();
lock.unlock();
Thread.sleep(200);
assertThat(lockAsyncSucceed.get()).isTrue();
lock.forceUnlock();
}
@Test
public void testTryLockAsyncFailed() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
AtomicBoolean lockAsyncSucceed = new AtomicBoolean();
Thread thread = new Thread(() -> {
RFuture<Boolean> booleanRFuture = lock.tryLockAsync();
booleanRFuture.onComplete((res, e) -> {
if (e != null) {
Assert.fail("Lock aquire failed for some reason");
}
lockAsyncSucceed.set(res);
});
});
thread.start();
Thread.sleep(1_000);
assertThat(lockAsyncSucceed.get()).isFalse();
lock.unlock();
}
@Test
public void testTryLockAsyncSucceed() throws InterruptedException, ExecutionException {
RLock lock = redisson.getSpinLock("lock");
Boolean result = lock.tryLockAsync().get();
assertThat(result).isTrue();
lock.unlock();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册