/* * Copyright 2016-2018 shardingsphere.io. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

*/ package io.shardingsphere.jdbc.orchestration.reg.zookeeper; import com.google.common.base.Charsets; import com.google.common.base.Strings; import io.shardingsphere.jdbc.orchestration.reg.api.RegistryCenter; import io.shardingsphere.jdbc.orchestration.reg.exception.RegExceptionHandler; import io.shardingsphere.jdbc.orchestration.reg.listener.DataChangedEvent; import io.shardingsphere.jdbc.orchestration.reg.listener.EventListener; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.OperationTimeoutException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; /** * Zookeeper based registry center. * * @author zhangliang */ public final class ZookeeperRegistryCenter implements RegistryCenter { private final CuratorFramework client; private final Map caches = new HashMap<>(); public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) { client = buildCuratorClient(zkConfig); initCuratorClient(zkConfig); } private CuratorFramework buildCuratorClient(final ZookeeperConfiguration zkConfig) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getServerLists()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())) .namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8)) .aclProvider(new ACLProvider() { @Override public List getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } return builder.build(); } private void initCuratorClient(final ZookeeperConfiguration zkConfig) { client.start(); try { if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) { client.close(); throw new OperationTimeoutException(); } } catch (final InterruptedException | OperationTimeoutException ex) { RegExceptionHandler.handleException(ex); } } @Override public String get(final String key) { TreeCache cache = findTreeCache(key); if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); } return getDirectly(key); } private TreeCache findTreeCache(final String key) { for (Entry entry : caches.entrySet()) { if (key.startsWith(entry.getKey())) { return entry.getValue(); } } return null; } @Override public String getDirectly(final String key) { try { return new String(client.getData().forPath(key), Charsets.UTF_8); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); return null; } } @Override public boolean isExisted(final String key) { try { return null != client.checkExists().forPath(key); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); return false; } } @Override public List getChildrenKeys(final String key) { try { List result = client.getChildren().forPath(key); Collections.sort(result, new Comparator() { @Override public int compare(final String o1, final String o2) { return o2.compareTo(o1); } }); return result; // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); return Collections.emptyList(); } } @Override public void persist(final String key, final String value) { try { if (!isExisted(key)) { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8)); } else { update(key, value); } // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } } @Override public void update(final String key, final String value) { try { client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } } @Override public void persistEphemeral(final String key, final String value) { try { if (isExisted(key)) { client.delete().deletingChildrenIfNeeded().forPath(key); } client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8)); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } } @Override public void watch(final String key, final EventListener eventListener) { final String path = key + "/"; if (!caches.containsKey(path)) { addCacheData(key); } TreeCache cache = caches.get(path); cache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (null == data || null == data.getPath()) { return; } eventListener.onChange(new DataChangedEvent(getEventType(event), data.getPath(), null == data.getData() ? null : new String(data.getData(), "UTF-8"))); } private DataChangedEvent.Type getEventType(final TreeCacheEvent event) { switch (event.getType()) { case NODE_UPDATED: return DataChangedEvent.Type.UPDATED; case NODE_REMOVED: return DataChangedEvent.Type.DELETED; default: return DataChangedEvent.Type.IGNORED; } } }); } private void addCacheData(final String cachePath) { TreeCache cache = new TreeCache(client, cachePath); try { cache.start(); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } caches.put(cachePath + "/", cache); } @Override public void close() { for (Entry each : caches.entrySet()) { each.getValue().close(); } waitForCacheClose(); CloseableUtils.closeQuietly(client); } /* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常 * 因为异步处理, 可能会导致client先关闭而cache还未关闭结束. * 等待Curator新版本解决这个bug. * BUG地址:https://issues.apache.org/jira/browse/CURATOR-157 */ private void waitForCacheClose() { try { Thread.sleep(500L); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } } }