diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index 99fd1e6df4189eb9ab4bd53ac27b55c51eb538a0..bfed1c0f7aebf9394773bab9e997e8ebd016a424 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -55,18 +55,20 @@ public class RedissonTransactionalBucket extends RedissonBucket { private Object state; private final String transactionId; - public RedissonTransactionalBucket(CommandAsyncExecutor commandExecutor, String name, List operations, AtomicBoolean executed, String transactionId) { + public RedissonTransactionalBucket(CommandAsyncExecutor commandExecutor, long timeout, String name, List operations, AtomicBoolean executed, String transactionId) { super(commandExecutor, name); this.operations = operations; this.executed = executed; this.transactionId = transactionId; + this.timeout = timeout; } - public RedissonTransactionalBucket(Codec codec, CommandAsyncExecutor commandExecutor, String name, List operations, AtomicBoolean executed, String transactionId) { + public RedissonTransactionalBucket(Codec codec, CommandAsyncExecutor commandExecutor, long timeout, String name, List operations, AtomicBoolean executed, String transactionId) { super(codec, commandExecutor, name); this.operations = operations; this.executed = executed; this.transactionId = transactionId; + this.timeout = timeout; } @Override @@ -198,7 +200,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { @Override public void run() { if (state != null) { - operations.add(new DeleteOperation(getName(), getLockName())); + operations.add(new DeleteOperation(getName(), getLockName(), transactionId)); if (state == NULL) { result.trySuccess(false); } else { @@ -214,7 +216,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return; } - operations.add(new DeleteOperation(getName(), getLockName())); + operations.add(new DeleteOperation(getName(), getLockName(), transactionId)); state = NULL; result.trySuccess(res); }); @@ -241,12 +243,12 @@ public class RedissonTransactionalBucket extends RedissonBucket { @Override public RFuture compareAndSetAsync(V expect, V update) { checkState(); - RPromise result = new RedissonPromise(); + RPromise result = new RedissonPromise<>(); executeLocked(result, new Runnable() { @Override public void run() { if (state != null) { - operations.add(new BucketCompareAndSetOperation(getName(), getLockName(), getCodec(), expect, update)); + operations.add(new BucketCompareAndSetOperation(getName(), getLockName(), getCodec(), expect, update, transactionId)); if ((state == NULL && expect == null) || isEquals(state, expect)) { if (update == null) { @@ -267,7 +269,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return; } - operations.add(new BucketCompareAndSetOperation(getName(), getLockName(), getCodec(), expect, update)); + operations.add(new BucketCompareAndSetOperation(getName(), getLockName(), getCodec(), expect, update, transactionId)); if ((res == null && expect == null) || isEquals(res, expect)) { if (update == null) { @@ -300,7 +302,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } else { prevValue = state; } - operations.add(new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), newValue)); + operations.add(new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), newValue, transactionId)); if (newValue == null) { state = NULL; } else { @@ -321,7 +323,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } else { state = newValue; } - operations.add(new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), newValue)); + operations.add(new BucketGetAndSetOperation(getName(), getLockName(), getCodec(), newValue, transactionId)); result.trySuccess(res); }); } @@ -344,7 +346,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } else { prevValue = state; } - operations.add(new BucketGetAndDeleteOperation(getName(), getLockName(), getCodec())); + operations.add(new BucketGetAndDeleteOperation(getName(), getLockName(), getCodec(), transactionId)); state = NULL; result.trySuccess((V) prevValue); return; @@ -357,7 +359,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { } state = NULL; - operations.add(new BucketGetAndDeleteOperation(getName(), getLockName(), getCodec())); + operations.add(new BucketGetAndDeleteOperation(getName(), getLockName(), getCodec(), transactionId)); result.trySuccess(res); }); } @@ -367,7 +369,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { @Override public RFuture setAsync(V newValue) { - return setAsync(newValue, new BucketSetOperation(getName(), getLockName(), getCodec(), newValue)); + return setAsync(newValue, new BucketSetOperation(getName(), getLockName(), getCodec(), newValue, transactionId)); } private RFuture setAsync(V newValue, TransactionalOperation operation) { @@ -390,17 +392,17 @@ public class RedissonTransactionalBucket extends RedissonBucket { @Override public RFuture setAsync(V value, long timeToLive, TimeUnit timeUnit) { - return setAsync(value, new BucketSetOperation(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit)); + return setAsync(value, new BucketSetOperation(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId)); } @Override public RFuture trySetAsync(V newValue) { - return trySet(newValue, new BucketTrySetOperation(getName(), getLockName(), getCodec(), newValue)); + return trySet(newValue, new BucketTrySetOperation(getName(), getLockName(), getCodec(), newValue, transactionId)); } @Override public RFuture trySetAsync(V value, long timeToLive, TimeUnit timeUnit) { - return trySet(value, new BucketTrySetOperation(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit)); + return trySet(value, new BucketTrySetOperation(getName(), getLockName(), getCodec(), value, timeToLive, timeUnit, transactionId)); } private RFuture trySet(V newValue, TransactionalOperation operation) { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java index 7705f783707b1f07e570aecf01d8639201e1af49..6ed866717210b8ed4830c9b53440c45b1a40e5e1 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java @@ -19,6 +19,7 @@ import org.redisson.RedissonKeys; import org.redisson.RedissonLock; import org.redisson.api.RKeys; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; /** * @@ -28,14 +29,16 @@ import org.redisson.command.CommandAsyncExecutor; public class DeleteOperation extends TransactionalOperation { private String lockName; + private String transactionId; public DeleteOperation(String name) { - this(name, null); + this(name, null, null); } - public DeleteOperation(String name, String lockName) { + public DeleteOperation(String name, String lockName, String transactionId) { super(name, null); this.lockName = lockName; + this.transactionId = transactionId; } @Override @@ -43,7 +46,7 @@ public class DeleteOperation extends TransactionalOperation { RKeys keys = new RedissonKeys(commandExecutor); keys.deleteAsync(getName()); if (lockName != null) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } } @@ -51,7 +54,7 @@ public class DeleteOperation extends TransactionalOperation { @Override public void rollback(CommandAsyncExecutor commandExecutor) { if (lockName != null) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java index 2e5bbb1e797331d61d8d9fe1d1367ed279dd5fff..77fa9e2885913e1059faacd046778481e5c97858 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketCompareAndSetOperation.java @@ -19,6 +19,7 @@ import org.redisson.RedissonBucket; import org.redisson.RedissonLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; import org.redisson.transaction.operation.TransactionalOperation; /** @@ -32,25 +33,27 @@ public class BucketCompareAndSetOperation extends TransactionalOperation { private V expected; private V value; private String lockName; + private String transactionId; - public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value) { + public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value, String transactionId) { super(name, codec); this.expected = expected; this.value = value; this.lockName = lockName; + this.transactionId = transactionId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { RedissonBucket bucket = new RedissonBucket(codec, commandExecutor, name); bucket.compareAndSetAsync(expected, value); - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java index e99e182171d6384474a27fcda9b304d239717c19..f378076c64c7c53a05a29e09d1979ae0dff0e823 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndDeleteOperation.java @@ -19,6 +19,7 @@ import org.redisson.RedissonBucket; import org.redisson.RedissonLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; import org.redisson.transaction.operation.TransactionalOperation; /** @@ -30,23 +31,25 @@ import org.redisson.transaction.operation.TransactionalOperation; public class BucketGetAndDeleteOperation extends TransactionalOperation { private String lockName; + private String transactionId; - public BucketGetAndDeleteOperation(String name, String lockName, Codec codec) { + public BucketGetAndDeleteOperation(String name, String lockName, Codec codec, String transactionId) { super(name, codec); this.lockName = lockName; + this.transactionId = transactionId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { RedissonBucket bucket = new RedissonBucket(codec, commandExecutor, name); bucket.getAndDeleteAsync(); - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java index 061d3d649d60449ecce516f5466d53aa5618585b..9e921ea7edf4f57bed2b91951a9605340ab11e93 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketGetAndSetOperation.java @@ -19,6 +19,7 @@ import org.redisson.RedissonBucket; import org.redisson.RedissonLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; import org.redisson.transaction.operation.TransactionalOperation; /** @@ -31,24 +32,26 @@ public class BucketGetAndSetOperation extends TransactionalOperation { private Object value; private String lockName; + private String transactionId; - public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value) { + public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { super(name, codec); this.value = value; this.lockName = lockName; + this.transactionId = transactionId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { RedissonBucket bucket = new RedissonBucket(codec, commandExecutor, name); bucket.getAndSetAsync((V) value); - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java index 1a50a79536985033609368cae0c0d0a1a3fe7824..1992945778f8d2090fb7b45cb394961f554a329c 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketSetOperation.java @@ -21,6 +21,7 @@ import org.redisson.RedissonBucket; import org.redisson.RedissonLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; import org.redisson.transaction.operation.TransactionalOperation; /** @@ -35,17 +36,19 @@ public class BucketSetOperation extends TransactionalOperation { private String lockName; private long timeToLive; private TimeUnit timeUnit; + private String transactionId; - public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit) { - this(name, lockName, codec, value); + public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) { + this(name, lockName, codec, value, transactionId); this.timeToLive = timeToLive; this.timeUnit = timeUnit; } - public BucketSetOperation(String name, String lockName, Codec codec, Object value) { + public BucketSetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { super(name, codec); this.value = value; this.lockName = lockName; + this.transactionId = transactionId; } @Override @@ -56,13 +59,13 @@ public class BucketSetOperation extends TransactionalOperation { } else { bucket.setAsync((V) value); } - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java index 07470cd29481ea375df686175cf93eb5079b3f4e..24fafcb5b303cfd5a73b05ec8ca23ea468bedf9c 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketTrySetOperation.java @@ -21,6 +21,7 @@ import org.redisson.RedissonBucket; import org.redisson.RedissonLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; import org.redisson.transaction.operation.TransactionalOperation; /** @@ -31,21 +32,23 @@ import org.redisson.transaction.operation.TransactionalOperation; */ public class BucketTrySetOperation extends TransactionalOperation { + private String transactionId; private Object value; private String lockName; private long timeToLive; private TimeUnit timeUnit; - public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit) { - this(name, lockName, codec, value); + public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit, String transactionId) { + this(name, lockName, codec, value, transactionId); this.timeToLive = timeToLive; this.timeUnit = timeUnit; } - public BucketTrySetOperation(String name, String lockName, Codec codec, Object value) { + public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, String transactionId) { super(name, codec); this.value = value; this.lockName = lockName; + this.transactionId = transactionId; } @Override @@ -56,13 +59,13 @@ public class BucketTrySetOperation extends TransactionalOperation { } else { bucket.trySetAsync((V) value); } - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RedissonLock lock = new RedissonLock(commandExecutor, lockName); + RedissonLock lock = new RedissonTransactionalLock(commandExecutor, lockName, transactionId); lock.unlockAsync(); }