OrchestrationShardingDataSourceFactory.java 8.5 KB
Newer Older
T
terrymanu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

18
package io.shardingjdbc.orchestration.api;
T
terrymanu 已提交
19

H
haocao 已提交
20
import com.google.common.base.Charsets;
21 22 23
import io.shardingjdbc.core.api.ShardingDataSourceFactory;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import io.shardingjdbc.core.jdbc.core.datasource.ShardingDataSource;
H
haocao 已提交
24
import io.shardingjdbc.orchestration.api.config.OrchestrationShardingConfiguration;
H
haocao 已提交
25 26 27 28
import io.shardingjdbc.orchestration.internal.instance.OrchestrationInstance;
import io.shardingjdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import io.shardingjdbc.orchestration.internal.json.DataSourceJsonConverter;
import io.shardingjdbc.orchestration.internal.json.ShardingRuleConfigurationConverter;
29
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
T
terrymanu 已提交
30 31
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
32 33 34 35 36
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
T
terrymanu 已提交
37 38 39 40 41 42 43 44 45 46

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

/**
 * Orchestration sharding data source factory.
 * 
 * @author zhangliang 
H
haocao 已提交
47
 * @author caohao 
T
terrymanu 已提交
48 49 50 51 52 53
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class OrchestrationShardingDataSourceFactory {
    
    /**
     * Create sharding data source.
H
haocao 已提交
54 55
     *
     * @param config orchestration sharding configuration
T
terrymanu 已提交
56 57 58
     * @return sharding data source
     * @throws SQLException SQL exception
     */
H
haocao 已提交
59
    public static DataSource createDataSource(final OrchestrationShardingConfiguration config) throws SQLException {
H
haocao 已提交
60 61
        String instanceId = new OrchestrationInstance().getInstanceId();
        initRegistryCenter(config, instanceId);
H
haocao 已提交
62 63
        ShardingDataSource result = (ShardingDataSource) ShardingDataSourceFactory.createDataSource(config.getDataSourceMap(), config.getShardingRuleConfig());
        addConfigurationChangeListener(config.getName(), config.getRegistryCenter(), result);
H
haocao 已提交
64
        addInstancesStateChangeListener(config.getName(), instanceId, config.getRegistryCenter(), result);
65
        return result;
T
terrymanu 已提交
66 67 68 69 70
    }
    
    /**
     * Create sharding data source.
     * 
H
haocao 已提交
71
     * @param config orchestration sharding configuration
T
terrymanu 已提交
72 73 74 75
     * @param props properties for data source
     * @return sharding data source
     * @throws SQLException SQL exception
     */
H
haocao 已提交
76
    public static DataSource createDataSource(final OrchestrationShardingConfiguration config, final Properties props) throws SQLException {
H
haocao 已提交
77 78
        String instanceId = new OrchestrationInstance().getInstanceId();
        initRegistryCenter(config, instanceId);
T
terrymanu 已提交
79
        // TODO props
H
haocao 已提交
80 81
        ShardingDataSource result = (ShardingDataSource) ShardingDataSourceFactory.createDataSource(config.getDataSourceMap(), config.getShardingRuleConfig(), props);
        addConfigurationChangeListener(config.getName(), config.getRegistryCenter(), result);
H
haocao 已提交
82
        addInstancesStateChangeListener(config.getName(), instanceId, config.getRegistryCenter(), result);
83 84 85
        return result;
    }
    
H
haocao 已提交
86
    private static void initRegistryCenter(final OrchestrationShardingConfiguration config, final String instanceId) throws SQLException {
H
haocao 已提交
87
        CoordinatorRegistryCenter registryCenter = config.getRegistryCenter();
88
        registryCenter.init();
H
haocao 已提交
89 90
        persistConfig(config);
        persistState(config, instanceId);
91 92
    }
    
