提交 43360325 编写于 作者: A avalon566

Refactor SyncJob

上级 3d7df6c1
......@@ -18,12 +18,14 @@
package info.avalon566.shardingscaling.core.job.sync;
import info.avalon566.shardingscaling.core.config.SyncConfiguration;
import info.avalon566.shardingscaling.core.job.MigrateProgress;
import info.avalon566.shardingscaling.core.job.sync.executor.Event;
import info.avalon566.shardingscaling.core.job.sync.executor.EventType;
import info.avalon566.shardingscaling.core.job.sync.executor.Reporter;
import info.avalon566.shardingscaling.core.exception.SyncExecuteException;
import info.avalon566.shardingscaling.core.sync.SyncExecutor;
import info.avalon566.shardingscaling.core.sync.channel.MemoryChannel;
import info.avalon566.shardingscaling.core.sync.reader.NopLogPosition;
import info.avalon566.shardingscaling.core.sync.reader.Reader;
import info.avalon566.shardingscaling.core.sync.reader.ReaderFactory;
import info.avalon566.shardingscaling.core.sync.writer.Writer;
......@@ -50,6 +52,21 @@ public class HistoryDataSyncJob implements SyncJob {
this.reporter = reporter;
}
@Override
public final void start() {
new Thread(this).start();
}
@Override
public final void stop() {
throw new UnsupportedOperationException();
}
@Override
public final MigrateProgress getProgress() {
return new MigrateProgress("HISTORY_DATA_SYNC", new NopLogPosition());
}
/**
* Run synchronize task.
*/
......
......@@ -19,6 +19,7 @@ package info.avalon566.shardingscaling.core.job.sync;
import info.avalon566.shardingscaling.core.config.SyncConfiguration;
import info.avalon566.shardingscaling.core.exception.SyncExecuteException;
import info.avalon566.shardingscaling.core.job.MigrateProgress;
import info.avalon566.shardingscaling.core.job.sync.executor.Event;
import info.avalon566.shardingscaling.core.job.sync.executor.EventType;
import info.avalon566.shardingscaling.core.job.sync.executor.Reporter;
......@@ -56,6 +57,21 @@ public class RealtimeDataSyncJob implements SyncJob {
mysqlBinlogReader = ReaderFactory.newInstanceLogReader(syncConfiguration.getReaderConfiguration(), syncConfiguration.getPosition());
}
@Override
public final void start() {
new Thread(this).start();
}
@Override
public final void stop() {
mysqlBinlogReader.stop();
}
@Override
public final MigrateProgress getProgress() {
return new MigrateProgress("REALTIME_DATA_SYNC", channel.getCurrentLogPosition());
}
/**
* Do something before run,mark binlog position.
*
......
......@@ -17,10 +17,29 @@
package info.avalon566.shardingscaling.core.job.sync;
import info.avalon566.shardingscaling.core.job.MigrateProgress;
/**
* Sync job interface.
*
* @author avalon566
*/
public interface SyncJob extends Runnable {
/**
* Start synchronize data.
*/
void start();
/**
* Stop synchronize data.
*/
void stop();
/**
* Get synchronize progress.
*
* @return migrate progress
*/
MigrateProgress getProgress();
}
......@@ -35,7 +35,7 @@ public class LocalSyncJobExecutor implements SyncJobExecutor {
public final Reporter execute(final List<SyncConfiguration> syncConfigurations) {
LocalReporter reporter = new LocalReporter();
for (SyncConfiguration syncConfiguration : syncConfigurations) {
new Thread(SyncJobFactory.createSyncJobInstance(syncConfiguration, reporter)).start();
SyncJobFactory.createSyncJobInstance(syncConfiguration, reporter).start();
}
return reporter;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册