未验证 提交 a7470eda 编写于 作者: X Xiangpeng Hu 提交者: GitHub

[IOTDB-6061] Fix the instability failure caused by initServer in IoTConsensus...

[IOTDB-6061] Fix the instability failure caused by initServer in IoTConsensus UT not binding to the corresponding port (#10723)
上级 5a89e751
...@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.common.Peer; ...@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.iot.util.TestEntry; import org.apache.iotdb.consensus.iot.util.TestEntry;
import org.apache.iotdb.consensus.iot.util.TestStateMachine; import org.apache.iotdb.consensus.iot.util.TestStateMachine;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.FileUtils;
import org.junit.After; import org.junit.After;
...@@ -38,9 +39,13 @@ import org.junit.Test; ...@@ -38,9 +39,13 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
...@@ -54,11 +59,17 @@ public class ReplicateTest { ...@@ -54,11 +59,17 @@ public class ReplicateTest {
private static final long timeout = TimeUnit.SECONDS.toMillis(300); private static final long timeout = TimeUnit.SECONDS.toMillis(300);
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
private int basePort = 9000;
private final List<Peer> peers = private final List<Peer> peers =
Arrays.asList( Arrays.asList(
new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)), new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 2)),
new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)), new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort - 1)),
new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002))); new Peer(gid, 3, new TEndPoint("127.0.0.1", basePort)));
private final List<File> peersStorage = private final List<File> peersStorage =
Arrays.asList( Arrays.asList(
...@@ -87,12 +98,35 @@ public class ReplicateTest { ...@@ -87,12 +98,35 @@ public class ReplicateTest {
} }
} }
public void changeConfiguration(int i) {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
outputStream.writeInt(this.peers.size());
for (Peer peer : this.peers) {
peer.serialize(outputStream);
}
File storageDir = new File(IoTConsensus.buildPeerDir(peersStorage.get(i), gid));
Path tmpConfigurationPath =
Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
Path configurationPath =
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
Files.write(tmpConfigurationPath, publicBAOS.getBuf());
if (Files.exists(configurationPath)) {
Files.delete(configurationPath);
}
Files.move(tmpConfigurationPath, configurationPath);
} catch (IOException e) {
logger.error("Unexpected error occurs when persisting configuration", e);
}
}
private void initServer() throws IOException { private void initServer() throws IOException {
for (Peer peer : peers) { for (int i = 0; i < peers.size(); i++) {
waitPortAvailable(peer.getEndpoint().port); findPortAvailable(i);
} }
for (int i = 0; i < peers.size(); i++) { for (int i = 0; i < peers.size(); i++) {
int finalI = i; int finalI = i;
changeConfiguration(i);
servers.add( servers.add(
(IoTConsensus) (IoTConsensus)
ConsensusFactory.getConsensusImpl( ConsensusFactory.getConsensusImpl(
...@@ -254,20 +288,23 @@ public class ReplicateTest { ...@@ -254,20 +288,23 @@ public class ReplicateTest {
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData()); Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
} }
private static void waitPortAvailable(int port) { private void findPortAvailable(int i) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) { while (System.currentTimeMillis() - start < timeout) {
try (ServerSocket ignored = new ServerSocket(port)) { try (ServerSocket ignored = new ServerSocket(this.peers.get(i).getEndpoint().port)) {
// success
return; return;
} catch (IOException e) { } catch (IOException e) {
// Port is already in use, wait and retry // Port is already in use, wait and retry
this.peers.set(i, new Peer(gid, i + 1, new TEndPoint("127.0.0.1", this.basePort)));
logger.info("try port {} for node {}.", this.basePort++, i + 1);
try { try {
Thread.sleep(1000); // Wait for 1 second before retrying Thread.sleep(50); // Wait for 1 second before retrying
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
// Handle the interruption if needed // Handle the interruption if needed
} }
} }
} }
Assert.fail(String.format("can not bind port %d after 300s", port)); Assert.fail(String.format("can not find port for node %d after 300s", i + 1));
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册