未验证 提交 41ab41dd 编写于 作者: 邱鹿 Lucas 提交者: GitHub

rename ResumablePositionManager to ResumeBreakPointManager (#6617)

* rename ResumablePositionManager to ResumeBreakPointManager

* remove useless blank line

Co-authored-by: qiulu3 <Lucas209910>
上级 88c83313
......@@ -29,15 +29,15 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public final class ScalingContext {
private static final ScalingContext INSTANCE = new ScalingContext();
private ServerConfiguration serverConfiguration;
private ShardingScalingExecuteEngine taskExecuteEngine;
private ShardingScalingExecuteEngine importerExecuteEngine;
/**
* Get instance of Sharding-Scaling's context.
*
......@@ -46,7 +46,7 @@ public final class ScalingContext {
public static ScalingContext getInstance() {
return INSTANCE;
}
/**
* Initialize Scaling context.
*
......@@ -57,5 +57,4 @@ public final class ScalingContext {
this.taskExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
this.importerExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
}
}
......@@ -38,12 +38,12 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
* Abstract resumable position manager.
* Abstract resume from break-point manager.
*/
@Getter
@Setter
@Slf4j
public abstract class AbstractResumablePositionManager implements ResumablePositionManager, Closeable {
public abstract class AbstractResumeBreakPointManager implements ResumeBreakPointManager, Closeable {
private static final Gson GSON = new Gson();
......
......@@ -18,11 +18,11 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
/**
* Fake resumable position manager as default.
* Fake resume from break-point manager as default.
*/
public final class FakeResumablePositionManager extends AbstractResumablePositionManager implements ResumablePositionManager {
public final class FakeResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
public FakeResumablePositionManager(final String databaseType, final String taskPath) {
public FakeResumeBreakPointManager(final String databaseType, final String taskPath) {
setDatabaseType(databaseType);
setTaskPath(taskPath);
}
......
......@@ -23,9 +23,9 @@ import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import java.util.Map;
/**
* Resumable position manager interface.
* Resume from break-point manager interface.
*/
public interface ResumablePositionManager {
public interface ResumeBreakPointManager {
/**
* If has resumable data.
......
......@@ -22,28 +22,28 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
/**
* Resumable position manager factory.
* Resume from break-point manager factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ResumablePositionManagerFactory {
public final class ResumeBreakPointManagerFactory {
private static Class<? extends ResumablePositionManager> clazz = FakeResumablePositionManager.class;
private static Class<? extends ResumeBreakPointManager> clazz = FakeResumeBreakPointManager.class;
static {
if (ZookeeperResumablePositionManager.isAvailable()) {
clazz = ZookeeperResumablePositionManager.class;
if (ZookeeperResumeBreakPointManager.isAvailable()) {
clazz = ZookeeperResumeBreakPointManager.class;
}
}
/**
* New resumable position manager instance.
* New resume from break-point manager instance.
*
* @param databaseType database type
* @param taskPath task path for persist data.
* @return resumable position manager
* @return resume from break-point manager
*/
@SneakyThrows(ReflectiveOperationException.class)
public static ResumablePositionManager newInstance(final String databaseType, final String taskPath) {
public static ResumeBreakPointManager newInstance(final String databaseType, final String taskPath) {
return clazz.getConstructor(String.class, String.class).newInstance(databaseType, taskPath);
}
}
......@@ -29,10 +29,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Depends on zookeeper to manager position.
* Depends on zookeeper resume from break-point manager.
*/
@Slf4j
public final class ZookeeperResumablePositionManager extends AbstractResumablePositionManager implements ResumablePositionManager {
public final class ZookeeperResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
private static final String INVENTORY = "/inventory";
......@@ -52,12 +52,12 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
ResumeConfiguration resumeConfiguration = ScalingContext.getInstance().getServerConfiguration().getResumeConfiguration();
if (null != resumeConfiguration) {
CURATOR_ZOOKEEPER_REPOSITORY.init(resumeConfiguration.getNamespace(), new OrchestrationCenterConfiguration("ZooKeeper", resumeConfiguration.getServerLists(), new Properties()));
log.info("zookeeper resumable position manager is available.");
log.info("zookeeper resume from break-point manager is available.");
available = true;
}
}
public ZookeeperResumablePositionManager(final String databaseType, final String taskPath) {
public ZookeeperResumeBreakPointManager(final String databaseType, final String taskPath) {
setDatabaseType(databaseType);
setTaskPath(taskPath);
inventoryPath = taskPath + INVENTORY;
......
......@@ -25,8 +25,8 @@ import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceCheckerCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.preparer.resumer.SyncPositionResumer;
......@@ -63,22 +63,22 @@ public final class ShardingScalingJobPreparer {
String databaseType = shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration().getDatabaseType().getName();
try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getSyncConfigurations())) {
checkDatasources(databaseType, dataSourceManager);
ResumablePositionManager resumablePositionManager = getResumablePositionManager(databaseType, shardingScalingJob);
if (resumablePositionManager.isResumable()) {
syncPositionResumer.resumePosition(shardingScalingJob, dataSourceManager, resumablePositionManager);
ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(databaseType, shardingScalingJob);
if (resumeBreakPointManager.isResumable()) {
syncPositionResumer.resumePosition(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
return;
}
initIncrementalDataTasks(databaseType, shardingScalingJob, dataSourceManager);
initInventoryDataTasks(shardingScalingJob, dataSourceManager);
syncPositionResumer.persistPosition(shardingScalingJob, resumablePositionManager);
syncPositionResumer.persistPosition(shardingScalingJob, resumeBreakPointManager);
} catch (PrepareFailedException ex) {
log.warn("Preparing sharding scaling job {} : {} failed", shardingScalingJob.getJobId(), shardingScalingJob.getJobName(), ex);
shardingScalingJob.setStatus(SyncTaskControlStatus.PREPARING_FAILURE.name());
}
}
private ResumablePositionManager getResumablePositionManager(final String databaseType, final ShardingScalingJob shardingScalingJob) {
return ResumablePositionManagerFactory.newInstance(databaseType, String.format("/%s/item-%d", shardingScalingJob.getJobName(), shardingScalingJob.getShardingItem()));
private ResumeBreakPointManager getResumeBreakPointManager(final String databaseType, final ShardingScalingJob shardingScalingJob) {
return ResumeBreakPointManagerFactory.newInstance(databaseType, String.format("/%s/item-%d", shardingScalingJob.getJobName(), shardingScalingJob.getShardingItem()));
}
private void checkDatasources(final String databaseType, final DataSourceManager dataSourceManager) {
......
......@@ -23,7 +23,7 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
......@@ -47,30 +47,30 @@ public final class SyncPositionResumer {
private final SyncTaskFactory syncTaskFactory = new DefaultSyncTaskFactory();
/**
* Resume position from this position manager.
* Resume position from resume from break-point manager.
*
* @param shardingScalingJob sharding scaling job
* @param dataSourceManager dataSource manager
* @param resumablePositionManager which position manager resume from
* @param resumeBreakPointManager resume from break-point manager
*/
public void resumePosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
resumeInventoryPosition(shardingScalingJob, dataSourceManager, resumablePositionManager);
resumeIncrementalPosition(shardingScalingJob, resumablePositionManager);
public void resumePosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
resumeInventoryPosition(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
resumeIncrementalPosition(shardingScalingJob, resumeBreakPointManager);
}
private void resumeInventoryPosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
List<ScalingTask> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumablePositionManager);
private void resumeInventoryPosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
List<ScalingTask> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
}
}
private List<ScalingTask> getAllInventoryDataTasks(
final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
List<ScalingTask> result = new LinkedList<>();
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumablePositionManager).entrySet()) {
for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry)));
}
}
......@@ -91,50 +91,50 @@ public final class SyncPositionResumer {
}
private Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionMap(
final RdbmsConfiguration dumperConfiguration, final ResumablePositionManager resumablePositionManager) {
final RdbmsConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) {
Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
return resumablePositionManager.getInventoryPositionManagerMap().entrySet().stream()
return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
.filter(entry -> pattern.matcher(entry.getKey()).find())
.collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue));
}
private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumablePositionManager resumablePositionManager) {
private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
each.getDumperConfiguration().setPositionManager(resumablePositionManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
each.getDumperConfiguration().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
}
}
/**
* Persist position when init sync job.
* Persist position.
*
* @param shardingScalingJob sharding scaling job
* @param resumablePositionManager which position manager resume from
* @param resumeBreakPointManager resume from break-point manager
*/
public void persistPosition(final ShardingScalingJob shardingScalingJob, final ResumablePositionManager resumablePositionManager) {
persistIncrementalPosition(shardingScalingJob.getIncrementalDataTasks(), resumablePositionManager);
persistInventoryPosition(shardingScalingJob.getInventoryDataTasks(), resumablePositionManager);
public void persistPosition(final ShardingScalingJob shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
persistIncrementalPosition(shardingScalingJob.getIncrementalDataTasks(), resumeBreakPointManager);
persistInventoryPosition(shardingScalingJob.getInventoryDataTasks(), resumeBreakPointManager);
}
private void persistInventoryPosition(final List<ScalingTask> inventoryDataTasks, final ResumablePositionManager resumablePositionManager) {
private void persistInventoryPosition(final List<ScalingTask> inventoryDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
for (ScalingTask each : inventoryDataTasks) {
if (each instanceof InventoryDataScalingTaskGroup) {
putInventoryDataScalingTask(((InventoryDataScalingTaskGroup) each).getScalingTasks(), resumablePositionManager);
putInventoryDataScalingTask(((InventoryDataScalingTaskGroup) each).getScalingTasks(), resumeBreakPointManager);
}
}
resumablePositionManager.persistInventoryPosition();
resumeBreakPointManager.persistInventoryPosition();
}
private void putInventoryDataScalingTask(final Collection<ScalingTask> scalingTasks, final ResumablePositionManager resumablePositionManager) {
private void putInventoryDataScalingTask(final Collection<ScalingTask> scalingTasks, final ResumeBreakPointManager resumeBreakPointManager) {
for (ScalingTask each : scalingTasks) {
resumablePositionManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
resumeBreakPointManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
}
}
private void persistIncrementalPosition(final List<ScalingTask> incrementalDataTasks, final ResumablePositionManager resumablePositionManager) {
private void persistIncrementalPosition(final List<ScalingTask> incrementalDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
for (ScalingTask each : incrementalDataTasks) {
resumablePositionManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
resumeBreakPointManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
}
resumablePositionManager.persistIncrementalPosition();
resumeBreakPointManager.persistIncrementalPosition();
}
}
......@@ -29,8 +29,8 @@ import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
import org.junit.Before;
import org.junit.Test;
......@@ -53,7 +53,7 @@ public final class SyncPositionResumerTest {
private ShardingScalingJob shardingScalingJob;
private ResumablePositionManager resumablePositionManager;
private ResumeBreakPointManager resumeBreakPointManager;
private SyncPositionResumer syncPositionResumer;
......@@ -62,25 +62,25 @@ public final class SyncPositionResumerTest {
ScalingContext.getInstance().init(new ServerConfiguration());
shardingScalingJob = new ShardingScalingJob("scalingTest", 0);
shardingScalingJob.getSyncConfigurations().add(mockSyncConfiguration());
resumablePositionManager = ResumablePositionManagerFactory.newInstance("MySQL", "/scalingTest/item-0");
resumeBreakPointManager = ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/item-0");
syncPositionResumer = new SyncPositionResumer();
}
@Test
public void assertResumePosition() {
resumablePositionManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
resumablePositionManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumablePositionManager);
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumeBreakPointManager);
assertThat(shardingScalingJob.getIncrementalDataTasks().size(), is(1));
assertTrue(shardingScalingJob.getInventoryDataTasks().isEmpty());
}
@Test
public void assertPersistPosition() {
ResumablePositionManager resumablePositionManager = mock(ResumablePositionManager.class);
syncPositionResumer.persistPosition(shardingScalingJob, resumablePositionManager);
verify(resumablePositionManager).persistIncrementalPosition();
verify(resumablePositionManager).persistInventoryPosition();
ResumeBreakPointManager resumeBreakPointManager = mock(ResumeBreakPointManager.class);
syncPositionResumer.persistPosition(shardingScalingJob, resumeBreakPointManager);
verify(resumeBreakPointManager).persistIncrementalPosition();
verify(resumeBreakPointManager).persistInventoryPosition();
}
private PositionManager mockPositionManager() {
......
......@@ -31,9 +31,9 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class AbstractResumablePositionManagerTest {
public final class AbstractResumeBreakPointManagerTest {
private AbstractResumablePositionManager resumablePositionManager;
private AbstractResumeBreakPointManager resumeBreakPointManager;
private final String incrementalPosition = "{\"ds0\":{\"filename\":\"mysql-bin.000001\",\"position\":4},\"ds1\":{\"filename\":\"mysql-bin.000002\",\"position\":4}}";
......@@ -41,38 +41,38 @@ public final class AbstractResumablePositionManagerTest {
@Before
public void setUp() throws NoSuchFieldException, IllegalAccessException {
resumablePositionManager = new AbstractResumablePositionManager() {
resumeBreakPointManager = new AbstractResumeBreakPointManager() {
};
resumablePositionManager.setDatabaseType("MySQL");
resumablePositionManager.setTaskPath("/scalingTest/item-0");
resumeBreakPointManager.setDatabaseType("MySQL");
resumeBreakPointManager.setTaskPath("/scalingTest/item-0");
ReflectionUtil.getFieldValueFromClass(new ScalingEntryLoader(), "SCALING_ENTRY_MAP", Map.class).put("MySQL", new MySQLScalingEntry());
}
@Test
public void assertResumeIncrementalPosition() {
resumablePositionManager.resumeIncrementalPosition(incrementalPosition);
assertThat(resumablePositionManager.getIncrementalPositionManagerMap().size(), is(2));
resumeBreakPointManager.resumeIncrementalPosition(incrementalPosition);
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), is(2));
}
@Test
public void assertResumeInventoryPosition() {
resumablePositionManager.resumeInventoryPosition(inventoryPosition);
assertThat(resumablePositionManager.getInventoryPositionManagerMap().size(), is(4));
resumeBreakPointManager.resumeInventoryPosition(inventoryPosition);
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), is(4));
}
@Test
public void assertGetIncrementalPositionData() {
resumablePositionManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
resumablePositionManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
assertThat(resumablePositionManager.getIncrementalPositionData(), is(incrementalPosition));
resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
assertThat(resumeBreakPointManager.getIncrementalPositionData(), is(incrementalPosition));
}
@Test
public void assertGetInventoryPositionData() {
resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 100L)));
resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_2", new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
resumablePositionManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 200L)));
assertThat(resumablePositionManager.getInventoryPositionData(), is(inventoryPosition));
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 100L)));
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 200L)));
assertThat(resumeBreakPointManager.getInventoryPositionData(), is(inventoryPosition));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册