提交 3f51e5f2 编写于 作者: N Nikita Koksharov

refactoring

上级 28f5d195
......@@ -176,10 +176,6 @@ public class ClientConnectionsEntry {
runnable.run();
}
public void removeConnection(Runnable runnable) {
freeConnectionsCounter.remove(runnable);
}
public void releaseConnection() {
freeConnectionsCounter.release();
}
......
......@@ -228,29 +228,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
protected final RFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
RPromise<T> result = new RedissonPromise<T>();
AcquireCallback<T> callback = new AcquireCallback<T>() {
boolean executed;
@Override
public void run() {
executed = true;
connectTo(entry, result, command);
}
@Override
public void accept(T t, Throwable u) {
if (executed) {
return;
}
entry.removeConnection(this);
}
};
result.onComplete(callback);
acquireConnection(entry, callback, command);
return result;
}
Runnable callback = () -> {
connectTo(entry, result, command);
};
acquireConnection(entry, callback, command);
return result;
}
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE && entry.isFailed()) {
......
......@@ -28,7 +28,6 @@ public class AsyncSemaphore {
private final AtomicInteger counter;
private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>();
private final Set<Runnable> removedListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
public AsyncSemaphore(int permits) {
counter = new AtomicInteger(permits);
......@@ -40,25 +39,19 @@ public class AsyncSemaphore {
acquire(runnable);
try {
boolean r = latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
if (!r) {
remove(runnable);
}
return r;
return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
remove(runnable);
Thread.currentThread().interrupt();
return false;
}
}
public int queueSize() {
return listeners.size() - removedListeners.size();
return listeners.size();
}
public void removeListeners() {
listeners.clear();
removedListeners.clear();
}
public void acquire(Runnable listener) {
......@@ -79,21 +72,12 @@ public class AsyncSemaphore {
return;
}
if (removedListeners.remove(listener)) {
counter.incrementAndGet();
tryRun();
} else {
listener.run();
}
listener.run();
} else {
counter.incrementAndGet();
}
}
public void remove(Runnable listener) {
removedListeners.add(listener);
}
public int getCounter() {
return counter.get();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册