ConfigCenter.java 18.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.governance.core.config;
19

20
import com.google.common.base.Joiner;
21
import com.google.common.base.Preconditions;
22
import com.google.common.base.Splitter;
23
import com.google.common.base.Strings;
24
import com.google.common.eventbus.Subscribe;
25
import org.apache.shardingsphere.encrypt.algorithm.config.AlgorithmProvidedEncryptRuleConfiguration;
26
import org.apache.shardingsphere.encrypt.api.config.EncryptRuleConfiguration;
L
Liang Zhang 已提交
27 28 29 30
import org.apache.shardingsphere.governance.core.event.model.persist.DataSourcePersistEvent;
import org.apache.shardingsphere.governance.core.event.model.persist.MetaDataPersistEvent;
import org.apache.shardingsphere.governance.core.event.model.persist.RulePersistEvent;
import org.apache.shardingsphere.governance.core.event.model.persist.SchemaNamePersistEvent;
31
import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
32
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfiguration;
33
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
34
import org.apache.shardingsphere.governance.core.yaml.config.metadata.YamlRuleSchemaMetaData;
35
import org.apache.shardingsphere.governance.core.yaml.swapper.DataSourceConfigurationYamlSwapper;
36
import org.apache.shardingsphere.governance.core.yaml.swapper.RuleSchemaMetaDataYamlSwapper;
37
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
38 39 40 41
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration;
import org.apache.shardingsphere.infra.auth.yaml.swapper.AuthenticationYamlSwapper;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
42
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
43
import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
44 45 46
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
47 48
import org.apache.shardingsphere.replication.primaryreplica.algorithm.config.AlgorithmProvidedPrimaryReplicaReplicationRuleConfiguration;
import org.apache.shardingsphere.replication.primaryreplica.api.config.PrimaryReplicaReplicationRuleConfiguration;
Y
yanyzy 已提交
49
import org.apache.shardingsphere.shadow.api.config.ShadowRuleConfiguration;
50
import org.apache.shardingsphere.sharding.algorithm.config.AlgorithmProvidedShardingRuleConfiguration;
51
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
52

53 54 55
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
56
import java.util.LinkedHashSet;
57 58 59 60
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
61
import java.util.Optional;
62 63 64
import java.util.Properties;
import java.util.stream.Collectors;

65
/**
66
 * Config center.
67
 */
kimmking's avatar
kimmking 已提交
68
public final class ConfigCenter {
69
    
kimmking's avatar
kimmking 已提交
70
    private final ConfigCenterNode node;
71
    
72
    private final ConfigurationRepository repository;
73
    
M
menghaoranss 已提交
74 75
    public ConfigCenter(final ConfigurationRepository repository) {
        node = new ConfigCenterNode();
76
        this.repository = repository;
77
        GovernanceEventBus.getInstance().register(this);
78 79 80 81 82
    }
    
    /**
     * Persist rule configuration.
     *
83
     * @param schemaName schema name
84
     * @param dataSourceConfigs data source configuration map
85
     * @param ruleConfigurations rule configurations
86
     * @param isOverwrite is overwrite config center's configuration
87
     */
88
    public void persistConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigs,
89
                                      final Collection<RuleConfiguration> ruleConfigurations, final boolean isOverwrite) {
90
        persistDataSourceConfigurations(schemaName, dataSourceConfigs, isOverwrite);
91
        persistRuleConfigurations(schemaName, ruleConfigurations, isOverwrite);
T
tristaZero 已提交
92
        // TODO Consider removing the following one.
93
        persistSchemaName(schemaName);
94 95 96 97 98 99 100 101 102 103
    }
    
    /**
     * Persist global configuration.
     *
     * @param authentication authentication
     * @param props properties
     * @param isOverwrite is overwrite config center's configuration
     */
    public void persistGlobalConfiguration(final Authentication authentication, final Properties props, final boolean isOverwrite) {
104 105 106 107
        persistAuthentication(authentication, isOverwrite);
        persistProperties(props, isOverwrite);
    }
    
