ConfigCenter.java 17.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.governance.core.config;
19

20
import com.google.common.base.Joiner;
21
import com.google.common.base.Preconditions;
22
import com.google.common.base.Splitter;
23
import com.google.common.base.Strings;
24
import com.google.common.eventbus.Subscribe;
25
import org.apache.shardingsphere.encrypt.algorithm.config.AlgorithmProvidedEncryptRuleConfiguration;
26
import org.apache.shardingsphere.encrypt.api.config.EncryptRuleConfiguration;
27
import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
28 29 30
import org.apache.shardingsphere.governance.core.event.model.datasource.DataSourcePersistEvent;
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsPersistEvent;
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaNamePersistEvent;
31
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaPersistEvent;
32
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfiguration;
33
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
34
import org.apache.shardingsphere.governance.core.yaml.config.schema.YamlSchema;
35
import org.apache.shardingsphere.governance.core.yaml.swapper.DataSourceConfigurationYamlSwapper;
36
import org.apache.shardingsphere.governance.core.yaml.swapper.SchemaYamlSwapper;
37
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
38 39 40 41
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration;
import org.apache.shardingsphere.infra.auth.yaml.swapper.AuthenticationYamlSwapper;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
42
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
43
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
44 45 46
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
47 48 49
import org.apache.shardingsphere.replicaquery.algorithm.config.AlgorithmProvidedReplicaQueryRuleConfiguration;
import org.apache.shardingsphere.replicaquery.api.config.ReplicaQueryRuleConfiguration;
import org.apache.shardingsphere.replicaquery.api.config.rule.ReplicaQueryDataSourceRuleConfiguration;
Y
yanyzy 已提交
50
import org.apache.shardingsphere.shadow.api.config.ShadowRuleConfiguration;
51
import org.apache.shardingsphere.sharding.algorithm.config.AlgorithmProvidedShardingRuleConfiguration;
52
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
53

54 55 56
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
57
import java.util.LinkedHashSet;
58 59 60 61
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
62
import java.util.Optional;
63 64 65
import java.util.Properties;
import java.util.stream.Collectors;

66
/**
67
 * Config center.
68
 */
kimmking's avatar
kimmking 已提交
69
public final class ConfigCenter {
70
    
kimmking's avatar
kimmking 已提交
71
    private final ConfigCenterNode node;
72
    
73
    private final ConfigurationRepository repository;
74
    
M
menghaoranss 已提交
75 76
    public ConfigCenter(final ConfigurationRepository repository) {
        node = new ConfigCenterNode();
77
        this.repository = repository;
78
        GovernanceEventBus.getInstance().register(this);
79 80 81 82 83
    }
    
    /**
     * Persist rule configuration.
     *
84
     * @param schemaName schema name
85
     * @param dataSourceConfigs data source configuration map
86
     * @param ruleConfigurations rule configurations
87
     * @param isOverwrite is overwrite config center's configuration
88
     */
89
    public void persistConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigs,
90
                                      final Collection<RuleConfiguration> ruleConfigurations, final boolean isOverwrite) {
91
        persistDataSourceConfigurations(schemaName, dataSourceConfigs, isOverwrite);
92
        persistRuleConfigurations(schemaName, ruleConfigurations, isOverwrite);
T
tristaZero 已提交
93
        // TODO Consider removing the following one.
94
        persistSchemaName(schemaName);
95 96 97 98 99 100 101 102 103 104
    }
    
    /**
     * Persist global configuration.
     *
     * @param authentication authentication
     * @param props properties
     * @param isOverwrite is overwrite config center's configuration
     */
    public void persistGlobalConfiguration(final Authentication authentication, final Properties props, final boolean isOverwrite) {
105 106 107 108
        persistAuthentication(authentication, isOverwrite);
        persistProperties(props, isOverwrite);
    }
    
109 110 111 112 113
    /**
     * persist data source configurations.
     * @param event Data source event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
114
    public synchronized void renew(final DataSourcePersistEvent event) {
115 116 117 118 119 120 121 122 123
        persistDataSourceConfigurations(event.getSchemaName(), event.getDataSourceConfigurations());
    }
    
    /**
     * Persist rule configurations.
     * 
     * @param event Rule event.
     */
    @Subscribe
