diff --git a/sharding-orchestration/pom.xml b/sharding-orchestration/pom.xml index 72cb52051055054fe6a201e250dd882f04924bc6..7bed9e04474f3d61af7ab1ae319ab75cccc52b8f 100644 --- a/sharding-orchestration/pom.xml +++ b/sharding-orchestration/pom.xml @@ -32,5 +32,6 @@ sharding-orchestration-reg sharding-orchestration-zookeeper-curator-integration-test sharding-orchestration-config + sharding-orchestration-distributed-lock diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..b995bb89a7f7d0c648b57c3ddacea251bba9a1e8 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + sharding-orchestration + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + sharding-orchestration-distributed-lock + ${project.artifactId} + pom + + + sharding-orchestration-distributed-lock-api + sharding-orchestration-distributed-lock-zookeeper-curator + + diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..af6fc4d5509eafb6934201b46cb16ce686ce528c --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml @@ -0,0 +1,38 @@ + + + + 4.0.0 + + sharding-orchestration-distributed-lock + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + + sharding-orchestration-distributed-lock-api + ${project.artifactId} + + + + org.apache.shardingsphere + sharding-core-api + ${project.version} + + + diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java new file mode 100644 index 0000000000000000000000000000000000000000..6a393478f483204d7eec854b8df9d56d25df7cca --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.api; + +import org.apache.shardingsphere.spi.TypeBasedSPI; + +/** + * Distributed Lock center. + * + * @author wangguangyuan + */ +public interface DistributedLockCenter extends TypeBasedSPI { + + /** + * Initialize distributed lock center. + * + * @param config distributed lock center configuration + */ + void init(DistributedLockConfiguration config); + + /** + * Get data from distributed lock center. + * + *

Maybe use cache if existed.

+ * + * @param key key of data + * @return value of data + */ + String get(String key); + + /** + * Persist data. + * + * @param key key of data + * @param value value of data + */ + void persist(String key, String value); + + /** + * Close. + */ + void close(); + + /** + * Initialize the lock of the key. + * + * @param key key of data + */ + void initLock(String key); + + /** + * Try to get the lock of the key. + * + * @return get the lock or not + */ + boolean tryLock(); + + /** + * Try to release the lock of the key. + * + */ + void tryRelease(); +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..67d0ad401e7f7a0ff00b89c8872325da5552a90a --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.api; + +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.api.config.TypeBasedSPIConfiguration; + +import java.util.Properties; + +/** + * Distributed Lock configuration. + * + * @author wangguangyuan + */ +@Getter +@Setter +public final class DistributedLockConfiguration extends TypeBasedSPIConfiguration { + + /** + * Server list of distributed lock center. + */ + private String serverLists; + + /** + * Namespace of distributed lock center. + */ + private String namespace; + + /** + * Digest of distributed lock center. + */ + private String digest; + + /** + * Operation timeout time in milliseconds. + */ + private int operationTimeoutMilliseconds = 500; + + /** + * Max number of times to retry. + */ + private int maxRetries = 3; + + /** + * Time interval in milliseconds on each retry. + */ + private int retryIntervalMilliseconds = 500; + + /** + * Time to live in seconds of ephemeral keys. + */ + private int timeToLiveSeconds = 60; + + public DistributedLockConfiguration(final String type) { + super(type); + } + + public DistributedLockConfiguration(final String type, final Properties properties) { + super(type, properties); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java new file mode 100644 index 0000000000000000000000000000000000000000..43abf94ee9dac8e200defec01d52b7e1997e2796 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.exception; + +/** + * Distributed Lock exception. + * + * @author wangguangyuan + */ +public final class DistributedLockException extends RuntimeException { + + private static final long serialVersionUID = -6417179023552012152L; + + public DistributedLockException(final String errorMessage, final Object... args) { + super(String.format(errorMessage, args)); + } + + public DistributedLockException(final Exception cause) { + super(cause); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..17402f8b5ef3d9666e3073e724cb9bc19c7bfec6 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml @@ -0,0 +1,53 @@ + + + + 4.0.0 + + sharding-orchestration-distributed-lock + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + sharding-orchestration-distributed-lock-zookeeper-curator + ${project.artifactId} + + + + org.apache.shardingsphere + sharding-orchestration-distributed-lock-api + ${project.version} + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-test + + + diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java new file mode 100644 index 0000000000000000000000000000000000000000..04bf9fbaa1906a05bdf41482a2a73b5e9f1e07c7 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator; + +import com.google.common.base.Charsets; +import com.google.common.base.Strings; +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter; +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockConfiguration; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.OperationTimeoutException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * Distributed lock center for zookeeper with curator. + * + * @author wangguangyuan + */ +public final class CuratorZookeeperDistributedLockCenter implements DistributedLockCenter { + + private final Map caches = new HashMap<>(); + + private CuratorFramework client; + + private InterProcessMutex leafLock; + + @Getter + @Setter + private Properties properties = new Properties(); + + @Override + public void init(final DistributedLockConfiguration config) { + client = buildCuratorClient(config); + initCuratorClient(config); + } + + private CuratorFramework buildCuratorClient(final DistributedLockConfiguration config) { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(config.getServerLists()) + .retryPolicy(new ExponentialBackoffRetry(config.getRetryIntervalMilliseconds(), config.getMaxRetries(), config.getRetryIntervalMilliseconds() * config.getMaxRetries())) + .namespace(config.getNamespace()); + if (0 != config.getTimeToLiveSeconds()) { + builder.sessionTimeoutMs(config.getTimeToLiveSeconds() * 1000); + } + if (0 != config.getOperationTimeoutMilliseconds()) { + builder.connectionTimeoutMs(config.getOperationTimeoutMilliseconds()); + } + if (!Strings.isNullOrEmpty(config.getDigest())) { + builder.authorization("digest", config.getDigest().getBytes(Charsets.UTF_8)) + .aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + return builder.build(); + } + + private void initCuratorClient(final DistributedLockConfiguration config) { + client.start(); + try { + if (!client.blockUntilConnected(config.getRetryIntervalMilliseconds() * config.getMaxRetries(), TimeUnit.MILLISECONDS)) { + client.close(); + throw new OperationTimeoutException(); + } + } catch (final InterruptedException | OperationTimeoutException ex) { + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + @Override + public String get(final String key) { + TreeCache cache = findTreeCache(key); + if (null == cache) { + return getDirectly(key); + } + ChildData resultInCache = cache.getCurrentData(key); + if (null != resultInCache) { + return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); + } + return getDirectly(key); + } + + private TreeCache findTreeCache(final String key) { + for (Entry entry : caches.entrySet()) { + if (key.startsWith(entry.getKey())) { + return entry.getValue(); + } + } + return null; + } + + @Override + public void persist(final String key, final String value) { + try { + if (!isExisted(key)) { + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8)); + } else { + update(key, value); + } + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + private void update(final String key, final String value) { + try { + client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + private String getDirectly(final String key) { + try { + return new String(client.getData().forPath(key), Charsets.UTF_8); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return null; + } + } + + private boolean isExisted(final String key) { + try { + return null != client.checkExists().forPath(key); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return false; + } + } + + @Override + public void close() { + for (Entry each : caches.entrySet()) { + each.getValue().close(); + } + waitForCacheClose(); + CloseableUtils.closeQuietly(client); + } + + /* TODO wait 500ms, close cache before close client, or will throw exception + * Because of asynchronous processing, may cause client to close + * first and cache has not yet closed the end. + * Wait for new version of Curator to fix this. + * BUG address:https://issues.apache.org/jira/browse/CURATOR-157 + */ + private void waitForCacheClose() { + try { + Thread.sleep(500L); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public String getType() { + return "zookeeper"; + } + + @Override + public void initLock(final String key) { + leafLock = new InterProcessMutex(client, key); + } + + @Override + @SneakyThrows + public boolean tryLock() { + return leafLock.acquire(5, TimeUnit.SECONDS); + } + + @Override + @SneakyThrows + public void tryRelease() { + leafLock.release(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..64d173d6388b3177ff24efabe3eb6640c3f06d29 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.orchestration.distributed.lock.exception.DistributedLockException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; + +/** + * Curator zookeeper exception handler. + * + * @author wangguangyuan + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public final class CuratorZookeeperExceptionHandler { + + /** + * Handle exception. + * + *

Ignore interrupt and connection invalid exception.

+ * + * @param cause to be handled exception + */ + public static void handleException(final Exception cause) { + if (null == cause) { + return; + } + if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) { + log.debug("Ignored exception for: {}", cause.getMessage()); + } else if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } else { + throw new DistributedLockException(cause); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException; + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter new file mode 100644 index 0000000000000000000000000000000000000000..2ac4e65473e88fc2d650c8b45f78558166172a08 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.CuratorZookeeperDistributedLockCenter diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java new file mode 100644 index 0000000000000000000000000000000000000000..749ed4caacfbdc213566aa48cacdbf03d12c1692 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.test; + +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter; +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockConfiguration; +import org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.CuratorZookeeperDistributedLockCenter; +import org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.util.EmbedTestingServer; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public final class CuratorZookeeperDistributedLockCenterTest { + + private static DistributedLockCenter curatorZookeeperRegistryCenter = new CuratorZookeeperDistributedLockCenter(); + + @BeforeClass + public static void init() { + EmbedTestingServer.start(); + DistributedLockConfiguration configuration = new DistributedLockConfiguration(curatorZookeeperRegistryCenter.getType(), new Properties()); + configuration.setServerLists("127.0.0.1:3181"); + curatorZookeeperRegistryCenter.init(configuration); + } + + @Test + public void assertPersist() { + curatorZookeeperRegistryCenter.persist("/test", "value1"); + assertThat(curatorZookeeperRegistryCenter.get("/test"), is("value1")); + } + + @Test + public void assertUpdate() { + curatorZookeeperRegistryCenter.persist("/test", "value2"); + assertThat(curatorZookeeperRegistryCenter.get("/test"), is("value2")); + } + + @Test + public void assertPersistEphemeral() { + curatorZookeeperRegistryCenter.persist("/test/ephemeral", "value3"); + assertThat(curatorZookeeperRegistryCenter.get("/test/ephemeral"), is("value3")); + } + + @Test + public void assertLock() { + curatorZookeeperRegistryCenter.initLock("/test/lock1"); + assertThat(curatorZookeeperRegistryCenter.tryLock(), is(true)); + } + + @Test + public void assertRelease() { + curatorZookeeperRegistryCenter.initLock("/test/lock2"); + curatorZookeeperRegistryCenter.tryLock(); + curatorZookeeperRegistryCenter.tryRelease(); + } + + @Test(expected = IllegalMonitorStateException.class) + public void assertReleaseWithoutLock() { + curatorZookeeperRegistryCenter.initLock("/test/lock3"); + curatorZookeeperRegistryCenter.tryRelease(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java new file mode 100644 index 0000000000000000000000000000000000000000..50cb185988e2657a5c8d87b15416338a7ff79b56 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.util; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.KeeperException; + +import java.io.File; +import java.io.IOException; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class EmbedTestingServer { + + private static final int PORT = 3181; + + private static volatile TestingServer testingServer; + + /** + * Start embed zookeeper server. + */ + public static void start() { + if (null != testingServer) { + return; + } + try { + testingServer = new TestingServer(PORT, new File(String.format("target/test_zk_data/%s/", System.nanoTime()))); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + if (!isIgnoredException(ex)) { + throw new RuntimeException(ex); + } + } finally { + Runtime.getRuntime().addShutdownHook(new Thread() { + + @Override + public void run() { + try { + testingServer.close(); + } catch (final IOException ignored) { + } + } + }); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof KeeperException.ConnectionLossException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException; + } +}