SchemaChangedListener.java 8.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.governance.core.config.listener;
19 20 21

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
22
import org.apache.commons.collections4.SetUtils;
23
import org.apache.shardingsphere.governance.core.config.ConfigCenterNode;
24
import org.apache.shardingsphere.governance.core.event.listener.PostGovernanceRepositoryEventListener;
L
Liang Zhang 已提交
25 26 27 28 29 30
import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
import org.apache.shardingsphere.governance.core.event.model.datasource.DataSourceChangedEvent;
import org.apache.shardingsphere.governance.core.event.model.metadata.MetaDataChangedEvent;
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaAddedEvent;
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaDeletedEvent;
31
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
32
import org.apache.shardingsphere.governance.core.yaml.config.metadata.YamlLogicSchemaMetaData;
33
import org.apache.shardingsphere.governance.core.yaml.swapper.DataSourceConfigurationYamlSwapper;
34
import org.apache.shardingsphere.governance.core.yaml.swapper.LogicSchemaMetaDataYamlSwapper;
35 36
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
kimmking's avatar
kimmking 已提交
37
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.Type;
38
import org.apache.shardingsphere.infra.metadata.model.schema.physical.model.schema.PhysicalSchemaMetaData;
39 40 41
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
42 43

import java.util.Collection;
44
import java.util.Collections;
45
import java.util.HashSet;
kimmking's avatar
kimmking 已提交
46
import java.util.LinkedHashMap;
47
import java.util.LinkedHashSet;
48
import java.util.Map.Entry;
49
import java.util.Optional;
50
import java.util.Set;
51
import java.util.stream.Collectors;
52 53 54 55

/**
 * Schema changed listener.
 */
56
public final class SchemaChangedListener extends PostGovernanceRepositoryEventListener {
57
    
kimmking's avatar
kimmking 已提交
58
    private final ConfigCenterNode configurationNode;
59
    
60
    private final Collection<String> existedSchemaNames;
61
    
M
menghaoranss 已提交
62 63 64
    public SchemaChangedListener(final ConfigurationRepository configurationRepository, final Collection<String> schemaNames) {
        super(configurationRepository, new ConfigCenterNode().getAllSchemaConfigPaths(schemaNames));
        configurationNode = new ConfigCenterNode();
65
        existedSchemaNames = new LinkedHashSet<>(schemaNames);
66 67 68
    }
    
    @Override
69
    protected Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
70
        // TODO Consider removing the following one.
71
        if (configurationNode.getSchemasPath().equals(event.getKey())) {
72 73
            return createSchemaNamesUpdatedEvent(event.getValue());
        }
74 75
        String schemaName = configurationNode.getSchemaName(event.getKey());
        if (Strings.isNullOrEmpty(schemaName) || !isValidNodeChangedEvent(schemaName, event.getKey())) {
76
            return Optional.empty();
77
        }
kimmking's avatar
kimmking 已提交
78
        if (Type.ADDED == event.getType()) {
79
            return Optional.of(createAddedEvent(schemaName));
80
        }
kimmking's avatar
kimmking 已提交
81
        if (Type.UPDATED == event.getType()) {
82
            return Optional.of(createUpdatedEvent(schemaName, event));
83
        }
kimmking's avatar
kimmking 已提交
84
        if (Type.DELETED == event.getType()) {
85 86
            existedSchemaNames.remove(schemaName);
            return Optional.of(new SchemaDeletedEvent(schemaName));
87
        }
88
        return Optional.empty();
89 90
    }
    
