提交 091d3aca 编写于 作者: H haocao

Add orchestration master slave datasource disabled support 7th.

上级 d69ea4bf
......@@ -6,6 +6,7 @@ import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.listener.ListenerManager;
import io.shardingjdbc.orchestration.internal.state.datasource.DataSourceService;
import io.shardingjdbc.orchestration.internal.state.instance.InstanceStateService;
......@@ -19,6 +20,7 @@ import java.util.Properties;
* Orchestration service facade.
*
* @author zhangliang
* @author caohao
*/
public final class OrchestrationFacade {
......@@ -28,10 +30,13 @@ public final class OrchestrationFacade {
private final InstanceStateService instanceStateService;
private final DataSourceService dataSourceService;
public OrchestrationFacade(final OrchestrationConfiguration config) {
this.config = config;
configurationService = new ConfigurationService(config);
instanceStateService = new InstanceStateService(config);
dataSourceService = new DataSourceService(config);
}
/**
......@@ -48,8 +53,9 @@ public final class OrchestrationFacade {
if (shardingRuleConfig.getMasterSlaveRuleConfigs().isEmpty()) {
reviseShardingRuleConfigurationForMasterSlave(dataSourceMap, shardingRuleConfig);
}
configurationService.persistShardingConfiguration(getActualDataSourceMapForMasterSlave(dataSourceMap), shardingRuleConfig, props, shardingDataSource);
instanceStateService.persistShardingInstanceOnline(shardingDataSource);
configurationService.persistShardingConfiguration(getActualDataSourceMapForMasterSlave(dataSourceMap), shardingRuleConfig, props);
instanceStateService.persistShardingInstanceOnline();
new ListenerManager(config).initShardingListeners(shardingDataSource);
}
private void reviseShardingRuleConfigurationForMasterSlave(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig) {
......@@ -93,9 +99,10 @@ public final class OrchestrationFacade {
public void initMasterSlaveOrchestration(
final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final MasterSlaveDataSource masterSlaveDataSource) {
config.getRegistryCenter().init();
configurationService.persistMasterSlaveConfiguration(dataSourceMap, masterSlaveRuleConfig, masterSlaveDataSource);
instanceStateService.persistMasterSlaveInstanceOnline(masterSlaveDataSource);
new DataSourceService(config).initDataSourcesNode(masterSlaveDataSource);
masterSlaveDataSource.renew(configurationService.getAvailableMasterSlaveRule());
configurationService.persistMasterSlaveConfiguration(dataSourceMap, masterSlaveRuleConfig);
instanceStateService.persistMasterSlaveInstanceOnline();
dataSourceService.persistDataSourcesNode();
new ListenerManager(config).initMasterSlaveListeners(masterSlaveDataSource);
masterSlaveDataSource.renew(dataSourceService.getAvailableMasterSlaveRule());
}
}
......@@ -20,24 +20,13 @@ package io.shardingjdbc.orchestration.internal.config;
import com.google.common.base.Strings;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.core.rule.MasterSlaveRule;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.json.DataSourceJsonConverter;
import io.shardingjdbc.orchestration.internal.json.GsonFactory;
import io.shardingjdbc.orchestration.internal.json.ShardingRuleConfigurationConverter;
import io.shardingjdbc.orchestration.internal.state.StateNode;
import io.shardingjdbc.orchestration.internal.state.StateNodeStatus;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -52,14 +41,11 @@ public final class ConfigurationService {
private final CoordinatorRegistryCenter regCenter;
private final String name;
private final boolean isOverwrite;
public ConfigurationService(final OrchestrationConfiguration config) {
configNode = new ConfigurationNode(config.getName());
regCenter = config.getRegistryCenter();
name = config.getName();
isOverwrite = config.isOverwrite();
}
......@@ -69,14 +55,12 @@ public final class ConfigurationService {
* @param dataSourceMap data source map
* @param shardingRuleConfig sharding rule configuration
* @param props sharding properties
* @param shardingDataSource sharding datasource
*/
public void persistShardingConfiguration(
final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props, final ShardingDataSource shardingDataSource) {
final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) {
persistDataSourceConfiguration(dataSourceMap);
persistShardingRuleConfiguration(shardingRuleConfig);
persistShardingProperties(props);
addShardingConfigurationChangeListener(shardingDataSource);
}
private void persistDataSourceConfiguration(final Map<String, DataSource> dataSourceMap) {
......@@ -97,41 +81,16 @@ public final class ConfigurationService {
}
}
private void addShardingConfigurationChangeListener(final ShardingDataSource shardingDataSource) {
addShardingConfigurationNodeChangeListener(ConfigurationNode.DATA_SOURCE_NODE_PATH, shardingDataSource);
addShardingConfigurationNodeChangeListener(ConfigurationNode.SHARDING_NODE_PATH, shardingDataSource);
addShardingConfigurationNodeChangeListener(ConfigurationNode.PROPS_NODE_PATH, shardingDataSource);
}
private void addShardingConfigurationNodeChangeListener(final String node, final ShardingDataSource shardingDataSource) {
String cachePath = configNode.getFullPath(node);
regCenter.addCacheData(cachePath);
TreeCache cache = (TreeCache) regCenter.getRawCache(cachePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || childData.getPath().isEmpty() || null == childData.getData() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
shardingDataSource.renew(loadShardingRuleConfiguration().build(loadDataSourceMap()), loadShardingProperties());
}
});
}
/**
* Persist master-slave configuration.
*
* @param dataSourceMap data source map
* @param masterSlaveRuleConfig master-slave rule configuration
* @param masterSlaveDataSource master-slave datasource
*/
public void persistMasterSlaveConfiguration(
final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final MasterSlaveDataSource masterSlaveDataSource) {
final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig) {
persistDataSourceConfiguration(dataSourceMap);
persistMasterSlaveRuleConfiguration(masterSlaveRuleConfig);
addMasterSlaveConfigurationChangeListener(masterSlaveDataSource);
}
private void persistMasterSlaveRuleConfiguration(final MasterSlaveRuleConfiguration masterSlaveRuleConfig) {
......@@ -140,28 +99,6 @@ public final class ConfigurationService {
}
}
private void addMasterSlaveConfigurationChangeListener(final MasterSlaveDataSource masterSlaveDataSource) {
addMasterSlaveConfigurationChangeListener(ConfigurationNode.DATA_SOURCE_NODE_PATH, masterSlaveDataSource);
addMasterSlaveConfigurationChangeListener(ConfigurationNode.MASTER_SLAVE_NODE_PATH, masterSlaveDataSource);
}
private void addMasterSlaveConfigurationChangeListener(final String node, final MasterSlaveDataSource masterSlaveDataSource) {
String cachePath = configNode.getFullPath(node);
regCenter.addCacheData(cachePath);
TreeCache cache = (TreeCache) regCenter.getRawCache(cachePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || childData.getPath().isEmpty() || null == childData.getData() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
masterSlaveDataSource.renew(getAvailableMasterSlaveRule());
}
});
}
/**
* Load data source configuration.
*
......@@ -198,25 +135,4 @@ public final class ConfigurationService {
public MasterSlaveRuleConfiguration loadMasterSlaveRuleConfiguration() {
return GsonFactory.getGson().fromJson(regCenter.get(configNode.getFullPath(ConfigurationNode.MASTER_SLAVE_NODE_PATH)), MasterSlaveRuleConfiguration.class);
}
/**
* Get available master-slave rule.
*
* @return available master-slave rule
*/
public MasterSlaveRule getAvailableMasterSlaveRule() {
Map<String, DataSource> dataSourceMap = loadDataSourceMap();
String dataSourcesNodePath = new StateNode(name).getDataSourcesNodeFullPath();
List<String> dataSources = regCenter.getChildrenKeys(dataSourcesNodePath);
MasterSlaveRuleConfiguration ruleConfig = loadMasterSlaveRuleConfiguration();
for (String each : dataSources) {
String dataSourceName = each.substring(each.lastIndexOf("/") + 1);
String path = dataSourcesNodePath + "/" + each;
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(path)) && dataSourceMap.containsKey(dataSourceName)) {
dataSourceMap.remove(dataSourceName);
ruleConfig.getSlaveDataSourceNames().remove(dataSourceName);
}
}
return ruleConfig.build(dataSourceMap);
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.orchestration.internal.listener;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationNode;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.state.datasource.DataSourceService;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
/**
* Configuration listener manager.
*
* @author caohao
*/
public class ConfigurationListenerManager {
private CoordinatorRegistryCenter regCenter;
private ConfigurationService configurationService;
private DataSourceService dataSourceService;
private final ConfigurationNode configNode;
public ConfigurationListenerManager(final OrchestrationConfiguration config) {
this.regCenter = config.getRegistryCenter();
configNode = new ConfigurationNode(config.getName());
configurationService = new ConfigurationService(config);
dataSourceService = new DataSourceService(config);
}
/**
* Add sharding configuration node change listener.
*
* @param shardingDataSource sharding datasource
*/
public void addShardingConfigurationChangeListener(final ShardingDataSource shardingDataSource) {
addShardingConfigurationNodeChangeListener(ConfigurationNode.DATA_SOURCE_NODE_PATH, shardingDataSource);
addShardingConfigurationNodeChangeListener(ConfigurationNode.SHARDING_NODE_PATH, shardingDataSource);
addShardingConfigurationNodeChangeListener(ConfigurationNode.PROPS_NODE_PATH, shardingDataSource);
}
private void addShardingConfigurationNodeChangeListener(final String node, final ShardingDataSource shardingDataSource) {
String cachePath = configNode.getFullPath(node);
regCenter.addCacheData(cachePath);
TreeCache cache = (TreeCache) regCenter.getRawCache(cachePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || childData.getPath().isEmpty() || null == childData.getData() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
shardingDataSource.renew(configurationService.loadShardingRuleConfiguration().build(configurationService.loadDataSourceMap()), configurationService.loadShardingProperties());
}
});
}
/**
* Add master-slave configuration node change listener.
*
* @param masterSlaveDataSource master-slave datasource
*/
public void addMasterSlaveConfigurationChangeListener(final MasterSlaveDataSource masterSlaveDataSource) {
addMasterSlaveConfigurationChangeListener(ConfigurationNode.DATA_SOURCE_NODE_PATH, masterSlaveDataSource);
addMasterSlaveConfigurationChangeListener(ConfigurationNode.MASTER_SLAVE_NODE_PATH, masterSlaveDataSource);
}
private void addMasterSlaveConfigurationChangeListener(final String node, final MasterSlaveDataSource masterSlaveDataSource) {
String cachePath = configNode.getFullPath(node);
regCenter.addCacheData(cachePath);
TreeCache cache = (TreeCache) regCenter.getRawCache(cachePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || childData.getPath().isEmpty() || null == childData.getData() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
masterSlaveDataSource.renew(dataSourceService.getAvailableMasterSlaveRule());
}
});
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.orchestration.internal.listener;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.state.datasource.DataSourceService;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
/**
* Data source listener manager.
*
* @author caohao
*/
public class DataSourceListenerManager {
private final DataSourceService dataSourceService;
private final CoordinatorRegistryCenter registryCenter;
public DataSourceListenerManager(final OrchestrationConfiguration config) {
dataSourceService = new DataSourceService(config);
registryCenter = config.getRegistryCenter();
}
/**
* Add data source node change listener.
*
* @param masterSlaveDataSource master-slave datasource
*/
public void addDataSourcesNodeListener(final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) registryCenter.getRawCache(dataSourceService.getDataSourceNodePath());
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty()) {
return;
}
if (TreeCacheEvent.Type.NODE_UPDATED == event.getType() || TreeCacheEvent.Type.NODE_REMOVED == event.getType()) {
masterSlaveDataSource.renew(dataSourceService.getAvailableMasterSlaveRule());
}
}
});
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.orchestration.internal.listener;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.state.StateNodeStatus;
import io.shardingjdbc.orchestration.internal.state.instance.InstanceStateService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.util.Map;
/**
* Instance listener manager.
*
* @author caohao
*/
public class InstanceListenerManager {
private final OrchestrationConfiguration config;
private final ConfigurationService configurationService;
private final String instanceNodePath;
public InstanceListenerManager(final OrchestrationConfiguration config) {
this.config = config;
configurationService = new ConfigurationService(config);
instanceNodePath = new InstanceStateService(config).getInstanceNodePath();
}
/**
* Add sharding instances state change listener.
*
* @param shardingDataSource sharding datasource
*/
public void addShardingInstancesStateChangeListener(final ShardingDataSource shardingDataSource) {
TreeCache cache = (TreeCache) config.getRegistryCenter().getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(config.getRegistryCenter().get(childData.getPath()))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
shardingDataSource.renew(configurationService.loadShardingRuleConfiguration().build(dataSourceMap), configurationService.loadShardingProperties());
}
});
}
/**
* Add master slave instances state change listener.
*
* @param masterSlaveDataSource master slave datasource
*/
public void addMasterSlaveInstancesStateChangeListener(final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) config.getRegistryCenter().getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(config.getRegistryCenter().get(childData.getPath()))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
masterSlaveDataSource.renew(configurationService.loadMasterSlaveRuleConfiguration().build(dataSourceMap));
}
});
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.orchestration.internal.listener;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
/**
* Registry center's listener manager.
*
* @author caohao
*/
public class ListenerManager {
private final ConfigurationListenerManager configurationListenerManager;
private final InstanceListenerManager instanceListenerManager;
private final DataSourceListenerManager dataSourceListenerManager;
public ListenerManager(final OrchestrationConfiguration config) {
configurationListenerManager = new ConfigurationListenerManager(config);
instanceListenerManager = new InstanceListenerManager(config);
dataSourceListenerManager = new DataSourceListenerManager(config);
}
public void initShardingListeners(final ShardingDataSource shardingDataSource) {
configurationListenerManager.addShardingConfigurationChangeListener(shardingDataSource);
instanceListenerManager.addShardingInstancesStateChangeListener(shardingDataSource);
}
public void initMasterSlaveListeners(final MasterSlaveDataSource masterSlaveDataSource) {
configurationListenerManager.addMasterSlaveConfigurationChangeListener(masterSlaveDataSource);
instanceListenerManager.addMasterSlaveInstancesStateChangeListener(masterSlaveDataSource);
dataSourceListenerManager.addDataSourcesNodeListener(masterSlaveDataSource);
}
}
......@@ -17,17 +17,18 @@
package io.shardingjdbc.orchestration.internal.state.datasource;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.rule.MasterSlaveRule;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.state.StateNode;
import io.shardingjdbc.orchestration.internal.state.StateNodeStatus;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
/**
* Data source service.
......@@ -43,37 +44,41 @@ public final class DataSourceService {
private final ConfigurationService configurationService;
private final String name;
public DataSourceService(final OrchestrationConfiguration config) {
dataSourceNodePath = new StateNode(config.getName()).getDataSourcesNodeFullPath();
regCenter = config.getRegistryCenter();
configurationService = new ConfigurationService(config);
name = config.getName();
}
/**
* Persist master-salve data sources node and add listener.
* Persist master-salve data sources node.
*
* @param masterSlaveDataSource master-slave data source
*/
public void initDataSourcesNode(final MasterSlaveDataSource masterSlaveDataSource) {
public void persistDataSourcesNode() {
regCenter.persist(dataSourceNodePath, "");
regCenter.addCacheData(dataSourceNodePath);
addDataSourcesNodeListener(masterSlaveDataSource);
}
private void addDataSourcesNodeListener(final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) regCenter.getRawCache(dataSourceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty()) {
return;
}
if (TreeCacheEvent.Type.NODE_UPDATED == event.getType() || TreeCacheEvent.Type.NODE_REMOVED == event.getType()) {
masterSlaveDataSource.renew(configurationService.getAvailableMasterSlaveRule());
}
public String getDataSourceNodePath() {
return dataSourceNodePath;
}
public MasterSlaveRule getAvailableMasterSlaveRule() {
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
String dataSourcesNodePath = new StateNode(name).getDataSourcesNodeFullPath();
List<String> dataSources = regCenter.getChildrenKeys(dataSourcesNodePath);
MasterSlaveRuleConfiguration ruleConfig = configurationService.loadMasterSlaveRuleConfiguration();
for (String each : dataSources) {
String dataSourceName = each.substring(each.lastIndexOf("/") + 1);
String path = dataSourcesNodePath + "/" + each;
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(path)) && dataSourceMap.containsKey(dataSourceName)) {
dataSourceMap.remove(dataSourceName);
ruleConfig.getSlaveDataSourceNames().remove(dataSourceName);
}
});
}
return ruleConfig.build(dataSourceMap);
}
}
......@@ -17,25 +17,13 @@
package io.shardingjdbc.orchestration.internal.state.instance;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.state.StateNode;
import io.shardingjdbc.orchestration.internal.state.StateNodeStatus;
import io.shardingjdbc.orchestration.internal.util.IpUtils;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.lang.management.ManagementFactory;
import java.util.Map;
/**
* Instance state service.
......@@ -55,76 +43,31 @@ public final class InstanceStateService {
private final CoordinatorRegistryCenter regCenter;
private final ConfigurationService configurationService;
public InstanceStateService(final OrchestrationConfiguration config) {
stateNode = new StateNode(config.getName());
instanceNodePath = stateNode.getInstancesNodeFullPath(IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split(PID_FLAG)[0]);
regCenter = config.getRegistryCenter();
configurationService = new ConfigurationService(config);
}
public String getInstanceNodePath() {
return instanceNodePath;
}
/**
* Persist sharding instance online.
*
* @param shardingDataSource sharding datasource
*/
public void persistShardingInstanceOnline(final ShardingDataSource shardingDataSource) {
public void persistShardingInstanceOnline() {
regCenter.persistEphemeral(instanceNodePath, "");
regCenter.addCacheData(instanceNodePath);
addShardingInstancesStateChangeListener(instanceNodePath, shardingDataSource);
}
private void addShardingInstancesStateChangeListener(final String instanceNodePath, final ShardingDataSource shardingDataSource) {
TreeCache cache = (TreeCache) regCenter.getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(childData.getPath()))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
shardingDataSource.renew(configurationService.loadShardingRuleConfiguration().build(dataSourceMap), configurationService.loadShardingProperties());
}
});
}
/**
* Persist master-salve instance online.
*
* @param masterSlaveDataSource master-slave datasource
*/
public void persistMasterSlaveInstanceOnline(final MasterSlaveDataSource masterSlaveDataSource) {
public void persistMasterSlaveInstanceOnline() {
regCenter.persistEphemeral(instanceNodePath, "");
regCenter.addCacheData(instanceNodePath);
addMasterSlaveInstancesStateChangeListener(instanceNodePath, masterSlaveDataSource);
}
private void addMasterSlaveInstancesStateChangeListener(final String instanceNodePath, final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) regCenter.getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData() || childData.getPath().isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
Map<String, DataSource> dataSourceMap = configurationService.loadDataSourceMap();
if (StateNodeStatus.DISABLED.toString().equalsIgnoreCase(regCenter.get(childData.getPath()))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
masterSlaveDataSource.renew(configurationService.loadMasterSlaveRuleConfiguration().build(dataSourceMap));
}
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册