未验证 提交 9883106d 编写于 作者: X Xiangwei Wei 提交者: GitHub

Fix group by data inconsistence bug (#3317)

Fix group by data inconsistence bug 
上级 22086d31
......@@ -45,29 +45,25 @@ public class CountAggrResult extends AggregateResult {
@Override
public void updateResultFromStatistics(Statistics statistics) {
long preValue = getLongValue();
preValue += statistics.getCount();
setLongValue(preValue);
setLongValue(getLongValue() + statistics.getCount());
}
@Override
public void updateResultFromPageData(BatchData dataInThisPage) {
int cnt = dataInThisPage.length();
long preValue = getLongValue();
preValue += cnt;
setLongValue(preValue);
setLongValue(getLongValue() + dataInThisPage.length());
}
@Override
public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
int cnt = 0;
while (dataInThisPage.hasCurrent()) {
if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
break;
}
long preValue = getLongValue();
setLongValue(++preValue);
cnt++;
dataInThisPage.next();
}
setLongValue(getLongValue() + cnt);
}
@Override
......@@ -80,10 +76,7 @@ public class CountAggrResult extends AggregateResult {
cnt++;
}
}
long preValue = getLongValue();
preValue += cnt;
setLongValue(preValue);
setLongValue(getLongValue() + cnt);
}
@Override
......@@ -94,10 +87,7 @@ public class CountAggrResult extends AggregateResult {
cnt++;
}
}
long preValue = getLongValue();
preValue += cnt;
setLongValue(preValue);
setLongValue(getLongValue() + cnt);
}
@Override
......
......@@ -297,11 +297,12 @@ public class SeriesReader {
* first time series metadata is already unpacked, consume cached ChunkMetadata
*/
if (!cachedChunkMetadata.isEmpty()) {
firstChunkMetadata = cachedChunkMetadata.poll();
firstChunkMetadata = cachedChunkMetadata.peek();
unpackAllOverlappedTsFilesToTimeSeriesMetadata(
orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
firstChunkMetadata = cachedChunkMetadata.poll();
}
}
......
......@@ -33,14 +33,10 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.db.constant.TestConstant.count;
import static org.junit.Assert.fail;
/**
* This test contains one seq file and one unseq file. In the seq file, it contains two pages:
* 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page: 7,
* 9. The unseq page is overlapped with the second seq page.
*/
public class IoTDBGroupByUnseqIT {
private static String[] dataSet1 =
......@@ -61,13 +57,29 @@ public class IoTDBGroupByUnseqIT {
"flush"
};
private static final String TIMESTAMP_STR = "Time";
private static String[] dataSet2 =
new String[] {
"SET STORAGE GROUP TO root.sg2",
"CREATE TIMESERIES root.sg2.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
"INSERT INTO root.sg2.d1(time,s1) values(1, 1)",
"INSERT INTO root.sg2.d1(time,s1) values(10, 10)",
"flush",
"INSERT INTO root.sg2.d1(time,s1) values(19, 19)",
"INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
"flush",
"INSERT INTO root.sg2.d1(time,s1) values(5, 5)",
"INSERT INTO root.sg2.d1(time,s1) values(15, 15)",
"INSERT INTO root.sg2.d1(time,s1) values(26, 26)",
"INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
"flush"
};
private boolean enableUnseqCompaction;
private int maxNumberOfPointsInPage;
@Before
public void setUp() throws Exception {
Class.forName(Config.JDBC_DRIVER_NAME);
EnvironmentUtils.closeStatMonitor();
enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
......@@ -75,23 +87,6 @@ public class IoTDBGroupByUnseqIT {
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(4);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData();
}
private void prepareData() {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement(); ) {
for (String sql : dataSet1) {
statement.execute(sql);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@After
......@@ -101,6 +96,11 @@ public class IoTDBGroupByUnseqIT {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
}
/**
* This test contains one seq file and one unseq file. In the seq file, it contains two pages:
* 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page:
* 7, 9. The unseq page is overlapped with the second seq page.
*/
@Test
public void test1() {
String[] retArray1 =
......@@ -111,13 +111,17 @@ public class IoTDBGroupByUnseqIT {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
for (String sql : dataSet1) {
statement.execute(sql);
}
boolean hasResultSet =
statement.execute("select count(s1) from root.sg1.d1 group by ([1, 13), 3ms)");
Assert.assertTrue(hasResultSet);
int cnt;
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
cnt = 0;
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
......@@ -133,4 +137,53 @@ public class IoTDBGroupByUnseqIT {
fail(e.getMessage());
}
}
/**
* This test contains two seq files and one unseq file. In the first seq file, it contains two
* points: [1, 10]. In the second seq file, it contains two points: [19, 30]. In the unseq file,
* it contains two CHUNKS: [5, 15], [26, 30]. The unseq file is overlapped with two seq files.
* While the chunk [19,30] in the second seq file is unpacked, it should replace [26,30] as the
* first chunk.
*/
@Test
public void test2() {
String[] retArray1 = new String[] {"5,1", "10,1", "15,2", "20,0", "25,1"};
int preAvgSeriesPointNumberThreshold =
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
for (String sql : dataSet2) {
statement.execute(sql);
}
boolean hasResultSet =
statement.execute("select count(s1) from root.sg2.d1 group by ([5, 30), 5ms)");
Assert.assertTrue(hasResultSet);
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(count("root.sg2.d1.s1"));
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
Assert.assertEquals(retArray1.length, cnt);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
.setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册