91 92 93
    private Optional<GovernanceEvent> createSchemaNamesUpdatedEvent(final String schemaNames) {
        Collection<String> persistedSchemaNames = configurationNode.splitSchemaName(schemaNames);
        Set<String> addedSchemaNames = SetUtils.difference(new HashSet<>(persistedSchemaNames), new HashSet<>(existedSchemaNames));
L
Liang Zhang 已提交
94
        if (!addedSchemaNames.isEmpty()) {
95
            return Optional.of(createAddedEvent(addedSchemaNames.iterator().next()));
96
        }
97
        Set<String> deletedSchemaNames = SetUtils.difference(new HashSet<>(existedSchemaNames), new HashSet<>(persistedSchemaNames));
L
Liang Zhang 已提交
98
        if (!deletedSchemaNames.isEmpty()) {
99 100 101
            String schemaName = deletedSchemaNames.iterator().next();
            existedSchemaNames.remove(schemaName);
            return Optional.of(new SchemaDeletedEvent(schemaName));
102
        }
103
        return Optional.empty();
104 105
    }
    
106
    private boolean isValidNodeChangedEvent(final String schemaName, final String nodeFullPath) {
107 108 109
        return !existedSchemaNames.contains(schemaName) || configurationNode.getDataSourcePath(schemaName).equals(nodeFullPath) 
                || configurationNode.getRulePath(schemaName).equals(nodeFullPath)
                || configurationNode.getTablePath(schemaName).equals(nodeFullPath);
110 111
    }
    
112 113
    private GovernanceEvent createAddedEvent(final String schemaName) {
        existedSchemaNames.add(schemaName);
114
        return new SchemaAddedEvent(schemaName, Collections.emptyMap(), Collections.emptyList());
115 116
    }
    
117
    private GovernanceEvent createUpdatedEvent(final String schemaName, final DataChangedEvent event) {
118
        // TODO Consider remove judgement.
119
        return existedSchemaNames.contains(schemaName) ? createUpdatedEventForExistedSchema(schemaName, event) : createAddedEvent(schemaName);
120 121
    }
    
122
    private GovernanceEvent createUpdatedEventForExistedSchema(final String schemaName, final DataChangedEvent event) {
123 124 125 126 127
        if (event.getKey().equals(configurationNode.getDataSourcePath(schemaName))) {
            return createDataSourceChangedEvent(schemaName, event);
        } else if (event.getKey().equals(configurationNode.getRulePath(schemaName))) {
            return createRuleChangedEvent(schemaName, event);
        }
kimmking's avatar
kimmking 已提交
128
        return createMetaDataChangedEvent(schemaName, event);
129 130
    }
    
131
    private DataSourceChangedEvent createDataSourceChangedEvent(final String schemaName, final DataChangedEvent event) {
132 133 134
        YamlDataSourceConfigurationWrap result = YamlEngine.unmarshal(event.getValue(), YamlDataSourceConfigurationWrap.class);
        Preconditions.checkState(null != result && !result.getDataSources().isEmpty(), "No available data sources to load for governance.");
        return new DataSourceChangedEvent(schemaName, result.getDataSources().entrySet().stream()
kimmking's avatar
kimmking 已提交
135
                .collect(Collectors.toMap(Entry::getKey, entry -> new DataSourceConfigurationYamlSwapper().swapToObject(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
136 137
    }
    
138
    private GovernanceEvent createRuleChangedEvent(final String schemaName, final DataChangedEvent event) {
T
tristaZero 已提交
139
        YamlRootRuleConfigurations configurations = YamlEngine.unmarshal(event.getValue(), YamlRootRuleConfigurations.class);
140
        Preconditions.checkState(null != configurations, "No available rule to load for governance.");
141
        return new RuleConfigurationsChangedEvent(schemaName, new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(configurations.getRules()));
142 143
    }
    
kimmking's avatar
kimmking 已提交
144
    private GovernanceEvent createMetaDataChangedEvent(final String schemaName, final DataChangedEvent event) {
145 146
        PhysicalSchemaMetaData physicalSchemaMetaData = new LogicSchemaMetaDataYamlSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(), YamlLogicSchemaMetaData.class));
        return new MetaDataChangedEvent(schemaName, physicalSchemaMetaData);
147
    }
148
}