未验证 提交 03d03439 编写于 作者: L Liang Zhang 提交者: GitHub

Minor changes of governance listener (#7270)

* Refactor GovernanceSchemaContexts

* rename shardingSchemaName to schemaName

* Refactor SchemaChangedListener

* Refactor PostGovernanceRepositoryEventListener

* Rename configurationService to configCenter

* Refactor SchemaChangedListener
上级 2d0e0a52
......@@ -39,16 +39,16 @@ public abstract class PostGovernanceRepositoryEventListener implements Governanc
private final Collection<String> watchKeys;
@Override
public final void watch(final ChangedType... watchedChangedTypes) {
Collection<ChangedType> watchedChangedTypeList = Arrays.asList(watchedChangedTypes);
public final void watch(final ChangedType... changedTypes) {
Collection<ChangedType> watchedChangedTypeList = Arrays.asList(changedTypes);
for (String watchKey : watchKeys) {
watch(watchKey, watchedChangedTypeList);
}
}
private void watch(final String watchKey, final Collection<ChangedType> watchedChangedTypeList) {
private void watch(final String watchKey, final Collection<ChangedType> changedTypes) {
governanceRepository.watch(watchKey, dataChangedEvent -> {
if (watchedChangedTypeList.contains(dataChangedEvent.getChangedType())) {
if (changedTypes.contains(dataChangedEvent.getChangedType())) {
Optional<GovernanceEvent> event = createGovernanceEvent(dataChangedEvent);
event.ifPresent(ShardingSphereEventBus.getInstance()::post);
}
......
......@@ -20,12 +20,8 @@ package org.apache.shardingsphere.governance.core.config.listener;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.collections4.SetUtils;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.governance.core.common.event.datasource.DataSourceChangedEvent;
import org.apache.shardingsphere.governance.core.common.event.GovernanceEvent;
import org.apache.shardingsphere.governance.core.common.event.datasource.DataSourceChangedEvent;
import org.apache.shardingsphere.governance.core.common.event.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.governance.core.common.event.schema.SchemaAddedEvent;
import org.apache.shardingsphere.governance.core.common.event.schema.SchemaDeletedEvent;
......@@ -37,6 +33,9 @@ import org.apache.shardingsphere.governance.core.config.ConfigCenterNode;
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.ChangedType;
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
import java.util.Collection;
import java.util.Collections;
......@@ -53,17 +52,17 @@ import java.util.stream.Collectors;
*/
public final class SchemaChangedListener extends PostGovernanceRepositoryEventListener {
private final ConfigCenter configurationService;
private final ConfigCenter configCenter;
private final ConfigCenterNode configurationNode;
private final Collection<String> existedSchemaNames = new LinkedHashSet<>();
private final Collection<String> existedSchemaNames;
public SchemaChangedListener(final ConfigurationRepository configurationRepository, final Collection<String> schemaNames) {
super(configurationRepository, new ConfigCenterNode().getAllSchemaConfigPaths(schemaNames));
configurationService = new ConfigCenter(configurationRepository);
configCenter = new ConfigCenter(configurationRepository);
configurationNode = new ConfigCenterNode();
existedSchemaNames.addAll(schemaNames);
existedSchemaNames = new LinkedHashSet<>(schemaNames);
}
@Override
......@@ -72,83 +71,73 @@ public final class SchemaChangedListener extends PostGovernanceRepositoryEventLi
if (configurationNode.getSchemaPath().equals(event.getKey())) {
return createSchemaNamesUpdatedEvent(event.getValue());
}
String shardingSchemaName = configurationNode.getSchemaName(event.getKey());
if (Strings.isNullOrEmpty(shardingSchemaName) || !isValidNodeChangedEvent(shardingSchemaName, event.getKey())) {
String schemaName = configurationNode.getSchemaName(event.getKey());
if (Strings.isNullOrEmpty(schemaName) || !isValidNodeChangedEvent(schemaName, event.getKey())) {
return Optional.empty();
}
if (ChangedType.ADDED == event.getChangedType()) {
return Optional.of(createAddedEvent(shardingSchemaName));
return Optional.of(createAddedEvent(schemaName));
}
if (ChangedType.UPDATED == event.getChangedType()) {
return Optional.of(createUpdatedEvent(shardingSchemaName, event));
return Optional.of(createUpdatedEvent(schemaName, event));
}
if (ChangedType.DELETED == event.getChangedType()) {
return Optional.of(createDeletedEvent(shardingSchemaName));
existedSchemaNames.remove(schemaName);
return Optional.of(new SchemaDeletedEvent(schemaName));
}
return Optional.empty();
}
private Optional<GovernanceEvent> createSchemaNamesUpdatedEvent(final String shardingSchemaNames) {
Collection<String> persistShardingSchemaNames = configurationNode.splitSchemaName(shardingSchemaNames);
Set<String> addedSchemaNames = SetUtils.difference(new HashSet<>(persistShardingSchemaNames), new HashSet<>(existedSchemaNames));
private Optional<GovernanceEvent> createSchemaNamesUpdatedEvent(final String schemaNames) {
Collection<String> persistedSchemaNames = configurationNode.splitSchemaName(schemaNames);
Set<String> addedSchemaNames = SetUtils.difference(new HashSet<>(persistedSchemaNames), new HashSet<>(existedSchemaNames));
if (!addedSchemaNames.isEmpty()) {
return Optional.of(createAddedEvent(addedSchemaNames.iterator().next()));
}
Set<String> deletedSchemaNames = SetUtils.difference(new HashSet<>(existedSchemaNames), new HashSet<>(persistShardingSchemaNames));
Set<String> deletedSchemaNames = SetUtils.difference(new HashSet<>(existedSchemaNames), new HashSet<>(persistedSchemaNames));
if (!deletedSchemaNames.isEmpty()) {
return Optional.of(createDeletedEvent(deletedSchemaNames.iterator().next()));
String schemaName = deletedSchemaNames.iterator().next();
existedSchemaNames.remove(schemaName);
return Optional.of(new SchemaDeletedEvent(schemaName));
}
return Optional.empty();
}
private boolean isValidNodeChangedEvent(final String shardingSchemaName, final String nodeFullPath) {
return !existedSchemaNames.contains(shardingSchemaName)
|| configurationNode.getDataSourcePath(shardingSchemaName).equals(nodeFullPath) || configurationNode.getRulePath(shardingSchemaName).equals(nodeFullPath);
private boolean isValidNodeChangedEvent(final String schemaName, final String nodeFullPath) {
return !existedSchemaNames.contains(schemaName) || configurationNode.getDataSourcePath(schemaName).equals(nodeFullPath) || configurationNode.getRulePath(schemaName).equals(nodeFullPath);
}
private GovernanceEvent createAddedEvent(final String shardingSchemaName) {
existedSchemaNames.add(shardingSchemaName);
if (!isOwnCompleteConfigurations(shardingSchemaName)) {
return new SchemaAddedEvent(shardingSchemaName, Collections.emptyMap(), Collections.emptyList());
}
return new SchemaAddedEvent(shardingSchemaName, configurationService.loadDataSourceConfigurations(shardingSchemaName), createRuleConfigurations(shardingSchemaName));
private GovernanceEvent createAddedEvent(final String schemaName) {
existedSchemaNames.add(schemaName);
return isOwnCompleteConfigurations(schemaName)
? new SchemaAddedEvent(schemaName, configCenter.loadDataSourceConfigurations(schemaName), configCenter.loadRuleConfigurations(schemaName))
: new SchemaAddedEvent(schemaName, Collections.emptyMap(), Collections.emptyList());
}
private GovernanceEvent createUpdatedEvent(final String shardingSchemaName, final DataChangedEvent event) {
private GovernanceEvent createUpdatedEvent(final String schemaName, final DataChangedEvent event) {
// TODO Consider remove judgement.
return existedSchemaNames.contains(shardingSchemaName) ? createUpdatedEventForExistedSchema(event, shardingSchemaName) : createAddedEvent(shardingSchemaName);
return existedSchemaNames.contains(schemaName) ? createUpdatedEventForExistedSchema(schemaName, event) : createAddedEvent(schemaName);
}
private GovernanceEvent createUpdatedEventForExistedSchema(final DataChangedEvent event, final String shardingSchemaName) {
return event.getKey().equals(configurationNode.getDataSourcePath(shardingSchemaName))
? createDataSourceChangedEvent(shardingSchemaName, event) : createRuleChangedEvent(shardingSchemaName, event);
private GovernanceEvent createUpdatedEventForExistedSchema(final String schemaName, final DataChangedEvent event) {
return event.getKey().equals(configurationNode.getDataSourcePath(schemaName)) ? createDataSourceChangedEvent(schemaName, event) : createRuleChangedEvent(schemaName, event);
}
@SuppressWarnings("unchecked")
private DataSourceChangedEvent createDataSourceChangedEvent(final String shardingSchemaName, final DataChangedEvent event) {
private DataSourceChangedEvent createDataSourceChangedEvent(final String schemaName, final DataChangedEvent event) {
Map<String, YamlDataSourceConfiguration> dataSourceConfigurations = (Map) YamlEngine.unmarshal(event.getValue());
Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data sources to load for governance.");
return new DataSourceChangedEvent(shardingSchemaName, dataSourceConfigurations.entrySet().stream()
return new DataSourceChangedEvent(schemaName, dataSourceConfigurations.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> new DataSourceConfigurationYamlSwapper().swapToObject(entry.getValue()))));
}
private GovernanceEvent createRuleChangedEvent(final String shardingSchemaName, final DataChangedEvent event) {
private GovernanceEvent createRuleChangedEvent(final String schemaName, final DataChangedEvent event) {
YamlRootRuleConfigurations configurations = YamlEngine.unmarshal(event.getValue(), YamlRootRuleConfigurations.class);
Preconditions.checkState(null != configurations, "No available rule to load for governance.");
return new RuleConfigurationsChangedEvent(
shardingSchemaName, new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(configurations.getRules()));
}
private boolean isOwnCompleteConfigurations(final String shardingSchemaName) {
return configurationService.hasDataSourceConfiguration(shardingSchemaName) && configurationService.hasRuleConfiguration(shardingSchemaName);
}
private Collection<RuleConfiguration> createRuleConfigurations(final String shardingSchemaName) {
return configurationService.loadRuleConfigurations(shardingSchemaName);
return new RuleConfigurationsChangedEvent(schemaName, new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(configurations.getRules()));
}
private GovernanceEvent createDeletedEvent(final String shardingSchemaName) {
existedSchemaNames.remove(shardingSchemaName);
return new SchemaDeletedEvent(shardingSchemaName);
private boolean isOwnCompleteConfigurations(final String schemaName) {
return configCenter.hasDataSourceConfiguration(schemaName) && configCenter.hasRuleConfiguration(schemaName);
}
}
......@@ -139,7 +139,7 @@ public final class SchemaChangedListenerTest {
}
@Test
public void assertCreateShardingSchemaAddedEventForNewSchema() {
public void assertCreateSchemaAddedEventForNewSchema() {
String shardingRule = readYAML(SHARDING_RULE_FILE);
String dataSource = readYAML(DATA_SOURCE_FILE);
when(configurationRepository.get("/config/schema/logic_db/rule")).thenReturn(shardingRule);
......
......@@ -80,12 +80,12 @@ public final class RegistryCenterNode {
}
/**
* Get governance sharding schema.
* Get governance schema.
*
* @param dataSourceNodeFullPath data source node full path
* @return governance sharding schema
* @return governance schema
*/
public Optional<GovernanceSchema> getGovernanceShardingSchema(final String dataSourceNodeFullPath) {
public Optional<GovernanceSchema> getGovernanceSchema(final String dataSourceNodeFullPath) {
Pattern pattern = Pattern.compile(getDataSourcesNodeFullRootPath() + "/" + "(\\w+)/(\\w+)$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(dataSourceNodeFullPath);
return matcher.find() ? Optional.of(new GovernanceSchema(matcher.group(1), matcher.group(2))) : Optional.empty();
......
......@@ -44,7 +44,7 @@ public final class DataSourceStateChangedListener extends PostGovernanceReposito
@Override
protected Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
Optional<GovernanceSchema> governanceSchema = registryCenterNode.getGovernanceShardingSchema(event.getKey());
Optional<GovernanceSchema> governanceSchema = registryCenterNode.getGovernanceSchema(event.getKey());
if (governanceSchema.isPresent()) {
return Optional.of(new DisabledStateChangedEvent(governanceSchema.get(), isDataSourceDisabled(event)));
}
......
......@@ -24,42 +24,42 @@ import java.util.LinkedList;
import java.util.Map;
/**
* Governance sharding schema group.
* Governance schema group.
*/
public final class GovernanceShardingSchemaGroup {
public final class GovernanceSchemaGroup {
private final Map<String, Collection<String>> schemaGroup = new HashMap<>();
/**
* Add governance sharding schema.
* Add governance schema.
*
* @param governanceShardingSchema governance sharding schema
* @param governanceSchema governance schema
*/
public void add(final GovernanceSchema governanceShardingSchema) {
String schemaName = governanceShardingSchema.getSchemaName();
public void add(final GovernanceSchema governanceSchema) {
String schemaName = governanceSchema.getSchemaName();
if (!schemaGroup.containsKey(schemaName)) {
schemaGroup.put(schemaName, new LinkedList<>());
}
schemaGroup.get(schemaName).add(governanceShardingSchema.getDataSourceName());
schemaGroup.get(schemaName).add(governanceSchema.getDataSourceName());
}
/**
* Put governance sharding schema.
* Put governance schema.
*
* @param shardingSchemaName sharding schema name
* @param schemaName schema name
* @param dataSourceNames data source names
*/
public void put(final String shardingSchemaName, final Collection<String> dataSourceNames) {
schemaGroup.put(shardingSchemaName, dataSourceNames);
public void put(final String schemaName, final Collection<String> dataSourceNames) {
schemaGroup.put(schemaName, dataSourceNames);
}
/**
* Get data source names.
*
* @param shardingSchemaName sharding schema name
* @param schemaName schema name
* @return data source names
*/
public Collection<String> getDataSourceNames(final String shardingSchemaName) {
return schemaGroup.getOrDefault(shardingSchemaName, Collections.emptyList());
public Collection<String> getDataSourceNames(final String schemaName) {
return schemaGroup.getOrDefault(schemaName, Collections.emptyList());
}
}
......@@ -41,8 +41,8 @@ public final class RegistryCenterNodeTest {
}
@Test
public void assertGetGovernanceShardingSchema() {
assertThat(registryCenterNode.getGovernanceShardingSchema("/registry/datasources/master_slave_db/slave_ds_0").get().getSchemaName(), is("master_slave_db"));
public void assertGetGovernanceSchema() {
assertThat(registryCenterNode.getGovernanceSchema("/registry/datasources/master_slave_db/slave_ds_0").get().getSchemaName(), is("master_slave_db"));
}
@Test
......
......@@ -25,11 +25,11 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class GovernanceShardingSchemaGroupTest {
public final class GovernanceSchemaGroupTest {
@Test
public void assertAddWithExistedSchemaName() {
GovernanceShardingSchemaGroup actual = new GovernanceShardingSchemaGroup();
GovernanceSchemaGroup actual = new GovernanceSchemaGroup();
actual.add(new GovernanceSchema("test_0.ds_0"));
actual.add(new GovernanceSchema("test_0.ds_1"));
assertThat(actual.getDataSourceNames("test_0").size(), is(2));
......@@ -39,7 +39,7 @@ public final class GovernanceShardingSchemaGroupTest {
@Test
public void assertAddWithoutExistedSchemaName() {
GovernanceShardingSchemaGroup actual = new GovernanceShardingSchemaGroup();
GovernanceSchemaGroup actual = new GovernanceSchemaGroup();
actual.add(new GovernanceSchema("test_0.ds_0"));
actual.add(new GovernanceSchema("test_1.ds_1"));
assertThat(actual.getDataSourceNames("test_0").size(), is(1));
......@@ -50,7 +50,7 @@ public final class GovernanceShardingSchemaGroupTest {
@Test
public void assertPut() {
GovernanceShardingSchemaGroup actual = new GovernanceShardingSchemaGroup();
GovernanceSchemaGroup actual = new GovernanceSchemaGroup();
actual.put("test", Arrays.asList("ds_0", "ds_1"));
assertThat(actual.getDataSourceNames("test").size(), is(2));
assertTrue(actual.getDataSourceNames("test").contains("ds_0"));
......
......@@ -23,7 +23,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class GovernanceShardingSchemaTest {
public final class GovernanceSchemaTest {
@Test
public void assertNewGovernanceSchemaWithDataSourceNameOnly() {
......
......@@ -275,12 +275,12 @@ public final class GovernanceSchemaContexts implements SchemaContexts {
schemaContexts.getSchemaContexts(), schemaContexts.getAuthentication(), schemaContexts.getProps(), schemaContexts.getDatabaseType(), event.isCircuitBreak());
}
private SchemaContext createAddedSchemaContext(final SchemaAddedEvent schemaAddedEvent) throws SQLException {
String schemaName = schemaAddedEvent.getSchemaName();
Map<String, Map<String, DataSource>> dataSourcesMap = createDataSourcesMap(Collections.singletonMap(schemaName, schemaAddedEvent.getDataSourceConfigurations()));
private SchemaContext createAddedSchemaContext(final SchemaAddedEvent event) throws SQLException {
String schemaName = event.getSchemaName();
Map<String, Map<String, DataSource>> dataSourcesMap = createDataSourcesMap(Collections.singletonMap(schemaName, event.getDataSourceConfigurations()));
DatabaseType databaseType = getDatabaseType(dataSourcesMap);
SchemaContextsBuilder schemaContextsBuilder = new SchemaContextsBuilder(databaseType, dataSourcesMap,
Collections.singletonMap(schemaName, schemaAddedEvent.getRuleConfigurations()), schemaContexts.getAuthentication(), schemaContexts.getProps().getProps());
Collections.singletonMap(schemaName, event.getRuleConfigurations()), schemaContexts.getAuthentication(), schemaContexts.getProps().getProps());
return schemaContextsBuilder.build().getSchemaContexts().get(schemaName);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册