108 109 110 111 112
    /**
     * persist data source configurations.
     * @param event Data source event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
113
    public synchronized void renew(final DataSourcePersistEvent event) {
114 115 116 117 118 119 120 121 122
        persistDataSourceConfigurations(event.getSchemaName(), event.getDataSourceConfigurations());
    }
    
    /**
     * Persist rule configurations.
     * 
     * @param event Rule event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
123
    public synchronized void renew(final RulePersistEvent event) {
124 125 126 127 128 129 130 131 132
        persistRuleConfigurations(event.getSchemaName(), event.getRuleConfigurations());
    }
    
    /**
     * Persist schema name.
     * 
     * @param event Schema name event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
133
    public synchronized void renew(final SchemaNamePersistEvent event) {
H
Haoran Meng 已提交
134
        persistSchema(event.getSchemaName(), event.isDrop());
135 136
    }
    
137 138 139 140 141 142 143 144 145 146
    /**
     * Persist meta data.
     *
     * @param event Meta data event.
     */
    @Subscribe
    public synchronized void renew(final MetaDataPersistEvent event) {
        persistMetaData(event.getSchemaName(), event.getMetaData());
    }
    
147
    private void persistDataSourceConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigurations, final boolean isOverwrite) {
148 149
        if (!dataSourceConfigurations.isEmpty() && (isOverwrite || !hasDataSourceConfiguration(schemaName))) {
            persistDataSourceConfigurations(schemaName, dataSourceConfigurations);
150 151 152
        }
    }
    
153
    private void persistDataSourceConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
154
        Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data source in `%s` for governance.", schemaName);
155 156
        Map<String, YamlDataSourceConfiguration> yamlDataSourceConfigurations = dataSourceConfigurations.entrySet().stream()
                .collect(Collectors.toMap(Entry::getKey, entry -> new DataSourceConfigurationYamlSwapper().swapToYamlConfiguration(entry.getValue())));
157 158 159
        YamlDataSourceConfigurationWrap yamlDataSourceConfigWrap = new YamlDataSourceConfigurationWrap();
        yamlDataSourceConfigWrap.setDataSources(yamlDataSourceConfigurations);
        repository.persist(node.getDataSourcePath(schemaName), YamlEngine.marshal(yamlDataSourceConfigWrap));
160 161
    }
    
162
    private void persistRuleConfigurations(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations, final boolean isOverwrite) {
163 164
        if (!ruleConfigurations.isEmpty() && (isOverwrite || !hasRuleConfiguration(schemaName))) {
            persistRuleConfigurations(schemaName, ruleConfigurations);
165 166 167
        }
    }
    
168
    private void persistRuleConfigurations(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations) {
169 170 171 172
        Collection<RuleConfiguration> configurations = new LinkedList<>();
        for (RuleConfiguration each : ruleConfigurations) {
            if (each instanceof ShardingRuleConfiguration) {
                ShardingRuleConfiguration config = (ShardingRuleConfiguration) each;
173
                Preconditions.checkState(hasAvailableTableConfigurations(config),
174
                        "No available rule configurations in `%s` for governance.", schemaName);
175
                configurations.add(each);
176 177
            } else if (each instanceof AlgorithmProvidedShardingRuleConfiguration) {
                AlgorithmProvidedShardingRuleConfiguration config = (AlgorithmProvidedShardingRuleConfiguration) each;
178
                Preconditions.checkState(hasAvailableTableConfigurations(config),
179
                        "No available rule configurations in `%s` for governance.", schemaName);
180
                configurations.add(each);
181 182
            } else if (each instanceof AlgorithmProvidedPrimaryReplicaReplicationRuleConfiguration) {
                AlgorithmProvidedPrimaryReplicaReplicationRuleConfiguration config = (AlgorithmProvidedPrimaryReplicaReplicationRuleConfiguration) each;
183
                config.getDataSources().forEach(group -> Preconditions.checkState(
184
                        !group.getPrimaryDataSourceName().isEmpty(), "No available primary-replica-replication rule configuration in `%s` for governance.", schemaName));
185 186 187
                configurations.add(each);
            } else if (each instanceof AlgorithmProvidedEncryptRuleConfiguration) {
                AlgorithmProvidedEncryptRuleConfiguration config = (AlgorithmProvidedEncryptRuleConfiguration) each;
188
                Preconditions.checkState(!config.getEncryptors().isEmpty(), "No available encrypt rule configuration in `%s` for governance.", schemaName);
189
                configurations.add(each);
190 191
            } else if (each instanceof PrimaryReplicaReplicationRuleConfiguration) {
                PrimaryReplicaReplicationRuleConfiguration config = (PrimaryReplicaReplicationRuleConfiguration) each;
192
                config.getDataSources().forEach(group -> Preconditions.checkState(
193
                        !group.getPrimaryDataSourceName().isEmpty(), "No available primary-replica-replication rule configuration in `%s` for governance.", schemaName));
194 195 196
                configurations.add(each);
            } else if (each instanceof EncryptRuleConfiguration) {
                EncryptRuleConfiguration config = (EncryptRuleConfiguration) each;
197
                Preconditions.checkState(!config.getEncryptors().isEmpty(), "No available encrypt rule configuration in `%s` for governance.", schemaName);
198 199 200
                configurations.add(each);
            } else if (each instanceof ShadowRuleConfiguration) {
                ShadowRuleConfiguration config = (ShadowRuleConfiguration) each;
Y
Yanick.xia 已提交
201 202
                boolean isShadow = !config.getColumn().isEmpty() && null != config.getSourceDataSourceNames() && null != config.getShadowDataSourceNames();
                Preconditions.checkState(isShadow, "No available shadow rule configuration in `%s` for governance.", schemaName);
203
                configurations.add(each);
204
            }
205
        }
206
        YamlRootRuleConfigurations yamlRuleConfigurations = new YamlRootRuleConfigurations();
207
        yamlRuleConfigurations.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlConfigurations(configurations));
