未验证 提交 c9315c75 编写于 作者: H Haonan 提交者: GitHub

[IOTDB-6138] Fix the support of negative timestamp (#11033)

上级 2f7e99b1
......@@ -146,6 +146,21 @@ public class IoTDBInsertWithQueryIT {
selectAndCount(2000);
}
@Test
public void insertNegativeTimestampWithQueryTest() {
// insert
insertData(-1000, 1);
// select
selectAndCount(1001);
// insert
insertData(-2000, -1000);
// select
selectAndCount(2001);
}
@Test
public void flushWithQueryTest() throws InterruptedException {
// insert
......@@ -393,11 +408,13 @@ public class IoTDBInsertWithQueryIT {
for (int time = start; time < end; time++) {
String sql =
String.format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70);
statement.execute(sql);
statement.addBatch(sql);
sql =
String.format("insert into root.fans.d0(timestamp,s1) values(%s,%s)", time, time % 40);
statement.execute(sql);
statement.addBatch(sql);
}
statement.executeBatch();
statement.clearBatch();
} catch (SQLException e) {
e.printStackTrace();
}
......@@ -422,7 +439,7 @@ public class IoTDBInsertWithQueryIT {
try (ResultSet resultSet = statement.executeQuery(selectSql)) {
assertNotNull(resultSet);
int cnt = 0;
long before = -1;
long before = -10000;
while (resultSet.next()) {
long cur = Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
if (cur <= before) {
......
......@@ -492,6 +492,51 @@ public class IoTDBSessionSimpleIT {
}
}
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertTabletWithNegativeTimestampTest() {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s3", TSDataType.INT32, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s4", TSDataType.BOOLEAN, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s6", TSDataType.TEXT, TSEncoding.RLE));
Tablet tablet = new Tablet("root.sg1.d1", schemaList);
for (long time = 0; time < 10; time++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, -time);
tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, (double) time);
tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (float) time);
tablet.addValue(schemaList.get(2).getMeasurementId(), rowIndex, time);
tablet.addValue(schemaList.get(3).getMeasurementId(), rowIndex, (int) time);
tablet.addValue(schemaList.get(4).getMeasurementId(), rowIndex, time % 2 == 0);
tablet.addValue(schemaList.get(5).getMeasurementId(), rowIndex, new Binary("Text" + time));
tablet.addValue(schemaList.get(6).getMeasurementId(), rowIndex, "Text" + time);
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
long count = 0L;
while (dataSet.hasNext()) {
count++;
RowRecord rowRecord = dataSet.next();
assertEquals(count - 10, rowRecord.getTimestamp());
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void createTimeSeriesWithDoubleTicksTest() {
......
......@@ -247,9 +247,16 @@ public class RpcUtils {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static String parseLongToDateWithPrecision(
DateTimeFormatter formatter, long timestamp, ZoneId zoneid, String timestampPrecision) {
long integerOfDate;
StringBuilder digits;
if ("ms".equals(timestampPrecision)) {
long integerOfDate = timestamp / 1000;
StringBuilder digits = new StringBuilder(Long.toString(timestamp % 1000));
if (timestamp > 0 || timestamp % 1000 == 0) {
integerOfDate = timestamp / 1000;
digits = new StringBuilder(Long.toString(timestamp % 1000));
} else {
integerOfDate = timestamp / 1000 - 1;
digits = new StringBuilder(Long.toString(1000 + timestamp % 1000));
}
ZonedDateTime dateTime =
ZonedDateTime.ofInstant(Instant.ofEpochSecond(integerOfDate), zoneid);
String datetime = dateTime.format(formatter);
......@@ -261,8 +268,13 @@ public class RpcUtils {
}
return formatDatetimeStr(datetime, digits);
} else if ("us".equals(timestampPrecision)) {
long integerOfDate = timestamp / 1000_000;
StringBuilder digits = new StringBuilder(Long.toString(timestamp % 1000_000));
if (timestamp > 0 || timestamp % 1000_000 == 0) {
integerOfDate = timestamp / 1000_000;
digits = new StringBuilder(Long.toString(timestamp % 1000_000));
} else {
integerOfDate = timestamp / 1000_000 - 1;
digits = new StringBuilder(Long.toString(1000_000 + timestamp % 1000_000));
}
ZonedDateTime dateTime =
ZonedDateTime.ofInstant(Instant.ofEpochSecond(integerOfDate), zoneid);
String datetime = dateTime.format(formatter);
......@@ -274,8 +286,13 @@ public class RpcUtils {
}
return formatDatetimeStr(datetime, digits);
} else {
long integerOfDate = timestamp / 1000_000_000L;
StringBuilder digits = new StringBuilder(Long.toString(timestamp % 1000_000_000L));
if (timestamp > 0 || timestamp % 1000_000_000L == 0) {
integerOfDate = timestamp / 1000_000_000L;
digits = new StringBuilder(Long.toString(timestamp % 1000_000_000L));
} else {
integerOfDate = timestamp / 1000_000_000L - 1;
digits = new StringBuilder(Long.toString(1000_000_000L + timestamp % 1000_000_000L));
}
ZonedDateTime dateTime =
ZonedDateTime.ofInstant(Instant.ofEpochSecond(integerOfDate), zoneid);
String datetime = dateTime.format(formatter);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.rpc;
import org.junit.Assert;
import org.junit.Test;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class RpcUtilsTest {
@Test
public void parseLongToDateWithPrecision() {
DateTimeFormatter formatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
ZoneId zoneId = ZoneId.of("+0000");
Assert.assertEquals(
"1969-12-31T23:59:59.999Z",
RpcUtils.parseLongToDateWithPrecision(formatter, -1, zoneId, "ms"));
Assert.assertEquals(
"1969-12-31T23:59:59.999999Z",
RpcUtils.parseLongToDateWithPrecision(formatter, -1, zoneId, "us"));
Assert.assertEquals(
"1969-12-31T23:59:59.999999999Z",
RpcUtils.parseLongToDateWithPrecision(formatter, -1, zoneId, "ns"));
Assert.assertEquals(
"1969-12-31T23:59:59.000Z",
RpcUtils.parseLongToDateWithPrecision(formatter, -1000, zoneId, "ms"));
Assert.assertEquals(
"1969-12-31T23:59:59.000000Z",
RpcUtils.parseLongToDateWithPrecision(formatter, -1000_000, zoneId, "us"));
Assert.assertEquals(
"1969-12-31T23:59:59.000000000Z",
RpcUtils.parseLongToDateWithPrecision(formatter, -1000_000_000L, zoneId, "ns"));
Assert.assertEquals(
"1970-01-01T00:00:00.001Z",
RpcUtils.parseLongToDateWithPrecision(formatter, 1, zoneId, "ms"));
Assert.assertEquals(
"1970-01-01T00:00:00.000001Z",
RpcUtils.parseLongToDateWithPrecision(formatter, 1, zoneId, "us"));
Assert.assertEquals(
"1970-01-01T00:00:00.000000001Z",
RpcUtils.parseLongToDateWithPrecision(formatter, 1, zoneId, "ns"));
zoneId = ZoneId.of("+0800");
Assert.assertEquals(
"1970-01-01T07:59:59.999+08:00",
RpcUtils.parseLongToDateWithPrecision(formatter, -1, zoneId, "ms"));
}
}
......@@ -85,6 +85,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.rpc.RpcUtils;
......@@ -1020,9 +1021,7 @@ public class PartitionManager {
}
if (req.isSetTimeStamp()) {
plan.setTimeSlotId(
new TTimePartitionSlot(
req.getTimeStamp() - req.getTimeStamp() % COMMON_CONFIG.getTimePartitionInterval()));
plan.setTimeSlotId(TimePartitionUtils.getTimePartitionSlot(req.getTimeStamp()));
}
try {
return (GetRegionIdResp) getConsensusManager().read(plan);
......
......@@ -983,7 +983,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
// only one database, one device, one time interval
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(req.getStartTime());
TimePartitionUtils.getTimePartitionSlot(req.getStartTime());
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(
deviceId, Collections.singletonList(timePartitionSlot), false, false);
......
......@@ -388,7 +388,7 @@ public class AlignedChunkData implements ChunkData {
public static AlignedChunkData deserialize(InputStream stream) throws IOException, PageException {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(ReadWriteIOUtils.readLong(stream));
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
String device = ReadWriteIOUtils.readString(stream);
boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
int chunkHeaderListSize = ReadWriteIOUtils.readInt(stream);
......
......@@ -260,7 +260,7 @@ public class NonAlignedChunkData implements ChunkData {
public static NonAlignedChunkData deserialize(InputStream stream)
throws IOException, PageException {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(ReadWriteIOUtils.readLong(stream));
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
String device = ReadWriteIOUtils.readString(stream);
boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
byte chunkType = ReadWriteIOUtils.readByte(stream);
......
......@@ -114,7 +114,7 @@ public class TsFileSplitter {
== TsFileConstant.TIME_COLUMN_MASK);
IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime());
TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime());
ChunkData chunkData =
ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
......@@ -157,7 +157,7 @@ public class TsFileSplitter {
? chunkMetadata.getStartTime()
: pageHeader.getStartTime();
TTimePartitionSlot pageTimePartitionSlot =
TimePartitionUtils.getTimePartition(startTime);
TimePartitionUtils.getTimePartitionSlot(startTime);
if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
if (!isAligned) {
consumeChunkData(measurementId, chunkOffset, chunkData);
......@@ -198,7 +198,7 @@ public class TsFileSplitter {
consumeChunkData(measurementId, chunkOffset, chunkData);
}
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
satisfiedLength = 0;
endTime =
timePartitionSlot.getStartTime()
......@@ -380,17 +380,17 @@ public class TsFileSplitter {
}
private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
return !TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime())
.equals(TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getEndTime()));
}
private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetadata) {
if (pageHeader.getStatistics() == null) {
return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
return !TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime())
.equals(TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getEndTime()));
}
return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime())
.equals(TimePartitionUtils.getTimePartition(pageHeader.getEndTime()));
return !TimePartitionUtils.getTimePartitionSlot(pageHeader.getStartTime())
.equals(TimePartitionUtils.getTimePartitionSlot(pageHeader.getEndTime()));
}
private Pair<long[], Object[]> decodePage(
......
......@@ -1838,7 +1838,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
boolean needLeftAll;
boolean needRightAll;
long startTime;
long endTime;
TTimePartitionSlot timePartitionSlot;
int index = 0;
......@@ -1846,17 +1845,11 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
if (timeRangeList.get(0).getMin() == Long.MIN_VALUE) {
needLeftAll = true;
startTime =
(timeRangeList.get(0).getMax() / TimePartitionUtils.timePartitionInterval)
* TimePartitionUtils.timePartitionInterval; // included
endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
timePartitionSlot = TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMax());
endTime = TimePartitionUtils.getTimePartitionUpperBound(timeRangeList.get(0).getMax());
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(timeRangeList.get(0).getMax());
} else {
startTime =
(timeRangeList.get(0).getMin() / TimePartitionUtils.timePartitionInterval)
* TimePartitionUtils.timePartitionInterval; // included
endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
timePartitionSlot = TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMin());
endTime = TimePartitionUtils.getTimePartitionUpperBound(timeRangeList.get(0).getMin());
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(timeRangeList.get(0).getMin());
needLeftAll = false;
}
......@@ -1874,15 +1867,13 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
if (curLeft >= endTime) {
result.add(timePartitionSlot);
// next init
endTime =
(curLeft / TimePartitionUtils.timePartitionInterval + 1)
* TimePartitionUtils.timePartitionInterval;
timePartitionSlot = TimePartitionUtils.getTimePartition(curLeft);
endTime = TimePartitionUtils.getTimePartitionUpperBound(curLeft);
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(curLeft);
} else if (curRight >= endTime) {
result.add(timePartitionSlot);
// next init
timePartitionSlot = new TTimePartitionSlot(endTime);
endTime = endTime + TimePartitionUtils.timePartitionInterval;
endTime = endTime + TimePartitionUtils.getTimePartitionInterval();
} else {
index++;
}
......@@ -1891,7 +1882,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
if (needRightAll) {
TTimePartitionSlot lastTimePartitionSlot =
TimePartitionUtils.getTimePartition(timeRangeList.get(timeRangeList.size() - 1).getMin());
TimePartitionUtils.getTimePartitionSlot(
timeRangeList.get(timeRangeList.size() - 1).getMin());
if (lastTimePartitionSlot.startTime != timePartitionSlot.startTime) {
result.add(lastTimePartitionSlot);
}
......
......@@ -255,8 +255,6 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
private static final String GROUP_BY_COMMON_ONLY_ONE_MSG =
"Only one of group by time or group by variation/series/session can be supported at a time";
private static final String NEGATIVE_TIMESTAMP_ERROR_MSG =
"Please set the time >=0 or after 1970-01-01 00:00:00";
private static final String LIMIT_CONFIGURATION_ENABLED_ERROR_MSG =
"Limit configuration is not enabled, please enable it first.";
......@@ -3655,11 +3653,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
if (ctx.time != null) {
long timestamp = parseTimeValue(ctx.time, DateTimeUtils.currentTime());
if (timestamp < 0) {
throw new SemanticException(NEGATIVE_TIMESTAMP_ERROR_MSG);
} else {
getRegionIdStatement.setTimeStamp(timestamp);
}
getRegionIdStatement.setTimeStamp(timestamp);
}
return getRegionIdStatement;
}
......@@ -3683,19 +3677,11 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
if (ctx.startTime != null) {
long timestamp = parseTimeValue(ctx.startTime, DateTimeUtils.currentTime());
if (timestamp < 0) {
throw new SemanticException(NEGATIVE_TIMESTAMP_ERROR_MSG);
} else {
getTimeSlotListStatement.setStartTime(timestamp);
}
getTimeSlotListStatement.setStartTime(timestamp);
}
if (ctx.endTime != null) {
long timestamp = parseTimeValue(ctx.endTime, DateTimeUtils.currentTime());
if (timestamp < 0) {
throw new SemanticException(NEGATIVE_TIMESTAMP_ERROR_MSG);
} else {
getTimeSlotListStatement.setEndTime(timestamp);
}
getTimeSlotListStatement.setEndTime(timestamp);
}
return getTimeSlotListStatement;
}
......
......@@ -85,9 +85,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
.forEach(
o -> {
slotList.add(
new Pair<>(o, TimePartitionUtils.getTimePartition(resource.getStartTime(o))));
new Pair<>(o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o))));
slotList.add(
new Pair<>(o, TimePartitionUtils.getTimePartition(resource.getEndTime(o))));
new Pair<>(o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o))));
});
if (slotList.isEmpty()) {
......
......@@ -106,7 +106,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartition(time);
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(time);
this.dataRegionReplicaSet =
analysis
.getDataPartitionInfo()
......@@ -191,7 +191,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
@TestOnly
public List<TTimePartitionSlot> getTimePartitionSlots() {
return Collections.singletonList(TimePartitionUtils.getTimePartition(time));
return Collections.singletonList(TimePartitionUtils.getTimePartitionSlot(time));
}
@Override
......
......@@ -223,7 +223,7 @@ public class InsertRowsNode extends InsertNode {
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
insertRowNode.devicePath.getFullPath(),
TimePartitionUtils.getTimePartition(insertRowNode.getTime()));
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()));
// collect redirectInfo
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
......
......@@ -156,7 +156,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
devicePath.getFullPath(),
TimePartitionUtils.getTimePartition(insertRowNode.getTime()));
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()));
List<InsertRowNode> tmpMap =
splitMap.computeIfAbsent(dataRegionReplicaSet, k -> new ArrayList<>());
List<Integer> tmpIndexMap =
......
......@@ -190,29 +190,23 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
if (times.length == 0) {
return Collections.emptyList();
}
long startTime =
(times[0] / TimePartitionUtils.timePartitionInterval)
* TimePartitionUtils.timePartitionInterval; // included
long endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartition(times[0]);
long upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[0]);
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[0]);
int startLoc = 0; // included
List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
// for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
List<Integer> ranges = new ArrayList<>();
for (int i = 1; i < times.length; i++) { // times are sorted in session API.
if (times[i] >= endTime) {
if (times[i] >= upperBoundOfTimePartition) {
// a new range.
ranges.add(startLoc); // included
ranges.add(i); // excluded
timePartitionSlots.add(timePartitionSlot);
// next init
startLoc = i;
startTime = endTime;
endTime =
(times[i] / TimePartitionUtils.timePartitionInterval + 1)
* TimePartitionUtils.timePartitionInterval;
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[i]);
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
}
}
......@@ -292,19 +286,14 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
@TestOnly
public List<TTimePartitionSlot> getTimePartitionSlots() {
List<TTimePartitionSlot> result = new ArrayList<>();
long startTime =
(times[0] / TimePartitionUtils.timePartitionInterval)
* TimePartitionUtils.timePartitionInterval; // included
long endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartition(times[0]);
long upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[0]);
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[0]);
for (int i = 1; i < times.length; i++) { // times are sorted in session API.
if (times[i] >= endTime) {
if (times[i] >= upperBoundOfTimePartition) {
result.add(timePartitionSlot);
// next init
endTime =
(times[i] / TimePartitionUtils.timePartitionInterval + 1)
* TimePartitionUtils.timePartitionInterval;
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[i]);
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
}
}
result.add(timePartitionSlot);
......
......@@ -154,7 +154,7 @@ public class InsertRowStatement extends InsertBaseStatement implements ISchemaVa
}
public TTimePartitionSlot getTimePartitionSlot() {
return TimePartitionUtils.getTimePartition(time);
return TimePartitionUtils.getTimePartitionSlot(time);
}
@Override
......
......@@ -80,7 +80,8 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
public List<TTimePartitionSlot> getTimePartitionSlots() {
Set<TTimePartitionSlot> timePartitionSlotSet = new HashSet<>();
for (InsertRowStatement insertRowStatement : insertRowStatementList) {
timePartitionSlotSet.add(TimePartitionUtils.getTimePartition(insertRowStatement.getTime()));
timePartitionSlotSet.add(
TimePartitionUtils.getTimePartitionSlot(insertRowStatement.getTime()));
}
return new ArrayList<>(timePartitionSlotSet);
}
......
......@@ -118,19 +118,14 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem
public List<TTimePartitionSlot> getTimePartitionSlots() {
List<TTimePartitionSlot> result = new ArrayList<>();
long startTime =
(times[0] / TimePartitionUtils.timePartitionInterval)
* TimePartitionUtils.timePartitionInterval; // included
long endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartition(times[0]);
long upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[0]);
TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[0]);
for (int i = 1; i < times.length; i++) { // times are sorted in session API.
if (times[i] >= endTime) {
if (times[i] >= upperBoundOfTimePartition) {
result.add(timePartitionSlot);
// next init
endTime =
(times[i] / TimePartitionUtils.timePartitionInterval + 1)
* TimePartitionUtils.timePartitionInterval;
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[i]);
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
}
}
result.add(timePartitionSlot);
......
......@@ -163,7 +163,9 @@ public class StorageEngine implements IService {
if (timePartitionInterval == -1) {
initTimePartition();
}
return time / timePartitionInterval;
return time > 0 || time % timePartitionInterval == 0
? time / timePartitionInterval
: time / timePartitionInterval - 1;
}
/** block insertion if the insertion is rejected by memory control */
......
......@@ -23,12 +23,16 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.utils.TestOnly;
public class TimePartitionUtils {
public static long timePartitionInterval =
private static long timePartitionInterval =
CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
public static TTimePartitionSlot getTimePartition(long time) {
public static TTimePartitionSlot getTimePartitionSlot(long time) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
timePartitionSlot.setStartTime(time - time % timePartitionInterval);
if (time > 0 || time % timePartitionInterval == 0) {
timePartitionSlot.setStartTime(time / timePartitionInterval * timePartitionInterval);
} else {
timePartitionSlot.setStartTime((time / timePartitionInterval - 1) * timePartitionInterval);
}
return timePartitionSlot;
}
......@@ -36,6 +40,20 @@ public class TimePartitionUtils {
return timePartitionInterval;
}
public static long getTimePartitionUpperBound(long time) {
long upperBoundOfTimePartition;
if (time > 0 || time % TimePartitionUtils.timePartitionInterval == 0) {
upperBoundOfTimePartition =
(time / TimePartitionUtils.timePartitionInterval + 1)
* TimePartitionUtils.timePartitionInterval;
} else {
upperBoundOfTimePartition =
(time / TimePartitionUtils.timePartitionInterval)
* TimePartitionUtils.timePartitionInterval;
}
return upperBoundOfTimePartition;
}
@TestOnly
public static void setTimePartitionInterval(long timePartitionInterval) {
TimePartitionUtils.timePartitionInterval = timePartitionInterval;
......
......@@ -100,13 +100,13 @@ public class WritePlanNodeSplitTest {
new TEndPoint("127.0.0.1", 10740),
new TEndPoint("127.0.0.1", 10760),
new TEndPoint("127.0.0.1", 10750)));
// sg1 has 5 data regions
// sg1 has 7 data regions
for (int i = 0; i < seriesSlotPartitionNum; i++) {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = new HashMap<>();
for (int t = 0; t < 5; t++) {
long startTime = t * TimePartitionUtils.timePartitionInterval;
for (int t = -2; t < 5; t++) {
long startTime = t * TimePartitionUtils.getTimePartitionInterval() + 1;
timePartitionSlotMap.put(
new TTimePartitionSlot(startTime),
TimePartitionUtils.getTimePartitionSlot(startTime),
Collections.singletonList(
new TRegionReplicaSet(
new TConsensusGroupId(
......@@ -125,7 +125,7 @@ public class WritePlanNodeSplitTest {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = new HashMap<>();
for (int t = 0; t < 5; t++) {
timePartitionSlotMap.put(
new TTimePartitionSlot(t * TimePartitionUtils.timePartitionInterval),
new TTimePartitionSlot(t * TimePartitionUtils.getTimePartitionInterval()),
Collections.singletonList(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 99), locationList)));
......@@ -150,7 +150,7 @@ public class WritePlanNodeSplitTest {
}
private int getRegionIdByTime(long startTime) {
return (int) (4 - (startTime / TimePartitionUtils.timePartitionInterval));
return (int) (4 - ((startTime - 1) / TimePartitionUtils.getTimePartitionInterval()));
}
protected DataPartition getDataPartition(
......@@ -194,9 +194,11 @@ public class WritePlanNodeSplitTest {
InsertTabletNode insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 1"));
insertTabletNode.setDevicePath(new PartialPath("root.sg1.d1"));
insertTabletNode.setTimes(new long[] {1, 60, 120, 180, 270, 290, 360, 375, 440, 470});
insertTabletNode.setTimes(
new long[] {-200, -101, 1, 60, 120, 180, 270, 290, 360, 375, 440, 470});
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
insertTabletNode.setColumns(
new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath());
......@@ -209,7 +211,7 @@ public class WritePlanNodeSplitTest {
List<WritePlanNode> insertTabletNodeList = insertTabletNode.splitByPartition(analysis);
Assert.assertEquals(5, insertTabletNodeList.size());
Assert.assertEquals(6, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
InsertTabletNode tabletNode = (InsertTabletNode) insertNode;
Assert.assertEquals(tabletNode.getTimes().length, 2);
......@@ -285,10 +287,10 @@ public class WritePlanNodeSplitTest {
InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("plan node 3"));
for (int i = 0; i < 5; i++) {
for (int i = 0; i < 7; i++) {
InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId("plan node 3"));
insertRowNode.setDevicePath(new PartialPath(String.format("root.sg1.d%d", i)));
insertRowNode.setTime(i * TimePartitionUtils.timePartitionInterval);
insertRowNode.setTime((i - 2) * TimePartitionUtils.getTimePartitionInterval());
insertRowsNode.addOneInsertRowNode(insertRowNode, 2 * i);
insertRowNode = new InsertRowNode(new PlanNodeId("plan node 3"));
......@@ -309,9 +311,9 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
List<WritePlanNode> insertTabletNodeList = insertRowsNode.splitByPartition(analysis);
List<WritePlanNode> insertRowsNodeList = insertRowsNode.splitByPartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
Assert.assertEquals(8, insertRowsNodeList.size());
}
@After
......
......@@ -68,4 +68,17 @@ public class StorageEngineTest {
rg1.syncDeleteDataFiles();
rg2.syncDeleteDataFiles();
}
@Test
public void testGetTimePartitionId() {
long timePartitionInterval = StorageEngine.getTimePartitionInterval();
Assert.assertEquals(-2, StorageEngine.getTimePartition(-timePartitionInterval - 1));
Assert.assertEquals(-1, StorageEngine.getTimePartition(-timePartitionInterval));
Assert.assertEquals(-1, StorageEngine.getTimePartition(-1));
Assert.assertEquals(0, StorageEngine.getTimePartition(0));
Assert.assertEquals(0, StorageEngine.getTimePartition(1));
Assert.assertEquals(0, StorageEngine.getTimePartition(timePartitionInterval / 2));
Assert.assertEquals(1, StorageEngine.getTimePartition(timePartitionInterval * 2 - 1));
Assert.assertEquals(2, StorageEngine.getTimePartition(timePartitionInterval * 2 + 1));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册