提交 c1e948a3 编写于 作者: T tristaZero

Merge branch 'dev' of ssh://github.com/shardingjdbc/sharding-jdbc into dev

# Conflicts:
#	pom.xml
#	sharding-orchestration/sharding-orchestration-reg/pom.xml
#	sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/pom.xml
......@@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere</artifactId>
<version>4.0.0-RC2-SNAPSHOT</version>
<version>4.0.0-RC1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
......@@ -67,10 +67,6 @@
<atomikos.version>4.0.6</atomikos.version>
<curator.version>2.10.0</curator.version>
<grpc.version>1.7.0</grpc.version>
<protobuf.version>3.4.0</protobuf.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
<opentracing.version>0.30.0</opentracing.version>
<lombok.version>1.16.4</lombok.version>
......@@ -101,6 +97,7 @@
<maven-jar-plugin.version>2.6</maven-jar-plugin.version>
<maven-surefire-plugin.version>2.18.1</maven-surefire-plugin.version>
<maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
<maven-release-plugin.version>2.5.3</maven-release-plugin.version>
<maven-site-plugin.version>3.4</maven-site-plugin.version>
<maven-enforcer-plugin.version>1.4</maven-enforcer-plugin.version>
<maven-project-info-reports-plugin.version>2.8</maven-project-info-reports-plugin.version>
......@@ -118,7 +115,6 @@
<jdepend-maven-plugin.version>2.0</jdepend-maven-plugin.version>
<taglist-maven-plugin.version>2.4</taglist-maven-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<docker-maven-plugin.version>0.4.14</docker-maven-plugin.version>
<apache-rat-plugin.version>0.12</apache-rat-plugin.version>
......@@ -221,26 +217,6 @@
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>${guava-retrying.version}</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
......@@ -540,6 +516,14 @@
<skip>${maven.deploy.skip}</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>${maven-release-plugin.version}</version>
<configuration>
<tagNameFormat>@{project.version}</tagNameFormat>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
......@@ -676,24 +660,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.eluder.coveralls</groupId>
<artifactId>coveralls-maven-plugin</artifactId>
......@@ -724,11 +690,7 @@
</ignoreMethodAnnotations>
<excludes>
<exclude>org/apache/shardingsphere/core/parse/antlr/autogen/*.class</exclude>
<exclude>authpb/*.class</exclude>
<exclude>mvccpb/*.class</exclude>
<exclude>etcdserverpb/*.class</exclude>
<exclude>org/apache/shardingsphere/orchestration/reg/zookeeper/*.class</exclude>
<exclude>org/apache/shardingsphere/orchestration/reg/etcd/*.class</exclude>
<exclude>org/apache/shardingsphere/**/*Test.class</exclude>
<exclude>org/apache/shardingsphere/**/Test*.class</exclude>
</excludes>
......@@ -915,27 +877,10 @@
<mailingLists>
<mailingList>
<name>Zhangliang</name>
<post>zhangliang@apache.org</post>
</mailingList>
<mailingList>
<name>Caohao</name>
<post>caohaoch@gmail.com</post>
<name>ShardingSphere Developer List</name>
<post>dev@shardingsphere.incubator.apache.org</post>
<subscribe>dev-subscribe@shardingsphere.incubator.apache.org</subscribe>
<unsubscribe>dev-unsubscribe@shardingsphere.incubator.apache.org</unsubscribe>
</mailingList>
</mailingLists>
<developers>
<developer>
<id>zhangliang</id>
<name>ZhangLiang</name>
<email>zhangliang@apache.org</email>
<timezone>8</timezone>
</developer>
<developer>
<id>caohao</id>
<name>CaoHao</name>
<email>caohaoch@gmail.com</email>
<timezone>8</timezone>
</developer>
</developers>
</project>
......@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-orchestration</artifactId>
<version>4.0.0-RC2-SNAPSHOT</version>
<version>4.0.0-RC1-SNAPSHOT</version>
</parent>
<artifactId>sharding-orchestration-reg</artifactId>
<name>${project.artifactId}</name>
......@@ -30,6 +30,5 @@
<modules>
<module>sharding-orchestration-reg-api</module>
<module>sharding-orchestration-reg-zookeeper-curator</module>
<module>sharding-orchestration-reg-etcd</module>
</modules>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-orchestration-reg</artifactId>
<version>4.0.0-RC2-SNAPSHOT</version>
</parent>
<artifactId>sharding-orchestration-reg-etcd</artifactId>
<name>${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-orchestration-reg-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.orchestration.reg.etcd;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.protobuf.ByteString;
import etcdserverpb.KVGrpc;
import etcdserverpb.KVGrpc.KVFutureStub;
import etcdserverpb.LeaseGrpc;
import etcdserverpb.LeaseGrpc.LeaseFutureStub;
import etcdserverpb.Rpc.LeaseGrantRequest;
import etcdserverpb.Rpc.PutRequest;
import etcdserverpb.Rpc.RangeRequest;
import etcdserverpb.Rpc.RangeResponse;
import etcdserverpb.Rpc.WatchCreateRequest;
import etcdserverpb.Rpc.WatchRequest;
import etcdserverpb.WatchGrpc;
import etcdserverpb.WatchGrpc.WatchStub;
import io.grpc.Channel;
import mvccpb.Kv.KeyValue;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.orchestration.reg.etcd.internal.channel.EtcdChannelFactory;
import org.apache.shardingsphere.orchestration.reg.etcd.internal.keepalive.KeepAlive;
import org.apache.shardingsphere.orchestration.reg.etcd.internal.retry.EtcdRetryEngine;
import org.apache.shardingsphere.orchestration.reg.etcd.internal.watcher.EtcdWatchStreamObserver;
import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Etcd based registry center.
*
* @author junxiong
*/
public final class EtcdRegistryCenter implements RegistryCenter {
private RegistryCenterConfiguration config;
private EtcdRetryEngine etcdRetryEngine;
private KVFutureStub kvStub;
private LeaseFutureStub leaseStub;
private WatchStub watchStub;
private KeepAlive keepAlive;
@Override
public void init(final RegistryCenterConfiguration config) {
this.config = config;
etcdRetryEngine = new EtcdRetryEngine(config);
Channel channel = EtcdChannelFactory.getInstance(Splitter.on(',').trimResults().splitToList(config.getServerLists()));
kvStub = KVGrpc.newFutureStub(channel);
leaseStub = LeaseGrpc.newFutureStub(channel);
watchStub = WatchGrpc.newStub(channel);
keepAlive = new KeepAlive(channel, config.getTimeToLiveSeconds());
}
@Override
public String get(final String key) {
final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).build();
return etcdRetryEngine.execute(new Callable<String>() {
@Override
public String call() throws InterruptedException, ExecutionException, TimeoutException {
RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
return response.getKvsCount() > 0 ? response.getKvs(0).getValue().toStringUtf8() : null;
}
}).orNull();
}
@Override
public String getDirectly(final String key) {
return get(key);
}
@Override
public boolean isExisted(final String key) {
return null != get(key);
}
@Override
public List<String> getChildrenKeys(final String key) {
String path = key + "/";
final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(path)).setRangeEnd(getRangeEnd(path)).build();
Optional<List<String>> result = etcdRetryEngine.execute(new Callable<List<String>>() {
@Override
public List<String> call() throws InterruptedException, ExecutionException, TimeoutException {
RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
List<String> result = new ArrayList<>();
for (KeyValue each : response.getKvsList()) {
String childFullPath = each.getKey().toStringUtf8();
result.add(childFullPath.substring(childFullPath.lastIndexOf("/") + 1));
}
return result;
}
});
return result.isPresent() ? result.get() : Collections.<String>emptyList();
}
@Override
public void persist(final String key, final String value) {
final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build();
etcdRetryEngine.execute(new Callable<Void>() {
@Override
public Void call() throws InterruptedException, ExecutionException, TimeoutException {
kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
return null;
}
});
}
@Override
public void update(final String key, final String value) {
persist(key, value);
}
@Override
public void persistEphemeral(final String key, final String value) {
final Optional<Long> leaseId = lease();
if (!leaseId.isPresent()) {
throw new RegistryCenterException("Unable to set up heat beat for key %s", key);
}
final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setLease(leaseId.get()).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build();
etcdRetryEngine.execute(new Callable<Void>() {
@Override
public Void call() throws InterruptedException, ExecutionException, TimeoutException {
kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
return null;
}
});
}
private Optional<Long> lease() {
final LeaseGrantRequest request = LeaseGrantRequest.newBuilder().setTTL(config.getTimeToLiveSeconds()).build();
return etcdRetryEngine.execute(new Callable<Long>() {
@Override
public Long call() throws InterruptedException, ExecutionException, TimeoutException {
long leaseId = leaseStub.leaseGrant(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS).getID();
keepAlive.heartbeat(leaseId);
return leaseId;
}
});
}
@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
WatchCreateRequest createWatchRequest = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).setRangeEnd(getRangeEnd(key)).build();
final WatchRequest request = WatchRequest.newBuilder().setCreateRequest(createWatchRequest).build();
etcdRetryEngine.execute(new Callable<Void>() {
@Override
public Void call() {
watchStub.watch(new EtcdWatchStreamObserver(dataChangedEventListener)).onNext(request);
return null;
}
});
}
@Override
public void close() {
keepAlive.close();
}
private ByteString getRangeEnd(final String key) {
byte[] noPrefix = {0};
byte[] endKey = key.getBytes().clone();
for (int i = endKey.length - 1; i >= 0; i--) {
if (endKey[i] < 0xff) {
endKey[i] = (byte) (endKey[i] + 1);
return ByteString.copyFrom(Arrays.copyOf(endKey, i + 1));
}
}
return ByteString.copyFrom(noPrefix);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.orchestration.reg.etcd.internal.channel;
import io.grpc.Channel;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.util.RoundRobinLoadBalancerFactory;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* Etcd channel factory.
*
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EtcdChannelFactory {
private static final String TARGET = "etcd";
private static ConcurrentHashMap<List<String>, Channel> etcdChannels = new ConcurrentHashMap<>();
/**
* Get etcd channel instance.
*
* @param endpoints etcd endpoints
* @return etcd channel
*/
public static Channel getInstance(final List<String> endpoints) {
if (etcdChannels.containsKey(endpoints)) {
return etcdChannels.get(endpoints);
}
Channel channel = NettyChannelBuilder.forTarget(TARGET)
.usePlaintext(true)
.nameResolverFactory(new EtcdNameSolverFactory(TARGET, endpoints))
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.build();
Channel result = etcdChannels.putIfAbsent(endpoints, channel);
return null == result ? channel : result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.orchestration.reg.etcd.internal.channel;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
/**
* Etcd name solver factory.
*
* @author junxiong
*/
@RequiredArgsConstructor
@Slf4j
public final class EtcdNameSolverFactory extends NameResolver.Factory {
private static final Pattern SCHEMAS = Pattern.compile("^(http|https)");
private final String scheme;
private final List<String> endpoints;
private ExecutorService executor;
private boolean shutdown;
@Nullable
@Override
public NameResolver newNameResolver(final URI targetUri, final Attributes params) {
if (!scheme.equals(targetUri.getPath())) {
return null;
}
return new NameResolver() {
@Override
public String getServiceAuthority() {
return scheme;
}
@Override
public void start(final Listener listener) {
if (shutdown) {
return;
}
List<EquivalentAddressGroup> equivalentAddressGroups = Lists.newArrayList();
for (String each : endpoints) {
try {
URI uri = new URI(each.trim());
if (!Strings.isNullOrEmpty(uri.getAuthority()) && SCHEMAS.matcher(uri.getScheme()).matches()) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
equivalentAddressGroups.add(new EquivalentAddressGroup(inetSocketAddress));
}
} catch (final URISyntaxException ex) {
listener.onError(Status.INVALID_ARGUMENT);
log.warn("Ignored illegal endpoint, %s", ex.getMessage());
}
}
listener.onAddresses(equivalentAddressGroups, Attributes.EMPTY);
}
@Override
public void shutdown() {
if (!shutdown) {
shutdown = true;
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
}
}
};
}
@Override
public String getDefaultScheme() {
return scheme;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.orchestration.reg.etcd.internal.keepalive;
import etcdserverpb.LeaseGrpc;
import etcdserverpb.LeaseGrpc.LeaseStub;
import etcdserverpb.Rpc.LeaseKeepAliveRequest;
import etcdserverpb.Rpc.LeaseKeepAliveResponse;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Keep the lease alive.
*
* @author junxiong
*/
@Slf4j
public final class KeepAlive implements AutoCloseable {
private static final long DELAY_MILLISECONDS = 100L;
private final LeaseStub leaseStub;
private final long heartbeatIntervalMilliseconds;
private final ConcurrentMap<Long, KeepAliveTask> keepAliveTasks;
private final ScheduledFuture scheduledFuture;
private final ScheduledExecutorService scheduledService;
public KeepAlive(final Channel channel, final long timeToLiveSeconds) {
leaseStub = LeaseGrpc.newStub(channel);
heartbeatIntervalMilliseconds = timeToLiveSeconds * 1000L / 3L;
keepAliveTasks = new ConcurrentHashMap<>();
scheduledService = Executors.newScheduledThreadPool(1);
scheduledFuture = scheduledService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (KeepAliveTask keepAliveTask : keepAliveTasks.values()) {
keepAliveTask.heartbeat();
}
}
}, DELAY_MILLISECONDS, heartbeatIntervalMilliseconds, TimeUnit.MILLISECONDS);
}
/**
* keep lease alive.
*
* @param leaseId lease ID
*/
public void heartbeat(final long leaseId) {
keepAliveTasks.putIfAbsent(leaseId, new KeepAliveTask(leaseId, leaseStub.leaseKeepAlive(createResponseObserver(leaseId)), System.currentTimeMillis()));
}
private StreamObserver<LeaseKeepAliveResponse> createResponseObserver(final long leaseId) {
return new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(final LeaseKeepAliveResponse response) {
long leaseId = response.getID();
long nextHeartbeatTimestamp = System.currentTimeMillis() + response.getTTL() * 1000L / 3L;
log.debug("Reschedule heartbeat time for lease {} to {}", leaseId, nextHeartbeatTimestamp);
KeepAliveTask keepAliveTask = keepAliveTasks.get(leaseId);
if (null != keepAliveTask) {
keepAliveTask.setNextHeartbeatTimestamp(nextHeartbeatTimestamp);
}
}
@Override
public void onCompleted() {
log.debug("Keep alive finished");
}
@Override
public void onError(final Throwable cause) {
log.warn("Keep alive failed, due to {}, renew it", Status.fromThrowable(cause));
heartbeat(leaseId);
}
};
}
@Override
public void close() {
for (KeepAliveTask keepAliveTask: keepAliveTasks.values()) {
keepAliveTask.close();
}
keepAliveTasks.clear();
scheduledService.shutdown();
scheduledFuture.cancel(false);
}
@AllArgsConstructor
private class KeepAliveTask implements AutoCloseable {
private final long leaseId;
private final StreamObserver<LeaseKeepAliveRequest> observer;
@Setter
private long nextHeartbeatTimestamp;
/**
* keep heartbeat.
*/
public void heartbeat() {
if (nextHeartbeatTimestamp <= System.currentTimeMillis()) {
log.debug("Heartbeat lease {} at time {}", leaseId, nextHeartbeatTimestamp);
observer.onNext(LeaseKeepAliveRequest.newBuilder().setID(leaseId).build());
}
}
@Override
public void close() {
observer.onCompleted();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.orchestration.reg.etcd.internal.retry;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Etcd retry engine.
*
* @author junxiong
*/
@RequiredArgsConstructor
public final class EtcdRetryEngine {
private final RegistryCenterConfiguration config;
/**
* Retry to execute callable command.
*
* @param callable callable command
* @param <T> return type
* @return execute result
*/
public <T> Optional<T> execute(final Callable<T> callable) {
Retryer<T> retryer = RetryerBuilder.<T>newBuilder()
.retryIfExceptionOfType(TimeoutException.class)
.retryIfExceptionOfType(ExecutionException.class)
.retryIfExceptionOfType(InterruptedException.class)
.withWaitStrategy(WaitStrategies.fixedWait(config.getRetryIntervalMilliseconds(), TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(config.getMaxRetries()))
.build();
try {
return Optional.fromNullable(retryer.call(callable));
} catch (final ExecutionException | RetryException ex) {
throw new RegistryCenterException(ex);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.orchestration.reg.etcd.internal.watcher;
import etcdserverpb.Rpc;
import etcdserverpb.Rpc.WatchResponse;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import mvccpb.Kv.Event;
import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent.ChangedType;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;
/**
* Watch stream observer.
*
* @author junxiong
*/
@RequiredArgsConstructor
public final class EtcdWatchStreamObserver implements StreamObserver<WatchResponse> {
private final DataChangedEventListener dataChangedEventListener;
@Override
public void onNext(final Rpc.WatchResponse response) {
if (response.getCanceled() || response.getCreated()) {
return;
}
for (Event event : response.getEventsList()) {
ChangedType changedType = getChangedType(event);
if (ChangedType.IGNORED != changedType) {
dataChangedEventListener.onChange(new DataChangedEvent(event.getKv().getKey().toStringUtf8(), event.getKv().getValue().toStringUtf8(), changedType));
}
}
}
private ChangedType getChangedType(final Event event) {
switch (event.getType()) {
case PUT:
return DataChangedEvent.ChangedType.UPDATED;
case DELETE:
return DataChangedEvent.ChangedType.DELETED;
default:
return DataChangedEvent.ChangedType.IGNORED;
}
}
@Override
public void onError(final Throwable throwable) {
// TODO retry watch later
throw new RegistryCenterException(new Exception(throwable));
}
@Override
public void onCompleted() {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
package authpb;
// User is a single entry in the bucket authUsers
message User {
bytes name = 1;
bytes password = 2;
repeated string roles = 3;
}
// Permission is a single entity
message Permission {
enum Type {
READ = 0;
WRITE = 1;
READWRITE = 2;
}
Type permType = 1;
bytes key = 2;
bytes range_end = 3;
}
// Role is a single entry in the bucket authRoles
message Role {
bytes name = 1;
repeated Permission keyPermission = 2;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
package mvccpb;
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;
// create_revision is the revision of last creation on this key.
int64 create_revision = 2;
// mod_revision is the revision of last modification on this key.
int64 mod_revision = 3;
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
int64 version = 4;
// value is the value held by the key, in bytes.
bytes value = 5;
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
int64 lease = 6;
}
message Event {
enum EventType {
PUT = 0;
DELETE = 1;
}
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
EventType type = 1;
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.shardingsphere.orchestration.reg.etcd.EtcdRegistryCenter
......@@ -187,11 +187,6 @@
<artifactId>sharding-orchestration-reg-zookeeper-curator</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-orchestration-reg-etcd</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-opentracing</artifactId>
......
......@@ -31,6 +31,7 @@
<include>LICENSE</include>
<include>NOTICE</include>
<include>DISCLAIMER</include>
<include>README.md</include>
</includes>
</fileSet>
</fileSets>
......
......@@ -34,7 +34,6 @@
<excludes>
<!-- github ignore -->
<exclude>**/.github/**</exclude>
<exclude>**/*.md</exclude>
<exclude>.travis.yml</exclude>
<!-- maven ignore -->
......@@ -48,6 +47,7 @@
<!-- maven plugin ignore -->
<exclude>release.properties</exclude>
<exclude>**/pom.xml.releaseBackup</exclude>
<exclude>**/cobertura.ser</exclude>
<exclude>*.gpg</exclude>
......
......@@ -50,6 +50,7 @@
<include>LICENSE</include>
<include>NOTICE</include>
<include>DISCLAIMER</include>
<include>README.md</include>
</includes>
</fileSet>
</fileSets>
......
......@@ -16,7 +16,7 @@
#
FROM java:7
MAINTAINER caohao "caohaoch@gmail.com"
MAINTAINER ShardingSphere "dev@shardingsphere.incubator.apache.org"
ADD apache-shardingsphere-incubating-4.0.0-RC1-sharding-proxy.tar.gz /
RUN mv /apache-shardingsphere-incubating-4.0.0-RC1-sharding-proxy /opt/sharding-proxy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册