未验证 提交 d6dda206 编写于 作者: L Liu Xuxin 提交者: GitHub

[IOTDB-2456] fix compaction lost device data when two devices in different tsfile (#4919)

上级 8ac9fde7
......@@ -211,7 +211,7 @@ public class IoTDBAlignedTimeSeriesCompactionIT {
}
@Test
public void testAlignedTsFileWithDifferentSchemaInDifferentTsFile() throws Exception {
public void testAlignedTsFileWithDifferentSeriesInDifferentTsFile() throws Exception {
String[] prepareSql =
new String[] {
"create aligned timeseries root.compactionTestForAligned.d1(s1 DOUBLE, s2 DOUBLE, s3 DOUBLE, s4 DOUBLE)",
......@@ -305,6 +305,84 @@ public class IoTDBAlignedTimeSeriesCompactionIT {
}
}
@Test
public void testAlignedTsFileWithDifferentDevicesInDifferentTsFile() throws Exception {
String[] prepareSql =
new String[] {
"create aligned timeseries root.compactionTestForAligned.d1(s1 DOUBLE, s2 DOUBLE, s3 DOUBLE, s4 DOUBLE)",
"create aligned timeseries root.compactionTestForAligned.d2(s1 INT64, s2 INT64, s3 INT64, s4 INT64)",
};
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
for (String sql : prepareSql) {
statement.execute(sql);
}
long time = 1;
Map<String, Map<Long, Long>> valueMap = new HashMap<>();
Random random = new Random(10);
for (int i = 0; i < 30; ++i) {
long nextTime = time + 100;
long tempTime = time;
while (tempTime < nextTime) {
statement.execute(
String.format(
"insert into root.compactionTestForAligned.d1(time, s1, s2, s3, s4) aligned values (%d, %d, %d, %d, %d)",
tempTime, tempTime + 1, tempTime + 2, tempTime + 3, tempTime + 4));
valueMap.computeIfAbsent("d1s1", x -> new TreeMap<>()).put(tempTime, tempTime + 1);
valueMap.computeIfAbsent("d1s2", x -> new TreeMap<>()).put(tempTime, tempTime + 2);
valueMap.computeIfAbsent("d1s3", x -> new TreeMap<>()).put(tempTime, tempTime + 3);
valueMap.computeIfAbsent("d1s4", x -> new TreeMap<>()).put(tempTime, tempTime + 4);
tempTime++;
}
statement.execute("FLUSH");
while (time < nextTime) {
statement.execute(
String.format(
"insert into root.compactionTestForAligned.d2(time, s1, s2, s3, s4) aligned values (%d, null, %d, null, %d)",
time, time * 3, time * 5));
valueMap.computeIfAbsent("d2s1", x -> new TreeMap<>()).put(time, 0L);
valueMap.computeIfAbsent("d2s2", x -> new TreeMap<>()).put(time, time * 3);
valueMap.computeIfAbsent("d2s3", x -> new TreeMap<>()).put(time, 0L);
valueMap.computeIfAbsent("d2s4", x -> new TreeMap<>()).put(time, time * 5);
time++;
}
statement.execute("FLUSH");
}
statement.execute("MERGE");
Thread.sleep(500);
ResultSet resultSet =
statement.executeQuery("select * from root.compactionTestForAligned.**");
int count = 0;
while (resultSet.next()) {
count++;
long resultTime = resultSet.getLong("Time");
double d1s1 = resultSet.getDouble("root.compactionTestForAligned.d1.s1");
double d1s2 = resultSet.getDouble("root.compactionTestForAligned.d1.s2");
double d1s3 = resultSet.getDouble("root.compactionTestForAligned.d1.s3");
double d1s4 = resultSet.getDouble("root.compactionTestForAligned.d1.s4");
long d2s1 = resultSet.getLong("root.compactionTestForAligned.d2.s1");
long d2s2 = resultSet.getLong("root.compactionTestForAligned.d2.s2");
long d2s3 = resultSet.getLong("root.compactionTestForAligned.d2.s3");
long d2s4 = resultSet.getLong("root.compactionTestForAligned.d2.s4");
Assert.assertEquals(valueMap.get("d1s1").get(resultTime).doubleValue(), d1s1, E);
Assert.assertEquals(valueMap.get("d1s2").get(resultTime).doubleValue(), d1s2, E);
Assert.assertEquals(valueMap.get("d1s3").get(resultTime).doubleValue(), d1s3, E);
Assert.assertEquals(valueMap.get("d1s4").get(resultTime).doubleValue(), d1s4, E);
Assert.assertEquals((long) valueMap.get("d2s1").get(resultTime), d2s1);
Assert.assertEquals((long) valueMap.get("d2s2").get(resultTime), d2s2);
Assert.assertEquals((long) valueMap.get("d2s3").get(resultTime), d2s3);
Assert.assertEquals((long) valueMap.get("d2s4").get(resultTime), d2s4);
}
Assert.assertEquals(30 * 100, count);
}
}
@Test
public void testAlignedTsFileWithDifferentDataType() throws Exception {
String[] prepareSql =
......
......@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
......@@ -180,6 +181,15 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
fullStorageGroupName));
}
if (targetTsFileResource.getTsFile().length()
< TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
// the file size is smaller than magic string and version number
throw new RuntimeException(
String.format(
"target file %s is smaller than magic string and version number size",
targetTsFileResource));
}
// delete the old files
InnerSpaceCompactionUtils.deleteTsFilesInDisk(
selectedTsFileResourceList, fullStorageGroupName);
......
......@@ -75,7 +75,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
public boolean hasNextDevice() {
boolean hasNext = false;
for (TsFileDeviceIterator iterator : deviceIteratorMap.values()) {
hasNext = hasNext | iterator.hasNext();
hasNext = hasNext || iterator.hasNext() || !iterator.current().equals(currentDevice);
}
return hasNext;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册