H
haocao 已提交
93
    private static void persistConfig(final OrchestrationShardingConfiguration config) throws SQLException {
H
haocao 已提交
94 95 96 97 98
        String name = config.getName();
        CoordinatorRegistryCenter registryCenter = config.getRegistryCenter();
        if (config.isOverwrite() || registryCenter.getChildrenKeys("/" + name + "/config").isEmpty()) {
            registryCenter.persist("/" + name + "/config/datasource", DataSourceJsonConverter.toJson(config.getDataSourceMap()));
            registryCenter.persist("/" + name + "/config/sharding", ShardingRuleConfigurationConverter.toJson(config.getShardingRuleConfig()));
H
haocao 已提交
99
        }
H
haocao 已提交
100 101 102 103 104 105
        registryCenter.addCacheData("/" + name + "/config");
    }
    
    private static void persistState(final OrchestrationShardingConfiguration config, final String instanceId) throws SQLException {
        config.getRegistryCenter().persistEphemeral("/" + config.getName() + "/state/instances/" + instanceId, "");
        config.getRegistryCenter().addCacheData("/" + config.getName() + "/state/instances/" + instanceId);
H
haocao 已提交
106 107
    }
    
108 109 110 111 112 113 114 115 116 117 118 119 120 121
    private static void addConfigurationChangeListener(final String name, final CoordinatorRegistryCenter registryCenter, final ShardingDataSource shardingDataSource) {
        TreeCache cache = (TreeCache) registryCenter.getRawCache("/" + name + "/config");
        cache.getListenable().addListener(new TreeCacheListener() {
            
            @Override
            public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
                ChildData childData = event.getData();
                if (null == childData || null == childData.getData()) {
                    return;
                }
                String path = childData.getPath();
                if (path.isEmpty()) {
                    return;
                }
T
terrymanu 已提交
122
                if (("/" + name + "/config/datasource").equals(path)) {
123
                    Map<String, DataSource> newDataSourceMap = DataSourceJsonConverter.fromJson(new String(childData.getData(), Charsets.UTF_8));
T
terrymanu 已提交
124
                    ShardingRuleConfiguration shardingRuleConfig = ShardingRuleConfigurationConverter.fromJson(registryCenter.get("/" + name + "/config/sharding"));
125
                    // TODO props
T
terrymanu 已提交
126 127
                    shardingDataSource.renew(shardingRuleConfig.build(newDataSourceMap), new Properties());
                } else if (("/" + name + "/config/sharding").equals(path)) {
T
terrymanu 已提交
128
                    ShardingRuleConfiguration newShardingRuleConfig = ShardingRuleConfigurationConverter.fromJson(new String(childData.getData(), Charsets.UTF_8));
T
terrymanu 已提交
129 130 131
                    Map<String, DataSource> dataSourceMap = DataSourceJsonConverter.fromJson(registryCenter.get("/" + name + "/config/datasource"));
                    // TODO props
                    shardingDataSource.renew(newShardingRuleConfig.build(dataSourceMap), new Properties());
132 133 134
                }
            }
        });
T
terrymanu 已提交
135
    }
H
haocao 已提交
136
    
H
haocao 已提交
137 138
    private static void addInstancesStateChangeListener(final String name, final String instanceId, final CoordinatorRegistryCenter registryCenter, final ShardingDataSource shardingDataSource) {
        TreeCache cache = (TreeCache) registryCenter.getRawCache("/" + name + "/state/instances/" + instanceId);
H
haocao 已提交
139 140 141 142 143 144 145 146 147
        cache.getListenable().addListener(new TreeCacheListener() {
            
            @Override
            public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
                ChildData childData = event.getData();
                if (null == childData || null == childData.getData()) {
                    return;
                }
                String path = childData.getPath();
H
haocao 已提交
148
                if (path.isEmpty() || TreeCacheEvent.Type.NODE_UPDATED != event.getType()) {
H
haocao 已提交
149 150 151 152
                    return;
                }
                ShardingRuleConfiguration shardingRuleConfig = ShardingRuleConfigurationConverter.fromJson(registryCenter.get("/" + name + "/config/sharding"));
                Map<String, DataSource> dataSourceMap = DataSourceJsonConverter.fromJson(registryCenter.get("/" + name + "/config/datasource"));
H
haocao 已提交
153
                if ("disabled".equals(registryCenter.get(path))) {
H
haocao 已提交
154 155 156
                    for (String each : dataSourceMap.keySet()) {
                        dataSourceMap.put(each, new CircuitBreakerDataSource());
                    }
H
haocao 已提交
157
                }
H
haocao 已提交
158
                // TODO props
H
haocao 已提交
159 160 161 162
                shardingDataSource.renew(shardingRuleConfig.build(dataSourceMap), new Properties());
            }
        });
    }
T
terrymanu 已提交
163
}