124
    public synchronized void renew(final RuleConfigurationsPersistEvent event) {
125 126 127 128 129 130 131 132 133
        persistRuleConfigurations(event.getSchemaName(), event.getRuleConfigurations());
    }
    
    /**
     * Persist schema name.
     * 
     * @param event Schema name event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
134
    public synchronized void renew(final SchemaNamePersistEvent event) {
135
        String schemaNames = repository.get(node.getMetadataNodePath());
136 137 138 139 140 141
        Collection<String> schemas = Strings.isNullOrEmpty(schemaNames) ? new LinkedHashSet<>() : new LinkedHashSet<>(Splitter.on(",").splitToList(schemaNames));
        if (event.isDrop()) {
            schemas.remove(event.getSchemaName());
        } else if (!schemas.contains(event.getSchemaName())) {
            schemas.add(event.getSchemaName());
        }
142
        repository.persist(node.getMetadataNodePath(), Joiner.on(",").join(schemas));
143 144
    }
    
145 146 147 148 149 150
    /**
     * Persist meta data.
     *
     * @param event Meta data event.
     */
    @Subscribe
L
Liang Zhang 已提交
151
    public synchronized void renew(final SchemaPersistEvent event) {
152
        persistSchema(event.getSchemaName(), event.getSchema());
153 154
    }
    
155
    private void persistDataSourceConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigurations, final boolean isOverwrite) {
156 157
        if (!dataSourceConfigurations.isEmpty() && (isOverwrite || !hasDataSourceConfiguration(schemaName))) {
            persistDataSourceConfigurations(schemaName, dataSourceConfigurations);
158 159 160
        }
    }
    
161
    private void persistDataSourceConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
162
        Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data source in `%s` for governance.", schemaName);
