提交 6628da59 编写于 作者: H haocao

Refactor orchestration instance state node.

上级 3b99fc77
......@@ -17,28 +17,16 @@
package io.shardingjdbc.orchestration.api;
import com.google.common.base.Charsets;
import io.shardingjdbc.core.api.MasterSlaveDataSourceFactory;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationMasterSlaveConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.state.InstanceStateNode;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.json.DataSourceJsonConverter;
import io.shardingjdbc.orchestration.internal.json.GsonFactory;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.internal.state.InstanceStateService;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;
/**
* Orchestration master slave data source factory.
......@@ -60,45 +48,7 @@ public final class OrchestrationMasterSlaveDataSourceFactory {
config.getRegistryCenter().init();
MasterSlaveDataSource result = (MasterSlaveDataSource) MasterSlaveDataSourceFactory.createDataSource(config.getDataSourceMap(), config.getMasterSlaveRuleConfiguration());
new ConfigurationService(config.getRegistryCenter(), config.getName()).addMasterSlaveConfiguration(config, result);
addState(config, result);
new InstanceStateService(config.getRegistryCenter(), config.getName()).addMasterSlaveState(result);
return result;
}
private static void addState(final OrchestrationMasterSlaveConfiguration config, final MasterSlaveDataSource masterSlaveDataSource) throws SQLException {
String instanceId = new InstanceStateNode().getInstanceId();
persistState(config, instanceId);
addInstancesStateChangeListener(config.getName(), instanceId, config.getRegistryCenter(), masterSlaveDataSource);
}
private static void persistState(final OrchestrationMasterSlaveConfiguration config, final String instanceId) throws SQLException {
config.getRegistryCenter().persistEphemeral("/" + config.getName() + "/state/instances/" + instanceId, "");
config.getRegistryCenter().addCacheData("/" + config.getName() + "/state/instances/" + instanceId);
}
private static void addInstancesStateChangeListener(final String name, final String instanceId, final CoordinatorRegistryCenter registryCenter, final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) registryCenter.getRawCache("/" + name + "/state/instances/" + instanceId);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData()) {
return;
}
String path = childData.getPath();
if (path.isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
MasterSlaveRuleConfiguration masterSlaveRuleConfig = GsonFactory.getGson().fromJson(new String(childData.getData(), Charsets.UTF_8), MasterSlaveRuleConfiguration.class);
Map<String, DataSource> dataSourceMap = DataSourceJsonConverter.fromJson(registryCenter.get("/" + name + "/config/datasource"));
if ("disabled".equals(registryCenter.get(path))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
// TODO props
masterSlaveDataSource.renew(masterSlaveRuleConfig.build(dataSourceMap));
}
});
}
}
......@@ -18,26 +18,15 @@
package io.shardingjdbc.orchestration.api;
import io.shardingjdbc.core.api.ShardingDataSourceFactory;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.api.config.OrchestrationShardingConfiguration;
import io.shardingjdbc.orchestration.internal.config.ConfigurationService;
import io.shardingjdbc.orchestration.internal.state.InstanceStateNode;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.json.DataSourceJsonConverter;
import io.shardingjdbc.orchestration.internal.json.ShardingRuleConfigurationConverter;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.internal.state.InstanceStateService;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
/**
......@@ -60,7 +49,7 @@ public final class OrchestrationShardingDataSourceFactory {
config.getRegistryCenter().init();
ShardingDataSource result = (ShardingDataSource) ShardingDataSourceFactory.createDataSource(config.getDataSourceMap(), config.getShardingRuleConfig());
new ConfigurationService(config.getRegistryCenter(), config.getName()).addShardingConfiguration(config, result);
addState(config, result);
new InstanceStateService(config.getRegistryCenter(), config.getName()).addShardingState(result);
return result;
}
......@@ -77,45 +66,8 @@ public final class OrchestrationShardingDataSourceFactory {
// TODO props
ShardingDataSource result = (ShardingDataSource) ShardingDataSourceFactory.createDataSource(config.getDataSourceMap(), config.getShardingRuleConfig(), props);
new ConfigurationService(config.getRegistryCenter(), config.getName()).addShardingConfiguration(config, result);
addState(config, result);
new InstanceStateService(config.getRegistryCenter(), config.getName()).addShardingState(result);
return result;
}
private static void addState(final OrchestrationShardingConfiguration config, final ShardingDataSource shardingDataSource) throws SQLException {
String instanceId = new InstanceStateNode().getInstanceId();
persistState(config, instanceId);
addInstancesStateChangeListener(config.getName(), instanceId, config.getRegistryCenter(), shardingDataSource);
}
private static void persistState(final OrchestrationShardingConfiguration config, final String instanceId) throws SQLException {
config.getRegistryCenter().persistEphemeral("/" + config.getName() + "/state/instances/" + instanceId, "");
config.getRegistryCenter().addCacheData("/" + config.getName() + "/state/instances/" + instanceId);
}
private static void addInstancesStateChangeListener(final String name, final String instanceId, final CoordinatorRegistryCenter registryCenter, final ShardingDataSource shardingDataSource) {
TreeCache cache = (TreeCache) registryCenter.getRawCache("/" + name + "/state/instances/" + instanceId);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData()) {
return;
}
String path = childData.getPath();
if (path.isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
ShardingRuleConfiguration shardingRuleConfig = ShardingRuleConfigurationConverter.fromJson(registryCenter.get("/" + name + "/config/sharding"));
Map<String, DataSource> dataSourceMap = DataSourceJsonConverter.fromJson(registryCenter.get("/" + name + "/config/datasource"));
if ("disabled".equals(registryCenter.get(path))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
// TODO props
shardingDataSource.renew(shardingRuleConfig.build(dataSourceMap), new Properties());
}
});
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.orchestration.internal.state;
/**
* Instance state.
*
* @author caohao
*/
public enum InstanceState {
DISABLED
}
......@@ -23,18 +23,21 @@ import lombok.Getter;
import java.lang.management.ManagementFactory;
/**
* Orchestration instance.
* Instance state node.
*
* @author caohao
*/
@Getter
public final class InstanceStateNode {
public static final String ROOT = "/state/instances/";
private static final String DELIMITER = "@-@";
private final String instanceId;
public InstanceStateNode() {
instanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.orchestration.internal.state;
import com.google.common.base.Charsets;
import io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import io.shardingjdbc.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingjdbc.orchestration.internal.config.ConfigurationNode;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.json.DataSourceJsonConverter;
import io.shardingjdbc.orchestration.internal.json.GsonFactory;
import io.shardingjdbc.orchestration.internal.json.ShardingRuleConfigurationConverter;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
/**
* Instance state service.
*
* @author caohao
*/
@Getter
public final class InstanceStateService {
private final String name;
private final CoordinatorRegistryCenter registryCenter;
public InstanceStateService(final CoordinatorRegistryCenter regCenter, final String name) {
this.registryCenter = regCenter;
this.name = name;
}
public void addShardingState(final ShardingDataSource shardingDataSource) throws SQLException {
String instanceNodePath = "/" + name + InstanceStateNode.ROOT + new InstanceStateNode().getInstanceId();
persistState(instanceNodePath);
addShardingInstancesStateChangeListener(instanceNodePath, shardingDataSource);
}
public void addMasterSlaveState(final MasterSlaveDataSource masterSlaveDataSource) throws SQLException {
String instanceNodePath = "/" + name + InstanceStateNode.ROOT + new InstanceStateNode().getInstanceId();
persistState(instanceNodePath);
addMasterSlaveInstancesStateChangeListener(instanceNodePath, masterSlaveDataSource);
}
private void persistState(final String instanceNodePath) throws SQLException {
registryCenter.persistEphemeral(instanceNodePath, "");
registryCenter.addCacheData(instanceNodePath);
}
private void addShardingInstancesStateChangeListener(final String instanceNodePath, final ShardingDataSource shardingDataSource) {
TreeCache cache = (TreeCache) registryCenter.getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData()) {
return;
}
String path = childData.getPath();
if (path.isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
ShardingRuleConfiguration shardingRuleConfig = ShardingRuleConfigurationConverter.fromJson(registryCenter.get("/" + name + "/" + ConfigurationNode.SHARDING_NODE_PATH));
Map<String, DataSource> dataSourceMap = DataSourceJsonConverter.fromJson(registryCenter.get("/" + name + "/" + ConfigurationNode.DATA_SOURCE_NODE_PATH));
if (InstanceState.DISABLED.toString().equalsIgnoreCase(registryCenter.get(path))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
// TODO props
shardingDataSource.renew(shardingRuleConfig.build(dataSourceMap), new Properties());
}
});
}
private void addMasterSlaveInstancesStateChangeListener(final String instanceNodePath, final MasterSlaveDataSource masterSlaveDataSource) {
TreeCache cache = (TreeCache) registryCenter.getRawCache(instanceNodePath);
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData || null == childData.getData()) {
return;
}
String path = childData.getPath();
if (path.isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
return;
}
MasterSlaveRuleConfiguration masterSlaveRuleConfig = GsonFactory.getGson().fromJson(new String(childData.getData(), Charsets.UTF_8), MasterSlaveRuleConfiguration.class);
Map<String, DataSource> dataSourceMap = DataSourceJsonConverter.fromJson(registryCenter.get("/" + name + "/config/datasource"));
if ("disabled".equals(registryCenter.get(path))) {
for (String each : dataSourceMap.keySet()) {
dataSourceMap.put(each, new CircuitBreakerDataSource());
}
}
// TODO props
masterSlaveDataSource.renew(masterSlaveRuleConfig.build(dataSourceMap));
}
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册