提交 38207677 编写于 作者: V Vlad Ilyushchenko

CAIRO: TableReader no longer relies on partition directories to be removed in...

CAIRO: TableReader no longer relies on partition directories to be removed in order to remove logical data chunk. This is necessary for Windows where open files cannot be removed.
上级 ba14fb3d
......@@ -132,7 +132,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
}
private void cullIntervals() {
int intervalsLo = intervals.binarySearch(reader.getPartitionMin());
int intervalsLo = intervals.binarySearch(reader.getMinTimestamp());
// not a direct hit
if (intervalsLo < 0) {
......@@ -157,9 +157,9 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
private void cullPartitions() {
long intervalLo = reader.floorToPartitionTimestamp(intervals.getQuick(initialIntervalsLo * 2));
this.initialPartitionLo = reader.getPartitionCountBetweenTimestamps(reader.getPartitionMin(), intervalLo);
this.initialPartitionLo = reader.getPartitionCountBetweenTimestamps(reader.getMinTimestamp(), intervalLo);
long intervalHi = reader.floorToPartitionTimestamp(intervals.getQuick((initialIntervalsHi - 1) * 2 + 1));
this.initialPartitionHi = Math.min(reader.getPartitionCount(), reader.getPartitionCountBetweenTimestamps(reader.getPartitionMin(), intervalHi) + 1);
this.initialPartitionHi = Math.min(reader.getPartitionCount(), reader.getPartitionCountBetweenTimestamps(reader.getMinTimestamp(), intervalHi) + 1);
}
protected class IntervalDataFrame implements DataFrame {
......
......@@ -62,6 +62,7 @@ public class AppendMemory extends VirtualMemory {
throw CairoException.instance(ff.errno()).put("Cannot truncate fd=").put(fd).put(" to ").put(getMapPageSize()).put(" bytes");
}
updateLimits(0, pageAddress = mapPage(0));
LOG.info().$("truncated [fd=").$(fd).$(']').$();
}
public final void close(boolean truncate) {
......
......@@ -34,6 +34,8 @@ import com.questdb.std.str.Path;
import java.io.Closeable;
import java.util.concurrent.locks.LockSupport;
import static com.questdb.cairo.TableUtils.TX_OFFSET_MIN_TIMESTAMP;
public class TableReader implements Closeable {
private static final Log LOG = LogFactory.getLog(TableReader.class);
private static final PartitionPathGenerator YEAR_GEN = TableReader::pathGenYear;
......@@ -76,7 +78,8 @@ public class TableReader implements Closeable {
private long txn = TableUtils.INITIAL_TXN;
private long maxTimestamp = Numbers.LONG_NaN;
private int partitionCount;
private long partitionMin = Long.MAX_VALUE;
private long minTimestamp = Long.MAX_VALUE;
private long prevMinTimestamp = Long.MAX_VALUE;
private ReloadMethod reloadMethod;
private long tempMem8b = Unsafe.malloc(8);
......@@ -104,7 +107,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = Dates::floorDD;
intervalLengthMethod = Dates::getDaysBetween;
partitionTimestampCalculatorMethod = Dates::addDays;
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
break;
case PartitionBy.MONTH:
......@@ -113,7 +115,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = Dates::floorMM;
intervalLengthMethod = Dates::getMonthsBetween;
partitionTimestampCalculatorMethod = Dates::addMonths;
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
break;
case PartitionBy.YEAR:
......@@ -122,7 +123,6 @@ public class TableReader implements Closeable {
timestampFloorMethod = Dates::floorYYYY;
intervalLengthMethod = Dates::getYearsBetween;
partitionTimestampCalculatorMethod = Dates::addYear;
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
break;
default:
......@@ -131,7 +131,7 @@ public class TableReader implements Closeable {
timestampFloorMethod = null;
intervalLengthMethod = null;
partitionTimestampCalculatorMethod = null;
countDefaultPartitions();
checkDefaultPartitionExistsAndUpdatePartitionCount();
break;
}
......@@ -282,8 +282,8 @@ public class TableReader implements Closeable {
return (int) intervalLengthMethod.calculate(partitionTimestamp1, partitionTimestamp2);
}
public long getPartitionMin() {
return partitionMin;
public long getMinTimestamp() {
return minTimestamp;
}
public int getPartitionedBy() {
......@@ -406,7 +406,6 @@ public class TableReader implements Closeable {
}
}
reloadSymbolMapCounts();
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
if (partitionCount > 0) {
updateCapacities();
......@@ -414,10 +413,13 @@ public class TableReader implements Closeable {
}
private int calculatePartitionCount() {
if (partitionMin == Long.MAX_VALUE) {
if (minTimestamp == Long.MAX_VALUE) {
return 0;
} else {
return maxTimestamp == Numbers.LONG_NaN ? 1 : getPartitionCountBetweenTimestamps(partitionMin, floorToPartitionTimestamp(maxTimestamp)) + 1;
return maxTimestamp == Long.MIN_VALUE ? 1 : getPartitionCountBetweenTimestamps(
minTimestamp,
floorToPartitionTimestamp(maxTimestamp)
) + 1;
}
}
......@@ -431,8 +433,9 @@ public class TableReader implements Closeable {
private void closeRemovedPartitions() {
for (int i = 0, n = removedPartitions.size(); i < n; i++) {
long timestamp = removedPartitions.get(i);
int partitionIndex = getPartitionCountBetweenTimestamps(partitionMin, timestamp);
final long timestamp = removedPartitions.get(i);
int partitionIndex = getPartitionCountBetweenTimestamps(prevMinTimestamp, timestamp);
if (partitionIndex > -1) {
if (partitionIndex < partitionCount) {
if (getPartitionRowCount(partitionIndex) != -1) {
......@@ -452,6 +455,15 @@ public class TableReader implements Closeable {
.$(']').$();
}
}
// adjust columns list when leading partitions have been removed
if (prevMinTimestamp != minTimestamp) {
assert prevMinTimestamp < minTimestamp;
int delta = getPartitionCountBetweenTimestamps(prevMinTimestamp, minTimestamp);
columns.remove(0, getColumnBase(delta) - 1);
prevMinTimestamp = minTimestamp;
partitionCount -= delta;
}
}
}
......@@ -476,7 +488,7 @@ public class TableReader implements Closeable {
return symbolMapReaders.getAndSetQuick(columnIndex, reader);
}
private void countDefaultPartitions() {
private void checkDefaultPartitionExistsAndUpdatePartitionCount() {
if (maxTimestamp == Numbers.LONG_NaN) {
partitionCount = 0;
} else {
......@@ -580,11 +592,6 @@ public class TableReader implements Closeable {
tempCopyStruct.forwardReader = indexReaders.getAndSetQuick(index + 1, null);
}
private long findPartitionMinimum() {
long maintainedMin = txMem.getLong(TableUtils.TX_OFFSET_MIN_TIMESTAMP);
return maintainedMin == Long.MAX_VALUE ? maintainedMin : timestampFloorMethod.floor(maintainedMin);
}
private void freeBitmapIndexCache() {
if (bitmapIndexes != null) {
for (int i = 0, n = bitmapIndexes.size(); i < n; i++) {
......@@ -669,7 +676,14 @@ public class TableReader implements Closeable {
private long openPartition0(int partitionIndex) {
// is this table is partitioned?
if (partitionTimestampCalculatorMethod != null
&& removedPartitions.contains(partitionTimestampCalculatorMethod.calculate(partitionMin, partitionIndex))) {
&& removedPartitions.contains(partitionTimestampCalculatorMethod.calculate(
minTimestamp, partitionIndex
))) {
return -1;
}
// todo: this may not be the best place to check if partition is out of range
if (maxTimestamp == Long.MIN_VALUE) {
return -1;
}
......@@ -731,7 +745,7 @@ public class TableReader implements Closeable {
private Path pathGenDay(int partitionIndex) {
TableUtils.fmtDay.format(
Dates.addDays(partitionMin, partitionIndex),
Dates.addDays(minTimestamp, partitionIndex),
DateLocaleFactory.INSTANCE.getDefaultDateLocale(),
null,
path.put(Files.SEPARATOR)
......@@ -745,7 +759,7 @@ public class TableReader implements Closeable {
private Path pathGenMonth(int partitionIndex) {
TableUtils.fmtMonth.format(
Dates.addMonths(partitionMin, partitionIndex),
Dates.addMonths(minTimestamp, partitionIndex),
DateLocaleFactory.INSTANCE.getDefaultDateLocale(),
null,
path.put(Files.SEPARATOR)
......@@ -755,7 +769,7 @@ public class TableReader implements Closeable {
private Path pathGenYear(int partitionIndex) {
TableUtils.fmtYear.format(
Dates.addYear(partitionMin, partitionIndex),
Dates.addYear(minTimestamp, partitionIndex),
DateLocaleFactory.INSTANCE.getDefaultDateLocale(),
null,
path.put(Files.SEPARATOR)
......@@ -785,6 +799,7 @@ public class TableReader implements Closeable {
Unsafe.getUnsafe().loadFence();
final long transientRowCount = txMem.getLong(TableUtils.TX_OFFSET_TRANSIENT_ROW_COUNT);
final long fixedRowCount = txMem.getLong(TableUtils.TX_OFFSET_FIXED_ROW_COUNT);
final long minTimestamp = txMem.getLong(TX_OFFSET_MIN_TIMESTAMP);
final long maxTimestamp = txMem.getLong(TableUtils.TX_OFFSET_MAX_TIMESTAMP);
final long structVersion = txMem.getLong(TableUtils.TX_OFFSET_STRUCT_VERSION);
final long dataVersion = txMem.getLong(TableUtils.TX_OFFSET_DATA_VERSION);
......@@ -817,6 +832,21 @@ public class TableReader implements Closeable {
this.txn = txn;
this.transientRowCount = transientRowCount;
this.rowCount = fixedRowCount + transientRowCount;
this.prevMinTimestamp = this.minTimestamp;
switch (getPartitionedBy()) {
case PartitionBy.DAY:
this.minTimestamp = minTimestamp == Long.MAX_VALUE ? minTimestamp : Dates.floorDD(minTimestamp);
break;
case PartitionBy.MONTH:
this.minTimestamp = minTimestamp == Long.MAX_VALUE ? minTimestamp : Dates.floorMM(minTimestamp);
break;
case PartitionBy.YEAR:
this.minTimestamp = minTimestamp == Long.MAX_VALUE ? minTimestamp : Dates.floorYYYY(minTimestamp);
break;
default:
this.minTimestamp = minTimestamp;
break;
}
this.maxTimestamp = maxTimestamp;
this.structVersion = structVersion;
this.dataVersion = dataVersion;
......@@ -936,7 +966,7 @@ public class TableReader implements Closeable {
if (readTxn()) {
reloadStruct();
reloadSymbolMapCounts();
countDefaultPartitions();
checkDefaultPartitionExistsAndUpdatePartitionCount();
if (partitionCount > 0) {
updateCapacities();
reloadMethod = NON_PARTITIONED_RELOAD_METHOD;
......@@ -956,11 +986,10 @@ public class TableReader implements Closeable {
private boolean reloadInitialPartitioned0() {
reloadSymbolMapCounts();
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
if (partitionCount > 0) {
updateCapacities();
if (maxTimestamp != Numbers.LONG_NaN) {
if (maxTimestamp != Long.MIN_VALUE) {
reloadMethod = PARTITIONED_RELOAD_METHOD;
}
}
......@@ -1006,15 +1035,20 @@ public class TableReader implements Closeable {
private boolean reloadPartitioned() {
assert timestampFloorMethod != null;
final long currentPartitionTimestamp = maxTimestamp == Numbers.LONG_NaN ? maxTimestamp : floorToPartitionTimestamp(maxTimestamp);
boolean b = readTxn();
if (b) {
final long currentPartitionTimestamp = maxTimestamp == Long.MIN_VALUE ? maxTimestamp : floorToPartitionTimestamp(maxTimestamp);
if (readTxn()) {
reloadStruct();
if (maxTimestamp == Numbers.LONG_NaN || currentPartitionTimestamp == Numbers.LONG_NaN) {
if (maxTimestamp < currentPartitionTimestamp) {
applyTruncate();
return true;
}
if (partitionCount == 0) {
// old partition count was 0
incrementPartitionCountBy(calculatePartitionCount());
return true;
}
assert intervalLengthMethod != null;
// calculate timestamp delta between before and after reload.
......
......@@ -807,13 +807,18 @@ public class TableWriter implements Closeable {
if (partitionBy != PartitionBy.NONE) {
freeColumns(false);
if (indexers != null) {
for (int i = 0, n = indexers.size(); i < n; i++) {
Misc.free(indexers.getQuick(i));
}
}
removePartitionDirectories();
rowFunction = openPartitionFunction;
}
prevMaxTimestamp = Long.MIN_VALUE;
maxTimestamp = Long.MIN_VALUE;
prevMinTimestamp = Long.MIN_VALUE;
prevMinTimestamp = Long.MAX_VALUE;
minTimestamp = Long.MAX_VALUE;
txPrevTransientRowCount = 0;
transientRowCount = 0;
......@@ -827,6 +832,8 @@ public class TableWriter implements Closeable {
} catch (CairoException err) {
throwDistressException(err);
}
LOG.info().$("truncated [name=").$(name).$(']').$();
}
/**
......@@ -1004,7 +1011,6 @@ public class TableWriter implements Closeable {
try {
long partitionTimestamp = txPendingPartitionSizes.getLong(offset + 8);
setStateForTimestamp(partitionTimestamp, false);
long fd = openAppend(path.concat(TableUtils.ARCHIVE_FILE_NAME).$());
try {
int len = 8;
......@@ -1623,7 +1629,8 @@ public class TableWriter implements Closeable {
nativeLPSZ.of(name);
if (IGNORED_FILES.excludes(nativeLPSZ)) {
if (type == Files.DT_DIR && !ff.rmdir(path)) {
throw CairoException.instance(ff.errno()).put("Cannot remove directory: ").put(path);
LOG.info().$("could not remove [path=").$(path).$(", errno=").$(ff.errno()).$(']').$();
// throw CairoException.instance(ff.errno()).put("Cannot remove directory: ").put(path);
}
}
});
......
......@@ -198,34 +198,36 @@ public final class Files {
public static boolean rmdir(Path path) {
long p = findFirst(path.address());
int len = path.length();
boolean clean = true;
if (p > 0) {
try {
do {
long lpszName = findName(p);
path.trimTo(len).concat(lpszName).$();
switch (findType(p)) {
case DT_DIR:
if (strcmp(lpszName, "..") || strcmp(lpszName, ".")) {
continue;
}
if (!rmdir(path)) {
return false;
}
break;
default:
if (!remove(path.address())) {
return false;
}
break;
if (findType(p) == DT_DIR) {
if (strcmp(lpszName, "..") || strcmp(lpszName, ".")) {
continue;
}
if (rmdir(path)) {
continue;
}
clean = false;
} else {
if (remove(path.address())) {
continue;
}
clean = false;
}
} while (findNext(p) > 0);
} finally {
findClose(p);
}
return rmdir(path.trimTo(len).$().address());
return rmdir(path.trimTo(len).$().address()) && clean;
}
return false;
......
......@@ -188,9 +188,6 @@ public class ObjList<T> implements Mutable, Sinkable {
}
}
/**
* {@inheritDoc}
*/
public void remove(int index) {
if (pos < 1 || index >= pos) {
return;
......@@ -202,6 +199,16 @@ public class ObjList<T> implements Mutable, Sinkable {
Unsafe.arrayPut(buffer, --pos, null);
}
public void remove(int from, int to) {
assert from <= to;
int move = pos - from - (to - from) - 1;
if (move > 0) {
System.arraycopy(buffer, to + 1, buffer, from, move);
}
pos -= (to - from + 1);
Arrays.fill(buffer, pos, buffer.length - 1, null);
}
public int remove(Object o) {
if (pos == 0) {
return -1;
......
......@@ -27,7 +27,6 @@ import com.questdb.cairo.sql.Record;
import com.questdb.griffin.SqlCompiler;
import com.questdb.griffin.engine.functions.bind.BindVariableService;
import com.questdb.std.BinarySequence;
import com.questdb.std.Os;
import com.questdb.std.Rnd;
import com.questdb.std.Unsafe;
import com.questdb.test.tools.TestUtils;
......@@ -191,13 +190,6 @@ public class BusyPollTest extends AbstractCairoTest {
}
private void testBusyPollFromMidTable(int partitionBy, long timestampIncrement) throws Exception {
if (Os.type == Os.WINDOWS) {
// TableWriter.truncate() is unable to remove directory on Windows when
// TableReader is open.
return;
}
final int blobSize = 1024;
final int n = 1000;
TestUtils.assertMemoryLeak(() -> {
......
......@@ -1811,12 +1811,6 @@ public class FullFwdDataFrameCursorTest extends AbstractCairoTest {
}
private void testReplaceIndexedColWithIndexedWithTruncate(int partitionBy, long increment, int expectedPartitionMin, boolean testRestricted) throws Exception {
if (Os.type == Os.WINDOWS) {
// TableWriter.truncate() is unable to remove directory on Windows when
// TableReader is open.
return;
}
TestUtils.assertMemoryLeak(() -> {
final int M = 1000;
final int N = 100;
......
......@@ -1426,6 +1426,44 @@ public class TableReaderTest extends AbstractCairoTest {
}
};
private static long allocBlob() {
return Unsafe.malloc(blobLen);
}
private static void freeBlob(long blob) {
Unsafe.free(blob, blobLen);
}
private static void assertBin(Record r, Rnd exp, long blob, int index) {
if (exp.nextBoolean()) {
exp.nextChars(blob, blobLen / 2);
Assert.assertEquals(blobLen, r.getBinLen(index));
BinarySequence sq = r.getBin(index);
for (int l = 0; l < blobLen; l++) {
byte b = sq.byteAt(l);
boolean result = Unsafe.getUnsafe().getByte(blob + l) != b;
if (result) {
Assert.fail("Error at [" + l + "]: expected=" + Unsafe.getUnsafe().getByte(blob + l) + ", actual=" + b);
}
}
} else {
Assert.assertEquals(TableUtils.NULL_LEN, r.getBinLen(index));
}
}
private static void assertStrColumn(CharSequence expected, Record r, int index) {
TestUtils.assertEquals(expected, r.getStr(index));
TestUtils.assertEquals(expected, r.getStrB(index));
Assert.assertNotSame(r.getStr(index), r.getStrB(index));
Assert.assertEquals(expected.length(), r.getStrLen(index));
}
private static void assertNullStr(Record r, int index) {
Assert.assertNull(r.getStr(index));
Assert.assertNull(r.getStrB(index));
Assert.assertEquals(TableUtils.NULL_LEN, r.getStrLen(index));
}
@Test
public void testCloseColumnNonPartitioned1() throws Exception {
testCloseColumn(PartitionBy.NONE, 2000, 6000L, "bin", BATCH1_ASSERTER_NULL_BIN);
......@@ -2896,44 +2934,6 @@ public class TableReaderTest extends AbstractCairoTest {
});
}
private static long allocBlob() {
return Unsafe.malloc(blobLen);
}
private static void freeBlob(long blob) {
Unsafe.free(blob, blobLen);
}
private static void assertBin(Record r, Rnd exp, long blob, int index) {
if (exp.nextBoolean()) {
exp.nextChars(blob, blobLen / 2);
Assert.assertEquals(blobLen, r.getBinLen(index));
BinarySequence sq = r.getBin(index);
for (int l = 0; l < blobLen; l++) {
byte b = sq.byteAt(l);
boolean result = Unsafe.getUnsafe().getByte(blob + l) != b;
if (result) {
Assert.fail("Error at [" + l + "]: expected=" + Unsafe.getUnsafe().getByte(blob + l) + ", actual=" + b);
}
}
} else {
Assert.assertEquals(TableUtils.NULL_LEN, r.getBinLen(index));
}
}
private static void assertStrColumn(CharSequence expected, Record r, int index) {
TestUtils.assertEquals(expected, r.getStr(index));
TestUtils.assertEquals(expected, r.getStrB(index));
Assert.assertNotSame(r.getStr(index), r.getStrB(index));
Assert.assertEquals(expected.length(), r.getStrLen(index));
}
private static void assertNullStr(Record r, int index) {
Assert.assertNull(r.getStr(index));
Assert.assertNull(r.getStrB(index));
Assert.assertEquals(TableUtils.NULL_LEN, r.getStrLen(index));
}
private void appendTwoSymbols(TableWriter writer, Rnd rnd) {
for (int i = 0; i < 1000; i++) {
TableWriter.Row row = writer.newRow(0);
......@@ -3628,7 +3628,6 @@ public class TableReaderTest extends AbstractCairoTest {
Assert.assertEquals(N * N_PARTITIONS, writer.size());
// now open table reader having partition gap
try (TableReader reader = new TableReader(configuration, "w")) {
......
......@@ -1518,16 +1518,6 @@ public class TableWriterTest extends AbstractCairoTest {
});
}
@Test
public void testDayPartitionRmDirError() throws Exception {
testTruncate(new CountingFilesFacade() {
@Override
public boolean rmdir(Path name) {
return --count != 0 && super.rmdir(name);
}
}, true);
}
@Test
public void testDayPartitionTruncate() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册