提交 ea87f635 编写于 作者: T terrymanu

merge KeepAlive.start() => constructor

上级 ebe86e04
......@@ -75,7 +75,6 @@ public final class EtcdRegistryCenter implements RegistryCenter {
leaseStub = LeaseGrpc.newFutureStub(channel);
watchStub = WatchGrpc.newStub(channel);
keepAlive = new KeepAlive(channel, etcdConfig.getTimeToLiveSeconds());
keepAlive.start();
}
@Override
......
......@@ -17,8 +17,6 @@
package io.shardingjdbc.orchestration.reg.etcd.internal.keepalive;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import etcdserverpb.LeaseGrpc;
import etcdserverpb.LeaseGrpc.LeaseStub;
import etcdserverpb.Rpc.LeaseKeepAliveRequest;
......@@ -26,15 +24,14 @@ import etcdserverpb.Rpc.LeaseKeepAliveResponse;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Keep the lease alive.
......@@ -44,41 +41,27 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public final class KeepAlive {
private static final long INITIAL_DELAY = 100;
private final LeaseStub leaseStub;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ConcurrentMap<Long, KeepAliveTask> keepAliveTasks = Maps.newConcurrentMap();
private final long span;
private final long heartbeatIntervalMilliseconds;
private ScheduledFuture scheduledFuture;
private final ConcurrentMap<Long, KeepAliveTask> keepAliveTasks;
private AtomicBoolean closed = new AtomicBoolean(true);
private final ScheduledFuture scheduledFuture;
public KeepAlive(final Channel channel, final long timeToLiveSeconds) {
this.span = timeToLiveSeconds * 1000 / 3;
this.leaseStub = LeaseGrpc.newStub(channel);
}
/**
* Start keep alive.
*/
public synchronized void start() {
if (closed.get()) {
scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (KeepAliveTask keepAliveTask : keepAliveTasks.values()) {
keepAliveTask.run();
}
leaseStub = LeaseGrpc.newStub(channel);
heartbeatIntervalMilliseconds = timeToLiveSeconds * 1000 / 3;
keepAliveTasks = new ConcurrentHashMap<>();
scheduledFuture = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (KeepAliveTask keepAliveTask : keepAliveTasks.values()) {
keepAliveTask.run();
}
}, INITIAL_DELAY, span / 3000, TimeUnit.MILLISECONDS);
closed.compareAndSet(true, false);
}
}
}, 100L, heartbeatIntervalMilliseconds, TimeUnit.MILLISECONDS);
}
/**
......@@ -87,36 +70,32 @@ public final class KeepAlive {
* @param leaseId lease ID
*/
public void heartbeat(final long leaseId) {
log.debug("Heartbeat lease {}", leaseId);
Preconditions.checkState(!closed.get(), "Keep alive is closed");
final StreamObserver<LeaseKeepAliveRequest> requestObserver = leaseStub.leaseKeepAlive(createResponseObserver(leaseId));
final long tickNow = System.currentTimeMillis();
keepAliveTasks.putIfAbsent(leaseId, new KeepAliveTask(leaseId, tickNow, requestObserver));
keepAliveTasks.putIfAbsent(leaseId, new KeepAliveTask(leaseId, leaseStub.leaseKeepAlive(createResponseObserver(leaseId)), System.currentTimeMillis()));
}
private StreamObserver<LeaseKeepAliveResponse> createResponseObserver(final long leaseId) {
return new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(final LeaseKeepAliveResponse response) {
long id = response.getID();
long ttl = response.getTTL() * 1000;
long tickTime = System.currentTimeMillis() + ttl / 3;
log.debug("Reschedule heartbeat time for lease {} to {}", id, tickTime);
final KeepAliveTask keepAliveTask = keepAliveTasks.get(id);
if (keepAliveTask != null) {
keepAliveTasks.put(id, keepAliveTask.newTick(tickTime));
long leaseId = response.getID();
long ttlMilliseconds = response.getTTL() * 1000L;
long nextHeartbeatTimestamp = System.currentTimeMillis() + ttlMilliseconds / 3L;
log.debug("Reschedule heartbeat time for lease {} to {}", leaseId, nextHeartbeatTimestamp);
KeepAliveTask keepAliveTask = keepAliveTasks.get(leaseId);
if (null != keepAliveTask) {
keepAliveTask.setNextHeartbeatTimestamp(nextHeartbeatTimestamp);
}
}
@Override
public void onCompleted() {
log.info("Keep alive finished");
log.debug("Keep alive finished");
}
@Override
public void onError(final Throwable t) {
log.warn("Keep alive failed, due to {}, renew it", Status.fromThrowable(t));
public void onError(final Throwable cause) {
log.warn("Keep alive failed, due to {}, renew it", Status.fromThrowable(cause));
heartbeat(leaseId);
}
};
......@@ -126,42 +105,43 @@ public final class KeepAlive {
* Close keep alive.
*/
public synchronized void close() {
if (!closed.get()) {
for (KeepAliveTask keepAliveTask: keepAliveTasks.values()) {
keepAliveTask.cancel();
}
keepAliveTasks.clear();
scheduledFuture.cancel(false);
closed.compareAndSet(false, true);
for (KeepAliveTask keepAliveTask: keepAliveTasks.values()) {
keepAliveTask.cancel();
}
keepAliveTasks.clear();
scheduledFuture.cancel(false);
}
@RequiredArgsConstructor
@AllArgsConstructor
private class KeepAliveTask implements Runnable {
private final long id;
private final long tick;
private final long leaseId;
private final StreamObserver<LeaseKeepAliveRequest> observer;
private boolean fired;
private long nextHeartbeatTimestamp;
public void cancel() {
observer.onCompleted();
/**
* Set next heartbeat timestamp.
*
* @param nextHeartbeatTimestamp Next heartbeat timestamp.
*/
public void setNextHeartbeatTimestamp(final long nextHeartbeatTimestamp) {
this.nextHeartbeatTimestamp = nextHeartbeatTimestamp;
}
public KeepAliveTask newTick(final long tick) {
return new KeepAliveTask(id, tick, observer);
/**
* Cancel task.
*/
public void cancel() {
observer.onCompleted();
}
@Override
public void run() {
if (!fired && tick <= System.currentTimeMillis()) {
log.debug("heart beat lease {} at time {}", id, tick);
LeaseKeepAliveRequest request = LeaseKeepAliveRequest.newBuilder().setID(id).build();
observer.onNext(request);
fired = true;
if (nextHeartbeatTimestamp <= System.currentTimeMillis()) {
log.debug("Heartbeat lease {} at time {}", leaseId, nextHeartbeatTimestamp);
observer.onNext(LeaseKeepAliveRequest.newBuilder().setID(leaseId).build());
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册