未验证 提交 0444fc48 编写于 作者: Y YangCaiyin 提交者: GitHub

More UT for iotdb-core/datanode

上级 61b55122
......@@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.join.RowBasedT
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.window.CountWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.SessionWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.TimeWindowParameter;
import org.apache.iotdb.db.queryengine.execution.operator.window.VariationWindowParameter;
......@@ -835,8 +836,8 @@ public class RawDataAggregationOperatorTest {
}
}
assertEquals(resultMinTime1, 499);
assertEquals(resultMinTime2, 499);
assertEquals(499, resultMinTime1);
assertEquals(499, resultMinTime2);
}
@Test
......@@ -997,4 +998,56 @@ public class RawDataAggregationOperatorTest {
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
windowParameter);
}
@Test
public void groupByCountTest() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
List<List<InputLocation[]>> inputLocations = new ArrayList<>();
for (int i = 0; i < 2; i++) {
aggregationTypes.add(TAggregationType.FIRST_VALUE);
List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
inputLocations.add(inputLocationForOneAggregator);
}
for (int i = 0; i < 2; i++) {
aggregationTypes.add(TAggregationType.LAST_VALUE);
List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
inputLocations.add(inputLocationForOneAggregator);
}
WindowParameter windowParameter = new CountWindowParameter(10, 0, false, false);
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
long count = 0;
long index = 0;
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
for (int row = 0; row < resultTsBlock.getPositionCount(); row++) {
long firstValue = count, lastValue = count + 9;
if (count < 200) {
firstValue += 20000;
lastValue += 20000;
} else if (count < 260 || (count >= 300 && count < 380) || (count >= 400 && count < 500)) {
firstValue += 10000;
lastValue += 10000;
}
for (int i = 0; i < 2; i++) {
assertEquals(firstValue, resultTsBlock.getColumn(i).getInt(row));
}
for (int i = 2; i < 4; i++) {
assertEquals(lastValue, resultTsBlock.getColumn(i).getInt(row));
}
count += 10;
index++;
}
}
assertEquals(50, index);
assertEquals(500, count);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.utils.sort;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SortUtilTest {
private static final String folderPath = "SORT_TEST";
private static final String filePrefix = folderPath + File.separator + "tmp";
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(200);
}
private void clear() {
File tmpDir = new File(folderPath);
if (!tmpDir.exists()) return;
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void diskSortTest() {
double[] values = new double[] {1.0, 2.0, 3.0, 5.0, 6.0, 8.0, 11.0, 13.0, 15.0, 17.0};
double[] values2 = new double[] {4.0, 7.0, 9.0, 10.0, 12.0, 14.0, 16.0, 18.0, 19.0, 20.0};
TimeColumn timeColumn = new TimeColumn(10, new long[] {1, 2, 3, 5, 6, 8, 11, 13, 15, 17});
Column column = new DoubleColumn(10, Optional.empty(), values);
TimeColumn timeColumn2 = new TimeColumn(10, new long[] {4, 7, 9, 10, 12, 14, 16, 18, 19, 20});
Column column2 = new DoubleColumn(10, Optional.empty(), values2);
TsBlock tsBlock = new TsBlock(timeColumn, column);
TsBlock tsBlock2 = new TsBlock(timeColumn2, column2);
DiskSpiller diskSpiller =
new DiskSpiller(folderPath, filePrefix, Collections.singletonList(TSDataType.DOUBLE));
List<SortKey> sortKeyList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
sortKeyList.add(new SortKey(tsBlock, i));
}
SortBufferManager sortBufferManager = new SortBufferManager();
try {
sortBufferManager.allocateOneSortBranch();
diskSpiller.spillSortedData(sortKeyList);
List<SortReader> sortReaders = diskSpiller.getReaders(sortBufferManager);
assertTrue(sortReaders.size() == 1 && sortReaders.get(0) instanceof FileSpillerReader);
FileSpillerReader fileSpillerReader = (FileSpillerReader) sortReaders.get(0);
int index = 0;
while (fileSpillerReader.hasNext()) {
SortKey sortKey = fileSpillerReader.next();
assertEquals(sortKey.tsBlock.getColumn(0).getDouble(index), values[index], 0.001);
index++;
}
fileSpillerReader.close();
assertEquals(10, index);
} catch (Exception e) {
clear();
fail(e.getMessage());
}
List<MergeSortKey> mergeSortKeyList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
mergeSortKeyList.add(new MergeSortKey(tsBlock2, i));
}
MemoryReader memoryReader = new MemoryReader(mergeSortKeyList);
try {
int index = 0;
while (memoryReader.hasNext()) {
SortKey sortKey = memoryReader.next();
assertEquals(sortKey.tsBlock.getColumn(0).getDouble(index), values2[index], 0.001);
index++;
}
memoryReader.close();
assertEquals(10, index);
} catch (Exception e) {
clear();
fail(e.getMessage());
}
clear();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册