未验证 提交 87aa1b1b 编写于 作者: H Haoran Meng 提交者: GitHub

Update node state after slave database is disabled # control-panel-cluster (#6006)

* process node state after slave database is disabled

* Optimize cluster state

* Rename variable
上级 b5348a0e
......@@ -94,7 +94,7 @@ public final class ClusterFacade {
DataSourceState dataSourceState = null == instanceState.getDataSources()
|| null == instanceState.getDataSources().get(dataSourceName) ? new DataSourceState()
: instanceState.getDataSources().get(dataSourceName);
dataSourceState.setState(each.getEnable() ? NodeState.ONLINE : NodeState.OFFLINE);
dataSourceState.setState(each.getDisabled() ? NodeState.DISABLED : each.getEnable() ? NodeState.ONLINE : NodeState.OFFLINE);
dataSourceState.setLastConnect(each.getDetectTimeStamp());
dataSourceStateMap.put(dataSourceName, dataSourceState);
});
......
......@@ -34,7 +34,7 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-orchestration-core-configuration</artifactId>
<artifactId>shardingsphere-orchestration-core-facade</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......
......@@ -25,6 +25,7 @@ import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse;
import org.apache.shardingsphere.cluster.heartbeat.task.HeartbeatTask;
import org.apache.shardingsphere.cluster.heartbeat.task.HeartbeatTaskManager;
import org.apache.shardingsphere.kernel.context.SchemaContext;
import org.apache.shardingsphere.orchestration.core.facade.ShardingOrchestrationFacade;
import java.util.Map;
......@@ -66,7 +67,7 @@ public final class ClusterHeartbeatInstance {
* @return heartbeat response
*/
public HeartbeatResponse detect(final Map<String, SchemaContext> schemaContexts) {
return heartbeatHandler.handle(schemaContexts);
return heartbeatHandler.handle(schemaContexts, ShardingOrchestrationFacade.getInstance().getRegistryCenter().loadAllDataSourcesNodes());
}
private static final class ClusterHeartbeatInstanceHolder {
......
......@@ -35,10 +35,14 @@ public abstract class AbstractHeartbeatDetect implements Callable<Map<String, He
private Integer retryInterval;
public AbstractHeartbeatDetect(final Boolean retryEnable, final Integer retryMaximum, final Integer retryInterval) {
private Boolean needDetect;
public AbstractHeartbeatDetect(final Boolean retryEnable, final Integer retryMaximum,
final Integer retryInterval, final Boolean needDetect) {
this.retryEnable = retryEnable;
this.retryMaximum = retryMaximum;
this.retryInterval = retryInterval;
this.needDetect = needDetect;
}
/**
......@@ -58,6 +62,9 @@ public abstract class AbstractHeartbeatDetect implements Callable<Map<String, He
@Override
public Map<String, HeartbeatResult> call() {
if (!needDetect) {
return buildResult(Boolean.FALSE);
}
if (retryEnable && retryMaximum > 0) {
Boolean result = Boolean.FALSE;
for (int i = 0; i < retryMaximum; i++) {
......
......@@ -44,12 +44,16 @@ public final class HeartbeatDetect extends AbstractHeartbeatDetect {
private DataSource dataSource;
public HeartbeatDetect(final String schemaName, final String dataSourceName, final DataSource dataSource, final HeartbeatConfiguration configuration) {
super(configuration.getRetryEnable(), configuration.getRetryMaximum(), configuration.getRetryInterval());
private Boolean dataSourceDisabled;
public HeartbeatDetect(final String schemaName, final String dataSourceName, final DataSource dataSource,
final HeartbeatConfiguration configuration, final Boolean dataSourceDisabled) {
super(configuration.getRetryEnable(), configuration.getRetryMaximum(), configuration.getRetryInterval(), !dataSourceDisabled);
this.sql = configuration.getSql();
this.schemaName = schemaName;
this.dataSourceName = dataSourceName;
this.dataSource = dataSource;
this.dataSourceDisabled = dataSourceDisabled;
}
@Override
......@@ -68,7 +72,7 @@ public final class HeartbeatDetect extends AbstractHeartbeatDetect {
@Override
protected Map<String, HeartbeatResult> buildResult(final Boolean result) {
Map<String, HeartbeatResult> heartBeatResultMap = new HashMap<>(1, 1);
heartBeatResultMap.put(schemaName, new HeartbeatResult(dataSourceName, result, System.currentTimeMillis()));
heartBeatResultMap.put(schemaName, new HeartbeatResult(dataSourceName, result, System.currentTimeMillis(), dataSourceDisabled));
return heartBeatResultMap;
}
}
......@@ -17,18 +17,22 @@
package org.apache.shardingsphere.cluster.heartbeat.detect;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult;
import org.apache.shardingsphere.kernel.context.SchemaContext;
import org.apache.shardingsphere.orchestration.core.facade.ShardingOrchestrationFacade;
import org.apache.shardingsphere.orchestration.core.registrycenter.RegistryCenterNodeStatus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
......@@ -48,6 +52,8 @@ public final class HeartbeatHandler {
private HeartbeatConfiguration configuration;
private Collection<String> disabledDataSources = Collections.emptyList();
/**
* Init heartbeat handler.
*
......@@ -71,13 +77,15 @@ public final class HeartbeatHandler {
* Handle heartbeat detect event.
*
* @param schemaContexts schema contexts
* @param disabledDataSources collection of disabled data sources
* @return heartbeat response
*/
public HeartbeatResponse handle(final Map<String, SchemaContext> schemaContexts) {
public HeartbeatResponse handle(final Map<String, SchemaContext> schemaContexts, final Collection<String> disabledDataSources) {
this.disabledDataSources = disabledDataSources;
ExecutorService executorService = Executors.newFixedThreadPool(configuration.getThreadCount());
List<Future<Map<String, HeartbeatResult>>> futureTasks = new ArrayList<>();
schemaContexts.forEach((key, value) -> value.getSchema().getDataSources().forEach((innerKey, innerValue) -> {
futureTasks.add(executorService.submit(new HeartbeatDetect(key, innerKey, innerValue, configuration)));
futureTasks.add(executorService.submit(new HeartbeatDetect(key, innerKey, innerValue, configuration, isDisabled(key, innerKey))));
}));
HeartbeatResponse heartbeatResponse = buildHeartbeatResponse(futureTasks);
closeExecutor(executorService);
......@@ -102,6 +110,15 @@ public final class HeartbeatHandler {
}
}
private boolean isDisabled(final String schemaName, final String dataSourceName) {
return disabledDataSources.isEmpty() ? Boolean.FALSE : isDisabled(Joiner.on(".").join(schemaName, dataSourceName));
}
private boolean isDisabled(final String schemaDataSourceName) {
return disabledDataSources.contains(schemaDataSourceName) && RegistryCenterNodeStatus.DISABLED.toString()
.equals(ShardingOrchestrationFacade.getInstance().getRegistryCenter().getDataSourcesNodeData(schemaDataSourceName));
}
private static final class HeartbeatHandlerHolder {
public static final HeartbeatHandler INSTANCE = new HeartbeatHandler();
......
......@@ -32,4 +32,6 @@ public final class HeartbeatResult {
private Boolean enable;
private Long detectTimeStamp;
private Boolean disabled;
}
......@@ -33,6 +33,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
......@@ -72,7 +73,7 @@ public final class HeartbeatHandlerTest {
@Test
public void assertHandleWithoutRetry() {
handler.init(getHeartbeatConfiguration(Boolean.FALSE));
HeartbeatResponse response = handler.handle(getSchemaContext());
HeartbeatResponse response = handler.handle(getSchemaContext(), Collections.emptyList());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getHeartbeatResultMap());
assertTrue(response.getHeartbeatResultMap().keySet().contains(SCHEMA_NAME));
......@@ -87,7 +88,7 @@ public final class HeartbeatHandlerTest {
public void assertHandleWhenDetectExceptionWithoutRetry() {
enableExecuteQuery = Boolean.FALSE;
handler.init(getHeartbeatConfiguration(Boolean.FALSE));
HeartbeatResponse response = handler.handle(getSchemaContext());
HeartbeatResponse response = handler.handle(getSchemaContext(), Collections.emptyList());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getHeartbeatResultMap());
assertTrue(response.getHeartbeatResultMap().keySet().contains(SCHEMA_NAME));
......@@ -101,7 +102,7 @@ public final class HeartbeatHandlerTest {
@Test
public void assertHandleWithRetry() {
handler.init(getHeartbeatConfiguration(Boolean.TRUE));
HeartbeatResponse response = handler.handle(getSchemaContext());
HeartbeatResponse response = handler.handle(getSchemaContext(), Collections.emptyList());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getHeartbeatResultMap());
assertTrue(response.getHeartbeatResultMap().keySet().contains(SCHEMA_NAME));
......@@ -116,7 +117,7 @@ public final class HeartbeatHandlerTest {
public void assertHandleWhenDetectExceptionWithRetry() {
enableExecuteQuery = Boolean.FALSE;
handler.init(getHeartbeatConfiguration(Boolean.TRUE));
HeartbeatResponse response = handler.handle(getSchemaContext());
HeartbeatResponse response = handler.handle(getSchemaContext(), Collections.emptyList());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getHeartbeatResultMap());
assertTrue(response.getHeartbeatResultMap().keySet().contains(SCHEMA_NAME));
......@@ -131,7 +132,7 @@ public final class HeartbeatHandlerTest {
public void assertMultipleDataSource() {
multipleDataSource = Boolean.TRUE;
handler.init(getHeartbeatConfiguration(Boolean.FALSE));
HeartbeatResponse response = handler.handle(getSchemaContext());
HeartbeatResponse response = handler.handle(getSchemaContext(), Collections.emptyList());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getHeartbeatResultMap());
assertTrue(response.getHeartbeatResultMap().keySet().contains(SCHEMA_NAME));
......
......@@ -34,7 +34,7 @@ public final class HeartbeatResponseTest {
@Test
public void getHeartbeatResultMap() {
Map<String, Collection<HeartbeatResult>> heartbeatResultMap = new HashMap<>();
heartbeatResultMap.put("sharding_db", Collections.singleton(new HeartbeatResult("ds_1", true, 123L)));
heartbeatResultMap.put("sharding_db", Collections.singleton(new HeartbeatResult("ds_1", true, 123L, Boolean.FALSE)));
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(heartbeatResultMap);
assertThat(heartbeatResponse.getHeartbeatResultMap(), is(heartbeatResultMap));
assertTrue(heartbeatResponse.getHeartbeatResultMap().keySet().contains("sharding_db"));
......
......@@ -17,12 +17,15 @@
package org.apache.shardingsphere.cluster.state;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.cluster.state.enums.NodeState;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.orchestration.core.common.eventbus.ShardingOrchestrationEventBus;
import org.apache.shardingsphere.orchestration.core.facade.ShardingOrchestrationFacade;
import org.apache.shardingsphere.orchestration.core.registrycenter.event.DisabledStateChangedEvent;
import java.util.Collection;
import java.util.HashMap;
......@@ -31,9 +34,12 @@ import java.util.Map;
/**
* Cluster state instance.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ClusterStateInstance {
private ClusterStateInstance() {
ShardingOrchestrationEventBus.getInstance().register(this);
}
/**
* Get cluster state instance.
*
......@@ -76,6 +82,20 @@ public final class ClusterStateInstance {
return YamlEngine.unmarshal(instanceData, InstanceState.class);
}
/**
* Disabled data source after state changed.
*
* @param event disabled state changed event
*/
@Subscribe
public void dataSourceStateChanged(final DisabledStateChangedEvent event) {
String dataSourceName = Joiner.on(".").join(event.getOrchestrationSchema().getSchemaName(), event.getOrchestrationSchema().getDataSourceName());
NodeState state = event.isDisabled() ? NodeState.DISABLED : NodeState.ONLINE;
InstanceState instanceState = loadInstanceState();
instanceState.getDataSources().entrySet().stream().filter(entry -> dataSourceName.equals(entry.getKey())).findFirst().get().getValue().setState(state);
persistInstanceState(instanceState);
}
/**
* Load all instance states.
*
......
......@@ -87,4 +87,23 @@ public final class RegistryCenter {
public Collection<String> loadAllInstances() {
return repository.getChildrenKeys(node.getInstanceNodeRootPath());
}
/**
* Load all data sources nodes.
*
* @return Collection of all data sources nodes
*/
public Collection<String> loadAllDataSourcesNodes() {
return repository.getChildrenKeys(node.getDataSourcesNodeFullRootPath());
}
/**
* Get data sources node data.
*
* @param schemaDataSourceName schema name and data source name
* @return data sources node data
*/
public String getDataSourcesNodeData(final String schemaDataSourceName) {
return repository.get(node.getDataSourcesNodeFullPath(schemaDataSourceName));
}
}
......@@ -42,7 +42,7 @@ export default {
state: {
ONLINE: '#01acca',
OFFLINE: '#FF0A14',
DISABLED: '#FF199D',
DISABLED: '#A9A9A9',
UNKNOWN: '#ffb402'
},
myChart: {},
......@@ -111,7 +111,7 @@ export default {
for (const key in this.allData.dataSourceStates) {
this.datasource.push({
name: key,
category: 0,
category: this.getCategory(this.allData.dataSourceStates[key]),
state: this.allData.dataSourceStates[key].state,
speed: '',
value: [x, 20]
......@@ -122,28 +122,42 @@ export default {
initLines() {
this.links = []
this.linesData = []
this.datasource.slice().forEach((el) => {
this.proxy.slice().forEach((e2) => {
const e3 = this.instanceData[e2.name].dataSources[el.name]
if (e3) {
if (e3.state === 'ONLINE') {
this.linesData.push([{
coord: e2.value
}, {
coord: el.value
}])
}
this.datasource.slice().forEach((ds) => {
this.proxy.slice().forEach((p) => {
if (p.category === 2) {
this.links.push({
source: e2.name,
target: el.name,
speed: el.speed,
source: p.name,
target: ds.name,
speed: ds.speed,
lineStyle: {
normal: {
color: this.state[e3.state],
color: this.state.DISABLED,
curveness: 0
}
}
})
} else {
const ids = this.instanceData[p.name].dataSources[ds.name]
if (ids) {
if (ids.state === 'ONLINE') {
this.linesData.push([{
coord: p.value
}, {
coord: ds.value
}])
}
this.links.push({
source: p.name,
target: ds.name,
speed: ds.speed,
lineStyle: {
normal: {
color: this.state[ids.state],
curveness: 0
}
}
})
}
}
})
})
......@@ -252,8 +266,8 @@ export default {
},
selectedMode: false,
right: 0,
data: this.categories.map(function(el) {
return el.name
data: this.categories.map(function(c) {
return c.name
})
}],
xAxis: {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册