ConfigCenter.java 17.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.governance.core.config;
19

20
import com.google.common.base.Joiner;
21
import com.google.common.base.Preconditions;
22
import com.google.common.base.Splitter;
23
import com.google.common.base.Strings;
24
import com.google.common.eventbus.Subscribe;
25
import org.apache.shardingsphere.encrypt.algorithm.config.AlgorithmProvidedEncryptRuleConfiguration;
26
import org.apache.shardingsphere.encrypt.api.config.EncryptRuleConfiguration;
27
import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
L
Liang Zhang 已提交
28 29 30 31
import org.apache.shardingsphere.governance.core.event.model.persist.DataSourcePersistEvent;
import org.apache.shardingsphere.governance.core.event.model.persist.MetaDataPersistEvent;
import org.apache.shardingsphere.governance.core.event.model.persist.RulePersistEvent;
import org.apache.shardingsphere.governance.core.event.model.persist.SchemaNamePersistEvent;
32
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfiguration;
33
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
34
import org.apache.shardingsphere.governance.core.yaml.config.metadata.YamlLogicSchemaMetaData;
35
import org.apache.shardingsphere.governance.core.yaml.swapper.DataSourceConfigurationYamlSwapper;
36
import org.apache.shardingsphere.governance.core.yaml.swapper.LogicSchemaMetaDataYamlSwapper;
37
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
38 39 40 41
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration;
import org.apache.shardingsphere.infra.auth.yaml.swapper.AuthenticationYamlSwapper;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
42
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
43
import org.apache.shardingsphere.infra.metadata.schema.model.physical.PhysicalSchemaMetaData;
44 45 46
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
47 48 49
import org.apache.shardingsphere.replicaquery.algorithm.config.AlgorithmProvidedReplicaQueryRuleConfiguration;
import org.apache.shardingsphere.replicaquery.api.config.ReplicaQueryRuleConfiguration;
import org.apache.shardingsphere.replicaquery.api.config.rule.ReplicaQueryDataSourceRuleConfiguration;
Y
yanyzy 已提交
50
import org.apache.shardingsphere.shadow.api.config.ShadowRuleConfiguration;
51
import org.apache.shardingsphere.sharding.algorithm.config.AlgorithmProvidedShardingRuleConfiguration;
52
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
53

54 55 56
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
57
import java.util.LinkedHashSet;
58 59 60 61
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
62
import java.util.Optional;
63 64 65
import java.util.Properties;
import java.util.stream.Collectors;

66
/**
67
 * Config center.
68
 */
kimmking's avatar
kimmking 已提交
69
public final class ConfigCenter {
70
    
kimmking's avatar
kimmking 已提交
71
    private final ConfigCenterNode node;
72
    
73
    private final ConfigurationRepository repository;
74
    
M
menghaoranss 已提交
75 76
    public ConfigCenter(final ConfigurationRepository repository) {
        node = new ConfigCenterNode();
77
        this.repository = repository;
78
        GovernanceEventBus.getInstance().register(this);
79 80 81 82 83
    }
    
    /**
     * Persist rule configuration.
     *
84
     * @param schemaName schema name
85
     * @param dataSourceConfigs data source configuration map
86
     * @param ruleConfigurations rule configurations
87
     * @param isOverwrite is overwrite config center's configuration
88
     */
89
    public void persistConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigs,
90
                                      final Collection<RuleConfiguration> ruleConfigurations, final boolean isOverwrite) {
91
        persistDataSourceConfigurations(schemaName, dataSourceConfigs, isOverwrite);
92
        persistRuleConfigurations(schemaName, ruleConfigurations, isOverwrite);
T
tristaZero 已提交
93
        // TODO Consider removing the following one.
94
        persistSchemaName(schemaName);
95 96 97 98 99 100 101 102 103 104
    }
    
    /**
     * Persist global configuration.
     *
     * @param authentication authentication
     * @param props properties
     * @param isOverwrite is overwrite config center's configuration
     */
    public void persistGlobalConfiguration(final Authentication authentication, final Properties props, final boolean isOverwrite) {
105 106 107 108
        persistAuthentication(authentication, isOverwrite);
        persistProperties(props, isOverwrite);
    }
    
