未验证 提交 34d54550 编写于 作者: kimmking's avatar kimmking 提交者: GitHub

polish cluster heartbeat (#5792)

上级 027c7bf7
......@@ -27,5 +27,5 @@ import lombok.Setter;
@Setter
public final class ClusterConfiguration {
private HeartBeatConfiguration heartBeat;
private HeartbeatConfiguration heartbeat;
}
......@@ -18,9 +18,9 @@
package org.apache.shardingsphere.cluster.configuration.swapper;
import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
import org.apache.shardingsphere.cluster.configuration.config.HeartBeatConfiguration;
import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlHeartBeatConfiguration;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlHeartbeatConfiguration;
import org.apache.shardingsphere.infra.yaml.swapper.YamlSwapper;
/**
......@@ -31,28 +31,28 @@ public final class ClusterConfigurationYamlSwapper implements YamlSwapper<YamlCl
@Override
public YamlClusterConfiguration swap(final ClusterConfiguration clusterConfiguration) {
final YamlClusterConfiguration yamlClusterConfiguration = new YamlClusterConfiguration();
final YamlHeartBeatConfiguration yamlHeartBeatConfiguration = new YamlHeartBeatConfiguration();
yamlHeartBeatConfiguration.setSql(clusterConfiguration.getHeartBeat().getSql());
yamlHeartBeatConfiguration.setInterval(clusterConfiguration.getHeartBeat().getInterval());
yamlHeartBeatConfiguration.setRetryEnable(clusterConfiguration.getHeartBeat().getRetryEnable());
yamlHeartBeatConfiguration.setRetryMaximum(clusterConfiguration.getHeartBeat().getRetryMaximum());
yamlHeartBeatConfiguration.setRetryInterval(clusterConfiguration.getHeartBeat().getRetryInterval());
yamlHeartBeatConfiguration.setThreadCount(clusterConfiguration.getHeartBeat().getThreadCount());
yamlClusterConfiguration.setHeartBeat(yamlHeartBeatConfiguration);
final YamlHeartbeatConfiguration yamlHeartBeatConfiguration = new YamlHeartbeatConfiguration();
yamlHeartBeatConfiguration.setSql(clusterConfiguration.getHeartbeat().getSql());
yamlHeartBeatConfiguration.setInterval(clusterConfiguration.getHeartbeat().getInterval());
yamlHeartBeatConfiguration.setRetryEnable(clusterConfiguration.getHeartbeat().getRetryEnable());
yamlHeartBeatConfiguration.setRetryMaximum(clusterConfiguration.getHeartbeat().getRetryMaximum());
yamlHeartBeatConfiguration.setRetryInterval(clusterConfiguration.getHeartbeat().getRetryInterval());
yamlHeartBeatConfiguration.setThreadCount(clusterConfiguration.getHeartbeat().getThreadCount());
yamlClusterConfiguration.setHeartbeat(yamlHeartBeatConfiguration);
return yamlClusterConfiguration;
}
@Override
public ClusterConfiguration swap(final YamlClusterConfiguration yamlConfiguration) {
final ClusterConfiguration clusterConfiguration = new ClusterConfiguration();
final HeartBeatConfiguration heartBeatConfiguration = new HeartBeatConfiguration();
heartBeatConfiguration.setSql(yamlConfiguration.getHeartBeat().getSql());
heartBeatConfiguration.setInterval(yamlConfiguration.getHeartBeat().getInterval());
heartBeatConfiguration.setRetryEnable(yamlConfiguration.getHeartBeat().getRetryEnable());
heartBeatConfiguration.setRetryMaximum(yamlConfiguration.getHeartBeat().getRetryMaximum());
heartBeatConfiguration.setRetryInterval(yamlConfiguration.getHeartBeat().getRetryInterval());
heartBeatConfiguration.setThreadCount(yamlConfiguration.getHeartBeat().getThreadCount());
clusterConfiguration.setHeartBeat(heartBeatConfiguration);
final HeartbeatConfiguration heartBeatConfiguration = new HeartbeatConfiguration();
heartBeatConfiguration.setSql(yamlConfiguration.getHeartbeat().getSql());
heartBeatConfiguration.setInterval(yamlConfiguration.getHeartbeat().getInterval());
heartBeatConfiguration.setRetryEnable(yamlConfiguration.getHeartbeat().getRetryEnable());
heartBeatConfiguration.setRetryMaximum(yamlConfiguration.getHeartbeat().getRetryMaximum());
heartBeatConfiguration.setRetryInterval(yamlConfiguration.getHeartbeat().getRetryInterval());
heartBeatConfiguration.setThreadCount(yamlConfiguration.getHeartbeat().getThreadCount());
clusterConfiguration.setHeartbeat(heartBeatConfiguration);
return clusterConfiguration;
}
}
......@@ -28,5 +28,5 @@ import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
@Setter
public final class YamlClusterConfiguration implements YamlConfiguration {
private YamlHeartBeatConfiguration heartBeat;
private YamlHeartbeatConfiguration heartbeat;
}
......@@ -22,11 +22,11 @@ import lombok.Setter;
import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
/**
* Heart beat configuration for YAML.
* Heartbeat configuration for YAML.
*/
@Getter
@Setter
public final class YamlHeartBeatConfiguration implements YamlConfiguration {
public final class YamlHeartbeatConfiguration implements YamlConfiguration {
private String sql;
......
......@@ -18,9 +18,9 @@
package org.apache.shardingsphere.cluster.configuration.swapper;
import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
import org.apache.shardingsphere.cluster.configuration.config.HeartBeatConfiguration;
import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlHeartBeatConfiguration;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlHeartbeatConfiguration;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
......@@ -37,37 +37,37 @@ public final class ClusterConfigurationYamlSwapperTest {
@Test
public void assertSwapToClusterConfiguration() {
YamlHeartBeatConfiguration yamlHeartBeatConfiguration = new YamlHeartBeatConfiguration();
YamlHeartbeatConfiguration yamlHeartBeatConfiguration = new YamlHeartbeatConfiguration();
yamlHeartBeatConfiguration.setSql(SQL);
yamlHeartBeatConfiguration.setInterval(INTERVAL);
yamlHeartBeatConfiguration.setRetryEnable(Boolean.TRUE);
yamlHeartBeatConfiguration.setRetryMaximum(MAXIMUM);
YamlClusterConfiguration yamlClusterConfiguration = new YamlClusterConfiguration();
yamlClusterConfiguration.setHeartBeat(yamlHeartBeatConfiguration);
yamlClusterConfiguration.setHeartbeat(yamlHeartBeatConfiguration);
ClusterConfiguration clusterConfiguration = new ClusterConfigurationYamlSwapper().swap(yamlClusterConfiguration);
assertThat(clusterConfiguration.getHeartBeat().getSql(), is(SQL));
assertThat(clusterConfiguration.getHeartBeat().getInterval(), is(INTERVAL));
assertThat(clusterConfiguration.getHeartBeat().getRetryMaximum(), is(MAXIMUM));
assertTrue(clusterConfiguration.getHeartBeat().getRetryEnable());
assertThat(clusterConfiguration.getHeartbeat().getSql(), is(SQL));
assertThat(clusterConfiguration.getHeartbeat().getInterval(), is(INTERVAL));
assertThat(clusterConfiguration.getHeartbeat().getRetryMaximum(), is(MAXIMUM));
assertTrue(clusterConfiguration.getHeartbeat().getRetryEnable());
}
@Test
public void assertSwapToYamlClusterConfiguration() {
HeartBeatConfiguration heartBeatConfiguration = new HeartBeatConfiguration();
HeartbeatConfiguration heartBeatConfiguration = new HeartbeatConfiguration();
heartBeatConfiguration.setSql(SQL);
heartBeatConfiguration.setInterval(INTERVAL);
heartBeatConfiguration.setRetryEnable(Boolean.TRUE);
heartBeatConfiguration.setRetryMaximum(MAXIMUM);
ClusterConfiguration clusterConfiguration = new ClusterConfiguration();
clusterConfiguration.setHeartBeat(heartBeatConfiguration);
clusterConfiguration.setHeartbeat(heartBeatConfiguration);
YamlClusterConfiguration yamlClusterConfiguration = new ClusterConfigurationYamlSwapper().swap(clusterConfiguration);
assertThat(yamlClusterConfiguration.getHeartBeat().getSql(), is(SQL));
assertThat(yamlClusterConfiguration.getHeartBeat().getInterval(), is(INTERVAL));
assertThat(yamlClusterConfiguration.getHeartBeat().getRetryMaximum(), is(MAXIMUM));
assertTrue(yamlClusterConfiguration.getHeartBeat().getRetryEnable());
assertThat(yamlClusterConfiguration.getHeartbeat().getSql(), is(SQL));
assertThat(yamlClusterConfiguration.getHeartbeat().getInterval(), is(INTERVAL));
assertThat(yamlClusterConfiguration.getHeartbeat().getRetryMaximum(), is(MAXIMUM));
assertTrue(yamlClusterConfiguration.getHeartbeat().getRetryEnable());
}
}
......@@ -22,9 +22,9 @@ import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
import org.apache.shardingsphere.cluster.heartbeat.ClusterHeartBeatInstance;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartBeatResponse;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartBeatResult;
import org.apache.shardingsphere.cluster.heartbeat.ClusterHeartbeatInstance;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult;
import org.apache.shardingsphere.cluster.state.ClusterStateInstance;
import org.apache.shardingsphere.cluster.state.DataSourceState;
import org.apache.shardingsphere.cluster.state.InstanceState;
......@@ -40,7 +40,7 @@ import java.util.Map;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ClusterFacade {
private ClusterHeartBeatInstance clusterHeartBeatInstance;
private ClusterHeartbeatInstance clusterHeartbeatInstance;
private ClusterStateInstance clusterStateInstance;
......@@ -51,34 +51,34 @@ public final class ClusterFacade {
*/
public void init(final ClusterConfiguration clusterConfiguration) {
Preconditions.checkNotNull(clusterConfiguration, "cluster configuration can not be null.");
clusterHeartBeatInstance = ClusterHeartBeatInstance.getInstance();
clusterHeartBeatInstance.init(clusterConfiguration.getHeartBeat());
clusterHeartbeatInstance = ClusterHeartbeatInstance.getInstance();
clusterHeartbeatInstance.init(clusterConfiguration.getHeartbeat());
clusterStateInstance = ClusterStateInstance.getInstance();
}
/**
* Report heart beat.
* Report heartbeat.
*
* @param heartBeatResponse heart beat response
* @param heartBeatResponse heartbeat response
*/
public void reportHeartBeat(final HeartBeatResponse heartBeatResponse) {
public void reportHeartbeat(final HeartbeatResponse heartBeatResponse) {
clusterStateInstance.persistInstanceState(buildInstanceState(heartBeatResponse));
}
private InstanceState buildInstanceState(final HeartBeatResponse heartBeatResponse) {
private InstanceState buildInstanceState(final HeartbeatResponse heartbeatResponse) {
InstanceState instanceState = clusterStateInstance.loadInstanceState();
return new InstanceState(instanceState.getState(), buildDataSourceStateMap(instanceState, heartBeatResponse));
return new InstanceState(instanceState.getState(), buildDataSourceStateMap(instanceState, heartbeatResponse));
}
private Map<String, DataSourceState> buildDataSourceStateMap(final InstanceState instanceState, final HeartBeatResponse heartBeatResponse) {
private Map<String, DataSourceState> buildDataSourceStateMap(final InstanceState instanceState, final HeartbeatResponse heartbeatResponse) {
Map<String, DataSourceState> dataSourceStateMap = new HashMap<>();
heartBeatResponse.getHeartBeatResultMap().forEach((key, value) -> buildDataSourceState(key, value, dataSourceStateMap, instanceState));
heartbeatResponse.getHeartbeatResultMap().forEach((key, value) -> buildDataSourceState(key, value, dataSourceStateMap, instanceState));
return dataSourceStateMap;
}
private void buildDataSourceState(final String schemaName, final Collection<HeartBeatResult> heartBeatResults,
private void buildDataSourceState(final String schemaName, final Collection<HeartbeatResult> heartbeatResults,
final Map<String, DataSourceState> dataSourceStateMap, final InstanceState instanceState) {
heartBeatResults.stream().forEach(each -> {
heartbeatResults.stream().forEach(each -> {
String dataSourceName = Joiner.on(".").join(schemaName, each.getDataSourceName());
DataSourceState dataSourceState = null == instanceState.getDataSources()
|| null == instanceState.getDataSources().get(dataSourceName) ? new DataSourceState()
......
......@@ -18,41 +18,41 @@
package org.apache.shardingsphere.cluster.heartbeat;
import com.google.common.base.Preconditions;
import org.apache.shardingsphere.cluster.configuration.config.HeartBeatConfiguration;
import org.apache.shardingsphere.cluster.heartbeat.event.HeartBeatDetectNoticeEvent;
import org.apache.shardingsphere.cluster.heartbeat.task.HeartBeatTask;
import org.apache.shardingsphere.cluster.heartbeat.task.HeartBeatTaskManager;
import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration;
import org.apache.shardingsphere.cluster.heartbeat.event.HeartbeatDetectNoticeEvent;
import org.apache.shardingsphere.cluster.heartbeat.task.HeartbeatTask;
import org.apache.shardingsphere.cluster.heartbeat.task.HeartbeatTaskManager;
/**
* Cluster heart beat instance.
* Cluster heartbeat instance.
*/
public final class ClusterHeartBeatInstance {
public final class ClusterHeartbeatInstance {
private HeartBeatTaskManager heartBeatTaskManager;
private HeartbeatTaskManager heartbeatTaskManager;
/**
* Get cluster heart beat instance.
* Get cluster heartbeat instance.
*
* @return cluster heart beat instance
* @return cluster heartbeat instance
*/
public static ClusterHeartBeatInstance getInstance() {
return ClusterHeartBeatInstanceHolder.INSTANCE;
public static ClusterHeartbeatInstance getInstance() {
return ClusterHeartbeatInstanceHolder.INSTANCE;
}
/**
* Init heart beat task manager.
* Init heartbeat task manager.
*
* @param configuration heart beat configuration
* @param configuration heartbeat configuration
*/
public void init(final HeartBeatConfiguration configuration) {
Preconditions.checkNotNull(configuration, "heart beat configuration can not be null.");
heartBeatTaskManager = new HeartBeatTaskManager(configuration.getInterval(), configuration.getThreadCount());
HeartBeatTask task = new HeartBeatTask(new HeartBeatDetectNoticeEvent(configuration.getSql(), configuration.getRetryEnable(), configuration.getRetryMaximum()));
heartBeatTaskManager.start(task);
public void init(final HeartbeatConfiguration configuration) {
Preconditions.checkNotNull(configuration, "heartbeat configuration can not be null.");
heartbeatTaskManager = new HeartbeatTaskManager(configuration.getInterval(), configuration.getThreadCount());
HeartbeatTask task = new HeartbeatTask(new HeartbeatDetectNoticeEvent(configuration.getSql(), configuration.getRetryEnable(), configuration.getRetryMaximum()));
heartbeatTaskManager.start(task);
}
private static final class ClusterHeartBeatInstanceHolder {
private static final class ClusterHeartbeatInstanceHolder {
private static final ClusterHeartBeatInstance INSTANCE = new ClusterHeartBeatInstance();
private static final ClusterHeartbeatInstance INSTANCE = new ClusterHeartbeatInstance();
}
}
......@@ -20,14 +20,14 @@ package org.apache.shardingsphere.cluster.heartbeat.event;
import lombok.Getter;
/**
* Abstract heart beat event.
* Abstract heartbeat event.
*/
@Getter
public abstract class AbstractHeartBeatEvent implements HeartBeatEvent {
public abstract class AbstractHeartbeatEvent implements HeartbeatEvent {
private final HeartBeatEventType eventType;
private final HeartbeatEventType eventType;
AbstractHeartBeatEvent(final HeartBeatEventType eventType) {
AbstractHeartbeatEvent(final HeartbeatEventType eventType) {
this.eventType = eventType;
}
}
......@@ -20,10 +20,10 @@ package org.apache.shardingsphere.cluster.heartbeat.event;
import lombok.Getter;
/**
* Heart beat detect notice event.
* Heartbeat detect notice event.
*/
@Getter
public final class HeartBeatDetectNoticeEvent extends AbstractHeartBeatEvent {
public final class HeartbeatDetectNoticeEvent extends AbstractHeartbeatEvent {
private String detectSQL;
......@@ -31,8 +31,8 @@ public final class HeartBeatDetectNoticeEvent extends AbstractHeartBeatEvent {
private Integer retryMaximum;
public HeartBeatDetectNoticeEvent(final String detectSQL, final Boolean retryEnable, final Integer retryMaximum) {
super(HeartBeatEventType.NOTICE_DETECT);
public HeartbeatDetectNoticeEvent(final String detectSQL, final Boolean retryEnable, final Integer retryMaximum) {
super(HeartbeatEventType.NOTICE_DETECT);
this.detectSQL = detectSQL;
this.retryEnable = retryEnable;
this.retryMaximum = retryMaximum;
......
......@@ -20,20 +20,20 @@ package org.apache.shardingsphere.cluster.heartbeat.eventbus;
import com.google.common.eventbus.EventBus;
/**
* Heart beat event bus.
* Heartbeat event bus.
*/
public final class HeartBeatEventBus {
public final class HeartbeatEventBus {
/**
* Get heart beat event bus instance.
* Get heartbeat event bus instance.
*
* @return event bus
*/
public static EventBus getInstance() {
return HeartBeatEventBusHolder.INSTANCE;
return HeartbeatEventBusHolder.INSTANCE;
}
private static final class HeartBeatEventBusHolder {
private static final class HeartbeatEventBusHolder {
private static final EventBus INSTANCE = new EventBus();
}
}
......@@ -23,14 +23,14 @@ import java.util.Collection;
import java.util.Map;
/**
* Heart beat response.
* Heartbeat response.
*/
@Getter
public final class HeartBeatResponse {
public final class HeartbeatResponse {
private Map<String, Collection<HeartBeatResult>> heartBeatResultMap;
private Map<String, Collection<HeartbeatResult>> heartbeatResultMap;
public HeartBeatResponse(final Map<String, Collection<HeartBeatResult>> heartBeatResultMap) {
this.heartBeatResultMap = heartBeatResultMap;
public HeartbeatResponse(final Map<String, Collection<HeartbeatResult>> heartbeatResultMap) {
this.heartbeatResultMap = heartbeatResultMap;
}
}
......@@ -21,11 +21,11 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* Heart beat result.
* Heartbeat result.
*/
@AllArgsConstructor
@Getter
public final class HeartBeatResult {
public final class HeartbeatResult {
private String dataSourceName;
......
......@@ -18,24 +18,24 @@
package org.apache.shardingsphere.cluster.heartbeat.task;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.cluster.heartbeat.event.HeartBeatEvent;
import org.apache.shardingsphere.cluster.heartbeat.eventbus.HeartBeatEventBus;
import org.apache.shardingsphere.cluster.heartbeat.event.HeartbeatEvent;
import org.apache.shardingsphere.cluster.heartbeat.eventbus.HeartbeatEventBus;
/**
* Heart beat task.
* Heartbeat task.
*/
@Slf4j
public final class HeartBeatTask implements Runnable {
public final class HeartbeatTask implements Runnable {
private final HeartBeatEvent heartBeatEvent;
private final HeartbeatEvent heartbeatEvent;
public HeartBeatTask(final HeartBeatEvent heartBeatEvent) {
this.heartBeatEvent = heartBeatEvent;
public HeartbeatTask(final HeartbeatEvent heartbeatEvent) {
this.heartbeatEvent = heartbeatEvent;
}
@Override
public void run() {
HeartBeatEventBus.getInstance().post(heartBeatEvent);
HeartbeatEventBus.getInstance().post(heartbeatEvent);
log.info("heart beat detect running");
}
}
......@@ -23,26 +23,26 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Heart beat task manager.
* Heartbeat task manager.
*/
public final class HeartBeatTaskManager {
public final class HeartbeatTaskManager {
private Integer interval;
private final ScheduledExecutorService executorService;
public HeartBeatTaskManager(final Integer interval, final Integer threadCount) {
public HeartbeatTaskManager(final Integer interval, final Integer threadCount) {
this.interval = interval;
executorService = Executors.newScheduledThreadPool(threadCount);
}
/**
* Start heart beat task.
* Start heartbeat task.
*
* @param heartBeatTask heart beat task
* @param heartbeatTask heartbeat task
*/
public void start(final HeartBeatTask heartBeatTask) {
Preconditions.checkNotNull(heartBeatTask, "task can not be null");
executorService.scheduleAtFixedRate(heartBeatTask, interval, interval, TimeUnit.SECONDS);
public void start(final HeartbeatTask heartbeatTask) {
Preconditions.checkNotNull(heartbeatTask, "task can not be null");
executorService.scheduleAtFixedRate(heartbeatTask, interval, interval, TimeUnit.SECONDS);
}
}
......@@ -21,10 +21,10 @@ import org.junit.Test;
import static org.junit.Assert.assertNotNull;
public final class ClusterHeartBeatInstanceTest {
public final class ClusterHeartbeatInstanceTest {
@Test
public void assertGetInstance() {
assertNotNull(ClusterHeartBeatInstance.getInstance());
assertNotNull(ClusterHeartbeatInstance.getInstance());
}
}
......@@ -21,11 +21,11 @@ import org.junit.Test;
import static org.junit.Assert.assertTrue;
public final class HeartBeatDetectNoticeEventTest {
public final class HeartbeatDetectNoticeEventTest {
@Test
public void assertHeartBeatEventType() {
HeartBeatDetectNoticeEvent event = new HeartBeatDetectNoticeEvent("SELECT 1", true, 60);
assertTrue(event.getEventType() == HeartBeatEventType.NOTICE_DETECT);
HeartbeatDetectNoticeEvent event = new HeartbeatDetectNoticeEvent("SELECT 1", true, 60);
assertTrue(event.getEventType() == HeartbeatEventType.NOTICE_DETECT);
}
}
......@@ -21,10 +21,10 @@ import org.junit.Test;
import static org.junit.Assert.assertNotNull;
public final class HeartBeatEventBusTest {
public final class HeartbeatEventBusTest {
@Test
public void assertGetInstance() {
assertNotNull(HeartBeatEventBus.getInstance());
assertNotNull(HeartbeatEventBus.getInstance());
}
}
......@@ -48,7 +48,7 @@
# port: 9190
#
#cluster:
# heartBeat:
# heartbeat:
# sql: select 1
# threadCount: 1
# interval: 60
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册