提交 ba14fb3d 编写于 作者: V Vlad Ilyushchenko

CAIRO: TableWriter maintains minTimestamp so that TableReader doesn't have to...

CAIRO: TableWriter maintains minTimestamp so that TableReader doesn't have to count directories to determine data interval
上级 cfb7bdd6
......@@ -27,10 +27,8 @@ import com.questdb.cairo.sql.RecordMetadata;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.std.*;
import com.questdb.std.microtime.DateFormat;
import com.questdb.std.microtime.DateLocaleFactory;
import com.questdb.std.microtime.Dates;
import com.questdb.std.str.NativeLPSZ;
import com.questdb.std.str.Path;
import java.io.Closeable;
......@@ -44,19 +42,17 @@ public class TableReader implements Closeable {
private static final PartitionPathGenerator DEFAULT_GEN = (reader, partitionIndex) -> reader.pathGenDefault();
private static final ReloadMethod NON_PARTITIONED_RELOAD_METHOD = TableReader::reloadNonPartitioned;
private static final ReloadMethod FIRST_TIME_NON_PARTITIONED_RELOAD_METHOD = TableReader::reloadInitialNonPartitioned;
private static final ReloadMethod FIRST_TIME_PARTITIONED_RELOAD_METHOD = TableReader::reloadInitialPartitioned;
private static final ReloadMethod PARTITIONED_RELOAD_METHOD = TableReader::reloadPartitioned;
private static final ReloadMethod FIRST_TIME_PARTITIONED_RELOAD_METHOD = TableReader::reloadInitialPartitioned;
private final ColumnCopyStruct tempCopyStruct = new ColumnCopyStruct();
private final FilesFacade ff;
private final Path path;
private final int rootLen;
private final ReadOnlyMemory txMem;
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final TableReaderMetadata metadata;
private final LongList partitionRowCounts;
private final PartitionPathGenerator partitionPathGenerator;
private final TableReaderRecordCursor recordCursor = new TableReaderRecordCursor();
private final DateFormat dateFormat;
private final TimestampFloorMethod timestampFloorMethod;
private final IntervalLengthMethod intervalLengthMethod;
private final PartitionTimestampCalculatorMethod partitionTimestampCalculatorMethod;
......@@ -108,7 +104,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = Dates::floorDD;
intervalLengthMethod = Dates::getDaysBetween;
partitionTimestampCalculatorMethod = Dates::addDays;
dateFormat = TableUtils.fmtDay;
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
break;
......@@ -118,7 +113,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = Dates::floorMM;
intervalLengthMethod = Dates::getMonthsBetween;
partitionTimestampCalculatorMethod = Dates::addMonths;
dateFormat = TableUtils.fmtMonth;
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
break;
......@@ -128,7 +122,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = Dates::floorYYYY;
intervalLengthMethod = Dates::getYearsBetween;
partitionTimestampCalculatorMethod = Dates::addYear;
dateFormat = TableUtils.fmtYear;
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
break;
......@@ -138,7 +131,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = null;
intervalLengthMethod = null;
partitionTimestampCalculatorMethod = null;
dateFormat = null;
countDefaultPartitions();
break;
}
......@@ -159,6 +151,56 @@ public class TableReader implements Closeable {
}
}
private static int getColumnBits(int columnCount) {
return Numbers.msb(Numbers.ceilPow2(columnCount) * 2);
}
static int getPrimaryColumnIndex(int base, int index) {
return base + index * 2;
}
private static boolean isEntryToBeProcessed(long address, int index) {
if (Unsafe.getUnsafe().getByte(address + index) == -1) {
return false;
}
Unsafe.getUnsafe().putByte(address + index, (byte) -1);
return true;
}
private static void growColumn(ReadOnlyColumn mem1, ReadOnlyColumn mem2, int type, long rowCount) {
long offset;
long len;
if (rowCount > 0) {
// subtract column top
switch (type) {
case ColumnType.BINARY:
assert mem2 != null;
mem2.grow(rowCount * 8);
offset = mem2.getLong((rowCount - 1) * 8);
// grow data column to value offset + length, so that we can read length
mem1.grow(offset + 8);
len = mem1.getLong(offset);
if (len > 0) {
mem1.grow(offset + len + 8);
}
break;
case ColumnType.STRING:
assert mem2 != null;
mem2.grow(rowCount * 8);
offset = mem2.getLong((rowCount - 1) * 8);
mem1.grow(offset + 4);
len = mem1.getInt(offset);
if (len > 0) {
mem1.grow(offset + len * 2 + 4);
}
break;
default:
mem1.grow(rowCount << ColumnType.pow2SizeOf(type));
break;
}
}
}
@Override
public void close() {
if (isOpen()) {
......@@ -347,56 +389,6 @@ public class TableReader implements Closeable {
return rowCount;
}
private static int getColumnBits(int columnCount) {
return Numbers.msb(Numbers.ceilPow2(columnCount) * 2);
}
static int getPrimaryColumnIndex(int base, int index) {
return base + index * 2;
}
private static boolean isEntryToBeProcessed(long address, int index) {
if (Unsafe.getUnsafe().getByte(address + index) == -1) {
return false;
}
Unsafe.getUnsafe().putByte(address + index, (byte) -1);
return true;
}
private static void growColumn(ReadOnlyColumn mem1, ReadOnlyColumn mem2, int type, long rowCount) {
long offset;
long len;
if (rowCount > 0) {
// subtract column top
switch (type) {
case ColumnType.BINARY:
assert mem2 != null;
mem2.grow(rowCount * 8);
offset = mem2.getLong((rowCount - 1) * 8);
// grow data column to value offset + length, so that we can read length
mem1.grow(offset + 8);
len = mem1.getLong(offset);
if (len > 0) {
mem1.grow(offset + len + 8);
}
break;
case ColumnType.STRING:
assert mem2 != null;
mem2.grow(rowCount * 8);
offset = mem2.getLong((rowCount - 1) * 8);
mem1.grow(offset + 4);
len = mem1.getInt(offset);
if (len > 0) {
mem1.grow(offset + len * 2 + 4);
}
break;
default:
mem1.grow(rowCount << ColumnType.pow2SizeOf(type));
break;
}
}
}
private void applyTruncate() {
LOG.info().$("truncate detected").$();
for (int i = 0, n = partitionCount; i < n; i++) {
......@@ -589,32 +581,8 @@ public class TableReader implements Closeable {
}
private long findPartitionMinimum() {
long partitionMin = Long.MAX_VALUE;
try {
long p = ff.findFirst(path.$());
if (p > 0) {
try {
do {
int type = ff.findType(p);
if (type == Files.DT_DIR || type == Files.DT_LNK) {
try {
long time = dateFormat.parse(nativeLPSZ.of(ff.findName(p)), DateLocaleFactory.INSTANCE.getDefaultDateLocale());
if (time < partitionMin && time <= maxTimestamp) {
partitionMin = time;
}
} catch (NumericException ignore) {
}
}
} while (ff.findNext(p) > 0);
} finally {
ff.findClose(p);
}
}
} finally {
path.trimTo(rootLen);
}
return partitionMin;
long maintainedMin = txMem.getLong(TableUtils.TX_OFFSET_MIN_TIMESTAMP);
return maintainedMin == Long.MAX_VALUE ? maintainedMin : timestampFloorMethod.floor(maintainedMin);
}
private void freeBitmapIndexCache() {
......@@ -1183,12 +1151,12 @@ public class TableReader implements Closeable {
}
@FunctionalInterface
private interface PartitionTimestampCalculatorMethod {
interface PartitionTimestampCalculatorMethod {
long calculate(long minTimestamp, int partitionIndex);
}
@FunctionalInterface
private interface TimestampFloorMethod {
interface TimestampFloorMethod {
long floor(long timestamp);
}
......
......@@ -56,12 +56,13 @@ public final class TableUtils {
static final long TX_OFFSET_TXN = 0;
static final long TX_OFFSET_TRANSIENT_ROW_COUNT = 8;
static final long TX_OFFSET_FIXED_ROW_COUNT = 16;
static final long TX_OFFSET_MAX_TIMESTAMP = 24;
static final long TX_OFFSET_STRUCT_VERSION = 32;
static final long TX_OFFSET_DATA_VERSION = 40;
static final long TX_OFFSET_PARTITION_TABLE_VERSION = 48;
static final long TX_OFFSET_TXN_CHECK = 56;
static final long TX_OFFSET_MAP_WRITER_COUNT = 64;
static final long TX_OFFSET_MIN_TIMESTAMP = 24;
static final long TX_OFFSET_MAX_TIMESTAMP = 32;
static final long TX_OFFSET_STRUCT_VERSION = 40;
static final long TX_OFFSET_DATA_VERSION = 48;
static final long TX_OFFSET_PARTITION_TABLE_VERSION = 56;
static final long TX_OFFSET_TXN_CHECK = 64;
static final long TX_OFFSET_MAP_WRITER_COUNT = 72;
/**
* struct {
* long txn;
......@@ -214,7 +215,9 @@ public final class TableUtils {
txMem.putLong(TX_OFFSET_TRANSIENT_ROW_COUNT, 0);
// fixed row count
txMem.putLong(TX_OFFSET_FIXED_ROW_COUNT, 0);
// partition low
// min timestamp value in table
txMem.putLong(TX_OFFSET_MIN_TIMESTAMP, Long.MAX_VALUE);
// max timestamp value in table
txMem.putLong(TX_OFFSET_MAX_TIMESTAMP, Long.MIN_VALUE);
// structure version
txMem.putLong(TX_OFFSET_STRUCT_VERSION, 0);
......
......@@ -2065,31 +2065,61 @@ public class TableReaderTest extends AbstractCairoTest {
testRemovePartition(PartitionBy.DAY, "2017-12-11", 0, current -> Dates.addDays(Dates.floorDD(current), 1));
}
@Test
public void testRemoveFirstPartitionByDayTwo() throws Exception {
testRemovePartition(PartitionBy.DAY, "2017-12-11", 0, current -> Dates.addDays(Dates.floorDD(current), 2));
}
@Test
public void testRemoveFirstPartitionByDayReload() throws Exception {
testRemovePartitionReload(PartitionBy.DAY, "2017-12-11", 0, current -> Dates.addDays(Dates.floorDD(current), 1));
}
@Test
public void testRemoveFirstPartitionByDayReloadTwo() throws Exception {
testRemovePartitionReload(PartitionBy.DAY, "2017-12-11", 0, current -> Dates.addDays(Dates.floorDD(current), 2));
}
@Test
public void testRemoveFirstPartitionByMonth() throws Exception {
testRemovePartition(PartitionBy.MONTH, "2017-12", 0, current -> Dates.addMonths(Dates.floorMM(current), 1));
}
@Test
public void testRemoveFirstPartitionByMonthTwo() throws Exception {
testRemovePartition(PartitionBy.MONTH, "2017-12", 0, current -> Dates.addMonths(Dates.floorMM(current), 2));
}
@Test
public void testRemoveFirstPartitionByMonthReload() throws Exception {
testRemovePartitionReload(PartitionBy.MONTH, "2017-12", 0, current -> Dates.addMonths(Dates.floorMM(current), 1));
}
@Test
public void testRemoveFirstPartitionByMonthReloadTwo() throws Exception {
testRemovePartitionReload(PartitionBy.MONTH, "2017-12", 0, current -> Dates.addMonths(Dates.floorMM(current), 2));
}
@Test
public void testRemoveFirstPartitionByYear() throws Exception {
testRemovePartition(PartitionBy.YEAR, "2017", 0, current -> Dates.addYear(Dates.floorYYYY(current), 1));
}
@Test
public void testRemoveFirstPartitionByYearTwo() throws Exception {
testRemovePartition(PartitionBy.YEAR, "2017", 0, current -> Dates.addYear(Dates.floorYYYY(current), 2));
}
@Test
public void testRemoveFirstPartitionByYearReload() throws Exception {
testRemovePartitionReload(PartitionBy.YEAR, "2017", 0, current -> Dates.addYear(Dates.floorYYYY(current), 1));
}
@Test
public void testRemoveFirstPartitionByYearReloadTwo() throws Exception {
testRemovePartitionReload(PartitionBy.YEAR, "2017", 0, current -> Dates.addYear(Dates.floorYYYY(current), 2));
}
@Test
public void testRemovePartitionByDay() throws Exception {
testRemovePartition(PartitionBy.DAY, "2017-12-14", 3000, current -> Dates.addDays(Dates.floorDD(current), 1));
......@@ -3501,7 +3531,7 @@ public class TableReaderTest extends AbstractCairoTest {
TestUtils.assertMemoryLeak(() -> {
int N = 100;
int N_PARTITIONS = 5;
long timestampUs = DateFormatUtils.parseDateTime("2017-12-11T00:00:00.000Z");
long timestampUs = DateFormatUtils.parseDateTime("2017-12-11T10:00:00.000Z");
long stride = 100;
int bandStride = 1000;
int totalCount = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册