提交 1d80e507 编写于 作者: V Vlad Ilyushchenko

read performance tweak, refactoring query system to accomodate aggregation

上级 ca1512b8
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -46,7 +46,6 @@ import java.io.Closeable;
import java.io.File;
import java.io.FileFilter;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.*;
public class Journal<T> implements Iterable<T>, Closeable {
......@@ -362,45 +361,6 @@ public class Journal<T> implements Iterable<T>, Closeable {
return (T) getMetadata().newObject();
}
public void clearObject(T obj) {
for (int i = 0; i < metadata.getColumnCount(); i++) {
com.nfsdb.journal.factory.configuration.ColumnMetadata m = metadata.getColumnMetadata(i);
switch (m.type) {
case BOOLEAN:
Unsafe.getUnsafe().putBoolean(obj, m.offset, false);
break;
case BYTE:
Unsafe.getUnsafe().putByte(obj, m.offset, (byte) 0);
break;
case DOUBLE:
Unsafe.getUnsafe().putDouble(obj, m.offset, 0d);
break;
case INT:
Unsafe.getUnsafe().putInt(obj, m.offset, 0);
break;
case SHORT:
Unsafe.getUnsafe().putShort(obj, m.offset, (short) 0);
break;
case LONG:
case DATE:
Unsafe.getUnsafe().putLong(obj, m.offset, 0L);
break;
case STRING:
case SYMBOL:
Unsafe.getUnsafe().putObject(obj, m.offset, null);
break;
case BINARY:
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, m.offset);
if (buf != null) {
buf.clear();
}
break;
default:
throw new JournalRuntimeException("Unsupported type: " + m.type);
}
}
}
/**
* Get the specified column's metadata (static information).
*
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -214,12 +214,8 @@ public class Partition<T> implements Iterable<T>, Closeable {
public void read(long localRowID, T obj) {
for (int i = 0; i < columnCount; i++) {
if (journal.getInactiveColumns().get(i)) {
continue;
}
Journal.ColumnMetadata m = journal.columnMetadata[i];
if (m.meta.offset == 0) {
Journal.ColumnMetadata m;
if (journal.getInactiveColumns().get(i) || (m = journal.columnMetadata[i]).meta.offset == 0) {
continue;
}
......@@ -247,34 +243,34 @@ public class Partition<T> implements Iterable<T>, Closeable {
Unsafe.getUnsafe().putObject(obj, m.meta.offset, ((VariableColumn) columns[i]).getStr(localRowID));
break;
case SYMBOL:
int symbolIndex = ((FixedColumn) columns[i]).getInt(localRowID);
// check if symbol was null
if (symbolIndex > SymbolTable.VALUE_IS_NULL) {
Unsafe.getUnsafe().putObject(obj, m.meta.offset, m.symbolTable.value(symbolIndex));
}
Unsafe.getUnsafe().putObject(obj, m.meta.offset, m.symbolTable.value(((FixedColumn) columns[i]).getInt(localRowID)));
break;
case BINARY:
int size = ((VariableColumn) columns[i]).getBinSize(localRowID);
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, m.meta.offset);
if (size == -1) {
if (buf != null) {
buf.clear();
}
} else {
if (buf == null || buf.capacity() < size) {
buf = ByteBuffer.allocate(size);
Unsafe.getUnsafe().putObject(obj, m.meta.offset, buf);
}
readBin(localRowID, obj, i, m);
if (buf.remaining() < size) {
buf.rewind();
}
buf.limit(size);
((VariableColumn) columns[i]).getBin(localRowID, buf);
buf.flip();
}
}
}
}
private void readBin(long localRowID, T obj, int i, Journal.ColumnMetadata m) {
int size = ((VariableColumn) columns[i]).getBinSize(localRowID);
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, m.meta.offset);
if (size == -1) {
if (buf != null) {
buf.clear();
}
} else {
if (buf == null || buf.capacity() < size) {
buf = ByteBuffer.allocate(size);
Unsafe.getUnsafe().putObject(obj, m.meta.offset, buf);
}
if (buf.remaining() < size) {
buf.rewind();
}
buf.limit(size);
((VariableColumn) columns[i]).getBin(localRowID, buf);
buf.flip();
}
}
......@@ -491,10 +487,28 @@ public class Partition<T> implements Iterable<T>, Closeable {
}
break;
case STRING:
appendStr(obj, i, meta);
String s = (String) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
long offset = ((VariableColumn) columns[i]).putStr(s);
if (meta.meta.indexed) {
sparseIndexProxies[i].getIndex().add(
s == null ? SymbolTable.VALUE_IS_NULL : Checksum.hash(s, meta.meta.distinctCountHint)
, offset
);
}
break;
case SYMBOL:
appendSym(obj, i, meta);
int key;
String sym = (String) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
if (sym == null) {
key = SymbolTable.VALUE_IS_NULL;
} else {
key = meta.symbolTable.put(sym);
}
if (meta.meta.indexed) {
sparseIndexProxies[i].getIndex().add(key, ((FixedColumn) columns[i]).putInt(key));
} else {
((FixedColumn) columns[i]).putInt(key);
}
break;
case BINARY:
appendBin(obj, i, meta);
......@@ -523,32 +537,6 @@ public class Partition<T> implements Iterable<T>, Closeable {
}
}
private void appendSym(T obj, int i, Journal.ColumnMetadata meta) throws JournalException {
int key;
String sym = (String) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
if (sym == null) {
key = SymbolTable.VALUE_IS_NULL;
} else {
key = meta.symbolTable.put(sym);
}
if (meta.meta.indexed) {
sparseIndexProxies[i].getIndex().add(key, ((FixedColumn) columns[i]).putInt(key));
} else {
((FixedColumn) columns[i]).putInt(key);
}
}
private void appendStr(T obj, int i, Journal.ColumnMetadata meta) throws JournalException {
String s = (String) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
long offset = ((VariableColumn) columns[i]).putStr(s);
if (meta.meta.indexed) {
sparseIndexProxies[i].getIndex().add(
s == null ? SymbolTable.VALUE_IS_NULL : Checksum.hash(s, meta.meta.distinctCountHint)
, offset
);
}
}
private FixedColumn getFixedWidthColumn(int i) {
checkColumnIndex(i);
return (FixedColumn) columns[i];
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -140,9 +140,14 @@ public class SymbolTable implements Closeable {
}
public String value(int key) {
if (key == VALUE_IS_NULL) {
return null;
}
if (key >= size) {
throw new JournalRuntimeException("Invalid symbol key: " + key);
}
String value = key < keyCache.size() ? keyCache.get(key) : null;
if (value == null) {
cache(key, value = data.getStr(key));
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,7 +16,6 @@
package com.nfsdb.journal.export;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import com.nfsdb.journal.lang.cst.DataRow;
import com.nfsdb.journal.utils.Dates;
import com.nfsdb.journal.utils.Numbers;
......@@ -68,8 +67,8 @@ public class JournalEntryPrinter {
case BOOLEAN:
sink.put(e.getBool(i) ? "true" : "false");
break;
default:
throw new JournalRuntimeException("Unsupported type: " + e.getColumnType(i));
// default:
// throw new JournalRuntimeException("Unsupported type: " + e.getColumnType(i));
}
sink.put('\t');
}
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -44,10 +44,10 @@ public class KVIndex implements Closeable {
*/
private static final int ENTRY_SIZE = 16;
private final IndexCursor cachedCursor = new IndexCursor();
int rowBlockSize;
int rowBlockLen;
long firstEntryOffset;
private final IndexCursor cachedCursor = new IndexCursor();
private MappedFileImpl kData;
// storage for rows
// block structure is [ rowid1, rowid2 ..., rowidn, prevBlockOffset]
......@@ -104,8 +104,6 @@ public class KVIndex implements Closeable {
}
long keyOffset = getKeyOffset(key);
long rowBlockOffset;
long rowCount;
if (keyOffset >= firstEntryOffset + keyBlockSize) {
long oldSize = keyBlockSize;
......@@ -126,8 +124,8 @@ public class KVIndex implements Closeable {
}
long address = kData.getAddress(keyOffset, ENTRY_SIZE);
rowBlockOffset = Unsafe.getUnsafe().getLong(address);
rowCount = Unsafe.getUnsafe().getLong(address + 8);
long rowBlockOffset = Unsafe.getUnsafe().getLong(address);
long rowCount = Unsafe.getUnsafe().getLong(address + 8);
int cellIndex = (int) (rowCount % rowBlockLen);
if (rowBlockOffset == 0 || cellIndex == 0) {
......
......@@ -49,7 +49,6 @@ public class JournalBufferedIterator<T> extends AbstractImmutableIterator<T> imp
@Override
public T next() {
journal.clearObject(obj);
partition.read(currentRowID, obj);
if (currentRowID < currentUpperBound) {
currentRowID++;
......@@ -64,7 +63,6 @@ public class JournalBufferedIterator<T> extends AbstractImmutableIterator<T> imp
public T peekLast() {
JournalIteratorRange w = ranges.get(ranges.size() - 1);
try {
journal.clearObject(obj);
journal.read(Rows.toRowID(w.partitionID, w.hi), obj);
return obj;
} catch (JournalException e) {
......@@ -76,7 +74,6 @@ public class JournalBufferedIterator<T> extends AbstractImmutableIterator<T> imp
public T peekFirst() {
JournalIteratorRange w = ranges.get(0);
try {
journal.clearObject(obj);
journal.read(Rows.toRowID(w.partitionID, w.lo), obj);
return obj;
} catch (JournalException e) {
......
......@@ -42,11 +42,11 @@ public class JournalConcurrentIterator<T> extends AbstractConcurrentIterator<T>
protected Runnable getRunnable() {
return new Runnable() {
boolean hasNext = true;
private int currentIndex = 0;
private long currentRowID;
private long currentUpperBound;
private int currentPartitionID;
boolean hasNext = true;
@Override
public void run() {
......@@ -57,7 +57,6 @@ public class JournalConcurrentIterator<T> extends AbstractConcurrentIterator<T>
Holder<T> holder = buffer.get(outSeq);
boolean hadNext = hasNext;
if (hadNext) {
journal.clearObject(holder.object);
journal.read(Rows.toRowID(currentPartitionID, currentRowID), holder.object);
if (currentRowID < currentUpperBound) {
currentRowID++;
......
......@@ -49,11 +49,9 @@ public class JournalRowBufferedIterator<T> extends AbstractImmutableIterator<Jou
@Override
public JournalRow<T> next() {
try {
T obj = row.getObject();
journal.clearObject(obj);
long rowID = Rows.toRowID(currentPartitionID, currentRowID);
row.setRowID(rowID);
journal.read(rowID, obj);
journal.read(rowID, row.getObject());
if (currentRowID < currentUpperBound) {
currentRowID++;
} else {
......
......@@ -72,7 +72,6 @@ public class PartitionBufferedIterator<T> extends AbstractImmutableIterator<T> i
if (!partition.isOpen()) {
partition.open();
}
partition.getJournal().clearObject(obj);
partition.read(localRowID, obj);
return obj;
} catch (JournalException e) {
......
......@@ -54,7 +54,6 @@ public class PartitionConcurrentIterator<T> extends AbstractConcurrentIterator<T
long seq = buffer.next();
Holder<T> holder = buffer.get(seq);
partition.getJournal().clearObject(holder.object);
partition.read(i, holder.object);
buffer.publish(seq);
} catch (JournalException e) {
......
......@@ -26,13 +26,11 @@ public class ResultSetBufferedIterator<T> extends AbstractImmutableIterator<T> i
private final ResultSet<T> rs;
private final T obj;
private final Journal<T> journal;
private int cursor = 0;
public ResultSetBufferedIterator(ResultSet<T> rs) {
this.rs = rs;
this.obj = rs.getJournal().newObject();
this.journal = rs.getJournal();
}
@Override
......@@ -67,7 +65,6 @@ public class ResultSetBufferedIterator<T> extends AbstractImmutableIterator<T> i
private T get(int rsIndex) {
try {
journal.clearObject(obj);
rs.read(rsIndex, obj);
return obj;
} catch (JournalException e) {
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -48,7 +48,6 @@ public class ResultSetConcurrentIterator<T> extends AbstractConcurrentIterator<T
long seq = buffer.next();
Holder<T> holder = buffer.get(seq);
getJournal().clearObject(holder.object);
rs.read(i, holder.object);
buffer.publish(seq);
} catch (JournalException e) {
......
......@@ -24,6 +24,8 @@ import java.io.OutputStream;
public abstract class AbstractDataRow implements DataRow {
private ColumnType[] types;
private DataRow slave;
@Override
public byte get(String column) {
......@@ -85,4 +87,14 @@ public abstract class AbstractDataRow implements DataRow {
}
protected abstract ColumnType getColumnTypeInternal(int x);
@Override
public DataRow getSlave() {
return slave;
}
public void setSlave(DataRow slave) {
this.slave = slave;
}
}
......@@ -26,7 +26,6 @@ import java.io.OutputStream;
public class JournalEntry extends AbstractDataRow {
public Partition<Object> partition;
public long rowid;
public JournalEntry slave;
@Override
public int getColumnIndex(String column) {
......@@ -88,11 +87,6 @@ public class JournalEntry extends AbstractDataRow {
return partition.getBin(rowid, col);
}
@Override
public DataRow getSlave() {
return slave;
}
@Override
public int getColumnCount() {
return partition.getJournal().getMetadata().getColumnCount();
......@@ -108,7 +102,7 @@ public class JournalEntry extends AbstractDataRow {
return "DataItem{" +
"partition=" + partition +
", rowid=" + rowid +
", slave=" + slave +
", slave=" + getSlave() +
'}';
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -36,7 +36,7 @@ public class InnerSkipJoin extends AbstractImmutableIterator<JournalEntry> imple
JournalEntry data;
while (delegate.hasNext()) {
if ((data = delegate.next()).slave != null) {
if ((data = delegate.next()).getSlave() != null) {
this.data = data;
return true;
}
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -52,10 +52,10 @@ public class SlaveResetOuterJoin extends AbstractImmutableIterator<JournalEntry>
}
if (nextSlave || slaveSource.hasNext()) {
joinedData.slave = slaveSource.next();
joinedData.setSlave(slaveSource.next());
nextSlave = slaveSource.hasNext();
} else {
joinedData.slave = null;
joinedData.setSlave(null);
nextSlave = false;
}
return joinedData;
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -93,10 +93,10 @@ public class SymbolToFrameOuterJoin extends AbstractImmutableIterator<JournalEnt
} catch (JournalException e) {
throw new JournalRuntimeException(e);
}
joinedData.slave = journalEntry;
joinedData.setSlave(journalEntry);
nextSlave = slaveCursor.hasNext();
} else {
joinedData.slave = null;
joinedData.setSlave(null);
nextSlave = false;
}
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -95,7 +95,7 @@ public class TimeSeriesJoin extends AbstractImmutableIterator<JournalEntry> impl
}
nextSlave = true;
joinedData.slave = data;
joinedData.setSlave(data);
nextData = joinedData;
return true;
}
......@@ -132,14 +132,14 @@ public class TimeSeriesJoin extends AbstractImmutableIterator<JournalEntry> impl
break;
}
joinedData.slave = s;
joinedData.setSlave(s);
nextData = joinedData;
return true;
}
}
if (!sl) {
joinedData.slave = null;
joinedData.setSlave(null);
nextData = joinedData;
return true;
}
......
......@@ -70,7 +70,6 @@ public class BinaryTest extends AbstractTest {
long t = System.nanoTime();
Band band = new Band();
for (int i = 0; i < count; i++) {
writer.clearObject(band);
band.setName(bands[Math.abs(r.nextInt() % bands.length)]);
band.setType(types[Math.abs(r.nextInt() % types.length)]);
band.setImage(bytes);
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,9 +19,10 @@ package com.nfsdb.journal.lang;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalConfigurationException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import com.nfsdb.journal.export.JournalEntryPrinter;
import com.nfsdb.journal.export.StringSink;
import com.nfsdb.journal.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.journal.lang.cst.EntrySource;
import com.nfsdb.journal.lang.cst.JournalEntry;
import com.nfsdb.journal.lang.cst.StatefulJournalSource;
import com.nfsdb.journal.lang.cst.impl.join.SlaveResetOuterJoin;
import com.nfsdb.journal.lang.cst.impl.jsrc.JournalSourceImpl;
......@@ -79,6 +80,11 @@ public class JoinStringToSymbolTest {
@Test
public void testOuterOneToOneHead() throws Exception {
final String expected = "band1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband1\thttp://new.band1.com\tjazz\t\t\n" +
"band1\talbum BZ\trock\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband1\thttp://new.band1.com\tjazz\t\t\n" +
"band3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -111,29 +117,10 @@ public class JoinStringToSymbolTest {
);
int count = 0;
for (JournalEntry d : src) {
Album a = (Album) d.partition.read(d.rowid);
Band b = null;
if (d.slave != null) {
b = (Band) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
case 1:
Assert.assertNotNull(b);
Assert.assertEquals(a.getBand(), b.getName());
Assert.assertEquals("http://new.band1.com", b.getUrl());
break;
case 2:
Assert.assertNotNull(b);
Assert.assertEquals(a.getBand(), b.getName());
break;
default:
Assert.fail("expected 3 rows");
}
}
StringSink sink = new StringSink();
JournalEntryPrinter p = new JournalEntryPrinter(sink, true);
p.print(src);
Assert.assertEquals(expected, sink.toString());
}
}
......@@ -20,11 +20,10 @@ import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalConfigurationException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import com.nfsdb.journal.export.FlexBufferSink;
import com.nfsdb.journal.export.JournalEntryPrinter;
import com.nfsdb.journal.export.StringSink;
import com.nfsdb.journal.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.journal.lang.cst.EntrySource;
import com.nfsdb.journal.lang.cst.JournalEntry;
import com.nfsdb.journal.lang.cst.StatefulJournalSource;
import com.nfsdb.journal.lang.cst.impl.dfrm.MapHeadDataFrameSource;
import com.nfsdb.journal.lang.cst.impl.join.InnerSkipJoin;
......@@ -47,14 +46,12 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
public class JoinSymbolOnSymbolTest {
@Rule
public final JournalTestFactory factory;
private final JournalEntryPrinter out;
private final StringSink sink = new StringSink();
private final JournalEntryPrinter out = new JournalEntryPrinter(sink, true);
private JournalWriter<Band> bw;
private JournalWriter<Album> aw;
......@@ -80,7 +77,7 @@ public class JoinSymbolOnSymbolTest {
throw new JournalRuntimeException(e);
}
out = new JournalEntryPrinter(new FlexBufferSink(new FileOutputStream(FileDescriptor.out).getChannel()), false);
// out = new JournalEntryPrinter(new FlexBufferSink(new FileOutputStream(FileDescriptor.out).getChannel()), false);
}
@Before
......@@ -91,6 +88,12 @@ public class JoinSymbolOnSymbolTest {
@Test
public void testOuterOneToOne() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -103,35 +106,18 @@ public class JoinSymbolOnSymbolTest {
aw.commit();
// from band outer join album
EntrySource src = buildSource(bw, aw);
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Band b = (Band) d.partition.read(d.rowid);
Album a = null;
if (d.slave != null) {
a = (Album) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
case 2:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
break;
case 1:
Assert.assertNull(a);
Assert.assertEquals("band2", b.getName());
break;
default:
Assert.fail("Do not expect more than 3 rows");
}
}
out.print(buildSource(bw, aw));
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testOuterOneToMany() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\trock\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -146,43 +132,17 @@ public class JoinSymbolOnSymbolTest {
// from band outer join album
// this is data-driven one to many
EntrySource src = buildSource(bw, aw);
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Band b = (Band) d.partition.read(d.rowid);
Album a = null;
if (d.slave != null) {
a = (Album) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
case 1:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("band1", b.getName());
break;
case 2:
Assert.assertNull(a);
Assert.assertEquals("band2", b.getName());
break;
case 3:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("band3", b.getName());
break;
default:
Assert.fail("expect 4 rows");
}
}
out.print(buildSource(bw, aw));
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testOuterOneToOneHead() throws Exception {
final String expected = "band1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband1\thttp://new.band1.com\tjazz\t\t\n" +
"band1\talbum BZ\trock\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband1\thttp://new.band1.com\tjazz\t\t\n" +
"band3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -213,29 +173,7 @@ public class JoinSymbolOnSymbolTest {
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Album a = (Album) d.partition.read(d.rowid);
Band b = null;
if (d.slave != null) {
b = (Band) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
case 1:
Assert.assertNotNull(b);
Assert.assertEquals(a.getBand(), b.getName());
Assert.assertEquals("http://new.band1.com", b.getUrl());
break;
case 2:
Assert.assertNotNull(b);
Assert.assertEquals(a.getBand(), b.getName());
break;
default:
Assert.fail("expected 3 rows");
}
}
Assert.assertEquals(expected, sink.toString());
}
/**
......@@ -247,6 +185,12 @@ public class JoinSymbolOnSymbolTest {
*/
@Test
public void testOuterOneToManyHead() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -282,44 +226,17 @@ public class JoinSymbolOnSymbolTest {
);
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Band b = (Band) d.partition.read(d.rowid);
Album a = null;
if (d.slave != null) {
a = (Album) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album BZ", a.getName());
Assert.assertEquals("pop", a.getGenre());
break;
case 1:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album X", a.getName());
break;
case 2:
Assert.assertNull(a);
Assert.assertEquals("band2", b.getName());
break;
case 3:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("band3", b.getName());
break;
default:
Assert.fail("expected 4 rows");
}
}
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testOuterOneToManyMapHead() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tband2\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -355,46 +272,16 @@ public class JoinSymbolOnSymbolTest {
);
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Band b = (Band) d.partition.read(d.rowid);
Album a = null;
if (d.slave != null) {
a = (Album) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album X", a.getName());
Assert.assertEquals("pop", a.getGenre());
break;
case 1:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album BZ", a.getName());
Assert.assertEquals("pop", a.getGenre());
break;
case 2:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album Y", a.getName());
Assert.assertEquals("band2", b.getName());
break;
case 3:
Assert.assertNull(a);
Assert.assertEquals("band3", b.getName());
break;
default:
Assert.fail("expected 4 rows");
}
}
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testInnerOneToManyMapHead() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tband2\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -433,42 +320,16 @@ public class JoinSymbolOnSymbolTest {
);
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Band b = (Band) d.partition.read(d.rowid);
Album a = null;
if (d.slave != null) {
a = (Album) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album X", a.getName());
Assert.assertEquals("pop", a.getGenre());
break;
case 1:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album BZ", a.getName());
Assert.assertEquals("pop", a.getGenre());
break;
case 2:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album Y", a.getName());
Assert.assertEquals("band2", b.getName());
break;
default:
Assert.fail("expected 3 rows");
}
}
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testInnerOneToManyHead() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -506,41 +367,17 @@ public class JoinSymbolOnSymbolTest {
);
out.print(src);
int count = 0;
for (JournalEntry d : src) {
Band b = (Band) d.partition.read(d.rowid);
Album a = null;
if (d.slave != null) {
a = (Album) d.slave.partition.read(d.slave.rowid);
}
switch (count++) {
case 0:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album BZ", a.getName());
Assert.assertEquals("pop", a.getGenre());
break;
case 1:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("album X", a.getName());
break;
case 2:
Assert.assertNotNull(a);
Assert.assertEquals(b.getName(), a.getBand());
Assert.assertEquals("band3", b.getName());
break;
default:
Assert.fail("expected 3 rows");
}
}
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testInnerOneToManyHeadFilter() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\trock\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
......@@ -572,6 +409,7 @@ public class JoinSymbolOnSymbolTest {
);
out.print(src);
Assert.assertEquals(expected, sink.toString());
}
private EntrySource buildSource(Journal<Band> bw, Journal<Album> aw) {
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,9 +19,10 @@ package com.nfsdb.journal.lang;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalConfigurationException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import com.nfsdb.journal.export.JournalEntryPrinter;
import com.nfsdb.journal.export.StringSink;
import com.nfsdb.journal.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.journal.lang.cst.EntrySource;
import com.nfsdb.journal.lang.cst.JournalEntry;
import com.nfsdb.journal.lang.cst.impl.join.TimeSeriesJoin;
import com.nfsdb.journal.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.journal.lang.cst.impl.psrc.JournalPartitionSource;
......@@ -29,10 +30,7 @@ import com.nfsdb.journal.lang.cst.impl.rsrc.AllRowSource;
import com.nfsdb.journal.test.tools.JournalTestFactory;
import com.nfsdb.journal.utils.Files;
import com.nfsdb.journal.utils.Rnd;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.*;
public class TimeSeriesJoinTest {
......@@ -43,9 +41,7 @@ public class TimeSeriesJoinTest {
try {
factory = new JournalTestFactory(
new JournalConfigurationBuilder() {{
$(Ts.class)
.$ts()
;
$(Ts.class);
}}.build(Files.makeTempDir())
);
} catch (JournalConfigurationException e) {
......@@ -54,10 +50,11 @@ public class TimeSeriesJoinTest {
}
private static final StringSink sink = new StringSink();
private static final JournalEntryPrinter printer = new JournalEntryPrinter(sink, true);
private static JournalWriter<Ts> w1;
private static JournalWriter<Ts> w2;
@BeforeClass
public static void setUp() throws Exception {
w1 = factory.writer(Ts.class, "1");
......@@ -70,11 +67,11 @@ public class TimeSeriesJoinTest {
long t2 = t1;
for (int i = 0; i < 10; i++) {
t1 += rnd.nextPositiveInt() % 100;
ts.timestamp = t1;
ts.ts = t1;
w1.append(ts);
t2 += rnd.nextPositiveInt() % 100;
ts.timestamp = t2;
ts.ts = t2;
w2.append(ts);
}
......@@ -82,109 +79,92 @@ public class TimeSeriesJoinTest {
w2.commit();
}
@Before
public void setUp2() throws Exception {
sink.clear();
}
@Test
public void testJoinNoNulls() throws Exception {
String expected = "20~89\n" +
"20~128\n" +
"53~89\n" +
"53~128\n" +
"53~199\n" +
"54~89\n" +
"54~128\n" +
"54~199\n" +
"96~128\n" +
"96~199\n" +
"102~128\n" +
"102~199\n" +
"102~247\n" +
"118~128\n" +
"118~199\n" +
"118~247\n" +
"132~199\n" +
"132~247\n" +
"213~247\n" +
"213~319\n" +
"213~322\n" +
"213~334\n" +
"229~247\n" +
"229~319\n" +
"229~322\n" +
"229~334\n" +
"234~247\n" +
"234~319\n" +
"234~322\n" +
"234~334\n";
String expected = "20\t89\t\n" +
"20\t128\t\n" +
"53\t89\t\n" +
"53\t128\t\n" +
"53\t199\t\n" +
"54\t89\t\n" +
"54\t128\t\n" +
"54\t199\t\n" +
"96\t128\t\n" +
"96\t199\t\n" +
"102\t128\t\n" +
"102\t199\t\n" +
"102\t247\t\n" +
"118\t128\t\n" +
"118\t199\t\n" +
"118\t247\t\n" +
"132\t199\t\n" +
"132\t247\t\n" +
"213\t247\t\n" +
"213\t319\t\n" +
"213\t322\t\n" +
"213\t334\t\n" +
"229\t247\t\n" +
"229\t319\t\n" +
"229\t322\t\n" +
"229\t334\t\n" +
"234\t247\t\n" +
"234\t319\t\n" +
"234\t322\t\n" +
"234\t334\t\n";
// from w1 tj w2 depth 150
EntrySource src = new TimeSeriesJoin(
new JournalSourceImpl(new JournalPartitionSource(w1, true), new AllRowSource())
, w1.getMetadata().getTimestampColumnIndex()
,
new JournalSourceImpl(new JournalPartitionSource(w2, true), new AllRowSource())
, w2.getMetadata().getTimestampColumnIndex()
, 150
, 2 // trigger re-sizes to test ring expand formulas
);
StringBuilder builder = new StringBuilder();
for (JournalEntry d : src) {
builder.append(d.partition.getLong(d.rowid, 0));
builder.append("~");
if (d.slave == null) {
builder.append("null");
} else {
builder.append(d.slave.partition.getLong(d.slave.rowid, 0));
}
builder.append("\n");
}
Assert.assertEquals(expected, builder.toString());
printer.print(
new TimeSeriesJoin(
new JournalSourceImpl(new JournalPartitionSource(w1, true), new AllRowSource())
, 0
,
new JournalSourceImpl(new JournalPartitionSource(w2, true), new AllRowSource())
, 0
, 150
, 2 // trigger re-sizes to test ring expand formulas
)
);
// System.out.println(sink);
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testJoinWithNulls() throws Exception {
String expected = "20~null\n" +
"53~null\n" +
"54~null\n" +
"96~null\n" +
"102~null\n" +
"118~128\n" +
"132~null\n" +
"213~null\n" +
"229~null\n" +
"234~247\n";
String expected = "20\t\n" +
"53\t\n" +
"54\t\n" +
"96\t\n" +
"102\t\n" +
"118\t128\t\n" +
"132\t\n" +
"213\t\n" +
"229\t\n" +
"234\t247\t\n";
EntrySource src = new TimeSeriesJoin(
new JournalSourceImpl(new JournalPartitionSource(w1, true), new AllRowSource())
, w1.getMetadata().getTimestampColumnIndex()
, 0
,
new JournalSourceImpl(new JournalPartitionSource(w2, true), new AllRowSource())
, w2.getMetadata().getTimestampColumnIndex()
, 0
, 15
, 2 // trigger re-sizes to test ring expand formulas
);
StringBuilder builder = new StringBuilder();
for (JournalEntry d : src) {
builder.append(d.partition.getLong(d.rowid, 0));
builder.append("~");
if (d.slave == null) {
builder.append("null");
} else {
builder.append(d.slave.partition.getLong(d.slave.rowid, 0));
}
builder.append("\n");
}
Assert.assertEquals(expected, builder.toString());
printer.print(src);
Assert.assertEquals(expected, sink.toString());
}
@SuppressWarnings("unused")
public static class Ts {
private long timestamp;
private long ts;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册