未验证 提交 39819a2b 编写于 作者: P Potato 提交者: GitHub

[IOTDB-6061] Fix the instability failure caused by initServer in IoTConsensus...

[IOTDB-6061] Fix the instability failure caused by initServer in IoTConsensus UT not binding to the corresponding port  (#10991)
上级 f869a95c
......@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.Peer;
......@@ -53,6 +54,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
public class ReplicateTest {
private static final long CHECK_POINT_GAP = 500;
private final Logger logger = LoggerFactory.getLogger(ReplicateTest.class);
......@@ -201,34 +203,45 @@ public class ReplicateTest {
Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
stopServer();
initServer();
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
try {
stopServer();
initServer();
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to recover entries");
}
Thread.sleep(100);
}
}
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
Assert.assertEquals(
CHECK_POINT_GAP, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
CHECK_POINT_GAP, servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
CHECK_POINT_GAP, servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to recover entries");
}
Thread.sleep(100);
} catch (IOException e) {
if (e.getCause() instanceof StartupException) {
// just succeed when can not bind socket
logger.info("Can not start IoTConsensus because", e);
} else {
logger.error("Failed because", e);
Assert.fail("Failed because " + e.getMessage());
}
}
Assert.assertEquals(
CHECK_POINT_GAP, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
CHECK_POINT_GAP, servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
CHECK_POINT_GAP, servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
}
/**
......@@ -255,39 +268,49 @@ public class ReplicateTest {
Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(0, servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
stopServer();
initServer();
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 2; i++) {
long start = System.currentTimeMillis();
// should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
// replicating all entries
while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
logger.error("{}", servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.fail("Unable to replicate entries");
try {
stopServer();
initServer();
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 2; i++) {
long start = System.currentTimeMillis();
// should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
// replicating all entries
while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
logger.error("{}", servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.fail("Unable to replicate entries");
}
Thread.sleep(100);
}
Thread.sleep(100);
}
}
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(0).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(1).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(2).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(0).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(1).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(2).getRequestSet().size());
Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
} catch (IOException e) {
if (e.getCause() instanceof StartupException) {
// just succeed when can not bind socket
logger.info("Can not start IoTConsensus because", e);
} else {
logger.error("Failed because", e);
Assert.fail("Failed because " + e.getMessage());
}
}
}
private void findPortAvailable(int i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册