208
        repository.persist(node.getRulePath(schemaName), YamlEngine.marshal(yamlRuleConfigurations));
209 210
    }
    
211 212 213 214 215 216 217 218
    private boolean hasAvailableTableConfigurations(final ShardingRuleConfiguration configuration) {
        return !configuration.getTables().isEmpty() || null != configuration.getDefaultTableShardingStrategy() || !configuration.getAutoTables().isEmpty();
    }
    
    private boolean hasAvailableTableConfigurations(final AlgorithmProvidedShardingRuleConfiguration configuration) {
        return !configuration.getTables().isEmpty() || null != configuration.getDefaultTableShardingStrategy() || !configuration.getAutoTables().isEmpty();
    }
    
219
    private void persistAuthentication(final Authentication authentication, final boolean isOverwrite) {
220
        if (null != authentication && (isOverwrite || !hasAuthentication())) {
221
            repository.persist(node.getAuthenticationPath(), YamlEngine.marshal(new AuthenticationYamlSwapper().swapToYamlConfiguration(authentication)));
222 223 224 225
        }
    }
    
    private void persistProperties(final Properties props, final boolean isOverwrite) {
226
        if (!props.isEmpty() && (isOverwrite || !hasProperties())) {
kimmking's avatar
kimmking 已提交
227
            repository.persist(node.getPropsPath(), YamlEngine.marshal(props));
228 229 230
        }
    }
    
231 232 233 234 235
    private boolean hasProperties() {
        return !Strings.isNullOrEmpty(repository.get(node.getPropsPath()));
    }
    
    private void persistSchemaName(final String schemaName) {
236
        String schemaNames = repository.get(node.getSchemasPath());
237
        if (Strings.isNullOrEmpty(schemaNames)) {
238
            repository.persist(node.getSchemasPath(), schemaName);
239 240
            return;
        }
241 242
        List<String> schemaNameList = Splitter.on(",").splitToList(schemaNames);
        if (schemaNameList.contains(schemaName)) {
243 244
            return;
        }
245
        List<String> newArrayList = new ArrayList<>(schemaNameList);
246
        newArrayList.add(schemaName);
247
        repository.persist(node.getSchemasPath(), Joiner.on(",").join(newArrayList));
248 249
    }
    
H
Haoran Meng 已提交
250
    private void persistSchema(final String schemaName, final boolean isDrop) {
251
        String schemaNames = repository.get(node.getSchemasPath());
252 253
        Collection<String> schemas = Strings.isNullOrEmpty(schemaNames) ? new LinkedHashSet<>() 
                : new LinkedHashSet<>(Splitter.on(",").splitToList(schemaNames));
H
Haoran Meng 已提交
254
        if (isDrop) {
255 256 257 258
            schemas.remove(schemaName);
        } else if (!schemas.contains(schemaName)) {
            schemas.add(schemaName);
        }
259
        repository.persist(node.getSchemasPath(), Joiner.on(",").join(schemas));
260 261
    }
    
262 263 264
    /**
     * Load data source configurations.
     *
265
     * @param schemaName schema name
266 267
     * @return data source configurations
     */
