提交 d9f89a4d 编写于 作者: kimmking's avatar kimmking

Merge branch 'etcd' into migrate-etcd

<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.opensharding</groupId>
<artifactId>sharding-orchestration-reg-spi-impl</artifactId>
<version>4.0.0-RC3-SNAPSHOT</version>
</parent>
<artifactId>sharding-orchestration-reg-etcd</artifactId>
<name>${project.artifactId}</name>
<properties>
<guava.version>20.0</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-orchestration-reg-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.3.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<testSource>1.8</testSource>
<testTarget>1.8</testTarget>
</configuration>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opensharding.orchestration.reg.etcd;
import com.google.common.base.Charsets;
import com.google.common.base.Splitter;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Observers;
import io.etcd.jetcd.Util;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.watch.WatchEvent;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* ETCD registry center.
*
* @author zhaojun
*/
public final class EtcdRegistryCenter implements RegistryCenter {
private Client client;
private RegistryCenterConfiguration config;
@Getter
@Setter
private Properties properties;
@Override
public void init(final RegistryCenterConfiguration config) {
this.config = config;
client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists()))).build();
}
@Override
@SneakyThrows
public String get(final String key) {
List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(key, Charsets.UTF_8)).get().getKvs();
return keyValues.isEmpty() ? null : keyValues.iterator().next().getValue().toString(Charsets.UTF_8);
}
@Override
public String getDirectly(final String key) {
return get(key);
}
@Override
public boolean isExisted(final String key) {
return null != get(key);
}
@Override
@SneakyThrows
public List<String> getChildrenKeys(final String key) {
String prefix = key + "/";
ByteSequence prefixByteSequence = ByteSequence.from(prefix, Charsets.UTF_8);
GetOption getOption = GetOption.newBuilder().withPrefix(prefixByteSequence).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
List<KeyValue> keyValues = client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
return keyValues.stream().map(e -> getSubNodeKeyName(prefix, e.getKey().toString(Charsets.UTF_8))).distinct().collect(Collectors.toList());
}
private String getSubNodeKeyName(final String prefix, final String fullPath) {
String pathWithoutPrefix = fullPath.substring(prefix.length());
return pathWithoutPrefix.contains("/") ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf("/")) : pathWithoutPrefix;
}
@Override
@SneakyThrows
public void persist(final String key, final String value) {
client.getKVClient().put(ByteSequence.from(key, Charsets.UTF_8), ByteSequence.from(value, Charsets.UTF_8)).get();
}
@Override
@SneakyThrows
public void update(final String key, final String value) {
persist(key, value);
}
@Override
@SneakyThrows
public void persistEphemeral(final String key, final String value) {
long leaseId = client.getLeaseClient().grant(config.getTimeToLiveSeconds()).get().getID();
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> { }));
client.getKVClient().put(ByteSequence.from(key, Charsets.UTF_8), ByteSequence.from(value, Charsets.UTF_8), PutOption.newBuilder().withLeaseId(leaseId).build()).get();
}
@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
Watch.Listener listener = Watch.listener(response -> {
for (WatchEvent each : response.getEvents()) {
DataChangedEvent.ChangedType changedType = getEventChangedType(each);
if (DataChangedEvent.ChangedType.IGNORED != changedType) {
dataChangedEventListener.onChange(new DataChangedEvent(each.getKeyValue().getKey().toString(Charsets.UTF_8), each.getKeyValue().getValue().toString(Charsets.UTF_8), changedType));
}
}
});
client.getWatchClient().watch(ByteSequence.from(key, Charsets.UTF_8), listener);
}
private DataChangedEvent.ChangedType getEventChangedType(final WatchEvent event) {
switch (event.getEventType()) {
case PUT:
return DataChangedEvent.ChangedType.UPDATED;
case DELETE:
return DataChangedEvent.ChangedType.DELETED;
default:
return DataChangedEvent.ChangedType.IGNORED;
}
}
@Override
public void close() {
client.close();
}
@Override
public void initLock(String key) {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public void tryRelease() {
}
@Override
public String getType() {
return "etcd";
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
io.opensharding.orchestration.reg.etcd.EtcdRegistryCenter
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opensharding.orchestration.reg.etcd;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class EtcdRegistryCenterTest {
@Mock
private Client client;
@Mock
private KV kv;
@Mock
private Watch watch;
@Mock
private Lease lease;
@Mock
private CompletableFuture getFuture;
@Mock
private CompletableFuture leaseFuture;
@Mock
private LeaseGrantResponse leaseGrantResponse;
@Mock
private GetResponse getResponse;
@Mock
private PutResponse putResponse;
@Mock
private CompletableFuture putFuture;
private EtcdRegistryCenter etcdRegistryCenter = new EtcdRegistryCenter();
@Before
public void setUp() {
setClient();
setConfiguration();
}
@SneakyThrows
private void setClient() {
mockClient();
Field field = etcdRegistryCenter.getClass().getDeclaredField("client");
field.setAccessible(true);
field.set(etcdRegistryCenter, client);
}
@SneakyThrows
@SuppressWarnings("unchecked")
private Client mockClient() {
when(client.getKVClient()).thenReturn(kv);
when(kv.get(any(ByteSequence.class))).thenReturn(getFuture);
when(kv.get(any(ByteSequence.class), any(GetOption.class))).thenReturn(getFuture);
when(kv.put(any(ByteSequence.class), any(ByteSequence.class), any(PutOption.class))).thenReturn(putFuture);
when(getFuture.get()).thenReturn(getResponse);
when(client.getLeaseClient()).thenReturn(lease);
when(lease.grant(anyLong())).thenReturn(leaseFuture);
when(leaseFuture.get()).thenReturn(leaseGrantResponse);
when(leaseGrantResponse.getID()).thenReturn(123L);
when(client.getWatchClient()).thenReturn(watch);
return client;
}
@SneakyThrows
private void setConfiguration() {
Field field = etcdRegistryCenter.getClass().getDeclaredField("config");
field.setAccessible(true);
field.set(etcdRegistryCenter, new RegistryCenterConfiguration("etcd"));
}
@Test
public void assertGetKey() {
etcdRegistryCenter.get("key");
verify(kv).get(ByteSequence.from("key", Charsets.UTF_8));
verify(getResponse).getKvs();
}
@Test
public void assertGetChildrenKeys() {
io.etcd.jetcd.api.KeyValue keyValue1 = io.etcd.jetcd.api.KeyValue.newBuilder()
.setKey(ByteString.copyFromUtf8("/key/key1/key1-1"))
.setValue(ByteString.copyFromUtf8("value1")).build();
io.etcd.jetcd.api.KeyValue keyValue2 = io.etcd.jetcd.api.KeyValue.newBuilder()
.setKey(ByteString.copyFromUtf8("/key/key2"))
.setValue(ByteString.copyFromUtf8("value3")).build();
List<KeyValue> keyValues = Lists.newArrayList(new KeyValue(keyValue1), new KeyValue(keyValue2), new KeyValue(keyValue1));
when(getResponse.getKvs()).thenReturn(keyValues);
List<String> actual = etcdRegistryCenter.getChildrenKeys("/key");
assertThat(actual.size(), is(2));
Iterator<String> iterator = actual.iterator();
assertThat(iterator.next(), is("key1"));
assertThat(iterator.next(), is("key2"));
}
@Test
@SuppressWarnings("unchecked")
public void assertPersistEphemeral() {
etcdRegistryCenter.persistEphemeral("key1", "value1");
verify(lease).grant(anyLong());
verify(lease).keepAlive(anyLong(), any(StreamObserver.class));
verify(kv).put(any(ByteSequence.class), any(ByteSequence.class), any(PutOption.class));
}
@Test
public void assertWatch() {
etcdRegistryCenter.watch("key1", dataChangedEvent -> {
});
verify(watch).watch(any(ByteSequence.class), any(Watch.Listener.class));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册