kimmking's avatar
kimmking 已提交
163 164
        Map<String, YamlDataSourceConfiguration> yamlDataSourceConfigurations = dataSourceConfigurations.entrySet().stream().collect(Collectors.toMap(Entry::getKey,
            entry -> new DataSourceConfigurationYamlSwapper().swapToYamlConfiguration(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
165 166 167
        YamlDataSourceConfigurationWrap yamlDataSourceConfigWrap = new YamlDataSourceConfigurationWrap();
        yamlDataSourceConfigWrap.setDataSources(yamlDataSourceConfigurations);
        repository.persist(node.getDataSourcePath(schemaName), YamlEngine.marshal(yamlDataSourceConfigWrap));
168 169
    }
    
170
    private void persistRuleConfigurations(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations, final boolean isOverwrite) {
171 172
        if (!ruleConfigurations.isEmpty() && (isOverwrite || !hasRuleConfiguration(schemaName))) {
            persistRuleConfigurations(schemaName, ruleConfigurations);
173 174 175
        }
    }
    
176
    private void persistRuleConfigurations(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations) {
177
        Collection<RuleConfiguration> configs = new LinkedList<>();
178 179 180
        for (RuleConfiguration each : ruleConfigurations) {
            if (each instanceof ShardingRuleConfiguration) {
                ShardingRuleConfiguration config = (ShardingRuleConfiguration) each;
181
                Preconditions.checkState(hasAvailableTableConfigurations(config),
182 183
                        "No available rule configs in `%s` for governance.", schemaName);
                configs.add(each);
184 185
            } else if (each instanceof AlgorithmProvidedShardingRuleConfiguration) {
                AlgorithmProvidedShardingRuleConfiguration config = (AlgorithmProvidedShardingRuleConfiguration) each;
186
                Preconditions.checkState(hasAvailableTableConfigurations(config),
187 188
                        "No available rule configs in `%s` for governance.", schemaName);
                configs.add(each);
189 190
            } else if (each instanceof AlgorithmProvidedReplicaQueryRuleConfiguration) {
                AlgorithmProvidedReplicaQueryRuleConfiguration config = (AlgorithmProvidedReplicaQueryRuleConfiguration) each;
kimmking's avatar
kimmking 已提交
191
                checkDataSources(schemaName, config.getDataSources());
192
                configs.add(each);
193 194
            } else if (each instanceof AlgorithmProvidedEncryptRuleConfiguration) {
                AlgorithmProvidedEncryptRuleConfiguration config = (AlgorithmProvidedEncryptRuleConfiguration) each;
195
                Preconditions.checkState(!config.getEncryptors().isEmpty(), "No available encrypt rule configuration in `%s` for governance.", schemaName);
196
                configs.add(each);
197 198
            } else if (each instanceof ReplicaQueryRuleConfiguration) {
                ReplicaQueryRuleConfiguration config = (ReplicaQueryRuleConfiguration) each;
kimmking's avatar
kimmking 已提交
199
                checkDataSources(schemaName, config.getDataSources());
200
                configs.add(each);
201 202
            } else if (each instanceof EncryptRuleConfiguration) {
                EncryptRuleConfiguration config = (EncryptRuleConfiguration) each;
203
                Preconditions.checkState(!config.getEncryptors().isEmpty(), "No available encrypt rule configuration in `%s` for governance.", schemaName);
204
                configs.add(each);
205 206
            } else if (each instanceof ShadowRuleConfiguration) {
                ShadowRuleConfiguration config = (ShadowRuleConfiguration) each;
Y
Yanick.xia 已提交
207 208
                boolean isShadow = !config.getColumn().isEmpty() && null != config.getSourceDataSourceNames() && null != config.getShadowDataSourceNames();
                Preconditions.checkState(isShadow, "No available shadow rule configuration in `%s` for governance.", schemaName);
209
                configs.add(each);
210
            }
211
        }
212 213 214
        YamlRootRuleConfigurations yamlRuleConfigs = new YamlRootRuleConfigurations();
        yamlRuleConfigs.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlConfigurations(configs));
        repository.persist(node.getRulePath(schemaName), YamlEngine.marshal(yamlRuleConfigs));
215 216
    }
    
217
    private void checkDataSources(final String schemaName, final Collection<ReplicaQueryDataSourceRuleConfiguration> dataSources) {
kimmking's avatar
kimmking 已提交
218
        dataSources.forEach(each -> Preconditions.checkState(
219
                !each.getPrimaryDataSourceName().isEmpty(), "No available replica-query rule configuration in `%s` for governance.", schemaName));
kimmking's avatar
kimmking 已提交
220 221
    }
    
222 223
    private boolean hasAvailableTableConfigurations(final ShardingRuleConfiguration config) {
        return !config.getTables().isEmpty() || null != config.getDefaultTableShardingStrategy() || !config.getAutoTables().isEmpty();
224 225
    }
    
226 227
    private boolean hasAvailableTableConfigurations(final AlgorithmProvidedShardingRuleConfiguration config) {
        return !config.getTables().isEmpty() || null != config.getDefaultTableShardingStrategy() || !config.getAutoTables().isEmpty();
228 229
    }
    
230
    private void persistAuthentication(final Authentication authentication, final boolean isOverwrite) {
231
        if (null != authentication && (isOverwrite || !hasAuthentication())) {
232
            repository.persist(node.getAuthenticationPath(), YamlEngine.marshal(new AuthenticationYamlSwapper().swapToYamlConfiguration(authentication)));
233 234 235 236
        }
    }
    
    private void persistProperties(final Properties props, final boolean isOverwrite) {
237
        if (!props.isEmpty() && (isOverwrite || !hasProperties())) {
kimmking's avatar
kimmking 已提交
238
            repository.persist(node.getPropsPath(), YamlEngine.marshal(props));
239 240 241
        }
    }
    
242 243 244 245 246
    private boolean hasProperties() {
        return !Strings.isNullOrEmpty(repository.get(node.getPropsPath()));
    }
    
    private void persistSchemaName(final String schemaName) {
247
        String schemaNames = repository.get(node.getMetadataNodePath());
248
        if (Strings.isNullOrEmpty(schemaNames)) {
249
            repository.persist(node.getMetadataNodePath(), schemaName);
250 251
            return;
        }
252 253
        List<String> schemaNameList = Splitter.on(",").splitToList(schemaNames);
        if (schemaNameList.contains(schemaName)) {
254 255
            return;
        }
256
        List<String> newArrayList = new ArrayList<>(schemaNameList);
257
        newArrayList.add(schemaName);
258
        repository.persist(node.getMetadataNodePath(), Joiner.on(",").join(newArrayList));
259 260
    }
    
261 262 263
    /**
     * Load data source configurations.
     *
264
     * @param schemaName schema name
265 266
     * @return data source configurations
     */
267 268
    public Map<String, DataSourceConfiguration> loadDataSourceConfigurations(final String schemaName) {
        if (!hasDataSourceConfiguration(schemaName)) {
269 270
            return new LinkedHashMap<>();
        }
271
        YamlDataSourceConfigurationWrap result = YamlEngine.unmarshal(repository.get(node.getDataSourcePath(schemaName)), YamlDataSourceConfigurationWrap.class);
kimmking's avatar
kimmking 已提交
272 273
        return result.getDataSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
            entry -> new DataSourceConfigurationYamlSwapper().swapToObject(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
274 275 276
    }
    
    /**
277
     * Load rule configurations.
278
     *
279
     * @param schemaName schema name
280
     * @return rule configurations
281
     */
282 283 284
    public Collection<RuleConfiguration> loadRuleConfigurations(final String schemaName) {
        return hasRuleConfiguration(schemaName) ? new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
                YamlEngine.unmarshal(repository.get(node.getRulePath(schemaName)), YamlRootRuleConfigurations.class).getRules()) : new LinkedList<>();
285 286 287 288 289 290 291 292
    }
    
    /**
     * Load authentication.
     *
     * @return authentication
     */
    public Authentication loadAuthentication() {
293 294 295
        return hasAuthentication()
                ? new AuthenticationYamlSwapper().swapToObject(YamlEngine.unmarshal(repository.get(node.getAuthenticationPath()), YamlAuthenticationConfiguration.class))
                : new Authentication();
296 297 298 299 300 301 302 303
    }
    
    /**
     * Load properties configuration.
     *
     * @return properties
     */
    public Properties loadProperties() {
kimmking's avatar
kimmking 已提交
304
        return YamlEngine.unmarshalProperties(repository.get(node.getPropsPath()));
305 306 307
    }
    
    /**
L
Liang Zhang 已提交
308
     * Get all schema names.
309
     * 
310
     * @return all schema names
311
     */
L
Liang Zhang 已提交
312
    public Collection<String> getAllSchemaNames() {
313
        String schemaNames = repository.get(node.getMetadataNodePath());
314
        return Strings.isNullOrEmpty(schemaNames) ? new LinkedList<>() : node.splitSchemaName(schemaNames);
315
    }
316 317 318 319
    
    /**
     * Judge whether schema has rule configuration.
     *
320
     * @param schemaName schema name
321 322
     * @return has rule configuration or not
     */
323 324
    public boolean hasRuleConfiguration(final String schemaName) {
        return !Strings.isNullOrEmpty(repository.get(node.getRulePath(schemaName)));
325 326 327 328 329
    }
    
    /**
     * Judge whether schema has data source configuration.
     *
330
     * @param schemaName schema name
331 332
     * @return has data source configuration or not
     */
333 334
    public boolean hasDataSourceConfiguration(final String schemaName) {
        return !Strings.isNullOrEmpty(repository.get(node.getDataSourcePath(schemaName)));
335 336
    }
    
337
    /**
338
     * Persist ShardingSphere schema.
339 340
     *
     * @param schemaName schema name
341
     * @param schema ShardingSphere schema
342
     */
343
    public void persistSchema(final String schemaName, final ShardingSphereSchema schema) {
344
        repository.persist(node.getSchemaPath(schemaName), YamlEngine.marshal(new SchemaYamlSwapper().swapToYamlConfiguration(schema)));
345 346 347
    }
    
    /**
348
     * Load ShardingSphere schema.
349 350
     *
     * @param schemaName schema name
351
     * @return ShardingSphere schema
352
     */
353
    public Optional<ShardingSphereSchema> loadSchema(final String schemaName) {
354
        String path = repository.get(node.getSchemaPath(schemaName));
355 356 357
        if (Strings.isNullOrEmpty(path)) {
            return Optional.empty();
        }
358
        return Optional.of(new SchemaYamlSwapper().swapToObject(YamlEngine.unmarshal(path, YamlSchema.class)));
359 360
    }
    
361
    /**
M
menghaoranss 已提交
362
     * Delete schema.
363 364 365 366 367 368 369
     * 
     * @param schemaName schema name
     */
    public void deleteSchema(final String schemaName) {
        repository.delete(node.getSchemaNamePath(schemaName));
    }
    
370 371 372
    private boolean hasAuthentication() {
        return !Strings.isNullOrEmpty(repository.get(node.getAuthenticationPath()));
    }
373
}