268 269
    public Map<String, DataSourceConfiguration> loadDataSourceConfigurations(final String schemaName) {
        if (!hasDataSourceConfiguration(schemaName)) {
270 271
            return new LinkedHashMap<>();
        }
272 273
        YamlDataSourceConfigurationWrap result = YamlEngine.unmarshal(repository.get(node.getDataSourcePath(schemaName)), YamlDataSourceConfigurationWrap.class);
        return result.getDataSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> new DataSourceConfigurationYamlSwapper().swapToObject(entry.getValue())));
274 275 276
    }
    
    /**
277
     * Load rule configurations.
278
     *
279
     * @param schemaName schema name
280
     * @return rule configurations
281
     */
282 283 284
    public Collection<RuleConfiguration> loadRuleConfigurations(final String schemaName) {
        return hasRuleConfiguration(schemaName) ? new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
                YamlEngine.unmarshal(repository.get(node.getRulePath(schemaName)), YamlRootRuleConfigurations.class).getRules()) : new LinkedList<>();
285 286 287 288 289 290 291 292
    }
    
    /**
     * Load authentication.
     *
     * @return authentication
     */
    public Authentication loadAuthentication() {
293 294 295
        return hasAuthentication()
                ? new AuthenticationYamlSwapper().swapToObject(YamlEngine.unmarshal(repository.get(node.getAuthenticationPath()), YamlAuthenticationConfiguration.class))
                : new Authentication();
296 297 298 299 300 301 302 303
    }
    
    /**
     * Load properties configuration.
     *
     * @return properties
     */
    public Properties loadProperties() {
kimmking's avatar
kimmking 已提交
304
        return YamlEngine.unmarshalProperties(repository.get(node.getPropsPath()));
305 306 307
    }
    
    /**
L
Liang Zhang 已提交
308
     * Get all schema names.
309
     * 
310
     * @return all schema names
311
     */
L
Liang Zhang 已提交
312
    public Collection<String> getAllSchemaNames() {
313
        String schemaNames = repository.get(node.getSchemasPath());
314
        return Strings.isNullOrEmpty(schemaNames) ? new LinkedList<>() : node.splitSchemaName(schemaNames);
315
    }
316 317 318 319
    
    /**
     * Judge whether schema has rule configuration.
     *
320
     * @param schemaName schema name
321 322
     * @return has rule configuration or not
     */
323 324
    public boolean hasRuleConfiguration(final String schemaName) {
        return !Strings.isNullOrEmpty(repository.get(node.getRulePath(schemaName)));
325 326 327 328 329
    }
    
    /**
     * Judge whether schema has data source configuration.
     *
330
     * @param schemaName schema name
331 332
     * @return has data source configuration or not
     */
333 334
    public boolean hasDataSourceConfiguration(final String schemaName) {
        return !Strings.isNullOrEmpty(repository.get(node.getDataSourcePath(schemaName)));
335 336
    }
    
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
    /**
     * Persist rule schema meta data.
     *
     * @param schemaName schema name
     * @param ruleSchemaMetaData rule schema meta data of the schema
     */
    public void persistMetaData(final String schemaName, final RuleSchemaMetaData ruleSchemaMetaData) {
        repository.persist(node.getTablePath(schemaName), YamlEngine.marshal(new RuleSchemaMetaDataYamlSwapper().swapToYamlConfiguration(ruleSchemaMetaData)));
    }
    
    /**
     * Load rule schema meta data.
     *
     * @param schemaName schema name
     * @return rule schema meta data of the schema
     */
    public Optional<RuleSchemaMetaData> loadMetaData(final String schemaName) {
        String path = repository.get(node.getTablePath(schemaName));
        if (Strings.isNullOrEmpty(path)) {
            return Optional.empty();
        }
        return Optional.of(new RuleSchemaMetaDataYamlSwapper().swapToObject(YamlEngine.unmarshal(path, YamlRuleSchemaMetaData.class)));
    }
    
361
    /**
M
menghaoranss 已提交
362
     * Delete schema.
363 364 365 366 367 368 369
     * 
     * @param schemaName schema name
     */
    public void deleteSchema(final String schemaName) {
        repository.delete(node.getSchemaNamePath(schemaName));
    }
    
370 371 372
    private boolean hasAuthentication() {
        return !Strings.isNullOrEmpty(repository.get(node.getAuthenticationPath()));
    }
373
}