109 110 111 112 113
    /**
     * persist data source configurations.
     * @param event Data source event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
114
    public synchronized void renew(final DataSourcePersistEvent event) {
115 116 117 118 119 120 121 122 123
        persistDataSourceConfigurations(event.getSchemaName(), event.getDataSourceConfigurations());
    }
    
    /**
     * Persist rule configurations.
     * 
     * @param event Rule event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
124
    public synchronized void renew(final RulePersistEvent event) {
125 126 127 128 129 130 131 132 133
        persistRuleConfigurations(event.getSchemaName(), event.getRuleConfigurations());
    }
    
    /**
     * Persist schema name.
     * 
     * @param event Schema name event.
     */
    @Subscribe
kimmking's avatar
kimmking 已提交
134
    public synchronized void renew(final SchemaNamePersistEvent event) {
H
Haoran Meng 已提交
135
        persistSchema(event.getSchemaName(), event.isDrop());
136 137
    }
    
138 139 140 141 142 143 144 145 146 147
    /**
     * Persist meta data.
     *
     * @param event Meta data event.
     */
    @Subscribe
    public synchronized void renew(final MetaDataPersistEvent event) {
        persistMetaData(event.getSchemaName(), event.getMetaData());
    }
    
148
    private void persistDataSourceConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigurations, final boolean isOverwrite) {
149 150
        if (!dataSourceConfigurations.isEmpty() && (isOverwrite || !hasDataSourceConfiguration(schemaName))) {
            persistDataSourceConfigurations(schemaName, dataSourceConfigurations);
151 152 153
        }
    }
    
