diff --git a/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/EtcdRegistryCenter.java b/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/EtcdRegistryCenter.java index d17b5f07bb0f03937f73f9908dc3be5ad1123d31..bfa197c89f1af99c14b70a0866eb4f3f4fe244a0 100644 --- a/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/EtcdRegistryCenter.java +++ b/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/EtcdRegistryCenter.java @@ -75,7 +75,6 @@ public final class EtcdRegistryCenter implements RegistryCenter { leaseStub = LeaseGrpc.newFutureStub(channel); watchStub = WatchGrpc.newStub(channel); keepAlive = new KeepAlive(channel, etcdConfig.getTimeToLiveSeconds()); - keepAlive.start(); } @Override diff --git a/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/internal/keepalive/KeepAlive.java b/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/internal/keepalive/KeepAlive.java index 140957ae8098b3f8859c96d5b4decb2fbc841006..8073b55a6dddb4b06ef384e6f20666bc6ea08fbf 100644 --- a/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/internal/keepalive/KeepAlive.java +++ b/sharding-jdbc-orchestration/src/main/java/io/shardingjdbc/orchestration/reg/etcd/internal/keepalive/KeepAlive.java @@ -17,8 +17,6 @@ package io.shardingjdbc.orchestration.reg.etcd.internal.keepalive; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import etcdserverpb.LeaseGrpc; import etcdserverpb.LeaseGrpc.LeaseStub; import etcdserverpb.Rpc.LeaseKeepAliveRequest; @@ -26,15 +24,14 @@ import etcdserverpb.Rpc.LeaseKeepAliveResponse; import io.grpc.Channel; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import lombok.RequiredArgsConstructor; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * Keep the lease alive. @@ -44,41 +41,27 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public final class KeepAlive { - private static final long INITIAL_DELAY = 100; - private final LeaseStub leaseStub; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - private final ConcurrentMap keepAliveTasks = Maps.newConcurrentMap(); - - private final long span; + private final long heartbeatIntervalMilliseconds; - private ScheduledFuture scheduledFuture; + private final ConcurrentMap keepAliveTasks; - private AtomicBoolean closed = new AtomicBoolean(true); + private final ScheduledFuture scheduledFuture; public KeepAlive(final Channel channel, final long timeToLiveSeconds) { - this.span = timeToLiveSeconds * 1000 / 3; - this.leaseStub = LeaseGrpc.newStub(channel); - } - - /** - * Start keep alive. - */ - public synchronized void start() { - if (closed.get()) { - scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - for (KeepAliveTask keepAliveTask : keepAliveTasks.values()) { - keepAliveTask.run(); - } + leaseStub = LeaseGrpc.newStub(channel); + heartbeatIntervalMilliseconds = timeToLiveSeconds * 1000 / 3; + keepAliveTasks = new ConcurrentHashMap<>(); + scheduledFuture = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2).scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + for (KeepAliveTask keepAliveTask : keepAliveTasks.values()) { + keepAliveTask.run(); } - }, INITIAL_DELAY, span / 3000, TimeUnit.MILLISECONDS); - closed.compareAndSet(true, false); - } + } + }, 100L, heartbeatIntervalMilliseconds, TimeUnit.MILLISECONDS); } /** @@ -87,36 +70,32 @@ public final class KeepAlive { * @param leaseId lease ID */ public void heartbeat(final long leaseId) { - log.debug("Heartbeat lease {}", leaseId); - Preconditions.checkState(!closed.get(), "Keep alive is closed"); - final StreamObserver requestObserver = leaseStub.leaseKeepAlive(createResponseObserver(leaseId)); - final long tickNow = System.currentTimeMillis(); - keepAliveTasks.putIfAbsent(leaseId, new KeepAliveTask(leaseId, tickNow, requestObserver)); + keepAliveTasks.putIfAbsent(leaseId, new KeepAliveTask(leaseId, leaseStub.leaseKeepAlive(createResponseObserver(leaseId)), System.currentTimeMillis())); } - + private StreamObserver createResponseObserver(final long leaseId) { return new StreamObserver() { + @Override public void onNext(final LeaseKeepAliveResponse response) { - long id = response.getID(); - long ttl = response.getTTL() * 1000; - - long tickTime = System.currentTimeMillis() + ttl / 3; - log.debug("Reschedule heartbeat time for lease {} to {}", id, tickTime); - final KeepAliveTask keepAliveTask = keepAliveTasks.get(id); - if (keepAliveTask != null) { - keepAliveTasks.put(id, keepAliveTask.newTick(tickTime)); + long leaseId = response.getID(); + long ttlMilliseconds = response.getTTL() * 1000L; + long nextHeartbeatTimestamp = System.currentTimeMillis() + ttlMilliseconds / 3L; + log.debug("Reschedule heartbeat time for lease {} to {}", leaseId, nextHeartbeatTimestamp); + KeepAliveTask keepAliveTask = keepAliveTasks.get(leaseId); + if (null != keepAliveTask) { + keepAliveTask.setNextHeartbeatTimestamp(nextHeartbeatTimestamp); } } @Override public void onCompleted() { - log.info("Keep alive finished"); + log.debug("Keep alive finished"); } @Override - public void onError(final Throwable t) { - log.warn("Keep alive failed, due to {}, renew it", Status.fromThrowable(t)); + public void onError(final Throwable cause) { + log.warn("Keep alive failed, due to {}, renew it", Status.fromThrowable(cause)); heartbeat(leaseId); } }; @@ -126,42 +105,43 @@ public final class KeepAlive { * Close keep alive. */ public synchronized void close() { - if (!closed.get()) { - for (KeepAliveTask keepAliveTask: keepAliveTasks.values()) { - keepAliveTask.cancel(); - } - keepAliveTasks.clear(); - scheduledFuture.cancel(false); - closed.compareAndSet(false, true); + for (KeepAliveTask keepAliveTask: keepAliveTasks.values()) { + keepAliveTask.cancel(); } + keepAliveTasks.clear(); + scheduledFuture.cancel(false); } - @RequiredArgsConstructor + @AllArgsConstructor private class KeepAliveTask implements Runnable { - private final long id; - - private final long tick; + private final long leaseId; private final StreamObserver observer; - private boolean fired; + private long nextHeartbeatTimestamp; - public void cancel() { - observer.onCompleted(); + /** + * Set next heartbeat timestamp. + * + * @param nextHeartbeatTimestamp Next heartbeat timestamp. + */ + public void setNextHeartbeatTimestamp(final long nextHeartbeatTimestamp) { + this.nextHeartbeatTimestamp = nextHeartbeatTimestamp; } - public KeepAliveTask newTick(final long tick) { - return new KeepAliveTask(id, tick, observer); + /** + * Cancel task. + */ + public void cancel() { + observer.onCompleted(); } @Override public void run() { - if (!fired && tick <= System.currentTimeMillis()) { - log.debug("heart beat lease {} at time {}", id, tick); - LeaseKeepAliveRequest request = LeaseKeepAliveRequest.newBuilder().setID(id).build(); - observer.onNext(request); - fired = true; + if (nextHeartbeatTimestamp <= System.currentTimeMillis()) { + log.debug("Heartbeat lease {} at time {}", leaseId, nextHeartbeatTimestamp); + observer.onNext(LeaseKeepAliveRequest.newBuilder().setID(leaseId).build()); } } }