未验证 提交 93a6ad92 编写于 作者: H Haoran Meng 提交者: GitHub

Merge pull request #7218 from kimmking/eventbus1

combine ShardingSphereEventBus&GovernanceEventbus
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.governance.core.common.eventbus;
import com.google.common.eventbus.EventBus;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
/**
* Governance event bus.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class GovernanceEventBus {
/**
* Get instance of governance event bus.
*
* @return instance of governance event bus
*/
public static EventBus getInstance() {
return GovernanceEventBusHolder.INSTANCE;
}
private static final class GovernanceEventBusHolder {
private static final EventBus INSTANCE = new EventBus();
}
}
...@@ -17,13 +17,12 @@ ...@@ -17,13 +17,12 @@
package org.apache.shardingsphere.governance.core.common.listener; package org.apache.shardingsphere.governance.core.common.listener;
import com.google.common.eventbus.EventBus;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.governance.repository.api.GovernanceRepository; import org.apache.shardingsphere.governance.repository.api.GovernanceRepository;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent; import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.ChangedType; import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.ChangedType;
import org.apache.shardingsphere.governance.core.common.event.GovernanceEvent; import org.apache.shardingsphere.governance.core.common.event.GovernanceEvent;
import org.apache.shardingsphere.governance.core.common.eventbus.GovernanceEventBus; import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
...@@ -35,8 +34,6 @@ import java.util.Optional; ...@@ -35,8 +34,6 @@ import java.util.Optional;
@RequiredArgsConstructor @RequiredArgsConstructor
public abstract class PostGovernanceRepositoryEventListener implements GovernanceListener { public abstract class PostGovernanceRepositoryEventListener implements GovernanceListener {
private final EventBus eventBus = GovernanceEventBus.getInstance();
private final GovernanceRepository governanceRepository; private final GovernanceRepository governanceRepository;
private final Collection<String> watchKeys; private final Collection<String> watchKeys;
...@@ -53,7 +50,7 @@ public abstract class PostGovernanceRepositoryEventListener implements Governanc ...@@ -53,7 +50,7 @@ public abstract class PostGovernanceRepositoryEventListener implements Governanc
governanceRepository.watch(watchKey, dataChangedEvent -> { governanceRepository.watch(watchKey, dataChangedEvent -> {
if (watchedChangedTypeList.contains(dataChangedEvent.getChangedType())) { if (watchedChangedTypeList.contains(dataChangedEvent.getChangedType())) {
Optional<GovernanceEvent> event = createGovernanceEvent(dataChangedEvent); Optional<GovernanceEvent> event = createGovernanceEvent(dataChangedEvent);
event.ifPresent(eventBus::post); event.ifPresent(ShardingSphereEventBus.getInstance()::post);
} }
}); });
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.governance.core.common.eventbus;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class GovernanceEventBusTest {
@Test
public void assertGetInstance() {
assertThat(GovernanceEventBus.getInstance(), is(GovernanceEventBus.getInstance()));
}
}
...@@ -25,7 +25,6 @@ import org.apache.shardingsphere.governance.core.common.event.props.PropertiesCh ...@@ -25,7 +25,6 @@ import org.apache.shardingsphere.governance.core.common.event.props.PropertiesCh
import org.apache.shardingsphere.governance.core.common.event.rule.RuleConfigurationsChangedEvent; import org.apache.shardingsphere.governance.core.common.event.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.governance.core.common.event.schema.SchemaAddedEvent; import org.apache.shardingsphere.governance.core.common.event.schema.SchemaAddedEvent;
import org.apache.shardingsphere.governance.core.common.event.schema.SchemaDeletedEvent; import org.apache.shardingsphere.governance.core.common.event.schema.SchemaDeletedEvent;
import org.apache.shardingsphere.governance.core.common.eventbus.GovernanceEventBus;
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade; import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
import org.apache.shardingsphere.governance.core.metadata.event.MetaDataChangedEvent; import org.apache.shardingsphere.governance.core.metadata.event.MetaDataChangedEvent;
import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent; import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent;
...@@ -45,6 +44,7 @@ import org.apache.shardingsphere.infra.context.runtime.RuntimeContext; ...@@ -45,6 +44,7 @@ import org.apache.shardingsphere.infra.context.runtime.RuntimeContext;
import org.apache.shardingsphere.infra.context.schema.ShardingSphereSchema; import org.apache.shardingsphere.infra.context.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseType; import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypes; import org.apache.shardingsphere.infra.database.type.DatabaseTypes;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorKernel; import org.apache.shardingsphere.infra.executor.kernel.ExecutorKernel;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData; import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
...@@ -75,7 +75,7 @@ public abstract class GovernanceSchemaContexts implements SchemaContexts { ...@@ -75,7 +75,7 @@ public abstract class GovernanceSchemaContexts implements SchemaContexts {
protected GovernanceSchemaContexts(final SchemaContexts schemaContexts, final GovernanceFacade governanceFacade) { protected GovernanceSchemaContexts(final SchemaContexts schemaContexts, final GovernanceFacade governanceFacade) {
this.governanceFacade = governanceFacade; this.governanceFacade = governanceFacade;
this.schemaContexts = schemaContexts; this.schemaContexts = schemaContexts;
GovernanceEventBus.getInstance().register(this); ShardingSphereEventBus.getInstance().register(this);
disableDataSources(); disableDataSources();
persistMetaData(); persistMetaData();
} }
...@@ -157,7 +157,7 @@ public abstract class GovernanceSchemaContexts implements SchemaContexts { ...@@ -157,7 +157,7 @@ public abstract class GovernanceSchemaContexts implements SchemaContexts {
schemaContexts = new StandardSchemaContexts(schemas, schemaContexts.getAuthentication(), schemaContexts.getProps(), schemaContexts.getDatabaseType()); schemaContexts = new StandardSchemaContexts(schemas, schemaContexts.getAuthentication(), schemaContexts.getProps(), schemaContexts.getDatabaseType());
governanceFacade.getMetaDataCenter().persistMetaDataCenterNode(event.getSchemaName(), schemaContexts.getSchemaContexts().get(event.getSchemaName()).getSchema().getMetaData().getSchema()); governanceFacade.getMetaDataCenter().persistMetaDataCenterNode(event.getSchemaName(), schemaContexts.getSchemaContexts().get(event.getSchemaName()).getSchema().getMetaData().getSchema());
governanceFacade.getMetaDataCenter().persistMetaDataCenterNode(event.getSchemaName(), schemaContexts.getSchemaContexts().get(event.getSchemaName()).getSchema().getMetaData().getSchema()); governanceFacade.getMetaDataCenter().persistMetaDataCenterNode(event.getSchemaName(), schemaContexts.getSchemaContexts().get(event.getSchemaName()).getSchema().getMetaData().getSchema());
GovernanceEventBus.getInstance().post( ShardingSphereEventBus.getInstance().post(
new DataSourceChangeCompletedEvent(event.getSchemaName(), schemaContexts.getDatabaseType(), schemas.get(event.getSchemaName()).getSchema().getDataSources())); new DataSourceChangeCompletedEvent(event.getSchemaName(), schemaContexts.getDatabaseType(), schemas.get(event.getSchemaName()).getSchema().getDataSources()));
} }
...@@ -242,7 +242,7 @@ public abstract class GovernanceSchemaContexts implements SchemaContexts { ...@@ -242,7 +242,7 @@ public abstract class GovernanceSchemaContexts implements SchemaContexts {
newSchemaContexts.remove(schemaName); newSchemaContexts.remove(schemaName);
newSchemaContexts.put(schemaName, getChangedSchemaContext(schemaContexts.getSchemaContexts().get(schemaName), event.getDataSourceConfigurations())); newSchemaContexts.put(schemaName, getChangedSchemaContext(schemaContexts.getSchemaContexts().get(schemaName), event.getDataSourceConfigurations()));
schemaContexts = new StandardSchemaContexts(newSchemaContexts, schemaContexts.getAuthentication(), schemaContexts.getProps(), schemaContexts.getDatabaseType()); schemaContexts = new StandardSchemaContexts(newSchemaContexts, schemaContexts.getAuthentication(), schemaContexts.getProps(), schemaContexts.getDatabaseType());
GovernanceEventBus.getInstance().post( ShardingSphereEventBus.getInstance().post(
new DataSourceChangeCompletedEvent(event.getSchemaName(), schemaContexts.getDatabaseType(), newSchemaContexts.get(event.getSchemaName()).getSchema().getDataSources())); new DataSourceChangeCompletedEvent(event.getSchemaName(), schemaContexts.getDatabaseType(), newSchemaContexts.get(event.getSchemaName()).getSchema().getDataSources()));
} }
......
...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.governance.core.transaction; ...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.governance.core.transaction;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.governance.core.common.event.datasource.DataSourceChangeCompletedEvent; import org.apache.shardingsphere.governance.core.common.event.datasource.DataSourceChangeCompletedEvent;
import org.apache.shardingsphere.governance.core.common.eventbus.GovernanceEventBus; import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine; import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts; import org.apache.shardingsphere.transaction.context.TransactionContexts;
...@@ -34,7 +34,7 @@ public final class GovernanceTransactionContexts implements TransactionContexts ...@@ -34,7 +34,7 @@ public final class GovernanceTransactionContexts implements TransactionContexts
public GovernanceTransactionContexts(final TransactionContexts contexts) { public GovernanceTransactionContexts(final TransactionContexts contexts) {
this.contexts = contexts; this.contexts = contexts;
GovernanceEventBus.getInstance().register(this); ShardingSphereEventBus.getInstance().register(this);
} }
@Override @Override
......
...@@ -20,7 +20,6 @@ package org.apache.shardingsphere.proxy.governance.schema; ...@@ -20,7 +20,6 @@ package org.apache.shardingsphere.proxy.governance.schema;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.shardingsphere.governance.core.common.event.auth.AuthenticationChangedEvent; import org.apache.shardingsphere.governance.core.common.event.auth.AuthenticationChangedEvent;
import org.apache.shardingsphere.governance.core.common.event.props.PropertiesChangedEvent; import org.apache.shardingsphere.governance.core.common.event.props.PropertiesChangedEvent;
import org.apache.shardingsphere.governance.core.common.eventbus.GovernanceEventBus;
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade; import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent; import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent;
import org.apache.shardingsphere.infra.auth.Authentication; import org.apache.shardingsphere.infra.auth.Authentication;
...@@ -33,6 +32,7 @@ import org.apache.shardingsphere.infra.context.impl.StandardSchemaContexts; ...@@ -33,6 +32,7 @@ import org.apache.shardingsphere.infra.context.impl.StandardSchemaContexts;
import org.apache.shardingsphere.infra.context.runtime.RuntimeContext; import org.apache.shardingsphere.infra.context.runtime.RuntimeContext;
import org.apache.shardingsphere.infra.context.schema.ShardingSphereSchema; import org.apache.shardingsphere.infra.context.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType; import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts; import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -90,7 +90,7 @@ public final class ProxyGovernanceSchemaContextsTest { ...@@ -90,7 +90,7 @@ public final class ProxyGovernanceSchemaContextsTest {
assertTrue(ProxySchemaContexts.getInstance().getSchemaContexts().getProps().getProps().isEmpty()); assertTrue(ProxySchemaContexts.getInstance().getSchemaContexts().getProps().getProps().isEmpty());
Properties props = new Properties(); Properties props = new Properties();
props.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(), Boolean.TRUE.toString()); props.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(), Boolean.TRUE.toString());
GovernanceEventBus.getInstance().post(new PropertiesChangedEvent(props)); ShardingSphereEventBus.getInstance().post(new PropertiesChangedEvent(props));
assertFalse(ProxySchemaContexts.getInstance().getSchemaContexts().getProps().getProps().isEmpty()); assertFalse(ProxySchemaContexts.getInstance().getSchemaContexts().getProps().getProps().isEmpty());
} }
...@@ -99,7 +99,7 @@ public final class ProxyGovernanceSchemaContextsTest { ...@@ -99,7 +99,7 @@ public final class ProxyGovernanceSchemaContextsTest {
ProxyUser proxyUser = new ProxyUser("root", Collections.singleton("db1")); ProxyUser proxyUser = new ProxyUser("root", Collections.singleton("db1"));
Authentication authentication = new Authentication(); Authentication authentication = new Authentication();
authentication.getUsers().put("root", proxyUser); authentication.getUsers().put("root", proxyUser);
GovernanceEventBus.getInstance().post(new AuthenticationChangedEvent(authentication)); ShardingSphereEventBus.getInstance().post(new AuthenticationChangedEvent(authentication));
assertThat(ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().keySet().iterator().next(), is("root")); assertThat(ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().keySet().iterator().next(), is("root"));
assertThat(ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().get("root").getPassword(), is("root")); assertThat(ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().get("root").getPassword(), is("root"));
assertThat(ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().get("root").getAuthorizedSchemas().iterator().next(), is("db1")); assertThat(ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().get("root").getAuthorizedSchemas().iterator().next(), is("db1"));
...@@ -108,8 +108,8 @@ public final class ProxyGovernanceSchemaContextsTest { ...@@ -108,8 +108,8 @@ public final class ProxyGovernanceSchemaContextsTest {
@Test @Test
public void assertRenewCircuitState() { public void assertRenewCircuitState() {
assertFalse(ProxySchemaContexts.getInstance().getSchemaContexts().isCircuitBreak()); assertFalse(ProxySchemaContexts.getInstance().getSchemaContexts().isCircuitBreak());
GovernanceEventBus.getInstance().post(new CircuitStateChangedEvent(true)); ShardingSphereEventBus.getInstance().post(new CircuitStateChangedEvent(true));
assertTrue(ProxySchemaContexts.getInstance().getSchemaContexts().isCircuitBreak()); assertTrue(ProxySchemaContexts.getInstance().getSchemaContexts().isCircuitBreak());
GovernanceEventBus.getInstance().post(new CircuitStateChangedEvent(false)); ShardingSphereEventBus.getInstance().post(new CircuitStateChangedEvent(false));
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册