154
    private void persistDataSourceConfigurations(final String schemaName, final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
155
        Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data source in `%s` for governance.", schemaName);
kimmking's avatar
kimmking 已提交
156 157
        Map<String, YamlDataSourceConfiguration> yamlDataSourceConfigurations = dataSourceConfigurations.entrySet().stream().collect(Collectors.toMap(Entry::getKey,
            entry -> new DataSourceConfigurationYamlSwapper().swapToYamlConfiguration(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
158 159 160
        YamlDataSourceConfigurationWrap yamlDataSourceConfigWrap = new YamlDataSourceConfigurationWrap();
        yamlDataSourceConfigWrap.setDataSources(yamlDataSourceConfigurations);
        repository.persist(node.getDataSourcePath(schemaName), YamlEngine.marshal(yamlDataSourceConfigWrap));
161 162
    }
    
163
    private void persistRuleConfigurations(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations, final boolean isOverwrite) {
164 165
        if (!ruleConfigurations.isEmpty() && (isOverwrite || !hasRuleConfiguration(schemaName))) {
            persistRuleConfigurations(schemaName, ruleConfigurations);
166 167 168
        }
    }
    
169
    private void persistRuleConfigurations(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations) {
170
        Collection<RuleConfiguration> configs = new LinkedList<>();
171 172 173
        for (RuleConfiguration each : ruleConfigurations) {
            if (each instanceof ShardingRuleConfiguration) {
                ShardingRuleConfiguration config = (ShardingRuleConfiguration) each;
174
                Preconditions.checkState(hasAvailableTableConfigurations(config),
175 176
                        "No available rule configs in `%s` for governance.", schemaName);
                configs.add(each);
177 178
            } else if (each instanceof AlgorithmProvidedShardingRuleConfiguration) {
                AlgorithmProvidedShardingRuleConfiguration config = (AlgorithmProvidedShardingRuleConfiguration) each;
179
                Preconditions.checkState(hasAvailableTableConfigurations(config),
180 181
                        "No available rule configs in `%s` for governance.", schemaName);
                configs.add(each);
182 183
            } else if (each instanceof AlgorithmProvidedReplicaQueryRuleConfiguration) {
                AlgorithmProvidedReplicaQueryRuleConfiguration config = (AlgorithmProvidedReplicaQueryRuleConfiguration) each;
kimmking's avatar
kimmking 已提交
184
                checkDataSources(schemaName, config.getDataSources());
185
                configs.add(each);
186 187
            } else if (each instanceof AlgorithmProvidedEncryptRuleConfiguration) {
                AlgorithmProvidedEncryptRuleConfiguration config = (AlgorithmProvidedEncryptRuleConfiguration) each;
188
                Preconditions.checkState(!config.getEncryptors().isEmpty(), "No available encrypt rule configuration in `%s` for governance.", schemaName);
189
                configs.add(each);
190 191
            } else if (each instanceof ReplicaQueryRuleConfiguration) {
                ReplicaQueryRuleConfiguration config = (ReplicaQueryRuleConfiguration) each;
kimmking's avatar
kimmking 已提交
192
                checkDataSources(schemaName, config.getDataSources());
193
                configs.add(each);
194 195
            } else if (each instanceof EncryptRuleConfiguration) {
                EncryptRuleConfiguration config = (EncryptRuleConfiguration) each;
196
                Preconditions.checkState(!config.getEncryptors().isEmpty(), "No available encrypt rule configuration in `%s` for governance.", schemaName);
197
                configs.add(each);
198 199
            } else if (each instanceof ShadowRuleConfiguration) {
                ShadowRuleConfiguration config = (ShadowRuleConfiguration) each;
Y
Yanick.xia 已提交
200 201
                boolean isShadow = !config.getColumn().isEmpty() && null != config.getSourceDataSourceNames() && null != config.getShadowDataSourceNames();
                Preconditions.checkState(isShadow, "No available shadow rule configuration in `%s` for governance.", schemaName);
202
                configs.add(each);
203
            }
204
        }
205 206 207
        YamlRootRuleConfigurations yamlRuleConfigs = new YamlRootRuleConfigurations();
        yamlRuleConfigs.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlConfigurations(configs));
        repository.persist(node.getRulePath(schemaName), YamlEngine.marshal(yamlRuleConfigs));
208 209
    }
    
210
    private void checkDataSources(final String schemaName, final Collection<ReplicaQueryDataSourceRuleConfiguration> dataSources) {
kimmking's avatar
kimmking 已提交
211
        dataSources.forEach(each -> Preconditions.checkState(
212
                !each.getPrimaryDataSourceName().isEmpty(), "No available replica-query rule configuration in `%s` for governance.", schemaName));
kimmking's avatar
kimmking 已提交
213 214
    }
    
215 216
    private boolean hasAvailableTableConfigurations(final ShardingRuleConfiguration config) {
        return !config.getTables().isEmpty() || null != config.getDefaultTableShardingStrategy() || !config.getAutoTables().isEmpty();
217 218
    }
    
219 220
    private boolean hasAvailableTableConfigurations(final AlgorithmProvidedShardingRuleConfiguration config) {
        return !config.getTables().isEmpty() || null != config.getDefaultTableShardingStrategy() || !config.getAutoTables().isEmpty();
221 222
    }
    
223
    private void persistAuthentication(final Authentication authentication, final boolean isOverwrite) {
224
        if (null != authentication && (isOverwrite || !hasAuthentication())) {
225
            repository.persist(node.getAuthenticationPath(), YamlEngine.marshal(new AuthenticationYamlSwapper().swapToYamlConfiguration(authentication)));
226 227 228 229
        }
    }
    
    private void persistProperties(final Properties props, final boolean isOverwrite) {
230
        if (!props.isEmpty() && (isOverwrite || !hasProperties())) {
kimmking's avatar
kimmking 已提交
231
            repository.persist(node.getPropsPath(), YamlEngine.marshal(props));
232 233 234
        }
    }
    
235 236 237 238 239
    private boolean hasProperties() {
        return !Strings.isNullOrEmpty(repository.get(node.getPropsPath()));
    }
    
    private void persistSchemaName(final String schemaName) {
240
        String schemaNames = repository.get(node.getSchemasPath());
241
        if (Strings.isNullOrEmpty(schemaNames)) {
242
            repository.persist(node.getSchemasPath(), schemaName);
243 244
            return;
        }
245 246
        List<String> schemaNameList = Splitter.on(",").splitToList(schemaNames);
        if (schemaNameList.contains(schemaName)) {
247 248
            return;
        }
249
        List<String> newArrayList = new ArrayList<>(schemaNameList);
250
        newArrayList.add(schemaName);
251
        repository.persist(node.getSchemasPath(), Joiner.on(",").join(newArrayList));
252 253
    }
    
H
Haoran Meng 已提交
254
    private void persistSchema(final String schemaName, final boolean isDrop) {
255
        String schemaNames = repository.get(node.getSchemasPath());
256
        Collection<String> schemas = Strings.isNullOrEmpty(schemaNames) ? new LinkedHashSet<>() : new LinkedHashSet<>(Splitter.on(",").splitToList(schemaNames));
H
Haoran Meng 已提交
257
        if (isDrop) {
258 259 260 261
            schemas.remove(schemaName);
        } else if (!schemas.contains(schemaName)) {
            schemas.add(schemaName);
        }
262
        repository.persist(node.getSchemasPath(), Joiner.on(",").join(schemas));
263 264
    }
    
265 266 267
    /**
     * Load data source configurations.
     *
268
     * @param schemaName schema name
269 270
     * @return data source configurations
     */
271 272
    public Map<String, DataSourceConfiguration> loadDataSourceConfigurations(final String schemaName) {
        if (!hasDataSourceConfiguration(schemaName)) {
273 274
            return new LinkedHashMap<>();
        }
275
        YamlDataSourceConfigurationWrap result = YamlEngine.unmarshal(repository.get(node.getDataSourcePath(schemaName)), YamlDataSourceConfigurationWrap.class);
kimmking's avatar
kimmking 已提交
276 277
        return result.getDataSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
            entry -> new DataSourceConfigurationYamlSwapper().swapToObject(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
278 279 280
    }
    
    /**
281
     * Load rule configurations.
282
     *
283
     * @param schemaName schema name
284
     * @return rule configurations
285
     */
286 287 288
    public Collection<RuleConfiguration> loadRuleConfigurations(final String schemaName) {
        return hasRuleConfiguration(schemaName) ? new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
                YamlEngine.unmarshal(repository.get(node.getRulePath(schemaName)), YamlRootRuleConfigurations.class).getRules()) : new LinkedList<>();
289 290 291 292 293 294 295 296
    }
    
    /**
     * Load authentication.
     *
     * @return authentication
     */
    public Authentication loadAuthentication() {
297 298 299
        return hasAuthentication()
                ? new AuthenticationYamlSwapper().swapToObject(YamlEngine.unmarshal(repository.get(node.getAuthenticationPath()), YamlAuthenticationConfiguration.class))
                : new Authentication();
300 301 302 303 304 305 306 307
    }
    
    /**
     * Load properties configuration.
     *
     * @return properties
     */
    public Properties loadProperties() {
kimmking's avatar
kimmking 已提交
308
        return YamlEngine.unmarshalProperties(repository.get(node.getPropsPath()));
309 310 311
    }
    
    /**
L
Liang Zhang 已提交
312
     * Get all schema names.
313
     * 
314
     * @return all schema names
315
     */
L
Liang Zhang 已提交
316
    public Collection<String> getAllSchemaNames() {
317
        String schemaNames = repository.get(node.getSchemasPath());
318
        return Strings.isNullOrEmpty(schemaNames) ? new LinkedList<>() : node.splitSchemaName(schemaNames);
319
    }
320 321 322 323
    
    /**
     * Judge whether schema has rule configuration.
     *
324
     * @param schemaName schema name
325 326
     * @return has rule configuration or not
     */
327 328
    public boolean hasRuleConfiguration(final String schemaName) {
        return !Strings.isNullOrEmpty(repository.get(node.getRulePath(schemaName)));
329 330 331 332 333
    }
    
    /**
     * Judge whether schema has data source configuration.
     *
334
     * @param schemaName schema name
335 336
     * @return has data source configuration or not
     */
337 338
    public boolean hasDataSourceConfiguration(final String schemaName) {
        return !Strings.isNullOrEmpty(repository.get(node.getDataSourcePath(schemaName)));
339 340
    }
    
341 342 343 344
    /**
     * Persist rule schema meta data.
     *
     * @param schemaName schema name
345
     * @param physicalSchemaMetaData physical schema meta data
346
     */
347 348
    public void persistMetaData(final String schemaName, final PhysicalSchemaMetaData physicalSchemaMetaData) {
        repository.persist(node.getTablePath(schemaName), YamlEngine.marshal(new LogicSchemaMetaDataYamlSwapper().swapToYamlConfiguration(physicalSchemaMetaData)));
349 350 351 352 353 354 355 356
    }
    
    /**
     * Load rule schema meta data.
     *
     * @param schemaName schema name
     * @return rule schema meta data of the schema
     */
357
    public Optional<PhysicalSchemaMetaData> loadMetaData(final String schemaName) {
358 359 360 361
        String path = repository.get(node.getTablePath(schemaName));
        if (Strings.isNullOrEmpty(path)) {
            return Optional.empty();
        }
362
        return Optional.of(new LogicSchemaMetaDataYamlSwapper().swapToObject(YamlEngine.unmarshal(path, YamlLogicSchemaMetaData.class)));
363 364
    }
    
365
    /**
M
menghaoranss 已提交
366
     * Delete schema.
367 368 369 370 371 372 373
     * 
     * @param schemaName schema name
     */
    public void deleteSchema(final String schemaName) {
        repository.delete(node.getSchemaNamePath(schemaName));
    }
    
374 375 376
    private boolean hasAuthentication() {
        return !Strings.isNullOrEmpty(repository.get(node.getAuthenticationPath()));
    }
377
}