提交 e9aff645 编写于 作者: V Vlad Ilyushchenko

(issue #6) refactored joins, replaced hash function with faster variant and...

(issue #6) refactored joins, replaced hash function with faster variant and improved release of direct memory.
上级 dfa90cbc
......@@ -18,16 +18,13 @@ package com.nfsdb.collections;
import com.nfsdb.utils.Unsafe;
import java.io.Closeable;
public class AbstractDirectList implements Closeable {
public class AbstractDirectList extends DirectMemory {
public static final int CACHE_LINE_SIZE = 64;
private final int pow2;
private final int onePow2;
protected long pos;
protected long start;
protected long limit;
private long address;
public AbstractDirectList(int pow2, long capacity) {
this.pow2 = pow2;
......@@ -100,12 +97,6 @@ public class AbstractDirectList implements Closeable {
return (int) ((pos - start) >> pow2);
}
public void free() {
if (address != 0) {
Unsafe.getUnsafe().freeMemory(address);
address = 0;
}
}
public void clear() {
clear((byte) 0);
......@@ -119,14 +110,4 @@ public class AbstractDirectList implements Closeable {
public void zero(byte v) {
Unsafe.getUnsafe().setMemory(start, limit - start + onePow2, v);
}
public void close() {
free();
}
@Override
protected void finalize() throws Throwable {
free();
super.finalize();
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.collections;
import com.nfsdb.utils.Unsafe;
import sun.misc.Cleaner;
import java.io.Closeable;
public class DirectMemory implements Closeable {
private final Cleaner cleaner = Cleaner.create(this, new Runnable() {
@Override
public void run() {
free0();
}
});
protected long address;
private void free0() {
if (address != 0) {
Unsafe.getUnsafe().freeMemory(address);
address = 0;
freeInternal();
}
}
protected void freeInternal() {
}
public void free() {
cleaner.clean();
}
@Override
public void close() {
free();
}
}
......@@ -14,90 +14,23 @@
* limitations under the License.
*/
/**
* This code is a port of xxHash algorithm by Yann Collet.
*
* Original code can be found here:
*
* https://code.google.com/p/lz4/source/browse/trunk/xxhash.c
*
*/
package com.nfsdb.collections;
import com.nfsdb.utils.Unsafe;
public class Hash {
private static final int PRIME32_1 = (int) 2654435761L;
private static final int PRIME32_2 = (int) 2246822519L;
private static final int PRIME32_3 = (int) 3266489917L;
private static final int PRIME32_4 = (int) 668265263L;
private static final int PRIME32_5 = (int) 374761393L;
public static int rotl(int value, int n) {
return (value >>> n) | (value << (32 - n));
}
public static int hashXX(long address, int len, int seed) {
int i32;
long p = address;
long l = address + len;
if (len >= 16) {
long limit = l - 16;
int v1 = seed + PRIME32_1 + PRIME32_2;
int v2 = seed + PRIME32_2;
int v3 = seed;
int v4 = seed - PRIME32_1;
do {
v1 += Unsafe.getUnsafe().getInt(p) * PRIME32_2;
v1 = rotl(v1, 13);
v1 *= PRIME32_1;
p += 4;
v2 += Unsafe.getUnsafe().getInt(p) * PRIME32_2;
v2 = rotl(v2, 13);
v2 *= PRIME32_1;
p += 4;
v3 += Unsafe.getUnsafe().getInt(p) * PRIME32_2;
v3 = rotl(v3, 13);
v3 *= PRIME32_1;
p += 4;
v4 += Unsafe.getUnsafe().getInt(p) * PRIME32_2;
v4 = rotl(v4, 13);
v4 *= PRIME32_1;
p += 4;
}
while (p <= limit);
i32 = rotl(v1, 1) + rotl(v2, 7) + rotl(v3, 12) + rotl(v4, 18);
} else {
i32 = seed + PRIME32_5;
public static int hash(long address, int len) {
int hash = 0;
long end = address + len;
while (end - address > 1) {
hash = (hash << 5) - hash + Unsafe.getUnsafe().getChar(address);
address += 2;
}
i32 += len;
while (p + 4 <= l) {
i32 += Unsafe.getUnsafe().getInt(p) * PRIME32_3;
i32 = rotl(i32, 17) * PRIME32_4;
p += 4;
}
while (p < l) {
i32 += Unsafe.getUnsafe().getByte(p) * PRIME32_5;
i32 = rotl(i32, 11) * PRIME32_1;
p++;
if (address < end) {
hash = (hash << 5) - hash + Unsafe.getUnsafe().getByte(address);
}
i32 ^= i32 >> 15;
i32 *= PRIME32_2;
i32 ^= i32 >> 13;
i32 *= PRIME32_3;
i32 ^= i32 >> 16;
return i32;
return hash < 0 ? -hash : hash;
}
}
......@@ -31,6 +31,7 @@ public final class MapMetadata implements RecordMetadata {
private final int columnCount;
private final ColumnType[] types;
private final SymbolTable[] symbolTables;
private final String[] columnNames;
public MapMetadata(List<ColumnMetadata> valueColumns, List<ColumnMetadata> keyColumns) {
......@@ -38,26 +39,24 @@ public final class MapMetadata implements RecordMetadata {
this.types = new ColumnType[columnCount];
this.nameCache = new ObjIntHashMap<>(columnCount);
this.symbolTables = new SymbolTable[columnCount];
this.columnNames = new String[columnCount];
int split = valueColumns.size();
for (int i = 0; i < split; i++) {
types[i] = valueColumns.get(i).type;
symbolTables[i] = valueColumns.get(i).symbolTable;
nameCache.put(valueColumns.get(i).name, i);
ColumnMetadata m = valueColumns.get(i);
types[i] = m.type;
symbolTables[i] = m.symbolTable;
nameCache.put(columnNames[i] = m.name, i);
}
for (int i = 0, sz = keyColumns.size(); i < sz; i++) {
types[split + i] = keyColumns.get(i).type;
symbolTables[split + i] = keyColumns.get(i).symbolTable;
nameCache.put(keyColumns.get(i).name, split + i);
ColumnMetadata m = keyColumns.get(i);
types[split + i] = m.type;
symbolTables[split + i] = m.symbolTable;
nameCache.put(columnNames[split + i] = m.name, split + i);
}
}
@Override
public RecordMetadata nextMetadata() {
return null;
}
@Override
public int getColumnCount() {
return columnCount;
......@@ -70,7 +69,6 @@ public final class MapMetadata implements RecordMetadata {
@Override
public int getColumnIndex(CharSequence name) {
int index = nameCache.get(name);
if (index == -1) {
throw new JournalRuntimeException("No such column: " + name);
......@@ -82,4 +80,9 @@ public final class MapMetadata implements RecordMetadata {
public SymbolTable getSymbolTable(int index) {
return symbolTables[index];
}
@Override
public String getColumnName(int index) {
return columnNames[index];
}
}
......@@ -19,7 +19,6 @@ package com.nfsdb.collections.mmap;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.export.CharSink;
import com.nfsdb.lang.cst.impl.qry.AbstractRecord;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.utils.Unsafe;
......@@ -115,11 +114,6 @@ public final class MapRecord extends AbstractRecord {
return Unsafe.getUnsafe().getShort(address0(index));
}
@Override
public Record getSlave() {
return null;
}
@Override
public byte get(int index) {
return Unsafe.getUnsafe().getByte(address0(index));
......
......@@ -16,22 +16,17 @@
package com.nfsdb.collections.mmap;
import com.nfsdb.collections.AbstractDirectList;
import com.nfsdb.collections.DirectLongList;
import com.nfsdb.collections.Hash;
import com.nfsdb.collections.Primes;
import com.nfsdb.collections.*;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.utils.Unsafe;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
public class MultiMap implements Closeable {
public class MultiMap extends DirectMemory {
private static final int seed = 0xdeadbeef;
private final float loadFactor;
private final Key key = new Key();
private final MapRecordSource recordSource;
......@@ -39,7 +34,6 @@ public class MultiMap implements Closeable {
private int keyBlockOffset;
private int keyDataOffset;
private DirectLongList offsets;
private long kAddress;
private long kStart;
private long kLimit;
private long kPos;
......@@ -49,8 +43,8 @@ public class MultiMap implements Closeable {
private MultiMap(int capacity, long dataSize, float loadFactor, List<ColumnMetadata> valueColumns, List<ColumnMetadata> keyColumns, List<MapRecordValueInterceptor> interceptors) {
this.loadFactor = loadFactor;
this.kAddress = Unsafe.getUnsafe().allocateMemory(dataSize + AbstractDirectList.CACHE_LINE_SIZE);
this.kStart = kPos = this.kAddress + (this.kAddress & (AbstractDirectList.CACHE_LINE_SIZE - 1));
this.address = Unsafe.getUnsafe().allocateMemory(dataSize + AbstractDirectList.CACHE_LINE_SIZE);
this.kStart = kPos = this.address + (this.address & (AbstractDirectList.CACHE_LINE_SIZE - 1));
this.kLimit = kStart + dataSize;
this.keyCapacity = Primes.next((int) (capacity / loadFactor));
......@@ -90,7 +84,7 @@ public class MultiMap implements Closeable {
public MapValues claimSlot(Key key) {
// calculate hash remembering "key" structure
// [ len | value block | key offset block | key data block ]
int index = Hash.hashXX(key.startAddr + keyBlockOffset, key.len - keyBlockOffset, seed) % keyCapacity;
int index = Hash.hash(key.startAddr + keyBlockOffset, key.len - keyBlockOffset) % keyCapacity;
long offset = offsets.get(index);
if (offset == -1) {
......@@ -168,7 +162,7 @@ public class MultiMap implements Closeable {
long kStart = kAddress + (kAddress & (AbstractDirectList.CACHE_LINE_SIZE - 1));
Unsafe.getUnsafe().copyMemory(this.kStart, kStart, kCapacity >> 1);
Unsafe.getUnsafe().freeMemory(this.kAddress);
Unsafe.getUnsafe().freeMemory(this.address);
long d = kStart - this.kStart;
key.startAddr += d;
......@@ -176,7 +170,7 @@ public class MultiMap implements Closeable {
key.nextColOffset += d;
this.kAddress = kAddress;
this.address = kAddress;
this.kStart = kStart;
this.kLimit = kStart + kCapacity;
}
......@@ -192,7 +186,7 @@ public class MultiMap implements Closeable {
if (offset == -1) {
continue;
}
long index = Hash.hashXX(kStart + offset + keyBlockOffset, Unsafe.getUnsafe().getInt(kStart + offset) - keyBlockOffset, seed) % capacity;
long index = Hash.hash(kStart + offset + keyBlockOffset, Unsafe.getUnsafe().getInt(kStart + offset) - keyBlockOffset) % capacity;
while (pointers.get(index) != -1) {
index = (index + 1) % capacity;
}
......@@ -204,11 +198,8 @@ public class MultiMap implements Closeable {
this.keyCapacity = capacity;
}
public void free() {
if (kAddress != 0) {
Unsafe.getUnsafe().freeMemory(kAddress);
kAddress = 0;
}
@Override
protected void freeInternal() {
offsets.free();
}
......@@ -220,11 +211,6 @@ public class MultiMap implements Closeable {
return recordSource.getMetadata();
}
@Override
public void close() {
free();
}
public int size() {
return size;
}
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -32,11 +32,6 @@ public class RecordSourcePrinter {
}
public void print(Record r, RecordMetadata m) {
if (r == null) {
sink.put("\n");
sink.flush();
return;
}
for (int i = 0, sz = m.getColumnCount(); i < sz; i++) {
switch (m.getColumnType(i)) {
......@@ -72,7 +67,8 @@ public class RecordSourcePrinter {
}
sink.put('\t');
}
print(r.getSlave(), m.nextMetadata());
sink.put("\n");
sink.flush();
}
public void print(RecordSource<? extends Record> src) {
......
......@@ -18,4 +18,6 @@ package com.nfsdb.lang.cst;
public interface IntVariableSource {
IntVariable getVariable(PartitionSlice slice);
void reset();
}
......@@ -16,9 +16,9 @@
package com.nfsdb.lang.cst;
import com.nfsdb.lang.cst.impl.qry.JournalRecord;
import com.nfsdb.lang.cst.impl.qry.JournalRecordSource;
import com.nfsdb.lang.cst.impl.qry.GenericRecordSource;
import com.nfsdb.lang.cst.impl.qry.Record;
public interface StatefulJournalSource extends JournalRecordSource {
JournalRecord current();
public interface StatefulJournalSource extends GenericRecordSource {
Record current();
}
......@@ -17,19 +17,14 @@
package com.nfsdb.lang.cst.impl.join;
import com.nfsdb.collections.AbstractImmutableIterator;
import com.nfsdb.lang.cst.impl.qry.GenericRecordSource;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
import java.util.NoSuchElementException;
import com.nfsdb.lang.cst.impl.qry.*;
public class InnerSkipJoin extends AbstractImmutableIterator<Record> implements GenericRecordSource {
private final RecordSource<? extends Record> delegate;
private final RecordSource<? extends SplitRecord> delegate;
private Record data;
public InnerSkipJoin(RecordSource<? extends Record> delegate) {
public InnerSkipJoin(RecordSource<? extends SplitRecord> delegate) {
this.delegate = delegate;
}
......@@ -40,10 +35,10 @@ public class InnerSkipJoin extends AbstractImmutableIterator<Record> implements
@Override
public boolean hasNext() {
Record data;
SplitRecord data;
while (delegate.hasNext()) {
if ((data = delegate.next()).getSlave() != null) {
if ((data = delegate.next()).hasB()) {
this.data = data;
return true;
}
......@@ -54,9 +49,6 @@ public class InnerSkipJoin extends AbstractImmutableIterator<Record> implements
@Override
public Record next() {
if (data == null) {
throw new NoSuchElementException();
}
return data;
}
......
......@@ -19,17 +19,18 @@ package com.nfsdb.lang.cst.impl.join;
import com.nfsdb.collections.AbstractImmutableIterator;
import com.nfsdb.lang.cst.impl.qry.*;
public class SlaveResetOuterJoin extends AbstractImmutableIterator<Record> implements GenericRecordSource {
public class NestedLoopLeftOuterJoin extends AbstractImmutableIterator<SplitRecord> implements RecordSource<SplitRecord> {
private final RecordSource<? extends Record> masterSource;
private final RecordSource<? extends Record> slaveSource;
private final JoinedRecordMetadata metadata;
private Record joinedData;
private final SplitRecordMetadata metadata;
private final SplitRecord record;
private boolean nextSlave = false;
public SlaveResetOuterJoin(RecordSource<? extends Record> masterSource, RecordSource<? extends Record> slaveSource) {
public NestedLoopLeftOuterJoin(RecordSource<? extends Record> masterSource, RecordSource<? extends Record> slaveSource) {
this.masterSource = masterSource;
this.slaveSource = slaveSource;
this.metadata = new JoinedRecordMetadata(masterSource, slaveSource);
this.metadata = new SplitRecordMetadata(masterSource.getMetadata(), slaveSource.getMetadata());
this.record = new SplitRecord(metadata, masterSource.getMetadata().getColumnCount());
}
@Override
......@@ -50,19 +51,19 @@ public class SlaveResetOuterJoin extends AbstractImmutableIterator<Record> imple
}
@Override
public Record next() {
public SplitRecord next() {
if (!nextSlave) {
joinedData = masterSource.next();
record.setA(masterSource.next());
slaveSource.reset();
}
if (nextSlave || slaveSource.hasNext()) {
joinedData.setSlave(slaveSource.next());
record.setB(slaveSource.next());
nextSlave = slaveSource.hasNext();
} else {
joinedData.setSlave(null);
record.setB(null);
nextSlave = false;
}
return joinedData;
return record;
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.join;
import com.nfsdb.Journal;
import com.nfsdb.Partition;
import com.nfsdb.column.FixedColumn;
import com.nfsdb.column.SymbolTable;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.lang.cst.RowCursor;
import com.nfsdb.lang.cst.impl.dfrm.JournalRowSourceHash;
import com.nfsdb.lang.cst.impl.qry.AbstractJournalSource;
import com.nfsdb.lang.cst.impl.qry.JournalRecord;
import com.nfsdb.lang.cst.impl.qry.JournalRecordSource;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.lang.cst.impl.ref.StringRef;
import com.nfsdb.utils.Rows;
import java.util.Arrays;
public class SymbolOuterHashJoin extends AbstractJournalSource implements JournalRecordSource {
private final JournalRecordSource masterSource;
private final JournalRowSourceHash hash;
private final StringRef masterSymbol;
private final StringRef slaveSymbol;
private final JournalRecord journalRecord = new JournalRecord(this);
private JournalRecord joinedData;
private int columnIndex;
private SymbolTable masterTab;
private SymbolTable slaveTab;
private Partition lastMasterPartition;
private int lastSlavePartIndex = -1;
private FixedColumn column;
private boolean nextSlave = false;
private int[] map;
private RowCursor slaveCursor;
public SymbolOuterHashJoin(JournalRecordSource masterSource, StringRef masterSymbol, JournalRowSourceHash hash, StringRef slaveSymbol) {
this.masterSource = masterSource;
this.hash = hash;
this.masterSymbol = masterSymbol;
this.slaveSymbol = slaveSymbol;
init();
}
@Override
public void reset() {
masterSource.reset();
hash.reset();
nextSlave = false;
init();
}
private void init() {
this.columnIndex = masterSource.getJournal().getMetadata().getColumnIndex(masterSymbol.value);
this.masterTab = masterSource.getJournal().getSymbolTable(masterSymbol.value);
this.slaveTab = hash.getJournal().getSymbolTable(slaveSymbol.value);
int sz = masterTab.size();
if (map == null || map.length < sz) {
map = new int[sz];
}
Arrays.fill(map, -1);
lastSlavePartIndex = -1;
lastMasterPartition = null;
}
@Override
public Journal getJournal() {
return masterSource.getJournal();
}
@Override
public boolean hasNext() {
return nextSlave || masterSource.hasNext();
}
@Override
@SuppressWarnings("unchecked")
public JournalRecord next() {
if (!nextSlave) {
nextMaster();
}
if (nextSlave || slaveCursor.hasNext()) {
long rowid = slaveCursor.next();
int pind = Rows.toPartitionIndex(rowid);
if (lastSlavePartIndex != pind) {
try {
journalRecord.partition = hash.getJournal().getPartition(pind, false);
lastSlavePartIndex = pind;
} catch (JournalException e) {
throw new JournalRuntimeException(e);
}
}
journalRecord.rowid = Rows.toLocalRowID(rowid);
joinedData.setSlave(journalRecord);
nextSlave = slaveCursor.hasNext();
} else {
joinedData.setSlave(null);
nextSlave = false;
}
return joinedData;
}
private void nextMaster() {
JournalRecord m = masterSource.next();
if (lastMasterPartition != m.partition) {
lastMasterPartition = m.partition;
column = (FixedColumn) m.partition.getAbstractColumn(columnIndex);
}
int masterKey = column.getInt(m.rowid);
if (map[masterKey] == -1) {
map[masterKey] = slaveTab.getQuick(masterTab.value(masterKey));
}
slaveCursor = hash.cursor(map[masterKey]);
joinedData = m;
}
@Override
public RecordMetadata nextMetadata() {
return hash.getMetadata();
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.join;
import com.nfsdb.Partition;
import com.nfsdb.collections.AbstractImmutableIterator;
import com.nfsdb.collections.RingQueue;
import com.nfsdb.column.FixedColumn;
import com.nfsdb.lang.cst.impl.qry.JoinedRecordMetadata;
import com.nfsdb.lang.cst.impl.qry.JournalRecord;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
import java.util.NoSuchElementException;
public class TimeSeriesJoin extends AbstractImmutableIterator<JournalRecord> implements RecordSource<JournalRecord> {
private final RecordSource<JournalRecord> masterSource;
private final RecordSource<JournalRecord> slaveSource;
private final long depth;
private final RingQueue<CachedJournalRecord> ringQueue;
private final int masterTimestampIndex;
private final int slaveTimestampIndex;
private final RecordMetadata metadata;
private JournalRecord joinedData;
private Partition lastMasterPartition;
private Partition lastSlavePartition;
private boolean nextSlave = false;
private FixedColumn masterColumn;
private FixedColumn slaveColumn;
private long masterTimestamp;
private JournalRecord nextData;
private boolean useQueue;
private boolean queueMarked = false;
public TimeSeriesJoin(RecordSource<JournalRecord> masterSource, int masterTsIndex, RecordSource<JournalRecord> slaveSource, int slaveTsIndex, long depth, int cacheSize) {
this.masterSource = masterSource;
this.slaveSource = slaveSource;
this.depth = depth;
this.masterTimestampIndex = masterTsIndex;
this.slaveTimestampIndex = slaveTsIndex;
this.ringQueue = new RingQueue<>(CachedJournalRecord.class, cacheSize);
this.metadata = new JoinedRecordMetadata(masterSource, slaveSource);
}
@Override
public void reset() {
}
@Override
public boolean hasNext() {
if (nextData != null) {
return true;
}
while (nextSlave || masterSource.hasNext()) {
boolean sl = nextSlave;
if (!nextSlave) {
nextMaster();
useQueue = true;
queueMarked = false;
ringQueue.toMark();
}
if (useQueue) {
while ((useQueue = ringQueue.hasNext())) {
CachedJournalRecord data = ringQueue.next();
if (data.timestamp < masterTimestamp) {
continue;
}
if (!queueMarked) {
ringQueue.mark();
queueMarked = true;
}
if (data.timestamp > masterTimestamp + depth) {
nextSlave = false;
break;
}
nextSlave = true;
joinedData.setSlave(data);
nextData = joinedData;
return true;
}
}
if (!useQueue) {
while ((nextSlave = slaveSource.hasNext())) {
JournalRecord s = slaveSource.next();
if (lastSlavePartition != s.partition) {
lastSlavePartition = s.partition;
slaveColumn = (FixedColumn) s.partition.getAbstractColumn(slaveTimestampIndex);
}
long slaveTimestamp = slaveColumn.getLong(s.rowid);
if (slaveTimestamp < masterTimestamp) {
continue;
} else {
long pos = ringQueue.nextWritePos();
CachedJournalRecord data = ringQueue.get(pos);
if (data == null) {
data = new CachedJournalRecord(metadata);
ringQueue.put(pos, data);
}
data.timestamp = slaveTimestamp;
data.partition = s.partition;
data.rowid = s.rowid;
}
if (slaveTimestamp > masterTimestamp + depth) {
nextSlave = false;
break;
}
joinedData.setSlave(s);
nextData = joinedData;
return true;
}
}
if (!sl) {
joinedData.setSlave(null);
nextData = joinedData;
return true;
}
}
nextData = null;
return false;
}
@Override
public JournalRecord next() {
if (nextData == null) {
throw new NoSuchElementException();
}
JournalRecord d = nextData;
nextData = null;
return d;
}
private void nextMaster() {
JournalRecord m = masterSource.next();
if (lastMasterPartition != m.partition) {
lastMasterPartition = m.partition;
masterColumn = (FixedColumn) m.partition.getAbstractColumn(masterTimestampIndex);
}
joinedData = m;
masterTimestamp = masterColumn.getLong(m.rowid);
}
@Override
public RecordMetadata getMetadata() {
return metadata;
}
public static class CachedJournalRecord extends JournalRecord {
private long timestamp;
public CachedJournalRecord(RecordMetadata metadata) {
super(metadata);
}
@Override
public String toString() {
return "CachedDataItem{" +
"timestamp=" + timestamp +
'}';
}
}
}
......@@ -31,6 +31,7 @@ public class JournalSourceImpl extends AbstractJournalSource {
private RowCursor cursor;
public JournalSourceImpl(PartitionSource partitionSource, RowSource rowSource) {
super(partitionSource.getJournal().getMetadata());
this.partitionSource = partitionSource;
this.rowSource = rowSource;
}
......
......@@ -16,23 +16,22 @@
package com.nfsdb.lang.cst.impl.jsrc;
import com.nfsdb.Journal;
import com.nfsdb.collections.AbstractImmutableIterator;
import com.nfsdb.lang.cst.StatefulJournalSource;
import com.nfsdb.lang.cst.impl.qry.JournalRecord;
import com.nfsdb.lang.cst.impl.qry.JournalRecordSource;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
public class StatefulJournalSourceImpl extends AbstractImmutableIterator<JournalRecord> implements StatefulJournalSource {
private final JournalRecordSource delegate;
private JournalRecord current;
public class StatefulJournalSourceImpl extends AbstractImmutableIterator<Record> implements StatefulJournalSource {
private final RecordSource<? extends Record> delegate;
private Record current;
public StatefulJournalSourceImpl(JournalRecordSource delegate) {
public StatefulJournalSourceImpl(RecordSource<? extends Record> delegate) {
this.delegate = delegate;
}
@Override
public JournalRecord current() {
public Record current() {
return current;
}
......@@ -41,18 +40,13 @@ public class StatefulJournalSourceImpl extends AbstractImmutableIterator<Journal
delegate.reset();
}
@Override
public Journal getJournal() {
return delegate.getJournal();
}
@Override
public boolean hasNext() {
return delegate.hasNext();
}
@Override
public JournalRecord next() {
public Record next() {
return current = delegate.next();
}
......
......@@ -52,6 +52,7 @@ public class SingleKeySource implements KeySource, KeyCursor {
@Override
public void reset() {
this.variableSource.reset();
hasNext = true;
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,27 +19,29 @@ package com.nfsdb.lang.cst.impl.qry;
import com.nfsdb.collections.AbstractImmutableIterator;
import com.nfsdb.column.ColumnType;
import com.nfsdb.column.SymbolTable;
import com.nfsdb.factory.configuration.JournalMetadata;
public abstract class AbstractJournalSource extends AbstractImmutableIterator<JournalRecord> implements JournalRecordSource, RecordMetadata {
@Override
public RecordMetadata nextMetadata() {
return null;
private final JournalMetadata metadata;
public AbstractJournalSource(JournalMetadata metadata) {
this.metadata = metadata;
}
@Override
public int getColumnCount() {
return getJournal().getMetadata().getColumnCount();
return metadata.getColumnCount();
}
@Override
public ColumnType getColumnType(int index) {
return getJournal().getMetadata().getColumnMetadata(index).type;
return metadata.getColumnMetadata(index).type;
}
@Override
public int getColumnIndex(CharSequence name) {
return getJournal().getMetadata().getColumnIndex(name);
return metadata.getColumnIndex(name);
}
@Override
......@@ -49,6 +51,11 @@ public abstract class AbstractJournalSource extends AbstractImmutableIterator<Jo
@Override
public SymbolTable getSymbolTable(int index) {
return getJournal().getSymbolTable(index);
return getJournal().getSymbolTable(getJournal().getMetadata().getColumnMetadata(index).name);
}
@Override
public String getColumnName(int index) {
return metadata.getColumnMetadata(index).name;
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -22,7 +22,6 @@ import java.io.OutputStream;
public abstract class AbstractRecord implements Record {
protected final RecordMetadata metadata;
private Record slave;
public AbstractRecord(RecordMetadata metadata) {
this.metadata = metadata;
......@@ -74,11 +73,7 @@ public abstract class AbstractRecord implements Record {
}
@Override
public Record getSlave() {
return slave;
}
public void setSlave(Record slave) {
this.slave = slave;
public RecordMetadata getMetadata() {
return metadata;
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -96,7 +96,6 @@ public class JournalRecord extends AbstractRecord {
return "DataItem{" +
"partition=" + partition +
", rowid=" + rowid +
", slave=" + getSlave() +
'}';
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -23,6 +23,8 @@ import java.io.OutputStream;
public interface Record {
RecordMetadata getMetadata();
byte get(String column);
byte get(int col);
......@@ -64,8 +66,4 @@ public interface Record {
InputStream getBin(String column);
InputStream getBin(int col);
Record getSlave();
void setSlave(Record slave);
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,8 +20,6 @@ import com.nfsdb.column.ColumnType;
import com.nfsdb.column.SymbolTable;
public interface RecordMetadata {
RecordMetadata nextMetadata();
int getColumnCount();
ColumnType getColumnType(int index);
......@@ -29,4 +27,6 @@ public interface RecordMetadata {
SymbolTable getSymbolTable(int index);
int getColumnIndex(CharSequence name);
String getColumnName(int index);
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.qry;
import com.nfsdb.export.CharSink;
import java.io.InputStream;
import java.io.OutputStream;
public class SplitRecord extends AbstractRecord {
private final int split;
private Record a;
private Record b;
public SplitRecord(RecordMetadata metadata, int split) {
super(metadata);
this.split = split;
}
public void setA(Record a) {
this.a = a;
}
public void setB(Record b) {
this.b = b;
}
@Override
public byte get(int col) {
if (col < split) {
return a.get(col);
} else {
return b == null ? 0 : b.get(col - split);
}
}
@Override
public int getInt(int col) {
if (col < split) {
return a.getInt(col);
} else {
return b == null ? 0 : b.getInt(col - split);
}
}
@Override
public long getLong(int col) {
if (col < split) {
return a.getLong(col);
} else {
return b == null ? 0L : b.getLong(col - split);
}
}
@Override
public long getDate(int col) {
if (col < split) {
return a.getDate(col);
} else {
return b == null ? 0L : b.getDate(col - split);
}
}
@Override
public double getDouble(int col) {
if (col < split) {
return a.getDouble(col);
} else {
return b == null ? 0d : b.getDouble(col - split);
}
}
@Override
public CharSequence getStr(int col) {
if (col < split) {
return a.getStr(col);
} else {
return b == null ? null : b.getStr(col - split);
}
}
@Override
public void getStr(int col, CharSink sink) {
if (col < split) {
a.getStr(col, sink);
} else if (b != null) {
b.getStr(col - split, sink);
}
}
@Override
public String getSym(int col) {
if (col < split) {
return a.getSym(col);
} else {
return b == null ? null : b.getSym(col - split);
}
}
@Override
public boolean getBool(int col) {
if (col < split) {
return a.getBool(col);
} else {
return b != null && b.getBool(col - split);
}
}
@Override
public void getBin(int col, OutputStream s) {
if (col < split) {
a.getBin(col, s);
} else if (b != null) {
b.getBin(col - split, s);
}
}
@Override
public short getShort(int col) {
if (col < split) {
return a.getShort(col);
} else {
return b == null ? 0 : b.getShort(col - split);
}
}
@Override
public InputStream getBin(int col) {
if (col < split) {
return a.getBin(col);
} else {
return b == null ? null : b.getBin(col - split);
}
}
public boolean hasB() {
return b != null;
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,40 +16,66 @@
package com.nfsdb.lang.cst.impl.qry;
import com.nfsdb.collections.ObjIntHashMap;
import com.nfsdb.column.ColumnType;
import com.nfsdb.column.SymbolTable;
public class JoinedRecordMetadata implements RecordMetadata {
private final RecordSource<? extends Record> master;
private final RecordSource<? extends Record> slave;
public class SplitRecordMetadata implements RecordMetadata {
private final RecordMetadata a;
private final RecordMetadata b;
private final int split;
private final int columnCount;
private final ObjIntHashMap<CharSequence> columnIndices;
private final String[] columnNames;
public JoinedRecordMetadata(RecordSource<? extends Record> master, RecordSource<? extends Record> slave) {
this.master = master;
this.slave = slave;
}
public SplitRecordMetadata(RecordMetadata a, RecordMetadata b) {
this.a = a;
this.b = b;
this.split = a.getColumnCount();
this.columnCount = this.split + b.getColumnCount();
this.columnIndices = new ObjIntHashMap<>(columnCount);
this.columnNames = new String[columnCount];
@Override
public RecordMetadata nextMetadata() {
return slave.getMetadata();
for (int i = 0; i < split; i++) {
columnIndices.put(columnNames[i] = a.getColumnName(i), i);
}
for (int i = 0, c = columnCount - split; c < i; i++) {
columnNames[i + split] = b.getColumnName(i);
columnIndices.put(columnNames[i + split] = b.getColumnName(i), i + split);
}
}
@Override
public int getColumnCount() {
return master.getMetadata().getColumnCount();
return columnCount;
}
@Override
public ColumnType getColumnType(int index) {
return master.getMetadata().getColumnType(index);
if (index < split) {
return a.getColumnType(index);
} else {
return b.getColumnType(index - split);
}
}
@Override
public SymbolTable getSymbolTable(int index) {
if (index < split) {
return a.getSymbolTable(index);
} else {
return b.getSymbolTable(index - split);
}
}
@Override
public int getColumnIndex(CharSequence name) {
return master.getMetadata().getColumnIndex(name);
return columnIndices.get(name);
}
@Override
public SymbolTable getSymbolTable(int index) {
return master.getMetadata().getSymbolTable(index);
public String getColumnName(int index) {
return columnNames[index];
}
}
......@@ -37,4 +37,8 @@ public class MutableIntVariableSource implements IntVariableSource, IntVariable
public void setValue(int value) {
this.value = value;
}
@Override
public void reset() {
}
}
......@@ -16,9 +16,7 @@
package com.nfsdb.lang.cst.impl.ref;
import com.nfsdb.Partition;
import com.nfsdb.column.SymbolTable;
import com.nfsdb.column.VariableColumn;
import com.nfsdb.lang.cst.IntVariable;
import com.nfsdb.lang.cst.IntVariableSource;
import com.nfsdb.lang.cst.PartitionSlice;
......@@ -32,15 +30,12 @@ public class StringXTabVariableSource implements IntVariableSource, IntVariable
private final String slaveSymbol;
private final int masterColumnIndex;
private SymbolTable slaveTab;
private VariableColumn column;
private Partition partition;
private int slaveKey;
private long rowid = -1;
public StringXTabVariableSource(StatefulJournalSource masterSource, String masterSymbol, String slaveSymbol) {
this.masterSource = masterSource;
this.slaveSymbol = slaveSymbol;
this.masterColumnIndex = masterSource.getJournal().getMetadata().getColumnIndex(masterSymbol);
this.masterColumnIndex = masterSource.getMetadata().getColumnIndex(masterSymbol);
}
@Override
......@@ -53,19 +48,14 @@ public class StringXTabVariableSource implements IntVariableSource, IntVariable
@Override
public int getValue() {
if (switchPartition() || masterSource.current().rowid != rowid) {
rowid = masterSource.current().rowid;
slaveKey = slaveTab.getQuick(column.getStr(rowid));
if (slaveKey == -3) {
slaveKey = slaveTab.getQuick(masterSource.current().getStr(masterColumnIndex).toString());
}
return slaveKey;
}
private boolean switchPartition() {
if (masterSource.current().partition == partition) {
return false;
}
partition = masterSource.current().partition;
column = (VariableColumn) partition.getAbstractColumn(masterColumnIndex);
return true;
@Override
public void reset() {
slaveKey = -3;
}
}
......@@ -16,8 +16,6 @@
package com.nfsdb.lang.cst.impl.ref;
import com.nfsdb.Partition;
import com.nfsdb.column.FixedColumn;
import com.nfsdb.column.SymbolTable;
import com.nfsdb.lang.cst.IntVariable;
import com.nfsdb.lang.cst.IntVariableSource;
......@@ -36,16 +34,13 @@ public class SymbolXTabVariableSource implements IntVariableSource, IntVariable
private final int map[];
private final SymbolTable masterTab;
private SymbolTable slaveTab;
private FixedColumn column;
private Partition partition;
private long rowid = -1;
private int slaveKey;
public SymbolXTabVariableSource(StatefulJournalSource masterSource, String masterSymbol, String slaveSymbol) {
this.masterSource = masterSource;
this.slaveSymbol = slaveSymbol;
this.masterColumnIndex = masterSource.getJournal().getMetadata().getColumnIndex(masterSymbol);
this.masterTab = masterSource.getJournal().getSymbolTable(masterSymbol);
this.masterColumnIndex = masterSource.getMetadata().getColumnIndex(masterSymbol);
this.masterTab = masterSource.getMetadata().getSymbolTable(masterColumnIndex);
map = new int[masterTab.size()];
Arrays.fill(map, -1);
}
......@@ -60,9 +55,8 @@ public class SymbolXTabVariableSource implements IntVariableSource, IntVariable
@Override
public int getValue() {
if (switchPartition() || masterSource.current().rowid != rowid) {
rowid = masterSource.current().rowid;
int masterKey = column.getInt(rowid);
if (slaveKey == -3) {
int masterKey = masterSource.current().getInt(masterColumnIndex);
if (map[masterKey] == -1) {
map[masterKey] = slaveTab.getQuick(masterTab.value(masterKey));
}
......@@ -71,12 +65,8 @@ public class SymbolXTabVariableSource implements IntVariableSource, IntVariable
return slaveKey;
}
private boolean switchPartition() {
if (masterSource.current().partition == partition) {
return false;
}
partition = masterSource.current().partition;
column = (FixedColumn) partition.getAbstractColumn(masterColumnIndex);
return true;
@Override
public void reset() {
slaveKey = -3;
}
}
......@@ -39,8 +39,9 @@ public class FixedColumnDeltaProducer implements ColumnDeltaProducer {
}
public void configure(long localRowID, long limit) {
this.offset = localRowID >= column.size() ? column.getOffset() : column.getOffset(localRowID);
this.targetOffset = limit >= column.size() ? column.getOffset() : column.getOffset(limit);
long sz = column.size();
this.offset = localRowID >= sz ? column.getOffset() : column.getOffset(localRowID);
this.targetOffset = limit >= sz ? column.getOffset() : column.getOffset(limit);
this.header.rewind();
this.header.putLong(targetOffset - offset);
this.header.flip();
......
......@@ -16,8 +16,8 @@
package com.nfsdb.net.producer;
import com.nfsdb.Journal;
import com.nfsdb.Partition;
import com.nfsdb.column.SymbolTable;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.net.AbstractObjectProducer;
......@@ -42,31 +42,31 @@ public class JournalClientStateProducer extends AbstractObjectProducer<IndexedJo
@Override
protected void write(IndexedJournal value, ByteBuffer buffer) {
try {
Partition p;
Journal j = value.getJournal();
// journal index
buffer.putInt(value.getIndex());
// max rowid for non-lag partitions
Partition p = value.getJournal().lastNonEmptyNonLag();
if (p == null) {
if ((p = j.lastNonEmptyNonLag()) == null) {
buffer.putLong(-1);
} else {
buffer.putLong(Rows.toRowID(p.getPartitionIndex(), p.size() - 1));
}
// size and name of lag partition
Partition lag = value.getJournal().getIrregularPartition();
if (lag != null) {
buffer.putLong(lag.size());
ByteBuffers.putStringW(buffer, lag.getName());
if ((p = j.getIrregularPartition()) != null) {
buffer.putLong(p.size());
ByteBuffers.putStringW(buffer, p.getName());
} else {
buffer.putLong(-1L);
ByteBuffers.putStringW(buffer, null);
}
// symbol table count and their indexes and sizes
int c;
buffer.putChar((char) (c = value.getJournal().getSymbolTableCount()));
buffer.putChar((char) (c = j.getSymbolTableCount()));
for (int i = 0; i < c; i++) {
SymbolTable tab = value.getJournal().getSymbolTable(i);
buffer.putChar((char) i);
buffer.putInt(tab.size());
buffer.putInt(j.getSymbolTable(i).size());
}
} catch (JournalException e) {
throw new JournalRuntimeException(e);
......
......@@ -35,7 +35,7 @@ public class HashTest {
for (int i = 0; i < 100000; i++) {
rnd.nextChars(address, LEN);
hashes.add(Hash.hashXX(address, LEN, rnd.nextInt()));
hashes.add(Hash.hash(address, LEN));
}
Assert.assertTrue("Hash function distribution dropped", hashes.size() > 99990);
}
......
......@@ -23,7 +23,7 @@ import com.nfsdb.export.RecordSourcePrinter;
import com.nfsdb.export.StringSink;
import com.nfsdb.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.lang.cst.StatefulJournalSource;
import com.nfsdb.lang.cst.impl.join.SlaveResetOuterJoin;
import com.nfsdb.lang.cst.impl.join.NestedLoopLeftOuterJoin;
import com.nfsdb.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.lang.cst.impl.jsrc.StatefulJournalSourceImpl;
import com.nfsdb.lang.cst.impl.ksrc.SingleKeySource;
......@@ -104,7 +104,7 @@ public class JoinStringToSymbolTest {
StringSink sink = new StringSink();
RecordSourcePrinter p = new RecordSourcePrinter(sink);
p.print(
new SlaveResetOuterJoin(
new NestedLoopLeftOuterJoin(
master = new StatefulJournalSourceImpl(
new JournalSourceImpl(
new JournalPartitionSource(aw, false), new AllRowSource()
......
......@@ -24,19 +24,20 @@ import com.nfsdb.export.RecordSourcePrinter;
import com.nfsdb.export.StringSink;
import com.nfsdb.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.lang.cst.StatefulJournalSource;
import com.nfsdb.lang.cst.impl.dfrm.JournalRowSourceHash;
import com.nfsdb.lang.cst.impl.join.InnerSkipJoin;
import com.nfsdb.lang.cst.impl.join.SlaveResetOuterJoin;
import com.nfsdb.lang.cst.impl.join.SymbolOuterHashJoin;
import com.nfsdb.lang.cst.impl.join.NestedLoopLeftOuterJoin;
import com.nfsdb.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.lang.cst.impl.jsrc.StatefulJournalSourceImpl;
import com.nfsdb.lang.cst.impl.ksrc.SingleKeySource;
import com.nfsdb.lang.cst.impl.ksrc.SymbolKeySource;
import com.nfsdb.lang.cst.impl.psrc.JournalPartitionSource;
import com.nfsdb.lang.cst.impl.qry.GenericRecordSource;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
import com.nfsdb.lang.cst.impl.ref.StringRef;
import com.nfsdb.lang.cst.impl.ref.SymbolXTabVariableSource;
import com.nfsdb.lang.cst.impl.rsrc.*;
import com.nfsdb.lang.cst.impl.rsrc.AllRowSource;
import com.nfsdb.lang.cst.impl.rsrc.KvIndexRowSource;
import com.nfsdb.lang.cst.impl.rsrc.KvIndexTopRowSource;
import com.nfsdb.lang.cst.impl.rsrc.SkipSymbolRowSource;
import com.nfsdb.model.Album;
import com.nfsdb.model.Band;
import com.nfsdb.test.tools.JournalTestFactory;
......@@ -90,7 +91,7 @@ public class JoinSymbolOnSymbolTest {
public void testOuterOneToOne() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tnull\tnull\t\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
......@@ -115,7 +116,7 @@ public class JoinSymbolOnSymbolTest {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\trock\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tnull\tnull\t\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
......@@ -160,7 +161,7 @@ public class JoinSymbolOnSymbolTest {
StringRef name = new StringRef("name");
StatefulJournalSource master;
out.print(new SlaveResetOuterJoin(
out.print(new NestedLoopLeftOuterJoin(
master = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(aw, false), new AllRowSource())
)
......@@ -187,7 +188,7 @@ public class JoinSymbolOnSymbolTest {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tnull\tnull\t\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\tband3\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
......@@ -212,7 +213,7 @@ public class JoinSymbolOnSymbolTest {
StatefulJournalSource master;
out.print(
new SlaveResetOuterJoin(
new NestedLoopLeftOuterJoin(
master = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(bw, false), new AllRowSource())
)
......@@ -229,99 +230,6 @@ public class JoinSymbolOnSymbolTest {
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testOuterOneToManyMapHead() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tband2\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband3\thttp://band3.com\tjazz\t\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
bw.commit();
aw.append(new Album().setName("album X").setBand("band1").setGenre("pop"));
aw.append(new Album().setName("album BZ").setBand("band1").setGenre("rock"));
aw.append(new Album().setName("album BZ").setBand("band1").setGenre("pop"));
aw.append(new Album().setName("album Y").setBand("band3").setGenre("metal"));
aw.append(new Album().setName("album Y").setBand("band2").setGenre("metal"));
aw.commit();
StringRef band = new StringRef("band");
StringRef name = new StringRef("name");
// from band outer join album +head by name
// **this is "head by name" first is joined to band
// **here a variation possible to specify head count and offset
// **generally this query can be presented as:
//
// from band outer join album +[1:0]head by name
out.print(new SymbolOuterHashJoin(
new JournalSourceImpl(new JournalPartitionSource(bw, false), new AllRowSource())
, name
,
new JournalRowSourceHash(
new JournalSourceImpl(new JournalPartitionSource(aw, false), new KvIndexHeadRowSource(name, new SymbolKeySource(name), 1, 0, null))
, band
)
, band
));
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testInnerOneToManyMapHead() throws Exception {
final String expected = "1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\tband1\talbum BZ\tpop\t1970-01-01T00:00:00.000Z\t\n" +
"1970-01-01T00:00:00.000Z\tband2\thttp://band2.com\thiphop\t\tband2\talbum Y\tmetal\t1970-01-01T00:00:00.000Z\t\n";
bw.append(new Band().setName("band1").setType("rock").setUrl("http://band1.com"));
bw.append(new Band().setName("band2").setType("hiphop").setUrl("http://band2.com"));
bw.append(new Band().setName("band3").setType("jazz").setUrl("http://band3.com"));
bw.commit();
aw.append(new Album().setName("album X").setBand("band1").setGenre("pop"));
aw.append(new Album().setName("album BZ").setBand("band1").setGenre("rock"));
aw.append(new Album().setName("album BZ").setBand("band1").setGenre("pop"));
aw.append(new Album().setName("album Y").setBand("band3").setGenre("metal"));
aw.append(new Album().setName("album Y").setBand("band2").setGenre("metal"));
aw.commit();
// from band join album +head by name
// **inner join
// **this is "head by name" first is joined to band
// **here a variation possible to specify head count and offset
// **generally this query can be presented as:
//
// from band join album +[1:0]head by name
StringRef band = new StringRef("band");
StringRef name = new StringRef("name");
out.print(
new InnerSkipJoin(
new SymbolOuterHashJoin(
new JournalSourceImpl(new JournalPartitionSource(bw, false), new AllRowSource())
, name
,
new JournalRowSourceHash(
new JournalSourceImpl(new JournalPartitionSource(aw, false), new KvIndexHeadRowSource(name, new SymbolKeySource(name), 1, 0, null))
, band
)
, band
)
)
);
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testInnerOneToManyHead() throws Exception {
......@@ -352,7 +260,7 @@ public class JoinSymbolOnSymbolTest {
out.print(
new InnerSkipJoin(
new SlaveResetOuterJoin(
new NestedLoopLeftOuterJoin(
master = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(bw, false), new AllRowSource())
)
......@@ -398,7 +306,7 @@ public class JoinSymbolOnSymbolTest {
out.print(
new InnerSkipJoin(
new SlaveResetOuterJoin(
new NestedLoopLeftOuterJoin(
master = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(bw, false), new AllRowSource())
)
......@@ -413,10 +321,10 @@ public class JoinSymbolOnSymbolTest {
Assert.assertEquals(expected, sink.toString());
}
private GenericRecordSource buildSource(Journal<Band> bw, Journal<Album> aw) {
private RecordSource<? extends Record> buildSource(Journal<Band> bw, Journal<Album> aw) {
StringRef band = new StringRef("band");
StatefulJournalSource master;
return new SlaveResetOuterJoin(
return new NestedLoopLeftOuterJoin(
master = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(bw, false), new AllRowSource())
)
......@@ -426,5 +334,4 @@ public class JoinSymbolOnSymbolTest {
))
);
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang;
import com.nfsdb.JournalWriter;
import com.nfsdb.exceptions.JournalConfigurationException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.export.RecordSourcePrinter;
import com.nfsdb.export.StringSink;
import com.nfsdb.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.lang.cst.impl.join.TimeSeriesJoin;
import com.nfsdb.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.lang.cst.impl.psrc.JournalPartitionSource;
import com.nfsdb.lang.cst.impl.rsrc.AllRowSource;
import com.nfsdb.test.tools.JournalTestFactory;
import com.nfsdb.utils.Files;
import com.nfsdb.utils.Rnd;
import org.junit.*;
public class TimeSeriesJoinTest {
@ClassRule
public static final JournalTestFactory factory;
static {
try {
factory = new JournalTestFactory(
new JournalConfigurationBuilder() {{
$(Ts.class);
}}.build(Files.makeTempDir())
);
} catch (JournalConfigurationException e) {
throw new JournalRuntimeException(e);
}
}
private static final StringSink sink = new StringSink();
private static final RecordSourcePrinter printer = new RecordSourcePrinter(sink);
private static JournalWriter<Ts> w1;
private static JournalWriter<Ts> w2;
@BeforeClass
public static void setUp() throws Exception {
w1 = factory.writer(Ts.class, "1");
w2 = factory.writer(Ts.class, "2");
Ts ts = new Ts();
Rnd rnd = new Rnd();
long t1 = 0;
long t2 = t1;
for (int i = 0; i < 10; i++) {
t1 += rnd.nextPositiveInt() % 100;
ts.ts = t1;
w1.append(ts);
t2 += rnd.nextPositiveInt() % 100;
ts.ts = t2;
w2.append(ts);
}
w1.commit();
w2.commit();
}
@Before
public void setUp2() throws Exception {
sink.clear();
}
@Test
public void testJoinNoNulls() throws Exception {
String expected = "20\t89\t\n" +
"20\t128\t\n" +
"53\t89\t\n" +
"53\t128\t\n" +
"53\t199\t\n" +
"54\t89\t\n" +
"54\t128\t\n" +
"54\t199\t\n" +
"96\t128\t\n" +
"96\t199\t\n" +
"102\t128\t\n" +
"102\t199\t\n" +
"102\t247\t\n" +
"118\t128\t\n" +
"118\t199\t\n" +
"118\t247\t\n" +
"132\t199\t\n" +
"132\t247\t\n" +
"213\t247\t\n" +
"213\t319\t\n" +
"213\t322\t\n" +
"213\t334\t\n" +
"229\t247\t\n" +
"229\t319\t\n" +
"229\t322\t\n" +
"229\t334\t\n" +
"234\t247\t\n" +
"234\t319\t\n" +
"234\t322\t\n" +
"234\t334\t\n";
// from w1 tj w2 depth 150
printer.print(
new TimeSeriesJoin(
new JournalSourceImpl(new JournalPartitionSource(w1, true), new AllRowSource())
, 0
,
new JournalSourceImpl(new JournalPartitionSource(w2, true), new AllRowSource())
, 0
, 150
, 2 // trigger re-sizes to test ring expand formulas
)
);
// System.out.println(sink);
Assert.assertEquals(expected, sink.toString());
}
@Test
public void testJoinWithNulls() throws Exception {
String expected = "20\t\n" +
"53\t\n" +
"54\t\n" +
"96\t\n" +
"102\t\n" +
"118\t128\t\n" +
"132\t\n" +
"213\t\n" +
"229\t\n" +
"234\t247\t\n";
printer.print(new TimeSeriesJoin(
new JournalSourceImpl(new JournalPartitionSource(w1, true), new AllRowSource())
, 0
,
new JournalSourceImpl(new JournalPartitionSource(w2, true), new AllRowSource())
, 0
, 15
, 2 // trigger re-sizes to test ring expand formulas
));
Assert.assertEquals(expected, sink.toString());
}
@SuppressWarnings("unused")
public static class Ts {
private long ts;
}
}
......@@ -28,15 +28,15 @@ import com.nfsdb.factory.JournalFactory;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.factory.configuration.JournalConfigurationBuilder;
import com.nfsdb.lang.cst.StatefulJournalSource;
import com.nfsdb.lang.cst.impl.join.SlaveResetOuterJoin;
import com.nfsdb.lang.cst.impl.join.NestedLoopLeftOuterJoin;
import com.nfsdb.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.lang.cst.impl.jsrc.StatefulJournalSourceImpl;
import com.nfsdb.lang.cst.impl.ksrc.SingleKeySource;
import com.nfsdb.lang.cst.impl.psrc.JournalPartitionSource;
import com.nfsdb.lang.cst.impl.qry.GenericRecordSource;
import com.nfsdb.lang.cst.impl.qry.JournalRecord;
import com.nfsdb.lang.cst.impl.qry.JournalRecordSource;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
import com.nfsdb.lang.cst.impl.ref.MutableIntVariableSource;
import com.nfsdb.lang.cst.impl.ref.StringRef;
import com.nfsdb.lang.cst.impl.ref.SymbolXTabVariableSource;
......@@ -165,7 +165,7 @@ public class CstTest {
StringRef sym = new StringRef("sym");
StatefulJournalSource m;
GenericRecordSource src = new SlaveResetOuterJoin(
RecordSource<? extends Record> src = new NestedLoopLeftOuterJoin(
m = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(master, false), new AllRowSource())
)
......@@ -203,7 +203,7 @@ public class CstTest {
StringRef sym = new StringRef("sym");
StatefulJournalSource m;
GenericRecordSource src = new SlaveResetOuterJoin(
RecordSource<? extends Record> src = new NestedLoopLeftOuterJoin(
m = new StatefulJournalSourceImpl(
new JournalSourceImpl(new JournalPartitionSource(master, false), new AllRowSource())
)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册