提交 5207d920 编写于 作者: N Nikita Koksharov

Feature - RRingBuffer object added. #249

上级 75153ca7
......@@ -64,6 +64,7 @@ import org.redisson.api.RQueue;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RRemoteService;
import org.redisson.api.RRingBuffer;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScript;
......@@ -546,6 +547,16 @@ public class Redisson implements RedissonClient {
return new RedissonQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RRingBuffer<V> getRingBuffer(String name) {
return new RedissonRingBuffer<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RRingBuffer<V> getRingBuffer(String name, Codec codec) {
return new RedissonRingBuffer<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(connectionManager.getCommandExecutor(), name, this);
......
......@@ -51,6 +51,7 @@ import org.redisson.api.RQueueReactive;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RRemoteService;
import org.redisson.api.RRingBufferReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
......@@ -356,6 +357,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonQueue<V>(codec, commandExecutor, name, null),
new RedissonListReactive<V>(codec, commandExecutor, name), RQueueReactive.class);
}
@Override
public <V> RRingBufferReactive<V> getRingBuffer(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonRingBuffer<V>(commandExecutor, name, null), RRingBufferReactive.class);
}
@Override
public <V> RRingBufferReactive<V> getRingBuffer(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonRingBuffer<V>(codec, commandExecutor, name, null), RRingBufferReactive.class);
}
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name) {
......
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.redisson.api.RFuture;
import org.redisson.api.RRingBuffer;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonRingBuffer<V> extends RedissonQueue<V> implements RRingBuffer<V> {
private static final RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor(0));
private final String settingsName;
public RedissonRingBuffer(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
settingsName = prefixName("redisson_rb", getName());
}
public RedissonRingBuffer(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
settingsName = prefixName("redisson_rb", getName());
}
@Override
public RFuture<Boolean> trySetCapacityAsync(int capacity) {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.SETNX, settingsName, capacity);
}
@Override
public boolean trySetCapacity(int capacity) {
return get(trySetCapacityAsync(capacity));
}
@Override
public RFuture<Boolean> addAsync(V e) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local limit = redis.call('get', KEYS[2]); "
+ "assert(limit ~= false, 'RingBuffer capacity is not defined'); "
+ "local size = redis.call('rpush', KEYS[1], ARGV[1]); "
+ "if size > tonumber(limit) then "
+ "redis.call('lpop', KEYS[1]); "
+ "end; "
+ "return 1; ",
Arrays.asList(getName(), settingsName), encode(e));
}
@Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
}
List<Object> args = new ArrayList<>(c.size());
encode(args, c);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local limit = redis.call('get', KEYS[2]); "
+ "assert(limit ~= false, 'RingBuffer capacity is not defined'); "
+ "local size = 0; "
+ "for i=1, #ARGV,5000 do "
+ "size = redis.call('rpush', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); "
+ "end; "
+ "local extraSize = size - tonumber(limit); "
+ "if extraSize > 0 then "
+ "redis.call('ltrim', KEYS[1], extraSize, -1); "
+ "end; "
+ "return 1; ",
Arrays.asList(getName(), settingsName), args.toArray());
}
@Override
public int remainingCapacity() {
return get(remainingCapacityAsync());
}
@Override
public RFuture<Integer> remainingCapacityAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local limit = redis.call('get', KEYS[2]); "
+ "assert(limit ~= false, 'RingBuffer capacity is not defined'); "
+ "local size = redis.call('llen', KEYS[1]); "
+ "return math.max(tonumber(limit) - size, 0); ",
Arrays.asList(getName(), settingsName));
}
@Override
public RFuture<Integer> capacityAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, GET_INTEGER, settingsName);
}
@Override
public int capacity() {
return get(capacityAsync());
}
}
......@@ -47,6 +47,7 @@ import org.redisson.api.RQueueRx;
import org.redisson.api.RRateLimiterRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RRemoteService;
import org.redisson.api.RRingBufferRx;
import org.redisson.api.RScoredSortedSetRx;
import org.redisson.api.RScriptRx;
import org.redisson.api.RSemaphoreRx;
......@@ -332,6 +333,16 @@ public class RedissonRx implements RedissonRxClient {
new RedissonListRx<V>(new RedissonList<V>(codec, commandExecutor, name, null)), RQueueRx.class);
}
@Override
public <V> RRingBufferRx<V> getRingBuffer(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonRingBuffer<V>(commandExecutor, name, null), RRingBufferRx.class);
}
@Override
public <V> RRingBufferRx<V> getRingBuffer(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonRingBuffer<V>(codec, commandExecutor, name, null), RRingBufferRx.class);
}
@Override
public <V> RBlockingQueueRx<V> getBlockingQueue(String name) {
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(commandExecutor, name, null);
......
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* This RingBuffer based queue evicts elements from the head if queue capacity became full.
* <p>
* The head element removed if new element added and queue is full.
* <p>
* Must be initialized with capacity size {@link #trySetCapacity(int)} before usage.
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public interface RRingBuffer<V> extends RQueue<V>, RRingBufferAsync<V> {
/**
* Sets queue capacity only if it is not set before.
*
* @param capacity - queue capacity
* @return <code>true</code> if capacity set successfully
* <code>false</code> if capacity already set
*/
boolean trySetCapacity(int capacity);
/**
* Returns remaining capacity of this queue
*
* @return remaining capacity
*/
int remainingCapacity();
/**
* Returns capacity of this queue
*
* @return queue capacity
*/
int capacity();
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* RingBuffer based queue evicts elements from the head if queue capacity became full.
* <p>
* The head element removed if new element added and queue is full.
* <p>
* Must be initialized with capacity size {@link #trySetCapacityAsync(int)} before usage.
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public interface RRingBufferAsync<V> extends RQueueAsync<V> {
/**
* Sets queue capacity only if it is not set before.
*
* @param capacity - queue capacity
* @return <code>true</code> if capacity set successfully
* <code>false</code> if capacity already set
*/
RFuture<Boolean> trySetCapacityAsync(int capacity);
/**
* Returns remaining capacity of this queue
*
* @return remaining capacity
*/
RFuture<Integer> remainingCapacityAsync();
/**
* Returns capacity of this queue
*
* @return queue capacity
*/
RFuture<Integer> capacityAsync();
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import reactor.core.publisher.Mono;
/**
* RingBuffer based queue evicts elements from the head if queue capacity became full.
* <p>
* The head element removed if new element added and queue is full.
* <p>
* Must be initialized with capacity size {@link #trySetCapacity(int)} before usage.
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public interface RRingBufferReactive<V> extends RQueueReactive<V> {
/**
* Sets queue capacity only if it is not set before.
*
* @param capacity - queue capacity
* @return <code>true</code> if capacity set successfully
* <code>false</code> if capacity already set
*/
Mono<Boolean> trySetCapacity(int capacity);
/**
* Returns remaining capacity of this queue
*
* @return remaining capacity
*/
Mono<Integer> remainingCapacity();
/**
* Returns capacity of this queue
*
* @return queue capacity
*/
Mono<Integer> capacity();
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.Single;
/**
* RingBuffer based queue evicts elements from the head if queue capacity became full.
* <p>
* The head element removed if new element added and queue is full.
* <p>
* Must be initialized with capacity size {@link #trySetCapacity(int)} before usage.
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public interface RRingBufferRx<V> extends RQueueRx<V> {
/**
* Sets queue capacity only if it is not set before.
*
* @param capacity - queue capacity
* @return <code>true</code> if capacity set successfully
* <code>false</code> if capacity already set
*/
Single<Boolean> trySetCapacity(int capacity);
/**
* Returns remaining capacity of this queue
*
* @return remaining capacity
*/
Single<Integer> remainingCapacity();
/**
* Returns capacity of this queue
*
* @return queue capacity
*/
Single<Integer> capacity();
}
......@@ -635,6 +635,25 @@ public interface RedissonClient {
* @return Queue object
*/
<V> RQueue<V> getQueue(String name, Codec codec);
/**
* Returns RingBuffer based queue.
*
* @param <V> value type
* @param name - name of object
* @return RingBuffer object
*/
<V> RRingBuffer<V> getRingBuffer(String name);
/**
* Returns RingBuffer based queue.
*
* @param <V> value type
* @param name - name of object
* @param codec - codec for values
* @return RingBuffer object
*/
<V> RRingBuffer<V> getRingBuffer(String name, Codec codec);
/**
* Returns priority unbounded queue instance by name.
......
......@@ -505,6 +505,25 @@ public interface RedissonReactiveClient {
*/
<V> RQueueReactive<V> getQueue(String name, Codec codec);
/**
* Returns RingBuffer based queue.
*
* @param <V> value type
* @param name - name of object
* @return RingBuffer object
*/
<V> RRingBufferReactive<V> getRingBuffer(String name);
/**
* Returns RingBuffer based queue.
*
* @param <V> value type
* @param name - name of object
* @param codec - codec for values
* @return RingBuffer object
*/
<V> RRingBufferReactive<V> getRingBuffer(String name, Codec codec);
/**
* Returns blocking queue instance by name.
*
......
......@@ -493,6 +493,25 @@ public interface RedissonRxClient {
*/
<V> RQueueRx<V> getQueue(String name, Codec codec);
/**
* Returns RingBuffer based queue.
*
* @param <V> value type
* @param name - name of object
* @return RingBuffer object
*/
<V> RRingBufferRx<V> getRingBuffer(String name);
/**
* Returns RingBuffer based queue.
*
* @param <V> value type
* @param name - name of object
* @param codec - codec for values
* @return RingBuffer object
*/
<V> RRingBufferRx<V> getRingBuffer(String name, Codec codec);
/**
* Returns blocking queue instance by name.
*
......
package org.redisson;
import org.junit.Test;
import static org.assertj.core.api.Assertions.*;
import java.util.Arrays;
import java.util.List;
import org.redisson.api.RRingBuffer;
public class RedissonRingBufferTest extends BaseTest {
@Test
public void testAdd() {
RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");
assertThat(buffer.remainingCapacity()).isZero();
assertThat(buffer.capacity()).isZero();
buffer.trySetCapacity(10);
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(10);
for (int i = 0; i < 10; i++) {
buffer.add(i);
}
assertThat(buffer).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(0);
for (int i = 0; i < 5; i++) {
buffer.add(i*10);
}
assertThat(buffer).containsExactly(5, 6, 7, 8, 9, 0, 10, 20, 30, 40);
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(0);
buffer.poll();
buffer.poll();
buffer.poll();
assertThat(buffer).containsExactly(8, 9, 0, 10, 20, 30, 40);
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(3);
}
@Test
public void testAddAll() {
RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");
assertThat(buffer.remainingCapacity()).isZero();
assertThat(buffer.capacity()).isZero();
buffer.trySetCapacity(10);
assertThat(buffer.capacity()).isEqualTo(10);
List<Integer> s = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
buffer.addAll(s);
assertThat(buffer).containsExactly((Integer[]) s.toArray());
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(0);
List<Integer> newlist = Arrays.asList(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 91, 92, 93);
buffer.addAll(newlist);
assertThat(buffer).containsExactly(30, 40, 50, 60, 70, 80, 90, 91, 92, 93);
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(0);
buffer.poll();
buffer.poll();
buffer.poll();
assertThat(buffer).containsExactly(60, 70, 80, 90, 91, 92, 93);
assertThat(buffer.capacity()).isEqualTo(10);
assertThat(buffer.remainingCapacity()).isEqualTo(3);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册