提交 a727225e 编写于 作者: J junxiong

#408 refactor change listener

上级 093afed0
......@@ -17,6 +17,7 @@
package io.shardingjdbc.orchestration.internal.config;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
......@@ -30,6 +31,9 @@ import io.shardingjdbc.orchestration.internal.json.ShardingRuleConfigurationConv
import io.shardingjdbc.orchestration.internal.state.StateNodeStatus;
import io.shardingjdbc.orchestration.internal.state.datasource.DataSourceStateNode;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeEvent;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeListener;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeType;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
......@@ -106,16 +110,12 @@ public final class ConfigurationService {
private void addShardingConfigurationNodeChangeListener(final String node, final ShardingDataSource shardingDataSource) {
String cachePath = configNode.getFullPath(node);
regCenter.addCacheData(cachePath);
TreeCache cache = (TreeCache) regCenter.getRawCache(cachePath);
cache.getListenable().addListener(new TreeCacheListener() {
regCenter.addRegistryChangeListener(cachePath, new RegistryChangeListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || childData.getPath().isEmpty() || null == childData.getData() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
public void onRegistryChange(RegistryChangeEvent registryChangeEvent) throws Exception {
if (RegistryChangeType.UPDATED == registryChangeEvent.getType() && registryChangeEvent.getPayload().isPresent()) {
shardingDataSource.renew(loadShardingRuleConfiguration().build(loadDataSourceMap()), loadShardingProperties());
}
shardingDataSource.renew(loadShardingRuleConfiguration().build(loadDataSourceMap()), loadShardingProperties());
}
});
}
......@@ -148,16 +148,12 @@ public final class ConfigurationService {
private void addMasterSlaveConfigurationChangeListener(final String node, final MasterSlaveDataSource masterSlaveDataSource) {
String cachePath = configNode.getFullPath(node);
regCenter.addCacheData(cachePath);
TreeCache cache = (TreeCache) regCenter.getRawCache(cachePath);
cache.getListenable().addListener(new TreeCacheListener() {
regCenter.addRegistryChangeListener(cachePath, new RegistryChangeListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || childData.getPath().isEmpty() || null == childData.getData() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
public void onRegistryChange(RegistryChangeEvent registryChangeEvent) throws Exception {
if (RegistryChangeType.UPDATED == registryChangeEvent.getType() && registryChangeEvent.getPayload().isPresent()) {
masterSlaveDataSource.renew(getAvailableMasterSlaveRule());
}
masterSlaveDataSource.renew(getAvailableMasterSlaveRule());
}
});
}
......
......@@ -17,11 +17,15 @@
package io.shardingjdbc.orchestration.internal.state.datasource;
import com.google.common.base.Optional;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.rule.MasterSlaveRule;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeEvent;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeListener;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeType;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
......@@ -61,20 +65,15 @@ public final class DataSourceService {
}
private void addDataSourcesNodeListener(final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) regCenter.getRawCache(dataSourceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
regCenter.addRegistryChangeListener(dataSourceNodePath, new RegistryChangeListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty()) {
return;
}
if (TreeCacheEvent.Type.NODE_UPDATED == event.getType() || TreeCacheEvent.Type.NODE_REMOVED == event.getType()) {
public void onRegistryChange(RegistryChangeEvent registryChangeEvent) throws Exception {
Optional<RegistryChangeEvent.Payload> payload = registryChangeEvent.getPayload();
if (payload.isPresent()) {
MasterSlaveRule masterSlaveRule = configurationService.getAvailableMasterSlaveRule();
if (TreeCacheEvent.Type.NODE_UPDATED == event.getType()) {
String path = childData.getPath();
String dataSourceName = path.substring(path.lastIndexOf("/") + 1);
if (RegistryChangeType.UPDATED == registryChangeEvent.getType()) {
String datasourceKey = payload.get().getKey();
String dataSourceName = datasourceKey.substring(datasourceKey.lastIndexOf("/") + 1);
masterSlaveRule.getSlaveDataSourceMap().remove(dataSourceName);
}
masterSlaveDataSource.renew(masterSlaveRule);
......
......@@ -24,6 +24,9 @@ import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.state.StateNodeStatus;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeEvent;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeListener;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeType;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
......@@ -67,22 +70,19 @@ public final class InstanceStateService {
}
private void addShardingInstancesStateChangeListener(final String instanceNodePath, final ShardingDataSource shardingDataSource) {
TreeCache cache = (TreeCache) regCenter.getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
regCenter.addRegistryChangeListener(instanceNodePath, new RegistryChangeListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(childData.getPath()))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
public void onRegistryChange(RegistryChangeEvent registryChangeEvent) throws Exception {
if (RegistryChangeType.UPDATED == registryChangeEvent.getType() && registryChangeEvent.getPayload().isPresent()) {
String instanceStateKey = registryChangeEvent.getPayload().get().getKey();
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(instanceStateKey))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
shardingDataSource.renew(configurationService.loadShardingRuleConfiguration().build(dataSourceMap), configurationService.loadShardingProperties());
}
shardingDataSource.renew(configurationService.loadShardingRuleConfiguration().build(dataSourceMap), configurationService.loadShardingProperties());
}
});
}
......@@ -100,22 +100,19 @@ public final class InstanceStateService {
}
private void addMasterSlaveInstancesStateChangeListener(final String instanceNodePath, final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) regCenter.getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
regCenter.addRegistryChangeListener(instanceNodePath, new RegistryChangeListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(childData.getPath()))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
public void onRegistryChange(RegistryChangeEvent registryChangeEvent) throws Exception {
if (RegistryChangeType.UPDATED == registryChangeEvent.getType() && registryChangeEvent.getPayload().isPresent()) {
String instanceKey = registryChangeEvent.getPayload().get().getKey();
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(instanceKey))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
masterSlaveDataSource.renew(configurationService.loadMasterSlaveRuleConfiguration().build(dataSourceMap));
}
masterSlaveDataSource.renew(configurationService.loadMasterSlaveRuleConfiguration().build(dataSourceMap));
}
});
}
......
package io.shardingjdbc.orchestration.reg.base;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.experimental.Wither;
/**
* Config change event
*
* @author junxiong
*/
@Value
@Wither
@AllArgsConstructor(staticName = "with")
public class ConfigChangeEvent<C> {
C newConfig;
}
package io.shardingjdbc.orchestration.reg.base;
/**
* Sharding-jdbc configuration change listener
*
* @author junxiong
*/
public interface ConfigChangeListener {
/**
* Notify when config change event fired
*
* @param configChangeEvent config change event
*/
void onConfigChange(ConfigChangeEvent configChangeEvent);
}
package io.shardingjdbc.orchestration.reg.base;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
/**
* Configuration Center
*
* @author junxiong
*/
public interface ConfigServer extends AutoCloseable {
/**
* Open sharding-jdbc config server, setup connection to backend storage and event listeners.
*/
void open();
/**
* save or update master slave rule configuration
*
* caution: this may conflict with distribute configuration management center.
*
* MasterSlaveRuleConfiguration is an aggregation of configuration items for CQRS data source strategy.
*
* @param masterSlaveRuleConfiguration master slave rule configuration
*/
void persistMasterSlaveRuleConfiguration(MasterSlaveRuleConfiguration masterSlaveRuleConfiguration);
/**
* load master slave rule configuration
*
* @return master slave rule configuration
*/
MasterSlaveRuleConfiguration loadMasterSlaveRuleConfiguration();
/**
* load sharding rule configuration
*
* @return sharding rule configuration
*/
ShardingRuleConfiguration loadShardingRuleConfiguration();
/**
* save or update sharding rule configuration
*
* @param shardingRuleConfiguration sharding rule configuration
*/
void presistShardingRuleConfiguration(ShardingRuleConfiguration shardingRuleConfiguration);
/**
* Register config change listener
*
* @param configChangeListener configuration change listener
*/
void addConfigChangeListener(ConfigChangeListener configChangeListener);
}
......@@ -17,6 +17,9 @@
package io.shardingjdbc.orchestration.reg.base;
import lombok.AllArgsConstructor;
import lombok.Value;
import java.util.List;
/**
......@@ -56,12 +59,12 @@ public interface CoordinatorRegistryCenter extends RegistryCenter {
* @return sub-nodes name list
*/
List<String> getChildrenKeys(String path);
/**
* Get cache's raw object.
*
* @param cachePath cache path
* @return cache's raw object
* Add registry change listener to a specific path
* @param path key or directory
* @param registryChangeListener the listener object
*/
Object getRawCache(String cachePath);
void addRegistryChangeListener(String path, RegistryChangeListener registryChangeListener);
}
package io.shardingjdbc.orchestration.reg.base;
import com.google.common.base.Optional;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Value;
import lombok.experimental.Wither;
import java.util.List;
/**
* Config item change listener
*
* @author junxiong
*/
@Value
@Wither
@AllArgsConstructor(staticName = "with")
public class ConfigItemChangeEvent<I> {
List<I> newItems;
@AllArgsConstructor
public class RegistryChangeEvent {
private RegistryChangeType type;
private Optional<Payload> payload;
@Value
@Wither
@AllArgsConstructor
public static class Payload {
String key;
String value;
}
}
package io.shardingjdbc.orchestration.reg.base;
/**
* Sharding instance state
* Registry change Listener
*
* @author junxiong
*/
public enum ShardState {
/**
* sharding jdbc is enabled
*/
ENABLED,
/**
* sharding jdbc is disabled
*/
DISABLED
public interface RegistryChangeListener {
void onRegistryChange(RegistryChangeEvent registryChangeEvent) throws Exception;
}
package io.shardingjdbc.orchestration.reg.base;
/**
* Registry change type
* @author junxiong
*
*/
public enum RegistryChangeType {
UPDATED, DELETED
}
package io.shardingjdbc.orchestration.reg.base;
/**
* sharding-jdbc registry server
*
* @author junxiong
*/
public interface RegistryServer extends AutoCloseable {
/**
* Open sharding-jdbc registry server, setup event listeners.
*/
void open();
/**
* register sharding-jdbc instance
*
* @param shardInstance sharding-jdbc instance
*/
void registerShard(ShardInstance shardInstance);
/**
* set sharding-jdbc instance to a new state
*
* @param id instance id
* @param shardState shard state
*/
void setShardState(String id, ShardState shardState);
/**
* retrieve sharding-jdbc instance state
*
* @param id instance id
* @return shard state
*/
ShardState getShardState(String id);
/**
* unregister sharding-jdbc instance
*
* @param id instance id
*/
void unregisterShard(String id);
}
package io.shardingjdbc.orchestration.reg.base;
import io.shardingjdbc.orchestration.internal.util.IpUtils;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.experimental.Wither;
import java.lang.management.ManagementFactory;
/**
* Shard JDBC Instance
*
* @author junxiong
*/
@Value
@Wither
@AllArgsConstructor
public class ShardInstance {
/**
* Delimiter of instance id
*/
private static final String DELIMITER = "@-@";
/**
* sharding-jdbc instance identity
*/
String id;
/**
* sharding jdbc host
*/
String host;
/**
* instance state, either ENABLED or DISABLED
*/
ShardState state;
/**
* local sharding jdbc instance
*
* @return Sharding jdbc instance
*/
public static ShardInstance localInstance() {
final String javaVmName = ManagementFactory.getRuntimeMXBean().getName();
final String host = IpUtils.getIp();
final String id = host + DELIMITER + javaVmName.split("@")[0];
return new ShardInstance(id, host, ShardState.ENABLED);
}
}
package io.shardingjdbc.orchestration.reg.base;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.experimental.Wither;
/**
* State Change Event
*
* @author junxiong
*/
@Value
@Wither
@AllArgsConstructor(staticName = "with")
public class StateChangeEvent {
ShardState newState;
}
package io.shardingjdbc.orchestration.reg.base;
/**
* state change listener
*
* @author junxiong
*/
public interface StateChangeListener {
/**
* Notify when state change event fired
*
* @param stateChangeEvent state change event
*/
void onStateChange(StateChangeEvent stateChangeEvent);
}
package io.shardingjdbc.orchestration.reg.etcd;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import io.shardingjdbc.orchestration.reg.base.ConfigChangeEvent;
import io.shardingjdbc.orchestration.reg.base.ConfigChangeListener;
import io.shardingjdbc.orchestration.reg.base.ConfigServer;
import io.shardingjdbc.orchestration.reg.etcd.internal.*;
import io.shardingjdbc.orchestration.reg.exception.RegException;
import lombok.NonNull;
import lombok.Synchronized;
import lombok.val;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
/**
* @author junxiong
*/
public class EtcdConfigServer implements ConfigServer, WatcherListener {
private static EtcdConfigServer etcdConfigServer;
private String namespace;
private EtcdClient etcdClient;
private Set<ConfigChangeListener> configChangeListeners = Sets.newCopyOnWriteArraySet();
private Gson gson = new Gson();
private AtomicBoolean opened = new AtomicBoolean(false);
private List<Watcher> watchers;
private EtcdConfigServer(@NonNull EtcdConfiguration etcdConfiguration) {
this.namespace = etcdConfiguration.getNamespace();
this.etcdClient = EtcdClientBuilder.newBuilder()
.endpoints(etcdConfiguration.getServerLists())
.build();
}
@Synchronized
public static EtcdConfigServer from(@NonNull EtcdConfiguration etcdConfiguration) {
if (etcdConfigServer == null) {
etcdConfigServer = new EtcdConfigServer(etcdConfiguration);
}
return etcdConfigServer;
}
@Override
public void open() {
if (opened.get()) {
return;
}
val shardingKey = getShardingKey();
val masterSlaveKey = getMasterSlaveKey();
for (String key : Lists.newArrayList(shardingKey, masterSlaveKey)) {
val watcher = etcdClient.watch(key);
if (watcher.isPresent()) {
watcher.get().addWatcherListener(this);
}
}
opened.compareAndSet(false, true);
}
private String getShardingKey() {
return format("/%s/config/sharding", namespace);
}
@Override
public void persistMasterSlaveRuleConfiguration(MasterSlaveRuleConfiguration masterSlaveRuleConfiguration) {
val key = getMasterSlaveKey();
etcdClient.put(key, gson.toJson(masterSlaveRuleConfiguration));
}
private String getMasterSlaveKey() {
return format("/%s/config/masterslave", namespace);
}
@Override
public MasterSlaveRuleConfiguration loadMasterSlaveRuleConfiguration() {
val key = getMasterSlaveKey();
val response = etcdClient.get(key).transform(new Function<String, MasterSlaveRuleConfiguration>() {
@Nullable
@Override
public MasterSlaveRuleConfiguration apply(@Nullable String input) {
return gson.fromJson(input, MasterSlaveRuleConfiguration.class);
}
});
if (response.isPresent()) {
return response.get();
} else {
throw new RegException("key %s does not exist", key);
}
}
@Override
public ShardingRuleConfiguration loadShardingRuleConfiguration() {
val key = getShardingKey();
val response = etcdClient.get(key).transform(new Function<String, ShardingRuleConfiguration>() {
@Nullable
@Override
public ShardingRuleConfiguration apply(@Nullable String input) {
return gson.fromJson(input, ShardingRuleConfiguration.class);
}
});
if (response.isPresent()) {
return response.get();
} else {
throw new RegException("key %s does not exist", key);
}
}
@Override
public void presistShardingRuleConfiguration(ShardingRuleConfiguration shardingRuleConfiguration) {
}
@Override
public void addConfigChangeListener(@NonNull ConfigChangeListener configChangeListener) {
configChangeListeners.add(configChangeListener);
}
@Override
public void close() throws Exception {
}
@Override
public void onWatch(WatchEvent watchEvent) {
for (ConfigChangeListener listener : configChangeListeners) {
if (getShardingKey().equals(watchEvent.getKey())) {
val shardingRuleConfiguration = gson.fromJson(watchEvent.getValue(), ShardingRuleConfiguration.class);
listener.onConfigChange(ConfigChangeEvent.with(shardingRuleConfiguration));
}
if (getMasterSlaveKey().equals(watchEvent.getKey())) {
val masterSlaveRuleConfiguration = gson.fromJson(watchEvent.getValue(), MasterSlaveRuleConfiguration.class);
listener.onConfigChange(ConfigChangeEvent.with(masterSlaveRuleConfiguration));
}
}
}
}
......@@ -26,10 +26,20 @@ public class EtcdConfiguration {
private final String namespace;
/**
* maximal retries.
* time to live of ephemeral keys
*/
private long timeToLive;
/**
* maximal retries when calling a etcd method.
*/
private int maxRetries = 3;
/**
* timeout when calling a etcd method in milliseconds.
*/
private long timeout;
/**
* username of etcd cluster
* <p>
......
package io.shardingjdbc.orchestration.reg.etcd;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeEvent;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeListener;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeType;
import io.shardingjdbc.orchestration.reg.etcd.internal.*;
import io.shardingjdbc.orchestration.reg.exception.RegExceptionHandler;
import lombok.NonNull;
import javax.annotation.Nullable;
import java.util.List;
/**
* ETCD based registry center
*
* @author junxiong
*/
public class EtcdRegistryCenter implements CoordinatorRegistryCenter {
private EtcdConfiguration etcdConfiguration;
private EtcdClient etcdClient;
public EtcdRegistryCenter(EtcdConfiguration etcdConfiguration) {
this.etcdConfiguration = etcdConfiguration;
this.etcdClient = EtcdClientBuilder.newBuilder()
.endpoints(etcdConfiguration.getServerLists())
.build();
}
@Override
public String getDirectly(@NonNull final String key) {
return get(key);
}
/**
* use default time to live
*
* @param key key of data
* @param value value of data
*/
@Override
public void persistEphemeral(@NonNull final String key, @NonNull final String value) {
etcdClient.put(key, value, etcdConfiguration.getTimeToLive());
}
@Override
public void addCacheData(String cachePath) {
// no op for etcd
}
@Override
public List<String> getChildrenKeys(String path) {
List<EtcdClient.KeyValue> children = etcdClient.list(path);
List<String> keys = Lists.newArrayList();
for (EtcdClient.KeyValue keyValue : children) {
keys.add(keyValue.getKey());
}
return keys;
}
@Override
public void addRegistryChangeListener(final String path, final RegistryChangeListener registryChangeListener) {
Optional<Watcher> watcherOptional = etcdClient.watch(path);
WatcherListener listener = new WatcherListener() {
@Override
public void onWatch(WatchEvent watchEvent) {
final Optional<RegistryChangeEvent> registryChangeEventOptional = fromWatchEvent(watchEvent);
registryChangeEventOptional.transform(new Function<RegistryChangeEvent, Object>() {
@Nullable
@Override
public Object apply(@Nullable RegistryChangeEvent input) {
try {
registryChangeListener.onRegistryChange(input);
} catch (Exception e) {
RegExceptionHandler.handleException(e);
}
return input;
}
});
}
};
}
private Optional<RegistryChangeEvent> fromWatchEvent(WatchEvent watchEvent) {
if (WatchEvent.WatchEventType.UPDATE == watchEvent.getWatchEventType()) {
return Optional.of(new RegistryChangeEvent(RegistryChangeType.UPDATED,
Optional.of(new RegistryChangeEvent.Payload(watchEvent.getKey(), watchEvent.getValue()))));
} else if (WatchEvent.WatchEventType.DELETE == watchEvent.getWatchEventType()) {
return Optional.of(new RegistryChangeEvent(RegistryChangeType.DELETED,
Optional.of(new RegistryChangeEvent.Payload(watchEvent.getKey(), watchEvent.getValue()))));
} else {
return Optional.absent();
}
}
@Override
public void init() {
// no op
}
@Override
public void close() {
// no op
}
@Override
public String get(@NonNull final String key) {
Optional<String> value = etcdClient.get(key);
return value.orNull();
}
@Override
public boolean isExisted(@NonNull final String key) {
return get(key) != null;
}
@Override
public void persist(String key, String value) {
etcdClient.put(key, value);
}
@Override
public void update(String key, String value) {
etcdClient.put(key, value);
}
}
package io.shardingjdbc.orchestration.reg.etcd;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import io.shardingjdbc.orchestration.reg.base.ConfigChangeListener;
import io.shardingjdbc.orchestration.reg.base.RegistryServer;
import io.shardingjdbc.orchestration.reg.base.ShardInstance;
import io.shardingjdbc.orchestration.reg.base.ShardState;
import io.shardingjdbc.orchestration.reg.etcd.internal.EtcdClient;
import io.shardingjdbc.orchestration.reg.etcd.internal.EtcdClientBuilder;
import io.shardingjdbc.orchestration.reg.etcd.internal.Watcher;
import lombok.NonNull;
import lombok.Synchronized;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
/**
* registration server implemented with etcd.
*
* @author junxiong
*/
public class EtcdRegistryServer implements RegistryServer {
private static EtcdRegistryServer etcdRegistryServer;
private String namespace;
private long ttl;
private EtcdClient etcdClient;
private Set<ConfigChangeListener> configChangeListeners = Sets.newCopyOnWriteArraySet();
private Gson gson = new Gson();
private AtomicBoolean opened = new AtomicBoolean(false);
private List<Watcher> watchers;
private EtcdRegistryServer(@NonNull EtcdConfiguration etcdConfiguration) {
//this.ttl = etcdConfiguration.ttl();
this.namespace = etcdConfiguration.getNamespace();
this.etcdClient = EtcdClientBuilder.newBuilder()
.endpoints(etcdConfiguration.getServerLists())
.build();
}
@Synchronized
public static EtcdRegistryServer from(@NonNull EtcdConfiguration etcdConfiguration) {
if (etcdRegistryServer == null) {
etcdRegistryServer = new EtcdRegistryServer(etcdConfiguration);
}
return etcdRegistryServer;
}
@Override
public void open() {
}
@Override
public void registerShard(@NonNull ShardInstance shardInstance) {
final String key = getRegistryKey(shardInstance.getId());
etcdClient.put(key, gson.toJson(shardInstance), ttl);
}
@Override
public void setShardState(@NonNull String id, @NonNull final ShardState shardState) {
final String key = getRegistryKey(id);
final Optional<ShardInstance> shardInstanceOptional = etcdClient.get(key).transform(new Function<String, ShardInstance>() {
@Override
public ShardInstance apply(@NonNull String input) {
return gson.fromJson(input, ShardInstance.class);
}
}).transform(new Function<ShardInstance, ShardInstance>() {
@Nullable
@Override
public ShardInstance apply(@Nullable ShardInstance input) {
return input.withState(shardState);
}
});
if (shardInstanceOptional.isPresent()) {
registerShard(shardInstanceOptional.get());
}
}
@Override
public ShardState getShardState(@NonNull String id) {
final String key = getRegistryKey(id);
final Optional<ShardInstance> shardInstanceOptional = etcdClient.get(key).transform(new Function<String, ShardInstance>() {
@Override
public ShardInstance apply(@NonNull String input) {
return gson.fromJson(input, ShardInstance.class);
}
});
if (shardInstanceOptional.isPresent()) {
return shardInstanceOptional.get().getState();
} else {
return null;
}
}
@Override
public void unregisterShard(@NonNull String id) {
final String key = getRegistryKey(id);
etcdClient.delete(key);
}
@Override
public void close() throws Exception {
}
private String getRegistryKey(@NonNull String id) {
return format("/registry/instances/%s", id);
}
}
......@@ -29,7 +29,7 @@ public interface EtcdClient {
* @param directory directory
* @return value
*/
Optional<List<KeyValue>> list(String directory);
List<KeyValue> list(String directory);
/**
* put value to a specific key, if result is not absent, it is an update
......
......@@ -22,6 +22,8 @@ public class EtcdClientBuilder {
private boolean keepAlive = false;
private boolean idle = false;
private long idleTimeout = 1000 * 60L;
private long timeout;
private int maxRetry;
public static EtcdClientBuilder newBuilder() {
return new EtcdClientBuilder();
......
......@@ -54,21 +54,21 @@ public class EtcdClientImpl implements EtcdClient, AutoCloseable {
}
@Override
public Optional<List<EtcdClient.KeyValue>> list(String dir) {
public List<EtcdClient.KeyValue> list(String dir) {
final RangeRequest request = RangeRequest.newBuilder()
.setKey(ByteString.copyFromUtf8(dir))
.setRangeEnd(prefix(dir))
.build();
final RangeResponse response = kvBlockingStub.range(request);
final List<io.shardingjdbc.orchestration.reg.etcd.internal.stub.KeyValue> keyValues = response.getKvsList();
List<EtcdClient.KeyValue> result = Lists.newArrayList();
List<EtcdClient.KeyValue> children = Lists.newArrayList();
for (io.shardingjdbc.orchestration.reg.etcd.internal.stub.KeyValue keyValue : keyValues) {
result.add(EtcdClient.KeyValue.builder()
children.add(EtcdClient.KeyValue.builder()
.key(keyValue.getKey().toStringUtf8())
.value(keyValue.getValue().toStringUtf8())
.build());
}
return Optional.of(result);
return children;
}
@Override
......
......@@ -19,7 +19,11 @@ package io.shardingjdbc.orchestration.reg.zookeeper;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.base.Optional;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeEvent;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeListener;
import io.shardingjdbc.orchestration.reg.base.RegistryChangeType;
import io.shardingjdbc.orchestration.reg.exception.RegExceptionHandler;
import lombok.AccessLevel;
import lombok.Getter;
......@@ -29,6 +33,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
......@@ -40,6 +46,8 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.StringUtils.isEmpty;
/**
* Zookeeper based registry center.
*
......@@ -177,7 +185,48 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
return Collections.emptyList();
}
}
@Override
public void addRegistryChangeListener(final String path, final RegistryChangeListener registryChangeListener) {
TreeCache treeCache = this.getRawCache(path);
if (treeCache == null) {
this.addCacheData(path);
treeCache = this.getRawCache(path);
}
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
Optional<RegistryChangeEvent.Payload> payload;
if (data == null || isEmpty(data.getPath()) || data.getData() == null) {
payload = Optional.absent();
} else {
payload = Optional.of(new RegistryChangeEvent.Payload(data.getPath(), new String(data.getData())));
}
switch (event.getType()) {
case NODE_ADDED:
break;
case INITIALIZED:
break;
case CONNECTION_LOST:
break;
case CONNECTION_SUSPENDED:
break;
case CONNECTION_RECONNECTED:
break;
case NODE_UPDATED:
registryChangeListener.onRegistryChange(new RegistryChangeEvent(RegistryChangeType.UPDATED, payload));
break;
case NODE_REMOVED:
registryChangeListener.onRegistryChange(new RegistryChangeEvent(RegistryChangeType.DELETED, payload));
break;
default:
break;
}
}
});
}
@Override
public boolean isExisted(final String key) {
try {
......@@ -242,9 +291,8 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
}
caches.put(cachePath + "/", cache);
}
@Override
public Object getRawCache(final String cachePath) {
private TreeCache getRawCache(final String cachePath) {
return caches.get(cachePath + "/");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册