提交 e7817366 编写于 作者: N Nikita Koksharov

Fixed - Transactional RBucket object doesn't respect transaction timeout #1928

上级 07fe1038
......@@ -55,18 +55,20 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
private Object state;
private final String transactionId;
public RedissonTransactionalBucket(CommandAsyncExecutor commandExecutor, String name, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
public RedissonTransactionalBucket(CommandAsyncExecutor commandExecutor, long timeout, String name, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
super(commandExecutor, name);
this.operations = operations;
this.executed = executed;
this.transactionId = transactionId;
this.timeout = timeout;
}
public RedissonTransactionalBucket(Codec codec, CommandAsyncExecutor commandExecutor, String name, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
public RedissonTransactionalBucket(Codec codec, CommandAsyncExecutor commandExecutor, long timeout, String name, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
super(codec, commandExecutor, name);
this.operations = operations;
this.executed = executed;
this.transactionId = transactionId;
this.timeout = timeout;
}
@Override
......@@ -198,7 +200,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
@Override
public void run() {
if (state != null) {
operations.add(new DeleteOperation(getName(), getLockName()));
operations.add(new DeleteOperation(getName(), getLockName(), transactionId));
if (state == NULL) {
result.trySuccess(false);
} else {
......@@ -214,7 +216,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return;
}
operations.add(new DeleteOperation(getName(), getLockName()));
operations.add(new DeleteOperation(getName(), getLockName(), transactionId));
state = NULL;
result.trySuccess(res);
});
......@@ -241,12 +243,12 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
@Override
public RFuture<Boolean> compareAndSetAsync(V expect, V update) {
checkState();
RPromise<Boolean> result = new RedissonPromise<Boolean>();
RPromise<Boolean> result = new RedissonPromise<>();
executeLocked(result, new Runnable() {
@Override
public void run() {
if (state != null) {
operations.add(new BucketCompareAndSetOperation<V>(getName(), getLockName(), getCodec(), expect, update));
operations.add(new BucketCompareAndSetOperation<V>(getName(), getLockName(), getCodec(), expect, update, transactionId));
if ((state == NULL && expect == null)
|| isEquals(state, expect)) {
if (update == null) {
......@@ -267,7 +269,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return;
}
operations.add(new BucketCompareAndSetOperation<V>(getName(), getLockName(), getCodec(), expect, update));
operations.add(new BucketCompareAndSetOperation<V>(getName(), getLockName(), getCodec(), expect, update, transactionId));
if ((res == null && expect == null)
|| isEquals(res, expect)) {
if (update == null) {
......@@ -300,7 +302,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
} else {
prevValue = state;
}
operations.add(new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), newValue));
operations.add(new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), newValue, transactionId));
if (newValue == null) {
state = NULL;
} else {
......@@ -321,7 +323,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
} else {
state = newValue;
}
operations.add(new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), newValue));
operations.add(new BucketGetAndSetOperation<V>(getName(), getLockName(), getCodec(), newValue, transactionId));
result.trySuccess(res);
});
}
......@@ -344,7 +346,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
} else {
prevValue = state;
}
operations.add(new BucketGetAndDeleteOperation<V>(getName(), getLockName(), getCodec()));
operations.add(new BucketGetAndDeleteOperation<V>(getName(), getLockName(), getCodec(), transactionId));
state = NULL;
result.trySuccess((V) prevValue);
return;
......@@ -357,7 +359,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
}
state = NULL;
operations.add(new BucketGetAndDeleteOperation<V>(getName(), getLockName(), getCodec()));
operations.add(new BucketGetAndDeleteOperation<V>(getName(), getLockName(), getCodec(), transactionId));
result.trySuccess(res);
});
}
......@@ -367,7 +369,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
@Override
public RFuture<Void> setAsync(V newValue) {
return setAsync(newValue, new BucketSetOperation<V>(getName(), getLockName(), getCodec(), newValue));
return setAsync(newValue, new BucketSetOperation<V>(getName(), getLockName(), getCodec(), newValue, transactionId));
}
private RFuture<Void> setAsync(V newValue, TransactionalOperation operation) {
......@@ -390,17 +392,17 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
@Override
public RFuture<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
return setAsync(value, new BucketSetOperation<V>(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit));
return setAsync(value, new BucketSetOperation<V>(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId));
}
@Override
public RFuture<Boolean> trySetAsync(V newValue) {
return trySet(newValue, new BucketTrySetOperation<V>(getName(), getLockName(), getCodec(), newValue));
return trySet(newValue, new BucketTrySetOperation<V>(getName(), getLockName(), getCodec(), newValue, transactionId));
}
@Override
public RFuture<Boolean> trySetAsync(V value, long timeToLive, TimeUnit timeUnit) {
return trySet(value, new BucketTrySetOperation<V>(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit));
return trySet(value, new BucketTrySetOperation<V>(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId));
}
private RFuture<Boolean> trySet(V newValue, TransactionalOperation operation) {
......
......@@ -19,6 +19,7 @@ import org.redisson.RedissonKeys;
import org.redisson.RedissonLock;
import org.redisson.api.RKeys;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
/**
*
......@@ -28,14 +29,16 @@ import org.redisson.command.CommandAsyncExecutor;
public class DeleteOperation extends TransactionalOperation {
private String lockName;
private String transactionId;
public DeleteOperation(String name) {
this(name, null);
this(name, null, null);
}
public DeleteOperation(String name, String lockName) {
public DeleteOperation(String name, String lockName, String transactionId) {
super(name, null);
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
......@@ -43,7 +46,7 @@ public class DeleteOperation extends TransactionalOperation {
RKeys keys = new RedissonKeys(commandExecutor);
keys.deleteAsync(getName());
if (lockName != null) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
}
......@@ -51,7 +54,7 @@ public class DeleteOperation extends TransactionalOperation {
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
if (lockName != null) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
}
......
......@@ -19,6 +19,7 @@ import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
......@@ -32,25 +33,27 @@ public class BucketCompareAndSetOperation<V> extends TransactionalOperation {
private V expected;
private V value;
private String lockName;
private String transactionId;
public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value) {
public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value, String transactionId) {
super(name, codec);
this.expected = expected;
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
bucket.compareAndSetAsync(expected, value);
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
......
......@@ -19,6 +19,7 @@ import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
......@@ -30,23 +31,25 @@ import org.redisson.transaction.operation.TransactionalOperation;
public class BucketGetAndDeleteOperation<V> extends TransactionalOperation {
private String lockName;
private String transactionId;
public BucketGetAndDeleteOperation(String name, String lockName, Codec codec) {
public BucketGetAndDeleteOperation(String name, String lockName, Codec codec, String transactionId) {
super(name, codec);
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
bucket.getAndDeleteAsync();
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
......
......@@ -19,6 +19,7 @@ import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
......@@ -31,24 +32,26 @@ public class BucketGetAndSetOperation<V> extends TransactionalOperation {
private Object value;
private String lockName;
private String transactionId;
public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value) {
public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RedissonBucket<V> bucket = new RedissonBucket<V>(codec, commandExecutor, name);
bucket.getAndSetAsync((V) value);
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
......
......@@ -21,6 +21,7 @@ import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
......@@ -35,17 +36,19 @@ public class BucketSetOperation<V> extends TransactionalOperation {
private String lockName;
private long timeToLive;
private TimeUnit timeUnit;
private String transactionId;
public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit) {
this(name, lockName, codec, value);
public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) {
this(name, lockName, codec, value, transactionId);
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
}
public BucketSetOperation(String name, String lockName, Codec codec, Object value) {
public BucketSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
......@@ -56,13 +59,13 @@ public class BucketSetOperation<V> extends TransactionalOperation {
} else {
bucket.setAsync((V) value);
}
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
......
......@@ -21,6 +21,7 @@ import org.redisson.RedissonBucket;
import org.redisson.RedissonLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
......@@ -31,21 +32,23 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class BucketTrySetOperation<V> extends TransactionalOperation {
private String transactionId;
private Object value;
private String lockName;
private long timeToLive;
private TimeUnit timeUnit;
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit) {
this(name, lockName, codec, value);
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) {
this(name, lockName, codec, value, transactionId);
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
}
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value) {
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, String transactionId) {
super(name, codec);
this.value = value;
this.lockName = lockName;
this.transactionId = transactionId;
}
@Override
......@@ -56,13 +59,13 @@ public class BucketTrySetOperation<V> extends TransactionalOperation {
} else {
bucket.trySetAsync((V) value);
}
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
lock.unlockAsync();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册