提交 43ba137d 编写于 作者: V Vlad Ilyushchenko

CAIRO: fixed transactional behaviour of TableWriter.truncate(). TableReader...

CAIRO: fixed transactional behaviour of TableWriter.truncate(). TableReader will now process this event correctly.
上级 50e53f4d
......@@ -69,10 +69,6 @@ public class BaseRecordMetadata implements RecordMetadata {
return getColumnQuick(columnIndex).isIndexed();
}
public TableColumnMetadata getColumnMetadata(int columnIndex) {
return columnMetadata.getQuick(columnIndex);
}
public TableColumnMetadata getColumnQuick(int index) {
return columnMetadata.getQuick(index);
}
......
......@@ -47,10 +47,6 @@ public final class PartitionBy {
private PartitionBy() {
}
public static int count() {
return nameToIndexMap.size();
}
public static int fromString(CharSequence name) {
return nameToIndexMap.get(name);
}
......
......@@ -42,11 +42,10 @@ public class TableReader implements Closeable {
private static final PartitionPathGenerator MONTH_GEN = TableReader::pathGenMonth;
private static final PartitionPathGenerator DAY_GEN = TableReader::pathGenDay;
private static final PartitionPathGenerator DEFAULT_GEN = (reader, partitionIndex) -> reader.pathGenDefault();
private static final ReloadMethod PARTITIONED_RELOAD_METHOD = TableReader::reloadPartitioned;
private static final ReloadMethod NON_PARTITIONED_RELOAD_METHOD = TableReader::reloadNonPartitioned;
private static final ReloadMethod FIRST_TIME_PARTITIONED_RELOAD_METHOD = TableReader::reloadInitialPartitioned;
private static final ReloadMethod FIRST_TIME_NON_PARTITIONED_RELOAD_METHOD = TableReader::reloadInitialNonPartitioned;
private static final ReloadMethod FIRST_TIME_PARTITIONED_RELOAD_METHOD = TableReader::reloadInitialPartitioned;
private static final ReloadMethod PARTITIONED_RELOAD_METHOD = TableReader::reloadPartitioned;
private final ColumnCopyStruct tempCopyStruct = new ColumnCopyStruct();
private final FilesFacade ff;
private final Path path;
......@@ -73,6 +72,7 @@ public class TableReader implements Closeable {
private int columnCountBits;
private long transientRowCount;
private long structVersion;
private long dataVersion;
private long prevStructVersion;
private long partitionTableVersion;
private long prevPartitionTableVersion;
......@@ -220,6 +220,10 @@ public class TableReader implements Closeable {
return recordCursor;
}
public long getDataVersion() {
return dataVersion;
}
public long getMaxTimestamp() {
return maxTimestamp;
}
......@@ -393,6 +397,30 @@ public class TableReader implements Closeable {
}
}
private void applyTruncate() {
LOG.info().$("truncate detected").$();
for (int i = 0, n = partitionCount; i < n; i++) {
long size = openPartition0(i);
if (size == -1) {
int base = getColumnBase(i);
for (int k = 0; k < columnCount; k++) {
final int index = getPrimaryColumnIndex(base, k);
Misc.free(columns.getAndSetQuick(index, null));
Misc.free(columns.getAndSetQuick(index + 1, null));
Misc.free(bitmapIndexes.getAndSetQuick(index, null));
Misc.free(bitmapIndexes.getAndSetQuick(index + 1, null));
}
partitionRowCounts.setQuick(i, -1);
}
}
reloadSymbolMapCounts();
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
if (partitionCount > 0) {
updateCapacities();
}
}
private int calculatePartitionCount() {
if (partitionMin == Long.MAX_VALUE) {
return 0;
......@@ -592,7 +620,7 @@ public class TableReader implements Closeable {
private void freeBitmapIndexCache() {
if (bitmapIndexes != null) {
for (int i = 0, n = bitmapIndexes.size(); i < n; i++) {
Misc.free(bitmapIndexes.getQuick(i));
bitmapIndexes.setQuick(i, Misc.free(bitmapIndexes.getQuick(i)));
}
}
}
......@@ -600,7 +628,7 @@ public class TableReader implements Closeable {
private void freeColumns() {
if (columns != null) {
for (int i = 0, n = columns.size(); i < n; i++) {
Misc.free(columns.getQuick(i));
columns.setQuick(i, Misc.free(columns.getQuick(i)));
}
}
}
......@@ -709,9 +737,7 @@ public class TableReader implements Closeable {
private void openPartitionColumns(Path path, int columnBase, long partitionRowCount) {
for (int i = 0; i < columnCount; i++) {
if (columns.getQuick(getPrimaryColumnIndex(columnBase, i)) == null) {
reloadColumnAt(path, this.columns, this.columnTops, this.bitmapIndexes, columnBase, i, partitionRowCount);
}
reloadColumnAt(path, this.columns, this.columnTops, this.bitmapIndexes, columnBase, i, partitionRowCount);
}
}
......@@ -793,6 +819,7 @@ public class TableReader implements Closeable {
final long fixedRowCount = txMem.getLong(TableUtils.TX_OFFSET_FIXED_ROW_COUNT);
final long maxTimestamp = txMem.getLong(TableUtils.TX_OFFSET_MAX_TIMESTAMP);
final long structVersion = txMem.getLong(TableUtils.TX_OFFSET_STRUCT_VERSION);
final long dataVersion = txMem.getLong(TableUtils.TX_OFFSET_DATA_VERSION);
final long partitionTableVersion = txMem.getLong(TableUtils.TX_OFFSET_PARTITION_TABLE_VERSION);
this.symbolCountSnapshot.clear();
......@@ -824,6 +851,7 @@ public class TableReader implements Closeable {
this.rowCount = fixedRowCount + transientRowCount;
this.maxTimestamp = maxTimestamp;
this.structVersion = structVersion;
this.dataVersion = dataVersion;
this.partitionTableVersion = partitionTableVersion;
LOG.info().$("new transaction [txn=").$(txn).$(", transientRowCount=").$(transientRowCount).$(", fixedRowCount=").$(fixedRowCount).$(", maxTimestamp=").$(maxTimestamp).$(", attempts=").$(count).$(']').$();
return true;
......@@ -852,7 +880,6 @@ public class TableReader implements Closeable {
if (ff.exists(TableUtils.dFile(path.trimTo(plen), name))) {
if (mem1 instanceof ReadOnlyMemory) {
((ReadOnlyMemory) mem1).of(ff, path, ff.getMapPageSize(), 0);
} else {
......@@ -937,6 +964,7 @@ public class TableReader implements Closeable {
}
private boolean reloadInitialNonPartitioned() {
long dataVersion = this.dataVersion;
if (readTxn()) {
reloadStruct();
reloadSymbolMapCounts();
......@@ -947,26 +975,30 @@ public class TableReader implements Closeable {
return true;
}
}
return false;
return dataVersion != this.dataVersion;
}
private boolean reloadInitialPartitioned() {
if (readTxn()) {
reloadStruct();
reloadSymbolMapCounts();
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
if (partitionCount > 0) {
updateCapacities();
if (maxTimestamp != Numbers.LONG_NaN) {
reloadMethod = PARTITIONED_RELOAD_METHOD;
}
}
return true;
return reloadInitialPartitioned0();
}
return false;
}
private boolean reloadInitialPartitioned0() {
reloadSymbolMapCounts();
partitionMin = findPartitionMinimum();
partitionCount = calculatePartitionCount();
if (partitionCount > 0) {
updateCapacities();
if (maxTimestamp != Numbers.LONG_NaN) {
reloadMethod = PARTITIONED_RELOAD_METHOD;
}
}
return true;
}
private boolean reloadNonPartitioned() {
// calling readTxn will set "rowCount" member variable
if (readTxn()) {
......@@ -1006,11 +1038,17 @@ public class TableReader implements Closeable {
private boolean reloadPartitioned() {
assert timestampFloorMethod != null;
long currentPartitionTimestamp = floorToPartitionTimestamp(maxTimestamp);
final long currentPartitionTimestamp = maxTimestamp == Numbers.LONG_NaN ? maxTimestamp : floorToPartitionTimestamp(maxTimestamp);
boolean b = readTxn();
if (b) {
reloadStruct();
if (maxTimestamp == Numbers.LONG_NaN || currentPartitionTimestamp == Numbers.LONG_NaN) {
applyTruncate();
return true;
}
assert intervalLengthMethod != null;
// calculate timestamp delta between before and after reload.
int delta = getPartitionCountBetweenTimestamps(currentPartitionTimestamp, floorToPartitionTimestamp(maxTimestamp));
int partitionIndex = partitionCount - 1;
......@@ -1058,7 +1096,6 @@ public class TableReader implements Closeable {
}
private SymbolMapReader reloadSymbolMapReader(int columnIndex, SymbolMapReader reader) {
// RecordColumnMetadata m = metadata.getColumnQuick(columnIndex);
if (metadata.getColumnType(columnIndex) == ColumnType.SYMBOL) {
if (reader instanceof SymbolMapReaderImpl) {
((SymbolMapReaderImpl) reader).of(configuration, path, metadata.getColumnName(columnIndex), 0);
......
......@@ -27,6 +27,7 @@ public class TableReaderIncrementalRecordCursor extends TableReaderRecordCursor
private long txn = TableUtils.INITIAL_TXN;
private long lastRowId = -1;
private long dataVersion = -1;
public void bookmark() {
lastRowId = record.getRowId();
......@@ -45,7 +46,13 @@ public class TableReaderIncrementalRecordCursor extends TableReaderRecordCursor
public boolean reload() {
long txn;
if (reader.reload()) {
seekToLast();
if (reader.getDataVersion() != this.dataVersion) {
lastRowId = -1;
dataVersion = reader.getDataVersion();
toTop();
} else {
seekToLast();
}
this.txn = reader.getTxn();
return true;
}
......@@ -67,7 +74,9 @@ public class TableReaderIncrementalRecordCursor extends TableReaderRecordCursor
if (lastRowId > -1) {
startFrom(lastRowId);
} else {
// this is first time this cursor opens
toTop();
this.dataVersion = reader.getDataVersion();
}
}
}
......@@ -58,9 +58,10 @@ public final class TableUtils {
static final long TX_OFFSET_FIXED_ROW_COUNT = 16;
static final long TX_OFFSET_MAX_TIMESTAMP = 24;
static final long TX_OFFSET_STRUCT_VERSION = 32;
static final long TX_OFFSET_PARTITION_TABLE_VERSION = 40;
static final long TX_OFFSET_TXN_CHECK = 48;
static final long TX_OFFSET_MAP_WRITER_COUNT = 56;
static final long TX_OFFSET_DATA_VERSION = 40;
static final long TX_OFFSET_PARTITION_TABLE_VERSION = 48;
static final long TX_OFFSET_TXN_CHECK = 56;
static final long TX_OFFSET_MAP_WRITER_COUNT = 64;
/**
* struct {
* long txn;
......@@ -142,7 +143,7 @@ public final class TableUtils {
}
}
mem.of(ff, path.trimTo(rootLen).concat(TXN_FILE_NAME).$(), ff.getPageSize());
TableUtils.resetTxn(mem, symbolMapCount);
TableUtils.resetTxn(mem, symbolMapCount, 0L, INITIAL_TXN);
}
}
......@@ -204,9 +205,9 @@ public final class TableUtils {
path.put(".lock").$();
}
public static void resetTxn(VirtualMemory txMem, int symbolMapCount) {
public static void resetTxn(VirtualMemory txMem, int symbolMapCount, long txn, long dataVersion) {
// txn to let readers know table is being reset
txMem.putLong(TX_OFFSET_TXN, INITIAL_TXN);
txMem.putLong(TX_OFFSET_TXN, txn);
Unsafe.getUnsafe().storeFence();
// transient row count
......@@ -217,6 +218,8 @@ public final class TableUtils {
txMem.putLong(TX_OFFSET_MAX_TIMESTAMP, Long.MIN_VALUE);
// structure version
txMem.putLong(TX_OFFSET_STRUCT_VERSION, 0);
// data version
txMem.putLong(TX_OFFSET_DATA_VERSION, dataVersion);
txMem.putInt(TX_OFFSET_MAP_WRITER_COUNT, symbolMapCount);
for (int i = 0; i < symbolMapCount; i++) {
......@@ -225,7 +228,7 @@ public final class TableUtils {
Unsafe.getUnsafe().storeFence();
// txn check
txMem.putLong(TX_OFFSET_TXN_CHECK, INITIAL_TXN);
txMem.putLong(TX_OFFSET_TXN_CHECK, txn);
// partition update count
txMem.putInt(getPartitionTableSizeOffset(symbolMapCount), 0);
......
......@@ -95,6 +95,7 @@ public class TableWriter implements Closeable {
private long fixedRowCount = 0;
private long txn;
private long structVersion;
private long dataVersion;
private RowFunction rowFunction = openPartitionFunction;
private long prevTimestamp;
private long txPrevTransientRowCount;
......@@ -614,10 +615,10 @@ public class TableWriter implements Closeable {
txPrevTransientRowCount = 0;
transientRowCount = 0;
fixedRowCount = 0;
txn = 0;
txn++;
txPartitionCount = 1;
TableUtils.resetTxn(txMem, metadata.getSymbolMapCount());
TableUtils.resetTxn(txMem, metadata.getSymbolMapCount(), txn, ++dataVersion);
try {
removeTodoFile();
} catch (CairoException err) {
......@@ -927,6 +928,7 @@ public class TableWriter implements Closeable {
this.txPrevTransientRowCount = this.transientRowCount;
this.fixedRowCount = txMem.getLong(TableUtils.TX_OFFSET_FIXED_ROW_COUNT);
this.maxTimestamp = txMem.getLong(TableUtils.TX_OFFSET_MAX_TIMESTAMP);
this.dataVersion = txMem.getLong(TX_OFFSET_DATA_VERSION);
this.structVersion = txMem.getLong(TableUtils.TX_OFFSET_STRUCT_VERSION);
this.prevTimestamp = this.maxTimestamp;
if (this.maxTimestamp > Long.MIN_VALUE || partitionBy == PartitionBy.NONE) {
......@@ -1669,7 +1671,11 @@ public class TableWriter implements Closeable {
if (partitionBy != PartitionBy.NONE) {
removePartitionDirectories();
}
TableUtils.resetTxn(txMem, metadata.getSymbolMapCount());
TableUtils.resetTxn(
txMem,
metadata.getSymbolMapCount(),
txMem.getLong(TX_OFFSET_TXN) + 1,
txMem.getLong(TX_OFFSET_DATA_VERSION) + 1);
removeTodoFile();
}
......
......@@ -44,66 +44,30 @@ public class BusyPollTest extends AbstractCairoTest {
private final static BindVariableService bindVariableService = new BindVariableService();
@Test
public void testBusyPollFromMidTable() throws Exception {
final int blobSize = 1024;
final int n = 1000;
final long timestampIncrement = 10000;
TestUtils.assertMemoryLeak(() -> {
try {
compiler.compile("create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by NONE", null);
try (TableWriter writer = engine.getWriter("xyz")) {
long ts = 0;
long addr = Unsafe.malloc(blobSize);
try {
Rnd rnd = new Rnd();
appendRecords(0, n, timestampIncrement, writer, ts, addr, rnd);
ts = n * timestampIncrement;
try (
TableReader reader = engine.getReader("xyz", TableUtils.ANY_TABLE_VERSION);
TableReaderIncrementalRecordCursor cursor = new TableReaderIncrementalRecordCursor()
) {
cursor.of(reader);
Assert.assertTrue(cursor.reload());
int count = 0;
Record record = cursor.getRecord();
while (cursor.hasNext()) {
Assert.assertEquals(n - count, record.getLong(2));
count++;
}
public void testBusyPollByDay() throws Exception {
testBusyPollFromMidTable(PartitionBy.DAY, 3000000000L);
}
Assert.assertFalse(cursor.reload());
Assert.assertFalse(cursor.hasNext());
@Test
public void testBusyPollByMonth() throws Exception {
testBusyPollFromMidTable(PartitionBy.MONTH, 50000000000L);
}
appendRecords(n, n, timestampIncrement, writer, ts, addr, rnd);
Assert.assertTrue(cursor.reload());
@Test
public void testBusyPollByNone() throws Exception {
testBusyPollFromMidTable(PartitionBy.NONE, 10000L);
}
count = 0;
while (cursor.hasNext()) {
Assert.assertEquals(n + n - count, record.getLong(2));
count++;
}
}
} finally {
Unsafe.free(addr, blobSize);
}
}
} finally {
engine.releaseAllReaders();
engine.releaseAllWriters();
}
});
@Test
public void testBusyPollByYear() throws Exception {
testBusyPollFromMidTable(PartitionBy.YEAR, 365 * 50000000000L);
}
@Test
public void testByDay() throws Exception {
testBusyPoll(
0L,
10000000,
300_000,
128,
"create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by DAY"
);
}
......@@ -111,10 +75,8 @@ public class BusyPollTest extends AbstractCairoTest {
@Test
public void testByMonth() throws Exception {
testBusyPoll(
0L,
40000000,
300_000,
128,
"create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by MONTH"
);
}
......@@ -122,10 +84,8 @@ public class BusyPollTest extends AbstractCairoTest {
@Test
public void testByYear() throws Exception {
testBusyPoll(
0L,
480000000,
300_000,
128,
"create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by YEAR"
);
}
......@@ -133,10 +93,8 @@ public class BusyPollTest extends AbstractCairoTest {
@Test
public void testNonPartitioned() throws Exception {
testBusyPoll(
0L,
10000,
3_000_000,
128,
"create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by NONE"
);
}
......@@ -156,7 +114,7 @@ public class BusyPollTest extends AbstractCairoTest {
}
}
private void testBusyPoll(long timestamp, long timestampIncrement, int n, int blobSize, String createStatement) throws Exception {
private void testBusyPoll(long timestampIncrement, int n, String createStatement) throws Exception {
TestUtils.assertMemoryLeak(() -> {
compiler.compile(createStatement, bindVariableService);
final AtomicInteger errorCount = new AtomicInteger();
......@@ -166,24 +124,24 @@ public class BusyPollTest extends AbstractCairoTest {
new Thread(() -> {
try (TableWriter writer = engine.getWriter("xyz")) {
barrier.await();
long ts = timestamp;
long addr = Unsafe.malloc(blobSize);
long ts = (long) 0;
long addr = Unsafe.malloc(128);
try {
Rnd rnd = new Rnd();
for (int i = 0; i < n; i++) {
TableWriter.Row row = writer.newRow(ts);
row.putInt(0, i);
for (int k = 0; k < blobSize; k++) {
for (int k = 0; k < 128; k++) {
Unsafe.getUnsafe().putByte(addr + k, rnd.nextByte());
}
row.putBin(1, addr, blobSize);
row.putBin(1, addr, 128);
row.putLong(2, rnd.nextLong());
row.append();
writer.commit();
ts += timestampIncrement;
}
} finally {
Unsafe.free(addr, blobSize);
Unsafe.free(addr, 128);
}
} catch (Throwable e) {
e.printStackTrace();
......@@ -206,7 +164,7 @@ public class BusyPollTest extends AbstractCairoTest {
while (cursor.hasNext()) {
Assert.assertEquals(count, record.getInt(0));
BinarySequence binarySequence = record.getBin(1);
for (int i = 0; i < blobSize; i++) {
for (int i = 0; i < 128; i++) {
Assert.assertEquals(rnd.nextByte(), binarySequence.byteAt(i));
}
Assert.assertEquals(rnd.nextLong(), record.getLong(2));
......@@ -230,4 +188,70 @@ public class BusyPollTest extends AbstractCairoTest {
}
});
}
private void testBusyPollFromMidTable(int partitionBy, long timestampIncrement) throws Exception {
final int blobSize = 1024;
final int n = 1000;
TestUtils.assertMemoryLeak(() -> {
try {
compiler.compile("create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by " + PartitionBy.toString(partitionBy), null);
try (TableWriter writer = engine.getWriter("xyz")) {
long ts = 0;
long addr = Unsafe.malloc(blobSize);
try {
Rnd rnd = new Rnd();
appendRecords(0, n, timestampIncrement, writer, ts, addr, rnd);
ts = n * timestampIncrement;
try (
TableReader reader = engine.getReader("xyz", TableUtils.ANY_TABLE_VERSION);
TableReaderIncrementalRecordCursor cursor = new TableReaderIncrementalRecordCursor()
) {
cursor.of(reader);
Assert.assertTrue(cursor.reload());
int count = 0;
Record record = cursor.getRecord();
while (cursor.hasNext()) {
Assert.assertEquals(n - count, record.getLong(2));
count++;
}
Assert.assertFalse(cursor.reload());
Assert.assertFalse(cursor.hasNext());
appendRecords(n, n, timestampIncrement, writer, ts, addr, rnd);
Assert.assertTrue(cursor.reload());
count = 0;
while (cursor.hasNext()) {
Assert.assertEquals(n + n - count, record.getLong(2));
count++;
}
writer.truncate();
Assert.assertTrue(cursor.reload());
Assert.assertFalse(cursor.hasNext());
appendRecords(n * 2, n / 2, timestampIncrement, writer, ts, addr, rnd);
Assert.assertTrue(cursor.reload());
count = 0;
while (cursor.hasNext()) {
Assert.assertEquals(n * 2 + n / 2 - count, record.getLong(2));
count++;
}
Assert.assertEquals(n / 2, count);
}
} finally {
Unsafe.free(addr, blobSize);
}
}
} finally {
engine.releaseAllReaders();
engine.releaseAllWriters();
}
});
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cairo;
import com.questdb.cairo.sql.Record;
import com.questdb.cairo.sql.RecordCursor;
import com.questdb.std.Rnd;
import com.questdb.std.Unsafe;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Test;
public class TableReaderReloadTest extends AbstractCairoTest {
@Test
public void testReloadTruncateByDay() {
testReloadAfterTruncate(PartitionBy.DAY, 3000000000L);
}
@Test
public void testReloadTruncateByMonth() {
testReloadAfterTruncate(PartitionBy.MONTH, 50000000000L);
}
@Test
public void testReloadTruncateByNone() {
testReloadAfterTruncate(PartitionBy.NONE, 1000000);
}
@Test
public void testReloadTruncateByYear() {
testReloadAfterTruncate(PartitionBy.YEAR, 365 * 50000000000L);
}
private void assertTable(Rnd rnd, long buffer, RecordCursor cursor, Record record) {
while (cursor.hasNext()) {
Assert.assertEquals(rnd.nextInt(), record.getInt(0));
Assert.assertEquals(rnd.nextShort(), record.getShort(1));
Assert.assertEquals(rnd.nextByte(), record.getByte(2));
Assert.assertEquals(rnd.nextDouble2(), record.getDouble(3), 0.00001);
Assert.assertEquals(rnd.nextFloat2(), record.getFloat(4), 0.00001);
Assert.assertEquals(rnd.nextLong(), record.getLong(5));
TestUtils.assertEquals(rnd.nextChars(3), record.getStr(6));
TestUtils.assertEquals(rnd.nextChars(2), record.getSym(7));
Assert.assertEquals(rnd.nextBoolean(), record.getBool(8));
rnd.nextChars(buffer, 1024 / 2);
Assert.assertEquals(rnd.nextLong(), record.getDate(10));
}
}
private void populateTable(Rnd rnd, long buffer, long timestamp, long increment, TableWriter writer) {
for (int i = 0; i < 100; i++) {
TableWriter.Row row = writer.newRow(timestamp);
row.putInt(0, rnd.nextInt());
row.putShort(1, rnd.nextShort());
row.putByte(2, rnd.nextByte());
row.putDouble(3, rnd.nextDouble2());
row.putFloat(4, rnd.nextFloat2());
row.putLong(5, rnd.nextLong());
row.putStr(6, rnd.nextChars(3));
row.putSym(7, rnd.nextChars(2));
row.putBool(8, rnd.nextBoolean());
rnd.nextChars(buffer, 1024 / 2);
row.putBin(9, buffer, 1024);
row.putDate(10, rnd.nextLong());
row.append();
timestamp += increment;
}
writer.commit();
}
private void testReloadAfterTruncate(int partitionBy, long increment) {
final Rnd rnd = new Rnd();
final int bufferSize = 1024;
long buffer = Unsafe.malloc(bufferSize);
try (TableModel model = CairoTestUtils.getAllTypesModel(configuration, partitionBy)) {
model.timestamp();
CairoTestUtils.create(model);
}
long timestamp = 0;
try (TableWriter writer = new TableWriter(configuration, "all")) {
try (TableReader reader = new TableReader(configuration, "all")) {
Assert.assertFalse(reader.reload());
}
populateTable(rnd, buffer, timestamp, increment, writer);
rnd.reset();
try (TableReader reader = new TableReader(configuration, "all")) {
RecordCursor cursor = reader.getCursor();
final Record record = cursor.getRecord();
assertTable(rnd, buffer, cursor, record);
writer.truncate();
Assert.assertTrue(reader.reload());
cursor = reader.getCursor();
Assert.assertFalse(cursor.hasNext());
rnd.reset();
populateTable(rnd, buffer, timestamp, increment, writer);
Assert.assertTrue(reader.reload());
rnd.reset();
cursor = reader.getCursor();
assertTable(rnd, buffer, cursor, record);
}
}
}
}
......@@ -61,7 +61,7 @@ public class JsonLexerTest {
lexer.parseLast();
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("String is too long", e.getMessage());
TestUtils.assertEquals("String is too long", e.getFlyweightMessage());
Assert.assertEquals(41, e.getPosition());
}
} finally {
......@@ -134,9 +134,9 @@ public class JsonLexerTest {
@Test
public void testInvalidUtf8Value() {
byte bytesA[] = "{\"x\":\"违法违,控网站漏洞风\", \"y\":\"站漏洞风".getBytes(StandardCharsets.UTF_8);
byte bytesB[] = {-116, -76, -55, 55, -34, 0, -11, 15, 13};
byte bytesC[] = "\"}".getBytes(StandardCharsets.UTF_8);
byte[] bytesA = "{\"x\":\"违法违,控网站漏洞风\", \"y\":\"站漏洞风".getBytes(StandardCharsets.UTF_8);
byte[] bytesB = {-116, -76, -55, 55, -34, 0, -11, 15, 13};
byte[] bytesC = "\"}".getBytes(StandardCharsets.UTF_8);
byte[] bytes = new byte[bytesA.length + bytesB.length + bytesC.length];
System.arraycopy(bytesA, 0, bytes, 0, bytesA.length);
......@@ -159,7 +159,7 @@ public class JsonLexerTest {
LEXER.parseLast();
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Unsupported encoding", e.getMessage());
TestUtils.assertEquals("Unsupported encoding", e.getFlyweightMessage());
Assert.assertEquals(43, e.getPosition());
}
}
......@@ -679,7 +679,7 @@ public class JsonLexerTest {
lexer.parseLast();
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals(expected, e.getMessage());
TestUtils.assertEquals(expected, e.getFlyweightMessage());
Assert.assertEquals(expectedPosition, e.getPosition());
}
}
......@@ -693,7 +693,7 @@ public class JsonLexerTest {
}
private void assertThat(String expected, String input, boolean recordPositions) throws Exception {
byte bytes[] = input.getBytes(StandardCharsets.UTF_8);
byte[] bytes = input.getBytes(StandardCharsets.UTF_8);
int len = bytes.length;
long address = Unsafe.malloc(len);
for (int i = 0; i < len; i++) {
......
......@@ -61,16 +61,13 @@ public class JsonSchemaParserTest {
@Test
public void testArrayProperty() {
String in = "[\n" +
"{\"name\": \"x\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\", \"locale\": []}\n" +
"]";
try {
parseMetadata(in);
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Unexpected array", e.getMessage());
Assert.assertEquals(62, e.getPosition());
}
assertFailure(
"[\n" +
"{\"name\": \"x\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\", \"locale\": []}\n" +
"]",
62,
"Unexpected array"
);
}
@Test
......@@ -103,85 +100,77 @@ public class JsonSchemaParserTest {
@Test
public void testMissingName() {
String in = "[\n" +
"{\"name\": \"x\", \"type\": \"INT\", \"pattern\":\"xyz\", \"locale\": \"en-GB\"},\n" +
"{\"type\": \"DOUBLE\", \"pattern\":\"xyz\"}\n" +
"]";
try {
parseMetadata(in);
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Missing 'name' property", e.getMessage());
Assert.assertEquals(103, e.getPosition());
}
assertFailure(
"[\n" +
"{\"name\": \"x\", \"type\": \"INT\", \"pattern\":\"xyz\", \"locale\": \"en-GB\"},\n" +
"{\"type\": \"DOUBLE\", \"pattern\":\"xyz\"}\n" +
"]",
103,
"Missing 'name' property"
);
}
@Test
public void testMissingType() {
String in = "[\n" +
"{\"name\": \"x\", \"pattern\":\"xyz\", \"locale\": \"en-GB\"},\n" +
"{\"name\": \"y\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\"}\n" +
"]";
try {
parseMetadata(in);
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Missing 'type' property", e.getMessage());
Assert.assertEquals(51, e.getPosition());
}
assertFailure(
"[\n" +
"{\"name\": \"x\", \"pattern\":\"xyz\", \"locale\": \"en-GB\"},\n" +
"{\"name\": \"y\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\"}\n" +
"]",
51,
"Missing 'type' property"
);
}
@Test
public void testNonArray() {
try {
parseMetadata("{}");
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Unexpected object", e.getMessage());
Assert.assertEquals(1, e.getPosition());
}
assertFailure(
"{}",
1,
"Unexpected object"
);
}
@Test
public void testNonObjectArrayMember() {
String in = "[2,\n" +
"{\"name\": \"x\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\"}\n" +
"]";
try {
parseMetadata(in);
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Must be an object", e.getMessage());
Assert.assertEquals(2, e.getPosition());
}
assertFailure(
"[2,\n" +
"{\"name\": \"x\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\"}\n" +
"]",
2,
"Must be an object"
);
}
@Test
public void testWrongDateLocale() {
String in = "[\n" +
"{\"name\": \"x\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\", \"locale\": \"enk\"}\n" +
"]";
try {
parseMetadata(in);
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Invalid date locale", e.getMessage());
Assert.assertEquals(63, e.getPosition());
}
assertFailure(
"[\n" +
"{\"name\": \"x\", \"type\": \"DOUBLE\", \"pattern\":\"xyz\", \"locale\": \"enk\"}\n" +
"]",
63,
"Invalid date locale"
);
}
@Test
public void testWrongType() {
String in = "[\n" +
"{\"name\": \"y\", \"type\": \"ABC\", \"pattern\":\"xyz\"}\n" +
"]";
assertFailure(
"[\n" +
"{\"name\": \"y\", \"type\": \"ABC\", \"pattern\":\"xyz\"}\n" +
"]",
26,
"Invalid type"
);
}
private void assertFailure(String in, int position, CharSequence text) {
try {
parseMetadata(in);
Assert.fail();
} catch (JsonException e) {
Assert.assertEquals("Invalid type", e.getMessage());
Assert.assertEquals(26, e.getPosition());
TestUtils.assertEquals(text, e.getFlyweightMessage());
Assert.assertEquals(position, e.getPosition());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册