未验证 提交 a8adc855 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

fix(cairo): fixed arithmetic issues calculating row counts during async O3 ingestion (#1474)

上级 ccd8cea2
......@@ -42,7 +42,7 @@ import io.questdb.std.str.StringSink;
*/
public final class ColumnType {
// column type version as written to the metadata file
public static final int VERSION = 423;
public static final int VERSION = 424;
public static final short UNDEFINED = 0;
public static final short BOOLEAN = 1;
......
......@@ -692,7 +692,7 @@ public class TableWriter implements Closeable {
}
public boolean checkMaxAndCommitLag(int commitMode) {
if (getO3RowCount() < metadata.getMaxUncommittedRows()) {
if (!hasO3() || getO3RowCount0() < metadata.getMaxUncommittedRows()) {
return false;
}
commit(commitMode, metadata.getCommitLag());
......@@ -722,6 +722,10 @@ public class TableWriter implements Closeable {
commit(defaultCommitMode, lagMicros);
}
public void commitWithLag(int commitMode) {
commit(commitMode, metadata.getCommitLag());
}
public int getColumnIndex(CharSequence name) {
int index = metadata.getColumnIndexQuiet(name);
if (index > -1) {
......@@ -747,6 +751,10 @@ public class TableWriter implements Closeable {
}
public long getO3RowCount() {
return hasO3() ? getO3RowCount0() : 0;
}
private long getO3RowCount0() {
return (masterRef - o3MasterRef + 1) / 2;
}
......@@ -1379,7 +1387,7 @@ public class TableWriter implements Closeable {
public long size() {
// This is uncommitted row count
return txWriter.getRowCount() + (hasO3() ? getO3RowCount() : 0L);
return txWriter.getRowCount() + getO3RowCount();
}
public void tick() {
......@@ -1462,18 +1470,6 @@ public class TableWriter implements Closeable {
LOG.info().$("truncated [name=").$(tableName).$(']').$();
}
public void updateSymbols(int columnIndex, SymbolMapReader symReader) {
int nSourceSymbols = symReader.size();
SymbolMapWriter symWriter = getSymbolMapWriter(columnIndex);
int nDestinationSymbols = symWriter.getSymbolCount();
if (nSourceSymbols > nDestinationSymbols) {
long address = symReader.symbolCharsAddressOf(nDestinationSymbols);
long addressHi = symReader.symbolCharsAddressOf(nSourceSymbols);
symWriter.appendSymbolCharsBlock(addressHi - address, address);
}
}
/**
* Eagerly sets up writer instance. Otherwise, writer will initialize lazily. Invoking this method could improve
* performance of some applications. UDP receivers use this in order to avoid initial receive buffer contention.
......@@ -2368,10 +2364,6 @@ public class TableWriter implements Closeable {
return symbolMapWriters.getQuick(columnIndex);
}
private boolean hasO3() {
return o3MasterRef > -1 && getO3RowCount() > 0;
}
private long indexHistoricPartitions(SymbolColumnIndexer indexer, CharSequence columnName, int indexValueBlockSize) {
final long ts = this.txWriter.getMaxTimestamp();
if (ts > Numbers.LONG_NaN) {
......@@ -2476,7 +2468,7 @@ public class TableWriter implements Closeable {
* @return <i>true</i> when commit has is a NOOP, e.g. no data has been committed to disk. <i>false</i> otherwise.
*/
private boolean o3Commit(long lag) {
o3RowCount = getO3RowCount();
o3RowCount = getO3RowCount0();
o3PartitionRemoveCandidates.clear();
o3ErrorCount.set(0);
o3ColumnCounters.clear();
......@@ -2484,11 +2476,12 @@ public class TableWriter implements Closeable {
long o3LagRowCount = 0;
long maxUncommittedRows = metadata.getMaxUncommittedRows();
final int timestampIndex = metadata.getTimestampIndex();
this.lastPartitionTimestamp = timestampFloorMethod.floor(partitionTimestampHi);
long activePartitionTimestampCeil = timestampCeilMethod.ceil(partitionTimestampHi);
try {
o3RowCount += o3MoveUncommitted(timestampIndex);
final long transientRowCount = txWriter.transientRowCount;
// we may need to re-use file descriptors when this partition is the "current" one
// we cannot open file again due to sharing violation
......@@ -2599,6 +2592,13 @@ public class TableWriter implements Closeable {
boolean flattenTimestamp = true;
int pCount = 0;
try {
// We do not know upfront which partition is going to be last because this is
// a single pass over the data. Instead, we will update transient row count in a rolling
// manner, assuming the partition marked "last" is the last and then for a new partition
// we move prevTransientRowCount into the "fixedRowCount" sum and set new value on the
// transientRowCount
long prevTransientRowCount = transientRowCount;
while (srcOoo < srcOooMax) {
try {
final long srcOooLo = srcOoo;
......@@ -2627,17 +2627,19 @@ public class TableWriter implements Closeable {
final int partitionIndex = txWriter.findAttachedPartitionIndexByLoTimestamp(partitionTimestamp);
if (partitionIndex > -1) {
if (last) {
srcDataMax = txWriter.transientRowCount;
srcDataMax = transientRowCount;
} else {
srcDataMax = getPartitionSizeByIndex(partitionIndex);
}
srcNameTxn = getPartitionNameTxnByIndex(partitionIndex);
} else {
srcDataMax = -1;
srcDataMax = 0;
srcNameTxn = -1;
}
final boolean append = last && (srcDataMax < 0 || o3Timestamp >= maxTimestamp);
final boolean append = last && (srcDataMax == 0 || o3Timestamp >= maxTimestamp);
final long partitionSize = srcDataMax + srcOooHi - srcOooLo + 1;
LOG.debug().
$("o3 partition task [table=").$(tableName)
.$(", srcOooLo=").$(srcOooLo)
......@@ -2651,10 +2653,23 @@ public class TableWriter implements Closeable {
.$(", srcDataMax=").$(srcDataMax)
.$(", maxTimestamp=").$ts(maxTimestamp)
.$(", last=").$(last)
.$(", partitionSize=").$(partitionSize)
.$(", append=").$(append)
.$(", memUsed=").$(Unsafe.getMemUsed())
.I$();
if (partitionTimestamp < lastPartitionTimestamp) {
// increment fixedRowCount by number of rows old partition incremented
this.txWriter.fixedRowCount += partitionSize - srcDataMax;
} else if (partitionTimestamp == lastPartitionTimestamp) {
// this is existing "last" partition, we can set the size directly
prevTransientRowCount = partitionSize;
} else {
// this is potentially a new last partition
this.txWriter.fixedRowCount += prevTransientRowCount;
prevTransientRowCount = partitionSize;
}
pCount++;
o3PartitionUpdRemaining.incrementAndGet();
final O3Basket o3Basket = o3BasketPool.next();
......@@ -2712,7 +2727,7 @@ public class TableWriter implements Closeable {
o3TimestampMax,
partitionTimestamp,
srcDataTop,
Math.max(0, srcDataMax),
srcDataMax,
isIndexed,
dstFixMem,
dstVarMem,
......@@ -2747,6 +2762,11 @@ public class TableWriter implements Closeable {
throw e;
}
}
// at this point we should know the last partition row count
this.txWriter.transientRowCount = prevTransientRowCount;
this.partitionTimestampHi = Math.max(this.partitionTimestampHi, o3TimestampMax);
this.txWriter.updateMaxTimestamp(Math.max(txWriter.getMaxTimestamp(), o3TimestampMax));
} finally {
// we are stealing work here it is possible we get exception from this method
LOG.debug()
......@@ -2784,19 +2804,16 @@ public class TableWriter implements Closeable {
// move append memory to new set of files. Otherwise, we stay on the same set but advance to append position.
avoidIndexOnCommit = o3ErrorCount.get() == 0;
if (o3LagRowCount == 0) {
this.o3MasterRef = -1;
rowActon = ROW_ACTION_SWITCH_PARTITION;
// transaction log is either not required or pending
activeColumns = columns;
activeNullSetters = nullSetters;
clearO3();
LOG.debug().$("lag segment is empty").$();
} else {
// adjust O3 master ref so that virtual row count becomes equal to value of "o3LagRowCount"
this.o3MasterRef = this.masterRef - o3LagRowCount * 2 + 1;
LOG.debug().$("adjusted [o3RowCount=").$(getO3RowCount()).I$();
LOG.debug().$("adjusted [o3RowCount=").$(getO3RowCount0()).I$();
}
}
if (!columns.getQuick(0).isOpen() || partitionTimestampHi < txWriter.getMaxTimestamp()) {
if (!columns.getQuick(0).isOpen() || partitionTimestampHi > activePartitionTimestampCeil) {
openPartition(txWriter.getMaxTimestamp());
}
......@@ -2814,6 +2831,14 @@ public class TableWriter implements Closeable {
return false;
}
private void clearO3() {
this.o3MasterRef = -1; // clears o3 flag, hasO3() will be returning false
rowActon = ROW_ACTION_SWITCH_PARTITION;
// transaction log is either not required or pending
activeColumns = columns;
activeNullSetters = nullSetters;
}
private void o3CommitPartitionAsync(
AtomicInteger columnCounter,
long maxTimestamp,
......@@ -3224,34 +3249,9 @@ public class TableWriter implements Closeable {
long srcDataMax,
boolean partitionMutates
) {
LOG.debug().$("o3 partition update [timestampMin=").$ts(timestampMin)
.$(", timestampMax=").$ts(timestampMax)
.$(", partitionTimestamp=").$ts(partitionTimestamp)
.$(", srcOooPartitionLo=").$(srcOooPartitionLo)
.$(", srcOooPartitionHi=").$(srcOooPartitionHi)
.$(", srcOooMax=").$(srcOooMax)
.$(", srcDataMax=").$(srcDataMax)
.$(", partitionMutates=").$(partitionMutates)
.I$();
this.txWriter.minTimestamp = Math.min(timestampMin, this.txWriter.minTimestamp);
final long partitionSize = srcDataMax + srcOooPartitionHi - srcOooPartitionLo + 1;
final long rowDelta = srcOooPartitionHi - srcOooMax;
if (partitionTimestamp < lastPartitionTimestamp) {
this.txWriter.fixedRowCount += partitionSize - srcDataMax;
// when we exit here we need to rollback transientRowCount we've been incrementing
// while adding out-of-order data
} else if (rowDelta < -1) {
this.txWriter.fixedRowCount += partitionSize;
} else {
// this is last partition
if (partitionTimestamp > lastPartitionTimestamp) {
this.txWriter.fixedRowCount += this.txWriter.transientRowCount;
}
this.txWriter.transientRowCount = partitionSize;
this.txWriter.maxTimestamp = Math.max(this.txWriter.maxTimestamp, timestampMax);
}
final int partitionIndex = txWriter.findAttachedPartitionIndexByLoTimestamp(partitionTimestamp);
if (partitionTimestamp == lastPartitionTimestamp) {
if (partitionMutates) {
......@@ -3263,6 +3263,19 @@ public class TableWriter implements Closeable {
}
}
LOG.debug().$("o3 partition update [timestampMin=").$ts(timestampMin)
.$(", timestampMax=").$ts(timestampMax)
.$(", last=").$(partitionTimestamp == lastPartitionTimestamp)
.$(", partitionTimestamp=").$ts(partitionTimestamp)
.$(", srcOooPartitionLo=").$(srcOooPartitionLo)
.$(", srcOooPartitionHi=").$(srcOooPartitionHi)
.$(", srcOooMax=").$(srcOooMax)
.$(", srcDataMax=").$(srcDataMax)
.$(", partitionMutates=").$(partitionMutates)
.$(", lastPartitionTimestamp=").$(lastPartitionTimestamp)
.$(", partitionSize=").$(partitionSize)
.I$();
if (partitionMutates) {
final long srcDataTxn = txWriter.getPartitionNameTxnByIndex(partitionIndex);
LOG.info()
......@@ -3677,7 +3690,7 @@ public class TableWriter implements Closeable {
}
private void o3TimestampSetter(long timestamp) {
o3TimestampMem.putLong128(timestamp, getO3RowCount());
o3TimestampMem.putLong128(timestamp, getO3RowCount0());
}
private void openColumnFiles(CharSequence name, int i, int plen) {
......@@ -3695,13 +3708,14 @@ public class TableWriter implements Closeable {
}
private void openFirstPartition(long timestamp) {
openPartition(repairDataGaps(timestamp));
final long ts = repairDataGaps(timestamp);
openPartition(ts);
populateDenseIndexerList();
setAppendPosition(txWriter.getTransientRowCount(), true);
if (performRecovery) {
performRecovery();
}
txWriter.openFirstPartition(timestamp);
txWriter.openFirstPartition(ts);
}
private void openNewColumnFiles(CharSequence name, boolean indexFlag, int indexValueBlockCapacity) {
......@@ -4120,7 +4134,7 @@ public class TableWriter implements Closeable {
int symColIndex = denseSymbolMapWriters.remove(writer);
// Shift all subsequent symbol indexes by 1 back
while (symColIndex < denseSymbolMapWriters.size()) {
SymbolMapWriter w = denseSymbolMapWriters.getQuick(symColIndex);
SymbolMapWriter w = denseSymbolMapWriters.getQuick(symColIndex);
w.setSymbolIndexInTxWriter(symColIndex);
symColIndex++;
}
......@@ -4245,7 +4259,7 @@ public class TableWriter implements Closeable {
private long repairDataGaps(final long timestamp) {
if (txWriter.getMaxTimestamp() != Numbers.LONG_NaN && partitionBy != PartitionBy.NONE) {
long actualSize = 0;
long fixedRowCount = 0;
long lastTimestamp = -1;
long transientRowCount = this.txWriter.getTransientRowCount();
long maxTimestamp = this.txWriter.getMaxTimestamp();
......@@ -4258,7 +4272,7 @@ public class TableWriter implements Closeable {
long partitionSize = txWriter.getPartitionSizeByPartitionTimestamp(ts);
if (partitionSize >= 0 && ff.exists(path.$())) {
actualSize += partitionSize;
fixedRowCount += partitionSize;
lastTimestamp = ts;
} else {
Path other = Path.getThreadLocal2(path.trimTo(p).$());
......@@ -4298,10 +4312,12 @@ public class TableWriter implements Closeable {
int p = path.length();
transientRowCount = txWriter.getPartitionSizeByPartitionTimestamp(lastTimestamp);
// 2. read max timestamp
TableUtils.dFile(path.trimTo(p), metadata.getColumnName(metadata.getTimestampIndex()));
maxTimestamp = TableUtils.readLongAtOffset(ff, path, tempMem16b, (transientRowCount - 1) * Long.BYTES);
actualSize -= transientRowCount;
fixedRowCount -= transientRowCount;
txWriter.removeAttachedPartitions(txWriter.getMaxTimestamp());
LOG.info()
.$("updated active partition [name=").$(path.trimTo(p).$())
.$(", maxTimestamp=").$ts(maxTimestamp)
......@@ -4316,14 +4332,14 @@ public class TableWriter implements Closeable {
}
final long expectedSize = txWriter.readFixedRowCount();
if (expectedSize != actualSize || maxTimestamp != this.txWriter.getMaxTimestamp()) {
if (expectedSize != fixedRowCount || maxTimestamp != this.txWriter.getMaxTimestamp()) {
LOG.info()
.$("actual table size has been adjusted [name=`").utf8(tableName).$('`')
.$(", expectedFixedSize=").$(expectedSize)
.$(", actualFixedSize=").$(actualSize)
.$(", actualFixedSize=").$(fixedRowCount)
.$(']').$();
txWriter.reset(actualSize, transientRowCount, maxTimestamp);
txWriter.reset(fixedRowCount, transientRowCount, maxTimestamp);
return maxTimestamp;
}
}
......@@ -4362,7 +4378,7 @@ public class TableWriter implements Closeable {
if (partitionBy != PartitionBy.NONE) {
removePartitionDirectories();
}
txWriter.reset();
txWriter.truncate();
clearTodoLog();
}
......@@ -4452,24 +4468,26 @@ public class TableWriter implements Closeable {
}
}
private boolean hasO3() {
return o3MasterRef > -1;
}
void rowCancel() {
if ((masterRef & 1) == 0) {
return;
}
if (o3MasterRef > -1) {
if (hasO3()) {
if (hasO3()) {
final long o3RowCount = getO3RowCount0();
if (o3RowCount > 0) {
// O3 mode and there are some rows.
masterRef--;
setO3AppendPosition(getO3RowCount());
setO3AppendPosition(o3RowCount);
} else {
// Cancelling first row in o3, reverting to non-o3
setO3AppendPosition(0);
masterRef--;
o3MasterRef = -1;
rowActon = ROW_ACTION_SWITCH_PARTITION;
activeColumns = columns;
activeNullSetters = nullSetters;
clearO3();
}
return;
}
......
......@@ -214,12 +214,13 @@ public final class TxWriter extends TxReader implements Closeable, SymbolValueCo
final long partitionTimestampLo = getPartitionTimestampLo(timestamp);
int index = findAttachedPartitionIndexByLoTimestamp(partitionTimestampLo);
if (index > -1) {
int size = attachedPartitions.size();
if (index + LONGS_PER_TX_ATTACHED_PARTITION < size) {
attachedPartitions.arrayCopy(index + LONGS_PER_TX_ATTACHED_PARTITION, index, size - index - LONGS_PER_TX_ATTACHED_PARTITION);
final int size = attachedPartitions.size();
final int lim = size - LONGS_PER_TX_ATTACHED_PARTITION;
if (index < lim) {
attachedPartitions.arrayCopy(index + LONGS_PER_TX_ATTACHED_PARTITION, index, lim - index);
attachedPositionDirtyIndex = Math.min(attachedPositionDirtyIndex, index);
}
attachedPartitions.setPos(size - LONGS_PER_TX_ATTACHED_PARTITION);
attachedPartitions.setPos(lim);
partitionTableVersion++;
}
}
......@@ -245,15 +246,6 @@ public final class TxWriter extends TxReader implements Closeable, SymbolValueCo
this.txn = txn;
}
public void reset() {
resetTxn(
txMem,
symbolColumnCount,
txMem.getLong(TX_OFFSET_TXN) + 1,
txMem.getLong(TX_OFFSET_DATA_VERSION) + 1,
txMem.getLong(TX_OFFSET_PARTITION_TABLE_VERSION) + 1);
}
public void resetTimestamp() {
prevMaxTimestamp = Long.MIN_VALUE;
prevMinTimestamp = Long.MAX_VALUE;
......
......@@ -218,12 +218,13 @@ public class EngineMigration {
}
static {
MIGRATIONS.put(417, MigrationActions::mig505);
MIGRATIONS.put(417, Mig505::migrate);
// there is no tagged version with _meta 418, this is something unreleased
MIGRATIONS.put(418, MigrationActions::rebuildTransactionFile);
MIGRATIONS.put(419, MigrationActions::mig600);
MIGRATIONS.put(420, MigrationActions::mig605);
MIGRATIONS.put(422, MigrationActions::mig607);
MIGRATIONS.put(423, MigrationActions::mig608);
MIGRATIONS.put(418, Mig506::migrate);
MIGRATIONS.put(419, Mig600::migrate);
MIGRATIONS.put(420, Mig605::migrate);
MIGRATIONS.put(422, Mig607::migrate);
MIGRATIONS.put(423, Mig608::migrate);
MIGRATIONS.put(424, Mig609::migrate);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.TableUtils;
import io.questdb.std.FilesFacade;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.META_OFFSET_TABLE_ID;
final class Mig505 {
static void migrate(MigrationContext migrationContext) {
MigrationActions.LOG.info().$("assigning table ID [table=").$(migrationContext.getTablePath()).I$();
final long mem = migrationContext.getTempMemory(8);
final FilesFacade ff = migrationContext.getFf();
final Path path = migrationContext.getTablePath();
final long fd = migrationContext.getMetadataFd();
MigrationActions.LOG.info().$("setting table id in [path=").$(path).I$();
TableUtils.writeIntOrFail(
ff,
fd,
META_OFFSET_TABLE_ID,
migrationContext.getNextTableId(),
mem,
path
);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.vm.api.MemoryARW;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.cairo.vm.api.MemoryR;
import io.questdb.std.FilesFacade;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.*;
final class Mig506 {
private static final long TX_STRUCT_UPDATE_1_META_OFFSET_PARTITION_BY = 4;
private static final String TX_STRUCT_UPDATE_1_ARCHIVE_FILE_NAME = "_archive";
static void migrate(MigrationContext migrationContext) {
// Update transaction file
// Before there was 1 int per symbol and list of removed partitions
// Now there is 2 ints per symbol and 4 longs per each non-removed partition
MigrationActions.LOG.info().$("rebuilding tx file [table=").$(migrationContext.getTablePath()).I$();
Path path = migrationContext.getTablePath();
final FilesFacade ff = migrationContext.getFf();
int pathDirLen = path.length();
path.concat(TXN_FILE_NAME).$();
if (!ff.exists(path)) {
MigrationActions.LOG.error().$("tx file does not exist, nothing to migrate [path=").$(path).I$();
return;
}
EngineMigration.backupFile(
ff,
path,
migrationContext.getTablePath2(),
TXN_FILE_NAME,
417
);
MigrationActions.LOG.debug().$("opening for rw [path=").$(path).I$();
try (MemoryMARW txMem = migrationContext.createRwMemoryOf(ff, path.$())) {
long tempMem8b = migrationContext.getTempMemory(8);
MemoryARW txFileUpdate = migrationContext.getTempVirtualMem();
txFileUpdate.jumpTo(0);
int symbolColumnCount = txMem.getInt(MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505);
for (int i = 0; i < symbolColumnCount; i++) {
final int symbolCount = txMem.getInt(
MigrationActions.prefixedBlockOffset(
MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505,
i + 1L,
Integer.BYTES
)
);
txFileUpdate.putInt(symbolCount);
txFileUpdate.putInt(symbolCount);
}
// Set partition segment size as 0 for now
long partitionSegmentOffset = txFileUpdate.getAppendOffset();
txFileUpdate.putInt(0);
final int partitionBy = TableUtils.readIntOrFail(
ff,
migrationContext.getMetadataFd(),
TX_STRUCT_UPDATE_1_META_OFFSET_PARTITION_BY,
tempMem8b,
path
);
if (partitionBy != PartitionBy.NONE) {
path.trimTo(pathDirLen);
writeAttachedPartitions(ff, tempMem8b, path, txMem, partitionBy, symbolColumnCount, txFileUpdate);
}
long updateSize = txFileUpdate.getAppendOffset();
long partitionSegmentSize = updateSize - partitionSegmentOffset - Integer.BYTES;
txFileUpdate.putInt(partitionSegmentOffset, (int) partitionSegmentSize);
// Save txFileUpdate to tx file starting at LOCAL_TX_OFFSET_MAP_WRITER_COUNT + 4
long writeOffset = MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505 + Integer.BYTES;
txMem.jumpTo(writeOffset);
for (int i = 0, size = 1; i < size && updateSize > 0; i++) {
long writeSize = Math.min(updateSize, txFileUpdate.getPageSize());
txMem.putBlockOfBytes(txFileUpdate.getPageAddress(i), writeSize);
updateSize -= writeSize;
}
assert updateSize == 0;
}
}
private static void writeAttachedPartitions(
FilesFacade ff,
long tempMem8b,
Path path,
MemoryMARW txMem,
int partitionBy,
int symbolsCount,
MemoryARW writeTo
) {
int rootLen = path.length();
long minTimestamp = txMem.getLong(TX_OFFSET_MIN_TIMESTAMP);
long maxTimestamp = txMem.getLong(TX_OFFSET_MAX_TIMESTAMP);
long transientCount = txMem.getLong(TX_OFFSET_TRANSIENT_ROW_COUNT);
Timestamps.TimestampFloorMethod timestampFloorMethod = getPartitionFloor(partitionBy);
Timestamps.TimestampAddMethod timestampAddMethod = getPartitionAdd(partitionBy);
final long tsLimit = timestampFloorMethod.floor(maxTimestamp);
for (long ts = timestampFloorMethod.floor(minTimestamp); ts < tsLimit; ts = timestampAddMethod.calculate(ts, 1)) {
path.trimTo(rootLen);
setPathForPartition(path, partitionBy, ts, false);
if (ff.exists(path.concat(TX_STRUCT_UPDATE_1_ARCHIVE_FILE_NAME).$())) {
if (!removedPartitionsIncludes(ts, txMem, symbolsCount)) {
long partitionSize = TableUtils.readLongAtOffset(ff, path, tempMem8b, 0);
// Update tx file with 4 longs per partition
writeTo.putLong(ts);
writeTo.putLong(partitionSize);
writeTo.putLong(-1L);
writeTo.putLong(0L);
}
}
}
// last partition
writeTo.putLong(tsLimit);
writeTo.putLong(transientCount);
writeTo.putLong(-1);
writeTo.putLong(0);
}
private static boolean removedPartitionsIncludes(long ts, MemoryR txMem, int symbolsCount) {
long removedPartitionLo = MigrationActions.prefixedBlockOffset(
MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505,
symbolsCount + 1L,
Integer.BYTES
);
long removedPartitionCount = txMem.getInt(removedPartitionLo);
long removedPartitionsHi = MigrationActions.prefixedBlockOffset(removedPartitionLo, Long.BYTES, removedPartitionCount);
for (long offset = removedPartitionLo + Integer.BYTES; offset < removedPartitionsHi; offset += Long.BYTES) {
long removedPartition = txMem.getLong(offset);
if (removedPartition == ts) {
return true;
}
}
return false;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.TableUtils;
import io.questdb.std.FilesFacade;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.META_OFFSET_COMMIT_LAG;
import static io.questdb.cairo.TableUtils.META_OFFSET_MAX_UNCOMMITTED_ROWS;
final class Mig600 {
static void migrate(MigrationContext migrationContext) {
MigrationActions.LOG.info().$("configuring default commit lag [table=").$(migrationContext.getTablePath()).I$();
final Path path = migrationContext.getTablePath();
final FilesFacade ff = migrationContext.getFf();
final long tempMem = migrationContext.getTempMemory(8);
final long fd = migrationContext.getMetadataFd();
TableUtils.writeIntOrFail(
ff,
fd,
META_OFFSET_MAX_UNCOMMITTED_ROWS,
migrationContext.getConfiguration().getMaxUncommittedRows(),
tempMem,
path
);
TableUtils.writeLongOrFail(
ff,
fd,
META_OFFSET_COMMIT_LAG,
migrationContext.getConfiguration().getCommitLag(),
tempMem,
path
);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.std.FilesFacade;
import io.questdb.std.MemoryTag;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.META_FILE_NAME;
final class Mig605 {
static void migrate(MigrationContext migrationContext) {
MigrationActions.LOG.info().$("updating column type IDs [table=").$(migrationContext.getTablePath()).I$();
final FilesFacade ff = migrationContext.getFf();
Path path = migrationContext.getTablePath();
path.concat(META_FILE_NAME).$();
if (!ff.exists(path)) {
MigrationActions.LOG.error().$("meta file does not exist, nothing to migrate [path=").$(path).I$();
return;
}
// Metadata file should already be backed up
try (final MemoryMARW rwMem = migrationContext.getRwMemory()) {
rwMem.of(ff, path, ff.getPageSize(), ff.length(path), MemoryTag.NATIVE_DEFAULT);
// column count
final int columnCount = rwMem.getInt(TableUtils.META_OFFSET_COUNT);
long offset = TableUtils.META_OFFSET_COLUMN_TYPES;
for (int i = 0; i < columnCount; i++) {
final byte oldTypeId = rwMem.getByte(offset);
final long oldFlags = rwMem.getLong(offset + 1);
final int blockCapacity = rwMem.getInt(offset + 1 + 8);
// column type id is int now
// we grabbed 3 reserved bytes for extra type info
// extra for old types is zeros
rwMem.putInt(offset, oldTypeId == 13 ? 18 : oldTypeId + 1); // ColumnType.VERSION_420 - ColumnType.VERSION_419 = 1 except for BINARY, old 13 new 18
rwMem.putLong(offset + 4, oldFlags);
rwMem.putInt(offset + 4 + 8, blockCapacity);
offset += 16; // old TableUtils.META_COLUMN_DATA_SIZE;
}
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.*;
import io.questdb.cairo.vm.MemoryCMARWImpl;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.std.FilesFacade;
import io.questdb.std.MemoryTag;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.*;
final class Mig607 {
static void migrate(MigrationContext migrationContext) {
final FilesFacade ff = migrationContext.getFf();
Path path = migrationContext.getTablePath();
int plen = path.length();
path.trimTo(plen).concat(META_FILE_NAME).$();
try (MemoryMARW metaMem = migrationContext.getRwMemory()) {
metaMem.of(ff, path, ff.getPageSize(), ff.length(path), MemoryTag.NATIVE_DEFAULT);
final int columnCount = metaMem.getInt(0);
final int partitionBy = metaMem.getInt(4);
final long columnNameOffset = MigrationActions.prefixedBlockOffset(
MigrationActions.META_OFFSET_COLUMN_TYPES_606,
columnCount,
MigrationActions.META_COLUMN_DATA_SIZE_606
);
try (MemoryMARW txMem = new MemoryCMARWImpl(
ff,
path.trimTo(plen).concat(TXN_FILE_NAME).$(),
ff.getPageSize(),
ff.length(path),
MemoryTag.NATIVE_DEFAULT)
) {
// this is a variable length file; we need to count of symbol maps before we get to the partition
// table data
final int symbolMapCount = txMem.getInt(MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505);
final long partitionCountOffset = MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505 + 4 + symbolMapCount * 8L;
int partitionCount = txMem.getInt(partitionCountOffset) / Long.BYTES / LONGS_PER_TX_ATTACHED_PARTITION;
final long transientRowCount = txMem.getLong(TX_OFFSET_TRANSIENT_ROW_COUNT);
if (partitionBy != PartitionBy.NONE) {
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) {
final long partitionDataOffset = partitionCountOffset + Integer.BYTES + partitionIndex * 8L * LONGS_PER_TX_ATTACHED_PARTITION;
setPathForPartition(
path.trimTo(plen),
partitionBy,
txMem.getLong(partitionDataOffset),
false
);
// the row count may not be stored in _txn file for the last partition
// we need to use transient row count instead
long rowCount = partitionIndex < partitionCount - 1 ? txMem.getLong(partitionDataOffset + Long.BYTES) : transientRowCount;
long txSuffix = txMem.getLong(MigrationActions.prefixedBlockOffset(partitionDataOffset, 2, Long.BYTES));
if (txSuffix > -1) {
txnPartition(path, txSuffix);
}
migrate(ff, path, migrationContext, metaMem, columnCount, rowCount, columnNameOffset);
}
} else {
path.trimTo(plen).concat(DEFAULT_PARTITION_NAME);
migrate(ff, path, migrationContext, metaMem, columnCount, transientRowCount, columnNameOffset);
}
// update symbol maps
long tmpMem = migrationContext.getTempMemory();
int denseSymbolCount = 0;
long currentColumnNameOffset = columnNameOffset;
for (int i = 0; i < columnCount; i++) {
final CharSequence columnName = metaMem.getStr(currentColumnNameOffset);
currentColumnNameOffset += Vm.getStorageLength(columnName.length());
if (ColumnType.tagOf(
metaMem.getInt(
MigrationActions.prefixedBlockOffset(
MigrationActions.META_OFFSET_COLUMN_TYPES_606,
i,
MigrationActions.META_COLUMN_DATA_SIZE_606
)
)) == ColumnType.SYMBOL
) {
final int symbolCount = txMem.getInt(MigrationActions.TX_OFFSET_MAP_WRITER_COUNT_505 + 8 + denseSymbolCount * 8L);
final long offset = MigrationActions.prefixedBlockOffset(SymbolMapWriter.HEADER_SIZE, symbolCount, 8L);
SymbolMapWriter.offsetFileName(path.trimTo(plen), columnName);
long fd = TableUtils.openRW(ff, path, MigrationActions.LOG);
try {
long fileLen = ff.length(fd);
if (symbolCount > 0) {
if (fileLen < offset) {
MigrationActions.LOG.error().$("file is too short [path=").$(path).I$();
} else {
TableUtils.allocateDiskSpace(ff, fd, offset + 8);
long dataOffset = TableUtils.readLongOrFail(ff, fd, offset - 8L, tmpMem, path);
// string length
SymbolMapWriter.charFileName(path.trimTo(plen), columnName);
long fd2 = TableUtils.openRO(ff, path, MigrationActions.LOG);
try {
long len = TableUtils.readIntOrFail(ff, fd2, dataOffset, tmpMem, path);
if (len == -1) {
dataOffset += 4;
} else {
dataOffset += 4 + len * 2L;
}
TableUtils.writeLongOrFail(ff, fd, offset, dataOffset, tmpMem, path);
} finally {
ff.close(fd2);
}
}
}
} finally {
Vm.bestEffortClose(ff, MigrationActions.LOG, fd, true, offset + 8);
}
denseSymbolCount++;
}
}
}
}
}
public static void migrate(
FilesFacade ff,
Path path,
MigrationContext migrationContext,
MemoryMARW metaMem,
int columnCount,
long rowCount,
long columnNameOffset
) {
final int plen2 = path.length();
if (rowCount > 0) {
long mem = migrationContext.getTempMemory();
long currentColumnNameOffset = columnNameOffset;
for (int i = 0; i < columnCount; i++) {
final int columnType = ColumnType.tagOf(
metaMem.getInt(
MigrationActions.prefixedBlockOffset(MigrationActions.META_OFFSET_COLUMN_TYPES_606, i, MigrationActions.META_COLUMN_DATA_SIZE_606)
)
);
final CharSequence columnName = metaMem.getStr(currentColumnNameOffset);
currentColumnNameOffset += Vm.getStorageLength(columnName);
if (columnType == ColumnType.STRING || columnType == ColumnType.BINARY) {
final long columnTop = readColumnTop(
ff,
path.trimTo(plen2),
columnName,
plen2,
mem,
false
);
final long columnRowCount = rowCount - columnTop;
long offset = columnRowCount * 8L;
iFile(path.trimTo(plen2), columnName);
long fd = TableUtils.openRW(ff, path, MigrationActions.LOG);
try {
long fileLen = ff.length(fd);
if (fileLen < offset) {
throw CairoException.instance(0).put("file is too short [path=").put(path).put("]");
}
TableUtils.allocateDiskSpace(ff, fd, offset + 8);
long dataOffset = TableUtils.readLongOrFail(ff, fd, offset - 8L, mem, path);
dFile(path.trimTo(plen2), columnName);
final long fd2 = TableUtils.openRO(ff, path, MigrationActions.LOG);
try {
if (columnType == ColumnType.BINARY) {
long len = TableUtils.readLongOrFail(ff, fd2, dataOffset, mem, path);
if (len == -1) {
dataOffset += 8;
} else {
dataOffset += 8 + len;
}
} else {
long len = TableUtils.readIntOrFail(ff, fd2, dataOffset, mem, path);
if (len == -1) {
dataOffset += 4;
} else {
dataOffset += MigrationActions.prefixedBlockOffset(4, 2, len);
}
}
} finally {
ff.close(fd2);
}
TableUtils.writeLongOrFail(ff, fd, offset, dataOffset, mem, path);
} finally {
Vm.bestEffortClose(ff, MigrationActions.LOG, fd, true, offset + 8);
}
}
}
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.griffin.engine.functions.rnd.SharedRandom;
import io.questdb.std.FilesFacade;
import io.questdb.std.MemoryTag;
import io.questdb.std.Rnd;
import io.questdb.std.Vect;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.META_FILE_NAME;
import static io.questdb.cairo.TableUtils.TXN_FILE_NAME;
final class Mig608 {
static void migrate(MigrationContext migrationContext) {
// META_COLUMN_DATA_SIZE = 16 -> 32;
// TX_OFFSET_MAP_WRITER_COUNT = 72 -> 128
final FilesFacade ff = migrationContext.getFf();
final Path path = migrationContext.getTablePath();
final int plen = path.length();
path.concat(META_FILE_NAME).$();
if (!ff.exists(path)) {
MigrationActions.LOG.error().$("meta file does not exist, nothing to migrate [path=").$(path).I$();
return;
}
// modify metadata
try (final MemoryMARW rwMem = migrationContext.getRwMemory()) {
final long thatMetaColumnDataSize = 16;
final long thisMetaColumnDataSize = 32;
rwMem.of(ff, path, ff.getPageSize(), ff.length(path), MemoryTag.NATIVE_DEFAULT);
// column count
final int columnCount = rwMem.getInt(TableUtils.META_OFFSET_COUNT);
long offset = TableUtils.META_OFFSET_COLUMN_TYPES;
// 32L here is TableUtils.META_COLUMN_DATA_SIZE at the time of writing this migration
long newNameOffset = offset + thisMetaColumnDataSize * columnCount;
// the intent is to resize the _meta file and move the variable length (names) segment
// to do that we need to work out size of the variable length segment first
long oldNameOffset = offset + thatMetaColumnDataSize * columnCount;
long o = oldNameOffset;
for (int i = 0; i < columnCount; i++) {
int len = rwMem.getStrLen(o);
o += Vm.getStorageLength(len);
}
final long nameSegmentLen = o - oldNameOffset;
// resize the file
rwMem.extend(newNameOffset + nameSegmentLen);
// move name segment
Vect.memmove(rwMem.addressOf(newNameOffset), rwMem.addressOf(oldNameOffset), nameSegmentLen);
// copy column information in reverse order
o = offset + thatMetaColumnDataSize * (columnCount - 1);
long o2 = offset + thisMetaColumnDataSize * (columnCount - 1);
final Rnd rnd = SharedRandom.getRandom(migrationContext.getConfiguration());
while (o > offset) {
rwMem.putInt(o2, rwMem.getInt(o)); // type
rwMem.putLong(o2 + 4, rwMem.getInt(o + 4)); // flags
rwMem.putInt(o2 + 12, rwMem.getInt(o + 12)); // index block capacity
rwMem.putLong(o2 + 20, rnd.nextLong()); // column hash
o -= thatMetaColumnDataSize;
o2 -= thisMetaColumnDataSize;
}
}
// update _txn file
path.trimTo(plen).concat(TXN_FILE_NAME).$();
if (!ff.exists(path)) {
MigrationActions.LOG.error().$("tx file does not exist, nothing to migrate [path=").$(path).I$();
return;
}
EngineMigration.backupFile(ff, path, migrationContext.getTablePath2(), TXN_FILE_NAME, 422);
MigrationActions.LOG.debug().$("opening for rw [path=").$(path).I$();
try (MemoryMARW txMem = migrationContext.createRwMemoryOf(ff, path.$())) {
// calculate size of the _txn file
final long thatTxOffsetMapWriterCount = 72;
final long thisTxOffsetMapWriterCount = 128;
final int longsPerAttachedPartition = 4;
int symbolCount = txMem.getInt(thatTxOffsetMapWriterCount);
int partitionTableSize = txMem.getInt(thatTxOffsetMapWriterCount + 4 + symbolCount * 8L) * 8 * longsPerAttachedPartition;
// resize existing file:
// thisTxOffsetMapWriterCount + symbolCount + symbolData + partitionTableEntryCount + partitionTableSize
long thatSize = thatTxOffsetMapWriterCount + 4 + symbolCount * 8L + 4L + partitionTableSize;
long thisSize = thisTxOffsetMapWriterCount + 4 + symbolCount * 8L + 4L + partitionTableSize;
txMem.extend(thisSize);
Vect.memmove(txMem.addressOf(thisTxOffsetMapWriterCount), txMem.addressOf(thatTxOffsetMapWriterCount), thatSize - thatTxOffsetMapWriterCount);
// zero out reserved area
Vect.memset(txMem.addressOf(thatTxOffsetMapWriterCount), thisTxOffsetMapWriterCount - thatTxOffsetMapWriterCount, 0);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo.mig;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.vm.MemoryCMARWImpl;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.FilesFacade;
import io.questdb.std.MemoryTag;
import io.questdb.std.str.Path;
import static io.questdb.cairo.TableUtils.*;
final class Mig609 {
private static final long TX_OFFSET_FIXED_ROW_COUNT_505 = 16;
private static final long TX_OFFSET_MAP_WRITER_COUNT_608 = 128;
private static final Log LOG = LogFactory.getLog(EngineMigration.class);
static void migrate(MigrationContext migrationContext) {
final FilesFacade ff = migrationContext.getFf();
final Path path = migrationContext.getTablePath();
final int plen = path.length();
path.trimTo(plen).concat(META_FILE_NAME).$();
try (MemoryMARW metaMem = migrationContext.getRwMemory()) {
metaMem.of(ff, path, ff.getPageSize(), ff.length(path), MemoryTag.NATIVE_DEFAULT);
// we require partition by value to avoid processing non-partitioned tables
final int partitionBy = metaMem.getInt(4);
try (MemoryMARW txMem = new MemoryCMARWImpl(
ff,
path.trimTo(plen).concat(TXN_FILE_NAME).$(),
ff.getPageSize(),
ff.length(path),
MemoryTag.NATIVE_DEFAULT)
) {
// this is a variable length file; we need to count of symbol maps before we get to the partition
// table data
final int symbolMapCount = txMem.getInt(TX_OFFSET_MAP_WRITER_COUNT_608);
final long partitionCountOffset = TX_OFFSET_MAP_WRITER_COUNT_608 + 4 + symbolMapCount * 8L;
// walk only non-active partition to extract sizes
final int partitionCount = txMem.getInt(partitionCountOffset) / Long.BYTES / LONGS_PER_TX_ATTACHED_PARTITION - 1;
if (partitionBy != PartitionBy.NONE) {
long calculatedFixedRowCount = 0;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) {
final long partitionDataOffset = partitionCountOffset + Integer.BYTES + partitionIndex * 8L * LONGS_PER_TX_ATTACHED_PARTITION;
// the row count may not be stored in _txn file for the last partition
// we need to use transient row count instead
calculatedFixedRowCount += txMem.getLong(partitionDataOffset + Long.BYTES);
}
long currentFixedRowCount = txMem.getLong(TX_OFFSET_FIXED_ROW_COUNT_505);
if (currentFixedRowCount != calculatedFixedRowCount) {
txMem.putLong(TX_OFFSET_FIXED_ROW_COUNT_505, calculatedFixedRowCount);
LOG.info()
.$("fixed row count is out [table=").$(path.trimTo(plen).$())
.$(", currentFixedRowCount=").$(currentFixedRowCount)
.$(", calculatedFixedRowCount=").$(calculatedFixedRowCount)
.I$();
}
}
}
}
}
}
......@@ -186,12 +186,9 @@ class LineTcpConnectionContext implements IOContext, Mutable {
.$(byteCharSequence.of(recvBufStartOfMeasurement, protoParser.getBufferAddress())).$();
goodMeasurement = true;
}
protoParser.startNextMeasurement();
recvBufStartOfMeasurement = protoParser.getBufferAddress();
if (recvBufStartOfMeasurement == recvBufPos) {
recvBufPos = recvBufStart;
protoParser.of(recvBufStart);
}
startNewMeasurement();
continue;
}
......@@ -229,16 +226,26 @@ class LineTcpConnectionContext implements IOContext, Mutable {
}
}
private void startNewMeasurement() {
protoParser.startNextMeasurement();
recvBufStartOfMeasurement = protoParser.getBufferAddress();
// we ran out of buffer, move to start and start parsing new data from socket
if (recvBufStartOfMeasurement == recvBufPos) {
recvBufPos = recvBufStart;
protoParser.of(recvBufStart);
}
}
protected boolean read() {
int bufferRemaining = (int) (recvBufEnd - recvBufPos);
final int orig = bufferRemaining;
if (bufferRemaining > 0 && !peerDisconnected) {
int nRead = nf.recv(fd, recvBufPos, bufferRemaining);
if (nRead > 0) {
recvBufPos += nRead;
bufferRemaining -= nRead;
int bytesRead = nf.recv(fd, recvBufPos, bufferRemaining);
if (bytesRead > 0) {
recvBufPos += bytesRead;
bufferRemaining -= bytesRead;
} else {
peerDisconnected = nRead < 0;
peerDisconnected = bytesRead < 0;
}
return bufferRemaining < orig;
}
......
......@@ -191,7 +191,7 @@ public class CairoTextWriter implements Closeable, Mutable {
private void checkMaxAndCommitLag() {
if (writer != null && maxUncommittedRows > 0 && writer.getO3RowCount() >= maxUncommittedRows) {
writer.checkMaxAndCommitLag(durable ? CommitMode.SYNC : CommitMode.NOSYNC);
writer.commitWithLag(durable ? CommitMode.SYNC : CommitMode.NOSYNC);
}
}
......
......@@ -43,7 +43,6 @@ public class Worker extends Thread {
private final boolean haltOnError;
private final int workerId;
private volatile int running = 0;
private volatile int fence;
private final long yieldThreshold;
private final long sleepThreshold;
......@@ -106,7 +105,7 @@ public class Worker extends Thread {
boolean useful = false;
for (int i = 0; i < n; i++) {
loadFence();
Unsafe.getUnsafe().loadFence();
try {
try {
useful |= jobs.get(i).run(workerId);
......@@ -114,7 +113,7 @@ public class Worker extends Thread {
onError(i, e);
}
} finally {
storeFence();
Unsafe.getUnsafe().storeFence();
}
}
......@@ -163,28 +162,20 @@ public class Worker extends Thread {
}
}
@SuppressWarnings("UnusedReturnValue")
private int loadFence() {
return fence;
}
private void setupJobs() {
if (running == 1) {
for (int i = 0; i < jobs.size(); i++) {
loadFence();
Unsafe.getUnsafe().loadFence();
try {
Job job = jobs.get(i);
if (job instanceof EagerThreadSetup) {
((EagerThreadSetup) job).setup();
}
} finally {
storeFence();
Unsafe.getUnsafe().storeFence();
}
}
}
}
private void storeFence() {
fence = 1;
}
}
......@@ -67,7 +67,6 @@ public class TxnTest extends AbstractCairoTest {
}
}
try (Path path = new Path()) {
path.of(configuration.getRoot()).concat(tableName);
int testPartitionCount = 3000;
......@@ -77,6 +76,7 @@ public class TxnTest extends AbstractCairoTest {
txWriter.updatePartitionSizeByTimestamp(i * Timestamps.DAY_MICROS, i + 1);
}
txWriter.updateMaxTimestamp(testPartitionCount * Timestamps.DAY_MICROS + 1);
txWriter.finishPartitionSizeUpdate();
txWriter.commit(CommitMode.SYNC, new ObjList<>());
}
......@@ -108,5 +108,4 @@ public class TxnTest extends AbstractCairoTest {
});
});
}
}
......@@ -656,7 +656,7 @@ public class ReaderPoolTest extends AbstractCairoTest {
CairoTestUtils.create(model);
}
for (int k = 0; k < 10000; k++) {
for (int k = 0; k < 10; k++) {
// allocate 32 readers to get to the start race at edge of next entry
int n = 64;
TableReader[] readers = new TableReader[n];
......
......@@ -1237,8 +1237,8 @@ public class LineTcpConnectionContextTest extends BaseLineTcpContextTest {
}
waitForIOCompletion();
rebalanceNLoadCheckCycles = scheduler.getNLoadCheckCycles();
rebalanceNRebalances = scheduler.getNRebalances();
rebalanceLoadByThread = scheduler.getLoadByThread();
rebalanceNRebalances = scheduler.getReshuffleCount();
rebalanceLoadByThread = scheduler.getLoadByWriterThread();
closeContext();
LOG.info().$("Completed ")
.$(nTotalUpdates)
......
......@@ -103,9 +103,11 @@ public abstract class LineUdpInsertTest extends AbstractCairoTest {
}
Os.sleep(250L); // allow reader to hit the readout
}
if (!waitForData.await(TimeUnit.SECONDS.toNanos(30L))) {
Assert.fail();
}
assertReader(tableName, expected, expectedExtraStringColumns);
}
});
......
......@@ -1251,7 +1251,7 @@ public class TextLoaderTest extends AbstractGriffinTest {
240_000_000, // 4 minutes, precision is micro
3,
true,
setOf("2021-01-01", "2021-01-02", "2021-01-01.4", "2021-01-02.5")
setOf("2021-01-01.2", "2021-01-01", "2021-01-01.4", "2021-01-02")
);
}
......@@ -1261,7 +1261,7 @@ public class TextLoaderTest extends AbstractGriffinTest {
60_000_000, // 1 minute, precision is micro
2,
false,
setOf("2021-01-01", "2021-01-02", "2021-01-01.5", "2021-01-02.6")
setOf("2021-01-01.2", "2021-01-01", "2021-01-01.5", "2021-01-02")
);
}
......@@ -3117,7 +3117,9 @@ public class TextLoaderTest extends AbstractGriffinTest {
@Override
public int rmdir(Path name) {
rmdirCallCount.getAndIncrement();
Assert.assertTrue(expectedPartitionNames.contains(extractLast(name)));
if (!expectedPartitionNames.contains(extractLast(name))) {
Assert.fail();
}
return Files.rmdir(name);
}
......@@ -3186,7 +3188,7 @@ public class TextLoaderTest extends AbstractGriffinTest {
Assert.assertEquals(TextLoadWarning.NONE, textLoader.getWarnings());
}
);
Assert.assertEquals(6, rmdirCallCount.get());
Assert.assertEquals(4, rmdirCallCount.get());
Assert.assertTrue((durable && msyncCallCount.get() > 0) || (!durable && msyncCallCount.get() == 0));
try (TableReader reader = engine.getReader(sqlExecutionContext.getCairoSecurityContext(), "test")) {
Assert.assertEquals(maxUncommittedRows, reader.getMaxUncommittedRows());
......
......@@ -33,6 +33,7 @@ import io.questdb.mp.WorkerPool;
import io.questdb.std.FilesFacade;
import io.questdb.std.FilesFacadeImpl;
import io.questdb.std.Rnd;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils;
......@@ -147,6 +148,14 @@ public class AbstractO3Test {
TestUtils.assertSqlCursors(compiler, sqlExecutionContext, referenceSQL, assertSQL, LOG);
engine.releaseAllReaders();
TestUtils.assertSqlCursors(compiler, sqlExecutionContext, referenceSQL, assertSQL, LOG);
TestUtils.assertSqlCursors(
compiler,
sqlExecutionContext,
"select count() from " + referenceSQL,
"select count() from " + assertSQL,
LOG
);
}
protected static void assertO3DataConsistency(
......@@ -256,37 +265,37 @@ public class AbstractO3Test {
// we need to create entire engine
final CairoConfiguration configuration = new DefaultCairoConfiguration(root) {
@Override
public FilesFacade getFilesFacade() {
return ff;
public int getO3PurgeDiscoveryQueueCapacity() {
return 0;
}
@Override
public int getO3CallbackQueueCapacity() {
public int getO3PurgeQueueCapacity() {
return 0;
}
@Override
public int getO3PartitionQueueCapacity() {
return 0;
public FilesFacade getFilesFacade() {
return ff;
}
@Override
public int getO3OpenColumnQueueCapacity() {
public int getO3CallbackQueueCapacity() {
return 0;
}
@Override
public int getO3CopyQueueCapacity() {
public int getO3PartitionQueueCapacity() {
return 0;
}
@Override
public int getO3PurgeDiscoveryQueueCapacity() {
public int getO3OpenColumnQueueCapacity() {
return 0;
}
@Override
public int getO3PurgeQueueCapacity() {
public int getO3CopyQueueCapacity() {
return 0;
}
......@@ -334,6 +343,11 @@ public class AbstractO3Test {
}
}
protected static void assertXCountY(SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
TestUtils.assertSqlCursors(compiler, sqlExecutionContext, "select count() from x", "select count() from y", LOG);
assertMaxTimestamp(compiler.getEngine(), compiler, sqlExecutionContext, "select max(ts) from y");
}
protected static void executeVanilla(O3Runnable code) throws Exception {
executeVanilla(() -> execute(null, code, new DefaultCairoConfiguration(root)));
}
......@@ -373,4 +387,46 @@ public class AbstractO3Test {
TestUtils.assertEquals(sink, sink2);
}
static void assertMaxTimestamp(
CairoEngine engine,
SqlCompiler compiler,
SqlExecutionContext executionContext,
String expectedSql
) throws SqlException {
TestUtils.printSql(
compiler,
executionContext,
expectedSql,
sink2
);
assertMaxTimestamp(engine, executionContext, sink2);
}
static void assertMaxTimestamp(
CairoEngine engine,
SqlExecutionContext executionContext,
CharSequence expected
) {
try (
final TableWriter w = engine.getWriter(
executionContext.getCairoSecurityContext(),
"x",
"test"
)
) {
sink.clear();
sink.put("max\n");
TimestampFormatUtils.appendDateTimeUSec(sink, w.getMaxTimestamp());
sink.put('\n');
TestUtils.assertEquals(expected, sink);
Assert.assertEquals(0, w.getO3RowCount());
}
}
static void assertXCount(SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
printSqlResult(compiler, sqlExecutionContext, "select count() from x");
TestUtils.assertEquals(sink2, sink);
}
}
......@@ -120,6 +120,59 @@ public class EngineMigrationTest extends AbstractGriffinTest {
}
}
@NotNull
private String appendCommonColumns() {
return " rnd_byte() a," +
" rnd_char() b," +
" rnd_short() c," +
" rnd_int(-77888, 999001, 2) d," + // ensure we have nulls
" rnd_long(-100000, 100000, 2) e," + // ensure we have nulls
" rnd_float(2) f," + // ensure we have nulls
" rnd_double(2) g," + // ensure we have nulls
" rnd_date(199999999, 399999999999, 2) h," + // ensure we have nulls
" cast(rnd_long(-7999999, 800000, 10) as timestamp) i," + // ensure we have nulls
" rnd_str(4,5,2) j," +
" rnd_symbol('newsymbol1','newsymbol12', null) k," +
" rnd_boolean() l," +
" rnd_symbol('newsymbol1','newsymbol12', null) m," +
" rnd_long256() n," +
" rnd_bin(2,10, 2) o";
}
private void appendData() throws SqlException {
engine.releaseAllReaders();
engine.releaseAllWriters();
// Insert some data
compiler.compile("insert into t_year select " +
appendCommonColumns() +
", timestamp_sequence('2021-01-01', 200000000L) ts" +
" from long_sequence(5)", sqlExecutionContext);
// Insert same data to have O3 append tested
compiler.compile("insert into t_year select " +
appendCommonColumns() +
", timestamp_sequence('2020-01-01', 200000000L) ts" +
" from long_sequence(5)", sqlExecutionContext);
}
private void assertAppendedData() throws SqlException {
engine.releaseAllReaders();
engine.releaseAllWriters();
assertSql("select * FROM t_year LIMIT -10",
"a\tb\tc\td\te\tf\tg\th\ti\tj\tk\tl\tm\tn\to\tts\n" +
"109\tP\t-12455\t263934\t49960\t0.6793\t0.09831693674866282\t1977-08-08T19:44:03.856Z\t1969-12-31T23:59:55.049605Z\tHQBJP\tc\tfalse\taaa\t0xbe15104d1d36d615cac36ab298393e52b06836c8abd67a44787ce11d6fc88eab\t00000000 37 58 2c 0d b0 d0 9c 57 02 75\t2096-10-02T07:10:00.000000Z\n" +
"73\tB\t-1271\t-47644\t4999\t0.8584\t0.12392055368261845\t1975-06-03T19:26:19.012Z\t1969-12-31T23:59:55.604499Z\t\tc\ttrue\taaa\t0x4f669e76b0311ac3438ec9cc282caa7043a05a3edd41f45aa59f873d1c729128\t00000000 34 01 0e 4d 2b 00 fa 34\t2103-02-04T02:43:20.000000Z\n" +
"83\tL\t-32289\t127321\t40837\t0.1335\t0.515824820198022\t\t1969-12-31T23:59:53.582959Z\tKFMO\taaa\tfalse\t\t0x8131875cd498c4b888762e985137f4e843b8167edcd59cf345c105202f875495\t\t2109-06-06T22:16:40.000000Z\n" +
"51\tS\t-28311\tNaN\t-72973\t0.5957\t0.20897460269739654\t1973-03-28T21:58:08.545Z\t1969-12-31T23:59:54.332988Z\t\tc\tfalse\taaa\t0x50113ffcc219fb1a9bc4f6389de1764097e7bcd897ae8a54aa2883a41581608f\t00000000 83 94 b5\t2115-10-08T17:50:00.000000Z\n" +
"49\tN\t-11147\t392567\t-9830\t0.5248\t0.1095692511246914\t\t1969-12-31T23:59:56.849475Z\tIFBE\tc\tfalse\taaa\t0x36055358bd232c9d775e2e80754e5fcda2353931c7033ad5c38c294e9227895a\t\t2122-02-08T13:23:20.000000Z\n" +
"57\tI\t-22903\t874980\t-28069\tNaN\t0.016793228004843286\t1975-09-29T05:10:33.275Z\t1969-12-31T23:59:55.690794Z\tBZSM\t\ttrue\tc\t0xa2c84382c65eb07087cf6cb291b2c3e7a9ffe8560d2cec518dea50b88b87fe43\t00000000 b9 c4 18 2b aa 3d\t2128-06-11T08:56:40.000000Z\n" +
"127\tW\t-16809\t288758\t-22272\t0.0535\t0.5855510665931698\t\t1969-12-31T23:59:52.689490Z\t\taaa\tfalse\tc\t0x918ae2d78481070577c7d4c3a758a5ea3dd771714ac964ab4b350afc9b599b28\t\t2134-10-13T04:30:00.000000Z\n" +
"20\tM\t-7043\t251501\t-85499\t0.9403\t0.9135840078861264\t1977-05-12T19:20:06.113Z\t1969-12-31T23:59:54.045277Z\tHOXL\tbbbbbb\tfalse\tbbbbbb\t0xc3b0de059fff72dbd7b99af08ac0d1cddb2990725a3338e377155edb531cb644\t\t2141-02-13T00:03:20.000000Z\n" +
"26\tG\t-24830\t-56840\t-32956\t0.8282\t0.017280895313585898\t1982-07-16T03:52:53.454Z\t1969-12-31T23:59:54.115165Z\tJEJH\taaa\tfalse\tbbbbbb\t0x16f70de9c6af11071d35d9faec5d18fd1cf3bbbc825b72a92ecb8ff0286bf649\t00000000 c2 62 f8 53 7d 05 65\t2147-06-16T19:36:40.000000Z\n" +
"127\tY\t19592\t224361\t37963\t0.6930\t0.006817672510656014\t1975-11-29T09:47:45.706Z\t1969-12-31T23:59:56.186242Z\t\t\ttrue\taaa\t0x88926dd483caaf4031096402997f21c833b142e887fa119e380dc9b54493ff70\t00000000 23 c3 9d 75 26 f2 0d b5 7a 3f\t2153-10-17T15:10:00.000000Z\n");
}
private void assertData(boolean withO3) throws SqlException {
assertNoneNts();
assertNone();
......@@ -507,6 +560,15 @@ public class EngineMigrationTest extends AbstractGriffinTest {
"\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select count() from t_month",
sink,
"count\n" +
"30\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
......@@ -574,6 +636,15 @@ public class EngineMigrationTest extends AbstractGriffinTest {
"\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select count() from t_month_ooo",
sink,
"count\n" +
"30\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
......@@ -640,6 +711,16 @@ public class EngineMigrationTest extends AbstractGriffinTest {
"bbbbbb\n" +
"\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select count() from t_none",
sink,
"count\n" +
"30\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
......@@ -694,6 +775,15 @@ public class EngineMigrationTest extends AbstractGriffinTest {
"27\tJ\t-15254\t978974\t-36356\t0.7911\t0.7128505998532723\t1976-03-14T08:19:05.571Z\t1969-12-31T23:59:56.726487Z\tPZNYV\t\ttrue\t\t0x39af691594d0654567af4ec050eea188b8074532ac9f3c87c68ce6f3720e2b62\t00000000 20 13\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select count() from t_none_nts",
sink,
"count\n" +
"30\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
......@@ -770,6 +860,15 @@ public class EngineMigrationTest extends AbstractGriffinTest {
"\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select count() from t_year",
sink,
"count\n" +
"30\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
......@@ -836,6 +935,15 @@ public class EngineMigrationTest extends AbstractGriffinTest {
"\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select count() from t_year_ooo",
sink,
"count\n" +
"30\n"
);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
......@@ -894,25 +1002,6 @@ public class EngineMigrationTest extends AbstractGriffinTest {
" rnd_bin(2,10, 2) o";
}
@NotNull
private String appendCommonColumns() {
return " rnd_byte() a," +
" rnd_char() b," +
" rnd_short() c," +
" rnd_int(-77888, 999001, 2) d," + // ensure we have nulls
" rnd_long(-100000, 100000, 2) e," + // ensure we have nulls
" rnd_float(2) f," + // ensure we have nulls
" rnd_double(2) g," + // ensure we have nulls
" rnd_date(199999999, 399999999999, 2) h," + // ensure we have nulls
" cast(rnd_long(-7999999, 800000, 10) as timestamp) i," + // ensure we have nulls
" rnd_str(4,5,2) j," +
" rnd_symbol('newsymbol1','newsymbol12', null) k," +
" rnd_boolean() l," +
" rnd_symbol('newsymbol1','newsymbol12', null) m," +
" rnd_long256() n," +
" rnd_bin(2,10, 2) o";
}
private void doMigration(String dataZip, boolean freeTableId, boolean withO3) throws IOException, SqlException {
if (freeTableId) {
engine.freeTableId();
......@@ -920,45 +1009,10 @@ public class EngineMigrationTest extends AbstractGriffinTest {
replaceDbContent(dataZip);
EngineMigration.migrateEngineTo(engine, ColumnType.VERSION, true);
assertData(withO3);
appendData();
assertAppendedData();
}
private void assertAppendedData() throws SqlException {
engine.releaseAllReaders();
engine.releaseAllWriters();
assertSql("select * FROM t_year LIMIT -10",
"a\tb\tc\td\te\tf\tg\th\ti\tj\tk\tl\tm\tn\to\tts\n" +
"109\tP\t-12455\t263934\t49960\t0.6793\t0.09831693674866282\t1977-08-08T19:44:03.856Z\t1969-12-31T23:59:55.049605Z\tHQBJP\tc\tfalse\taaa\t0xbe15104d1d36d615cac36ab298393e52b06836c8abd67a44787ce11d6fc88eab\t00000000 37 58 2c 0d b0 d0 9c 57 02 75\t2096-10-02T07:10:00.000000Z\n" +
"73\tB\t-1271\t-47644\t4999\t0.8584\t0.12392055368261845\t1975-06-03T19:26:19.012Z\t1969-12-31T23:59:55.604499Z\t\tc\ttrue\taaa\t0x4f669e76b0311ac3438ec9cc282caa7043a05a3edd41f45aa59f873d1c729128\t00000000 34 01 0e 4d 2b 00 fa 34\t2103-02-04T02:43:20.000000Z\n" +
"83\tL\t-32289\t127321\t40837\t0.1335\t0.515824820198022\t\t1969-12-31T23:59:53.582959Z\tKFMO\taaa\tfalse\t\t0x8131875cd498c4b888762e985137f4e843b8167edcd59cf345c105202f875495\t\t2109-06-06T22:16:40.000000Z\n" +
"51\tS\t-28311\tNaN\t-72973\t0.5957\t0.20897460269739654\t1973-03-28T21:58:08.545Z\t1969-12-31T23:59:54.332988Z\t\tc\tfalse\taaa\t0x50113ffcc219fb1a9bc4f6389de1764097e7bcd897ae8a54aa2883a41581608f\t00000000 83 94 b5\t2115-10-08T17:50:00.000000Z\n" +
"49\tN\t-11147\t392567\t-9830\t0.5248\t0.1095692511246914\t\t1969-12-31T23:59:56.849475Z\tIFBE\tc\tfalse\taaa\t0x36055358bd232c9d775e2e80754e5fcda2353931c7033ad5c38c294e9227895a\t\t2122-02-08T13:23:20.000000Z\n" +
"57\tI\t-22903\t874980\t-28069\tNaN\t0.016793228004843286\t1975-09-29T05:10:33.275Z\t1969-12-31T23:59:55.690794Z\tBZSM\t\ttrue\tc\t0xa2c84382c65eb07087cf6cb291b2c3e7a9ffe8560d2cec518dea50b88b87fe43\t00000000 b9 c4 18 2b aa 3d\t2128-06-11T08:56:40.000000Z\n" +
"127\tW\t-16809\t288758\t-22272\t0.0535\t0.5855510665931698\t\t1969-12-31T23:59:52.689490Z\t\taaa\tfalse\tc\t0x918ae2d78481070577c7d4c3a758a5ea3dd771714ac964ab4b350afc9b599b28\t\t2134-10-13T04:30:00.000000Z\n" +
"20\tM\t-7043\t251501\t-85499\t0.9403\t0.9135840078861264\t1977-05-12T19:20:06.113Z\t1969-12-31T23:59:54.045277Z\tHOXL\tbbbbbb\tfalse\tbbbbbb\t0xc3b0de059fff72dbd7b99af08ac0d1cddb2990725a3338e377155edb531cb644\t\t2141-02-13T00:03:20.000000Z\n" +
"26\tG\t-24830\t-56840\t-32956\t0.8282\t0.017280895313585898\t1982-07-16T03:52:53.454Z\t1969-12-31T23:59:54.115165Z\tJEJH\taaa\tfalse\tbbbbbb\t0x16f70de9c6af11071d35d9faec5d18fd1cf3bbbc825b72a92ecb8ff0286bf649\t00000000 c2 62 f8 53 7d 05 65\t2147-06-16T19:36:40.000000Z\n" +
"127\tY\t19592\t224361\t37963\t0.6930\t0.006817672510656014\t1975-11-29T09:47:45.706Z\t1969-12-31T23:59:56.186242Z\t\t\ttrue\taaa\t0x88926dd483caaf4031096402997f21c833b142e887fa119e380dc9b54493ff70\t00000000 23 c3 9d 75 26 f2 0d b5 7a 3f\t2153-10-17T15:10:00.000000Z\n");
}
private void appendData() throws SqlException {
engine.releaseAllReaders();
engine.releaseAllWriters();
// Insert some data
compiler.compile("insert into t_year select " +
appendCommonColumns() +
", timestamp_sequence('2021-01-01', 200000000L) ts" +
" from long_sequence(5)", sqlExecutionContext);
// Insert same data to have O3 append tested
compiler.compile("insert into t_year select " +
appendCommonColumns() +
", timestamp_sequence('2020-01-01', 200000000L) ts" +
" from long_sequence(5)", sqlExecutionContext);
}
private void generateMigrationTables() throws SqlException, NumericException {
compiler.compile(
"create table t_none_nts as (" +
......
......@@ -297,6 +297,16 @@ public class O3CommitLagTest extends AbstractO3Test {
});
}
private void assertXY(SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from y", sink2);
TestUtils.assertEquals(sink, sink2);
}
private void insertUncommitted(
SqlCompiler compiler,
SqlExecutionContext sqlExecutionContext,
......@@ -374,7 +384,7 @@ public class O3CommitLagTest extends AbstractO3Test {
}
long start = IntervalUtils.parseFloorPartialDate("2021-04-27T08:00:00");
long[] testCounts = new long[] { 2 * 1024 * 1024, 16 * 8 * 1024 * 5, 2_000_000 };
long[] testCounts = new long[]{2 * 1024 * 1024, 16 * 8 * 1024 * 5, 2_000_000};
for (int c = 0; c < testCounts.length; c++) {
long idCount = testCounts[c];
......@@ -544,9 +554,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testCommitLagEndingAtPartitionBoundaryPlus1WithRollback0(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException, NumericException {
......@@ -610,6 +618,10 @@ public class O3CommitLagTest extends AbstractO3Test {
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x where i<=185 or i>=200", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from (select * from x where i<=185 or i>=200)", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from y", sink2);
TestUtils.assertEquals(sink, sink2);
}
private void testCommitLagEndingAtPartitionBoundaryWithRollback0(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException, NumericException {
......@@ -729,9 +741,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testCommitLagStaggeringPartitionsWithRollback0(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -843,9 +853,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink2, sink);
assertXY(compiler, sqlExecutionContext);
}
private void testCommitLagWithLargeO3(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -892,9 +900,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testCommitLagWithinPartition(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -940,10 +946,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testCommitLagWithinPartitionWithRollback(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -1001,6 +1004,10 @@ public class O3CommitLagTest extends AbstractO3Test {
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x where i<=375 or i>380", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from (select * from x where i<=375 or i>380)", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from y", sink2);
TestUtils.assertEquals(sink, sink2);
}
private void testContinuousBatchedCommit0(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -1088,9 +1095,7 @@ public class O3CommitLagTest extends AbstractO3Test {
}
LOG.info().$("committed final state with ").$(nCommitsWithLag).$(" commits with lag").$();
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink2, sink);
assertXY(compiler, sqlExecutionContext);
}
private void testLargeLagWithRowLimit(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -1135,9 +1140,7 @@ public class O3CommitLagTest extends AbstractO3Test {
TestUtils.assertEquals(sink, sink2);
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select i, ts from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select i, ts from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testLargeLagWithinPartition(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -1183,10 +1186,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select i, ts from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select i, ts from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testNoLag0(
......@@ -1237,9 +1237,7 @@ public class O3CommitLagTest extends AbstractO3Test {
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testNoLagEndingAtPartitionBoundary(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -1286,10 +1284,7 @@ public class O3CommitLagTest extends AbstractO3Test {
insertUncommitted(compiler, sqlExecutionContext, sql, writer);
writer.commit();
}
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
assertXY(compiler, sqlExecutionContext);
}
private void testNoLagWithRollback(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) throws SqlException {
......@@ -1347,5 +1342,9 @@ public class O3CommitLagTest extends AbstractO3Test {
TestUtils.printSql(compiler, sqlExecutionContext, "select * from x where i<=375 or i>380", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select * from y", sink2);
TestUtils.assertEquals(sink, sink2);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from (select * from x where i<=375 or i>380)", sink);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from y", sink2);
TestUtils.assertEquals(sink, sink2);
}
}
......@@ -35,7 +35,9 @@ public class TableRepairTest extends AbstractGriffinTest {
@Test
public void testDeleteActivePartition() throws Exception {
// this delete partition actually deletes files, simulating manual intervention
assertMemoryLeak(() -> {
compiler.compile(
"create table tst as (select * from (select rnd_int() a, rnd_double() b, timestamp_sequence(0, 10000000l) t from long_sequence(100000)) timestamp (t)) timestamp(t) partition by DAY",
......@@ -57,24 +59,18 @@ public class TableRepairTest extends AbstractGriffinTest {
// repair by opening and closing writer
try (TableWriter w = new TableWriter(configuration, "tst")) {
Assert.assertTrue(reader.reload());
Assert.assertEquals(95040, reader.size());
Assert.assertEquals(950390000000L, w.getMaxTimestamp());
TableWriter.Row row = w.newRow(w.getMaxTimestamp());
row.putInt(0, 150);
row.putDouble(1, 0.67);
row.append();
w.commit();
}
Assert.assertTrue(reader.reload());
Assert.assertEquals(95041, reader.size());
}
});
}
......@@ -108,5 +104,4 @@ public class TableRepairTest extends AbstractGriffinTest {
}
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册