/*
* Copyright 2016-2018 shardingsphere.io.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.shardingsphere.jdbc.orchestration.reg.etcd.internal.keepalive;
import etcdserverpb.LeaseGrpc;
import etcdserverpb.LeaseGrpc.LeaseStub;
import etcdserverpb.Rpc.LeaseKeepAliveRequest;
import etcdserverpb.Rpc.LeaseKeepAliveResponse;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Keep the lease alive.
*
* @author junxiong
*/
@Slf4j
public final class KeepAlive implements AutoCloseable {
private static final long DELAY_MILLISECONDS = 100L;
private final LeaseStub leaseStub;
private final long heartbeatIntervalMilliseconds;
private final ConcurrentMap keepAliveTasks;
private final ScheduledFuture scheduledFuture;
private final ScheduledExecutorService scheduledService;
public KeepAlive(final Channel channel, final long timeToLiveSeconds) {
leaseStub = LeaseGrpc.newStub(channel);
heartbeatIntervalMilliseconds = timeToLiveSeconds * 1000L / 3L;
keepAliveTasks = new ConcurrentHashMap<>();
scheduledService = Executors.newScheduledThreadPool(1);
scheduledFuture = scheduledService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (KeepAliveTask keepAliveTask : keepAliveTasks.values()) {
keepAliveTask.heartbeat();
}
}
}, DELAY_MILLISECONDS, heartbeatIntervalMilliseconds, TimeUnit.MILLISECONDS);
}
/**
* keep lease alive.
*
* @param leaseId lease ID
*/
public void heartbeat(final long leaseId) {
keepAliveTasks.putIfAbsent(leaseId, new KeepAliveTask(leaseId, leaseStub.leaseKeepAlive(createResponseObserver(leaseId)), System.currentTimeMillis()));
}
private StreamObserver createResponseObserver(final long leaseId) {
return new StreamObserver() {
@Override
public void onNext(final LeaseKeepAliveResponse response) {
long leaseId = response.getID();
long nextHeartbeatTimestamp = System.currentTimeMillis() + response.getTTL() * 1000L / 3L;
log.debug("Reschedule heartbeat time for lease {} to {}", leaseId, nextHeartbeatTimestamp);
KeepAliveTask keepAliveTask = keepAliveTasks.get(leaseId);
if (null != keepAliveTask) {
keepAliveTask.setNextHeartbeatTimestamp(nextHeartbeatTimestamp);
}
}
@Override
public void onCompleted() {
log.debug("Keep alive finished");
}
@Override
public void onError(final Throwable cause) {
log.warn("Keep alive failed, due to {}, renew it", Status.fromThrowable(cause));
heartbeat(leaseId);
}
};
}
@Override
public void close() {
for (KeepAliveTask keepAliveTask: keepAliveTasks.values()) {
keepAliveTask.close();
}
keepAliveTasks.clear();
scheduledService.shutdown();
scheduledFuture.cancel(false);
}
@AllArgsConstructor
private class KeepAliveTask implements AutoCloseable {
private final long leaseId;
private final StreamObserver observer;
@Setter
private long nextHeartbeatTimestamp;
/**
* keep heartbeat.
*/
public void heartbeat() {
if (nextHeartbeatTimestamp <= System.currentTimeMillis()) {
log.debug("Heartbeat lease {} at time {}", leaseId, nextHeartbeatTimestamp);
observer.onNext(LeaseKeepAliveRequest.newBuilder().setID(leaseId).build());
}
}
@Override
public void close() {
observer.onCompleted();
}
}
}