提交 34be7840 编写于 作者: 杨翊 SionYang 提交者: avalon566

Refactor SyncTaskExecuteEngine to remove ExecuteUtil (#3719)

上级 15c0c607
......@@ -17,10 +17,19 @@
package org.apache.shardingsphere.shardingscaling.core.config;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.DefaultSyncTaskExecuteEngine;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.SyncTaskExecuteEngine;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* Scaling context.
*
* @author ssxlulu
* @author yangyi
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public final class ScalingContext {
......@@ -28,6 +37,8 @@ public final class ScalingContext {
private static final ScalingContext INSTANCE = new ScalingContext();
private ServerConfiguration serverConfiguration;
private SyncTaskExecuteEngine syncTaskExecuteEngine;
/**
* Get instance of Sharding-Scaling's context.
......@@ -45,6 +56,7 @@ public final class ScalingContext {
*/
public void init(final ServerConfiguration serverConfiguration) {
this.serverConfiguration = serverConfiguration;
this.syncTaskExecuteEngine = new DefaultSyncTaskExecuteEngine(serverConfiguration.getWorkerThread());
}
}
......@@ -23,8 +23,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunner;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunnerGroup;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
......@@ -36,20 +38,40 @@ import java.util.concurrent.TimeUnit;
* Default implement for sync task execute engine.
*
* @author avalon566
* @author yangyi
*/
public class DefaultSyncTaskExecuteEngine implements SyncTaskExecuteEngine {
public final class DefaultSyncTaskExecuteEngine implements SyncTaskExecuteEngine {
private final ListeningExecutorService executorService;
private int availableWorkerThread;
public DefaultSyncTaskExecuteEngine(final int maxWorkerThread) {
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(maxWorkerThread, maxWorkerThread, 0, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadPoolExecutor.AbortPolicy()));
executorService = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(maxWorkerThread, maxWorkerThread, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy()));
availableWorkerThread = maxWorkerThread;
}
@Override
public void submitGroup(final SyncRunnerGroup syncRunnerGroup) {
Iterable<ListenableFuture<Object>> listenableFutures = submit(syncRunnerGroup.getSyncRunners());
ListenableFuture allListenableFuture = Futures.allAsList(listenableFutures);
Futures.addCallback(allListenableFuture, new FutureCallback<List<Object>>() {
@Override
public void onSuccess(final List<Object> result) {
syncRunnerGroup.onSuccess();
}
@Override
public void onFailure(final Throwable t) {
syncRunnerGroup.onFailure(t);
}
});
}
@Override
public final synchronized List<ListenableFuture<Object>> submit(final List<SyncRunner> syncRunners) {
public synchronized List<ListenableFuture<Object>> submit(final Collection<SyncRunner> syncRunners) {
if (null == syncRunners || 0 == syncRunners.size()) {
return Collections.emptyList();
}
......@@ -60,18 +82,7 @@ public class DefaultSyncTaskExecuteEngine implements SyncTaskExecuteEngine {
availableWorkerThread -= syncRunners.size();
for (SyncRunner syncRunner : syncRunners) {
ListenableFuture listenableFuture = executorService.submit(syncRunner);
Futures.addCallback(listenableFuture, new FutureCallback() {
@Override
public void onSuccess(final Object r) {
releaseWorkerThread();
}
@Override
public void onFailure(final Throwable t) {
releaseWorkerThread();
}
});
addReleaseWorkerThreadCallback(listenableFuture);
result.add(listenableFuture);
}
return result;
......@@ -80,4 +91,19 @@ public class DefaultSyncTaskExecuteEngine implements SyncTaskExecuteEngine {
private synchronized void releaseWorkerThread() {
availableWorkerThread++;
}
private void addReleaseWorkerThreadCallback(final ListenableFuture listenableFuture) {
Futures.addCallback(listenableFuture, new FutureCallback() {
@Override
public void onSuccess(final Object r) {
releaseWorkerThread();
}
@Override
public void onFailure(final Throwable t) {
releaseWorkerThread();
}
});
}
}
......@@ -19,7 +19,9 @@ package org.apache.shardingsphere.shardingscaling.core.execute.engine;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunner;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunnerGroup;
import java.util.Collection;
import java.util.List;
/**
......@@ -28,6 +30,13 @@ import java.util.List;
* @author avalon566
*/
public interface SyncTaskExecuteEngine {
/**
* Submit a group syncRunner.
*
* @param syncRunnerGroup sync runner group
*/
void submitGroup(SyncRunnerGroup syncRunnerGroup);
/**
* Submit sync runner to execute.
......@@ -35,5 +44,5 @@ public interface SyncTaskExecuteEngine {
* @param syncRunners sync runner list
* @return listenable future
*/
List<ListenableFuture<Object>> submit(List<SyncRunner> syncRunners);
List<ListenableFuture<Object>> submit(Collection<SyncRunner> syncRunners);
}
......@@ -15,63 +15,62 @@
* limitations under the License.
*/
package org.apache.shardingsphere.shardingscaling.core.execute.engine;
package org.apache.shardingsphere.shardingscaling.core.execute.executor;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunner;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.Reader;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.Writer;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.ExecuteCallback;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
/**
* Execute util.
* Sync runner group.
*
* @author avalon566
* @author yangyi
*/
@Slf4j
public class ExecuteUtil {
private static final SyncTaskExecuteEngine EXECUTE_ENGINE = new DefaultSyncTaskExecuteEngine(ScalingContext.getInstance().getServerConfiguration().getWorkerThread());
@RequiredArgsConstructor
@Getter
public final class SyncRunnerGroup {
private final ExecuteCallback executeCallback;
private final Collection<SyncRunner> syncRunners = new LinkedList<>();
/**
* Add {@code SyncRunner}.
*
* @param syncRunner sync runner
*/
public void addSyncRunner(final SyncRunner syncRunner) {
syncRunners.add(syncRunner);
}
/**
* Execute.
* Add all {@code SyncRunner}.
*
* @param channel channel
* @param reader reader
* @param writers writers
* @param executeCallback call when execute finish
* @param syncRunners collection of sync runners
*/
public static void execute(final Channel channel, final Reader reader, final List<Writer> writers, final ExecuteCallback executeCallback) {
final List<SyncRunner> syncRunners = new LinkedList<>();
reader.setChannel(channel);
syncRunners.add(reader);
for (Writer writer : writers) {
writer.setChannel(channel);
syncRunners.add(writer);
public void addAllSyncRunner(final Collection<? extends SyncRunner> syncRunners) {
this.syncRunners.addAll(syncRunners);
}
/**
* Invoked when this group is successful.
*/
public void onSuccess() {
executeCallback.onSuccess();
}
/**
* Invoked when this group fails or is canceled.
*
* @param throwable throwable
*/
public void onFailure(final Throwable throwable) {
for (SyncRunner each : syncRunners) {
each.stop();
}
Iterable<ListenableFuture<Object>> listenableFutures = EXECUTE_ENGINE.submit(syncRunners);
ListenableFuture allListenableFuture = Futures.allAsList(listenableFutures);
Futures.addCallback(allListenableFuture, new FutureCallback<List<Object>>() {
@Override
public void onSuccess(final List<Object> result) {
executeCallback.onSuccess();
}
@Override
public void onFailure(final Throwable t) {
for (SyncRunner syncRunner : syncRunners) {
syncRunner.stop();
}
executeCallback.onFailure(t);
}
});
executeCallback.onFailure(throwable);
}
}
......@@ -20,15 +20,16 @@ package org.apache.shardingsphere.shardingscaling.core.synctask.history;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.shardingscaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.shardingscaling.core.controller.task.ReportCallback;
import org.apache.shardingsphere.shardingscaling.core.controller.SyncProgress;
import org.apache.shardingsphere.shardingscaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.ExecuteUtil;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.SyncTaskExecuteCallback;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunnerGroup;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.AckCallback;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.MemoryChannel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.Reader;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.JdbcReader;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.ReaderFactory;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.Record;
......@@ -42,7 +43,6 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
......@@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
* @author yangyi
*/
@Slf4j
public class HistoryDataSyncTask implements SyncTask {
public final class HistoryDataSyncTask implements SyncTask {
private final SyncConfiguration syncConfiguration;
......@@ -64,6 +64,10 @@ public class HistoryDataSyncTask implements SyncTask {
private long estimatedRows;
private AtomicLong syncedRows = new AtomicLong();
private JdbcReader reader;
private Writer writer;
public HistoryDataSyncTask(final SyncConfiguration syncConfiguration, final DataSourceFactory dataSourceFactory) {
this.syncConfiguration = syncConfiguration;
......@@ -82,16 +86,35 @@ public class HistoryDataSyncTask implements SyncTask {
}
@Override
public final void start(final ReportCallback callback) {
try {
estimatedRows = getEstimatedRows();
public void start(final ReportCallback callback) {
getEstimatedRows();
instanceSyncRunner();
instanceChannel();
ScalingContext.getInstance().getSyncTaskExecuteEngine().submitGroup(groupSyncRunner(callback));
}
private void getEstimatedRows() {
DataSource dataSource = dataSourceFactory.getDataSource(syncConfiguration.getReaderConfiguration().getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.prepareStatement(String.format("select count(*) from %s %s",
syncConfiguration.getReaderConfiguration().getTableName(),
syncConfiguration.getReaderConfiguration().getWhereCondition()))
.executeQuery();
resultSet.next();
estimatedRows = resultSet.getInt(1);
} catch (SQLException e) {
throw new SyncTaskExecuteException("get estimated rows error.", e);
}
final Reader reader = ReaderFactory.newInstanceJdbcReader(syncConfiguration.getReaderConfiguration(), dataSourceFactory);
final Writer writer = WriterFactory.newInstance(syncConfiguration.getWriterConfiguration(), dataSourceFactory);
ExecuteUtil.execute(new MemoryChannel(new AckCallback() {
}
private void instanceSyncRunner() {
reader = ReaderFactory.newInstanceJdbcReader(syncConfiguration.getReaderConfiguration(), dataSourceFactory);
writer = WriterFactory.newInstance(syncConfiguration.getWriterConfiguration(), dataSourceFactory);
}
private void instanceChannel() {
MemoryChannel channel = new MemoryChannel(new AckCallback() {
@Override
public void onAck(final List<Record> records) {
int count = 0;
......@@ -102,28 +125,27 @@ public class HistoryDataSyncTask implements SyncTask {
}
syncedRows.addAndGet(count);
}
}), reader, Collections.singletonList(writer), new SyncTaskExecuteCallback(this.getClass().getSimpleName(), syncTaskId, callback));
});
reader.setChannel(channel);
writer.setChannel(channel);
}
private int getEstimatedRows() throws SQLException {
DataSource dataSource = dataSourceFactory.getDataSource(syncConfiguration.getReaderConfiguration().getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.prepareStatement(String.format("select count(*) from %s %s",
syncConfiguration.getReaderConfiguration().getTableName(),
syncConfiguration.getReaderConfiguration().getWhereCondition()))
.executeQuery();
resultSet.next();
return resultSet.getInt(1);
}
private SyncRunnerGroup groupSyncRunner(final ReportCallback callback) {
SyncRunnerGroup result = new SyncRunnerGroup(new SyncTaskExecuteCallback(this.getClass().getSimpleName(), syncTaskId, callback));
result.addSyncRunner(reader);
result.addSyncRunner(writer);
return result;
}
@Override
public final void stop() {
throw new UnsupportedOperationException();
public void stop() {
if (null != reader) {
reader.stop();
}
}
@Override
public final SyncProgress getProgress() {
public SyncProgress getProgress() {
return new HistoryDataSyncTaskProgress(syncTaskId, estimatedRows, syncedRows.get());
}
}
......@@ -18,12 +18,14 @@
package org.apache.shardingsphere.shardingscaling.core.synctask.realtime;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.shardingscaling.core.controller.task.ReportCallback;
import org.apache.shardingsphere.shardingscaling.core.controller.SyncProgress;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.ExecuteUtil;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.SyncTaskExecuteCallback;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunnerGroup;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.AckCallback;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.RealtimeSyncChannel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPositionManager;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPositionManagerFactory;
......@@ -44,6 +46,7 @@ import java.util.List;
* Realtime data execute task.
*
* @author avalon566
* @author yangyi
*/
@Slf4j
public final class RealtimeDataSyncTask implements SyncTask {
......@@ -77,8 +80,8 @@ public final class RealtimeDataSyncTask implements SyncTask {
public void start(final ReportCallback callback) {
reader = ReaderFactory.newInstanceLogReader(syncConfiguration.getReaderConfiguration(), logPositionManager.getCurrentPosition());
List<Writer> writers = instanceWriters();
RealtimeSyncChannel channel = instanceChannel(writers.size());
ExecuteUtil.execute(channel, reader, writers, new SyncTaskExecuteCallback(this.getClass().getSimpleName(), syncTaskId, callback));
instanceChannel(writers);
ScalingContext.getInstance().getSyncTaskExecuteEngine().submitGroup(groupSyncRunner(callback, writers));
}
private List<Writer> instanceWriters() {
......@@ -89,8 +92,8 @@ public final class RealtimeDataSyncTask implements SyncTask {
return result;
}
private RealtimeSyncChannel instanceChannel(final int channelSize) {
return new RealtimeSyncChannel(channelSize, Collections.<AckCallback>singletonList(new AckCallback() {
private void instanceChannel(final List<Writer> writers) {
Channel channel = new RealtimeSyncChannel(writers.size(), Collections.<AckCallback>singletonList(new AckCallback() {
@Override
public void onAck(final List<Record> records) {
Record lastHandledRecord = records.get(records.size() - 1);
......@@ -98,6 +101,17 @@ public final class RealtimeDataSyncTask implements SyncTask {
delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
}
}));
reader.setChannel(channel);
for (Writer each : writers) {
each.setChannel(channel);
}
}
private SyncRunnerGroup groupSyncRunner(final ReportCallback callback, final List<Writer> writers) {
SyncRunnerGroup result = new SyncRunnerGroup(new SyncTaskExecuteCallback(this.getClass().getSimpleName(), syncTaskId, callback));
result.addSyncRunner(reader);
result.addAllSyncRunner(writers);
return result;
}
@Override
......
......@@ -67,7 +67,7 @@ public class HistoryDataSyncTaskGroupTest {
}
@After
public void setDown() {
public void tearDown() {
dataSourceFactory.close();
}
......
......@@ -60,7 +60,7 @@ public class HistoryDataSyncTaskTest {
}
@After
public void setDown() {
public void tearDown() {
dataSourceFactory.close();
}
......
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%-5level] %d{HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root>
<level value="info" />
<appender-ref ref="console" />
</root>
</configuration>
......@@ -72,7 +72,7 @@ public class MySQLLogPositionManagerTest {
}
@After
public void setDown() {
public void tearDown() {
dataSourceFactory.close();
}
......
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%-5level] %d{HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root>
<level value="info" />
<appender-ref ref="console" />
</root>
</configuration>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册