提交 df65c81d 编写于 作者: V Vlad Ilyushchenko

#6 single pass aggregation

上级 d712ed8d
......@@ -178,8 +178,10 @@ public class DirectLongList extends AbstractDirectList {
// Check if the array is nearly sorted
for (int k = left; k < right; run[count] = k) {
if (get(k) < get(k + 1)) { // ascending
//noinspection StatementWithEmptyBody
while (++k <= right && get(k - 1) <= get(k)) ;
} else if (get(k) > get(k + 1)) { // descending
//noinspection StatementWithEmptyBody
while (++k <= right && get(k - 1) >= get(k)) ;
for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) {
swap(lo, hi);
......@@ -220,12 +222,14 @@ public class DirectLongList extends AbstractDirectList {
//long[] b;
byte odd = 0;
//noinspection StatementWithEmptyBody
for (int n = 1; (n <<= 1) < count; odd ^= 1) ;
if (odd == 0) {
b = this;
a = new DirectLongList(this.size());
//noinspection StatementWithEmptyBody
for (int i = left - 1; ++i < right; a.set(i, b.get(i))) ;
} else {
a = this;
......@@ -246,6 +250,7 @@ public class DirectLongList extends AbstractDirectList {
run[++last] = hi;
}
if ((count & 1) != 0) {
//noinspection StatementWithEmptyBody
for (int i = right, lo = run[count - 1]; --i >= lo; b.set(i, a.get(i))) ;
run[++last] = right;
}
......@@ -414,7 +419,9 @@ public class DirectLongList extends AbstractDirectList {
/*
* Skip elements, which are less or greater than pivot values.
*/
//noinspection StatementWithEmptyBody
while (get(++less) < pivot1) ;
//noinspection StatementWithEmptyBody
while (get(--great) > pivot2) ;
/*
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -25,7 +25,7 @@ import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import java.util.List;
public class MapMetadata implements RecordMetadata {
public final class MapMetadata implements RecordMetadata {
private final ObjIntHashMap<CharSequence> nameCache;
private final int columnCount;
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -26,7 +26,7 @@ import com.nfsdb.utils.Unsafe;
import java.io.InputStream;
import java.io.OutputStream;
public class MapRecord extends AbstractRecord {
public final class MapRecord extends AbstractRecord {
private final int split;
private final int keyDataOffset;
private final int keyBlockOffset;
......
......@@ -22,15 +22,23 @@ import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
import com.nfsdb.utils.Unsafe;
public class MapRecordSource extends AbstractImmutableIterator<Record> implements RecordSource<Record> {
import java.util.List;
public final class MapRecordSource extends AbstractImmutableIterator<Record> implements RecordSource<Record> {
private final MapRecord record;
private final RecordMetadata metadata;
private final MapValues values;
private final List<MapRecordValueInterceptor> interceptors;
private final int interceptorsLen;
private int count;
private long address;
public MapRecordSource(MapRecord record, RecordMetadata metadata) {
MapRecordSource(MapRecord record, RecordMetadata metadata, MapValues values, List<MapRecordValueInterceptor> interceptors) {
this.record = record;
this.metadata = metadata;
this.values = values;
this.interceptors = interceptors;
this.interceptorsLen = interceptors != null ? interceptors.size() : 0;
}
MapRecordSource init(long address, int count) {
......@@ -58,6 +66,15 @@ public class MapRecordSource extends AbstractImmutableIterator<Record> implement
long address = this.address;
this.address = address + Unsafe.getUnsafe().getInt(address);
count--;
if (interceptorsLen > 0) {
notifyInterceptors(address);
}
return record.init(address);
}
private void notifyInterceptors(long address) {
for (int i = 0; i < interceptorsLen; i++) {
interceptors.get(i).beforeRecord(values.init(address, false));
}
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.collections.mmap;
public interface MapRecordValueInterceptor {
void beforeRecord(MapValues values);
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,7 +20,7 @@ import com.nfsdb.utils.Unsafe;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@SuppressFBWarnings({"EI_EXPOSE_REP2"})
public class MapValues {
public final class MapValues {
private final int valueOffsets[];
public long address;
private boolean _new;
......@@ -37,6 +37,14 @@ public class MapValues {
return Unsafe.getUnsafe().getDouble(address0(index));
}
public void putLong(int index, long value) {
Unsafe.getUnsafe().putLong(address0(index), value);
}
public long getLong(int index) {
return Unsafe.getUnsafe().getLong(address0(index));
}
public void putInt(int index, int value) {
Unsafe.getUnsafe().putInt(address0(index), value);
}
......@@ -49,7 +57,7 @@ public class MapValues {
return address + valueOffsets[index];
}
MapValues beginRead(long address, boolean _new) {
MapValues init(long address, boolean _new) {
this.address = address;
this._new = _new;
return this;
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -35,10 +35,10 @@ public class MultiMap implements Closeable {
private final float loadFactor;
private final Key key = new Key();
private final MapRecordSource recordSource;
private final MapValues mapValues0;
private final MapValues values;
private int keyBlockOffset;
private int keyDataOffset;
private DirectLongList keyOffsets;
private DirectLongList offsets;
private long kAddress;
private long kStart;
private long kLimit;
......@@ -47,7 +47,7 @@ public class MultiMap implements Closeable {
private int keyCapacity;
private int size = 0;
private MultiMap(int capacity, long dataSize, float loadFactor, List<ColumnMetadata> valueColumns, List<ColumnMetadata> keyColumns) {
private MultiMap(int capacity, long dataSize, float loadFactor, List<ColumnMetadata> valueColumns, List<ColumnMetadata> keyColumns, List<MapRecordValueInterceptor> interceptors) {
this.loadFactor = loadFactor;
this.kAddress = Unsafe.getUnsafe().allocateMemory(dataSize + AbstractDirectList.CACHE_LINE_SIZE);
this.kStart = kPos = this.kAddress + (this.kAddress & (AbstractDirectList.CACHE_LINE_SIZE - 1));
......@@ -55,9 +55,9 @@ public class MultiMap implements Closeable {
this.keyCapacity = Primes.next((int) (capacity / loadFactor));
this.free = (int) (keyCapacity * loadFactor);
this.keyOffsets = new DirectLongList(keyCapacity);
this.keyOffsets.zero((byte) -1);
this.keyOffsets.setPos(keyCapacity);
this.offsets = new DirectLongList(keyCapacity);
this.offsets.zero((byte) -1);
this.offsets.setPos(keyCapacity);
int columnSplit = valueColumns.size();
int[] valueOffsets = new int[columnSplit];
......@@ -78,12 +78,12 @@ public class MultiMap implements Closeable {
}
}
this.mapValues0 = new MapValues(valueOffsets);
this.values = new MapValues(valueOffsets);
MapMetadata metadata = new MapMetadata(valueColumns, keyColumns);
this.keyBlockOffset = offset;
this.keyDataOffset = this.keyBlockOffset + 4 * keyColumns.size();
MapRecord record = new MapRecord(metadata, valueOffsets, keyDataOffset, keyBlockOffset);
this.recordSource = new MapRecordSource(record, metadata);
this.recordSource = new MapRecordSource(record, metadata, this.values, interceptors);
}
......@@ -91,19 +91,19 @@ public class MultiMap implements Closeable {
// calculate hash remembering "key" structure
// [ len | value block | key offset block | key data block ]
int index = Hash.hashXX(key.startAddr + keyBlockOffset, key.len - keyBlockOffset, seed) % keyCapacity;
long offset = keyOffsets.get(index);
long offset = offsets.get(index);
if (offset == -1) {
keyOffsets.set(index, key.startAddr - kStart);
offsets.set(index, key.startAddr - kStart);
if (--free == 0) {
rehash();
}
size++;
return mapValues0.beginRead(key.startAddr, true);
return values.init(key.startAddr, true);
} else if (eq(key, offset)) {
// rollback added key
kPos = key.startAddr;
return mapValues0.beginRead(kStart + offset, false);
return values.init(kStart + offset, false);
} else {
return probe0(key, index);
}
......@@ -111,20 +111,20 @@ public class MultiMap implements Closeable {
private MapValues probe0(Key key, int index) {
long offset;
while ((offset = keyOffsets.get(index = (++index % keyCapacity))) != -1) {
while ((offset = offsets.get(index = (++index % keyCapacity))) != -1) {
if (eq(key, offset)) {
kPos = key.startAddr;
return mapValues0.beginRead(kStart + offset, false);
return values.init(kStart + offset, false);
}
}
keyOffsets.set(index, key.startAddr - kStart);
offsets.set(index, key.startAddr - kStart);
free--;
if (free == 0) {
rehash();
}
size++;
return mapValues0.beginRead(key.startAddr, true);
return values.init(key.startAddr, true);
}
private boolean eq(Key key, long offset) {
......@@ -159,7 +159,7 @@ public class MultiMap implements Closeable {
}
public Key claimKey() {
return key.beginWrite();
return key.init();
}
private void resize() {
......@@ -170,9 +170,10 @@ public class MultiMap implements Closeable {
Unsafe.getUnsafe().copyMemory(this.kStart, kStart, kCapacity >> 1);
Unsafe.getUnsafe().freeMemory(this.kAddress);
key.startAddr = kStart + (key.startAddr - this.kStart);
key.appendAddr = kStart + (key.appendAddr - this.kStart);
key.nextColOffset = kStart + (key.nextColOffset - this.kStart);
long d = kStart - this.kStart;
key.startAddr += d;
key.appendAddr += d;
key.nextColOffset += d;
this.kAddress = kAddress;
......@@ -186,8 +187,8 @@ public class MultiMap implements Closeable {
pointers.zero((byte) -1);
pointers.setPos(capacity);
for (int i = 0, sz = this.keyOffsets.size(); i < sz; i++) {
long offset = this.keyOffsets.get(i);
for (int i = 0, sz = this.offsets.size(); i < sz; i++) {
long offset = this.offsets.get(i);
if (offset == -1) {
continue;
}
......@@ -197,8 +198,8 @@ public class MultiMap implements Closeable {
}
pointers.set(index, offset);
}
this.keyOffsets.free();
this.keyOffsets = pointers;
this.offsets.free();
this.offsets = pointers;
this.free += (capacity - keyCapacity) * loadFactor;
this.keyCapacity = capacity;
}
......@@ -208,7 +209,7 @@ public class MultiMap implements Closeable {
Unsafe.getUnsafe().freeMemory(kAddress);
kAddress = 0;
}
keyOffsets.free();
offsets.free();
}
public MapRecordSource getRecordSource() {
......@@ -232,12 +233,13 @@ public class MultiMap implements Closeable {
kPos = kStart;
free = (int) (keyCapacity * loadFactor);
size = 0;
keyOffsets.clear((byte) -1);
offsets.clear((byte) -1);
}
public static class Builder {
private final List<ColumnMetadata> valueColumns = new ArrayList<>();
private final List<ColumnMetadata> keyColumns = new ArrayList<>();
private final List<MapRecordValueInterceptor> interceptors = new ArrayList<>();
private int capacity = 67;
private long dataSize = 4096;
private float loadFactor = 0.5f;
......@@ -267,8 +269,13 @@ public class MultiMap implements Closeable {
return this;
}
public Builder interceptor(MapRecordValueInterceptor interceptor) {
interceptors.add(interceptor);
return this;
}
public MultiMap build() {
return new MultiMap(capacity, dataSize, loadFactor, valueColumns, keyColumns);
return new MultiMap(capacity, dataSize, loadFactor, valueColumns, keyColumns, interceptors);
}
}
......@@ -324,13 +331,13 @@ public class MultiMap implements Closeable {
return this;
}
public Key $() {
public Key commit() {
Unsafe.getUnsafe().putInt(startAddr, len = (int) (appendAddr - startAddr));
kPos = appendAddr;
return this;
}
public Key beginWrite() {
public Key init() {
startAddr = kPos;
appendAddr = startAddr + keyDataOffset;
nextColOffset = startAddr + keyBlockOffset;
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -51,7 +51,7 @@ public class ColumnMetadata {
'}';
}
public void copy(ColumnMetadata from) {
public ColumnMetadata copy(ColumnMetadata from) {
this.name = from.name;
this.type = from.type;
this.offset = from.offset;
......@@ -63,6 +63,17 @@ public class ColumnMetadata {
this.distinctCountHint = from.distinctCountHint;
this.sameAs = from.sameAs;
this.noCache = from.noCache;
return this;
}
public ColumnMetadata setType(ColumnType type) {
this.type = type;
return this;
}
public ColumnMetadata setName(String name) {
this.name = name;
return this;
}
public void write(HugeBuffer buf) {
......
......@@ -62,8 +62,7 @@ public class JournalMetadataBuilder<T> implements JMetadataBuilder<T> {
this.lag = model.getLag();
for (int i = 0; i < model.getColumnCount(); i++) {
ColumnMetadata from = model.getColumnMetadata(i);
ColumnMetadata to = columnMetadata.get(from.name);
to.copy(from);
columnMetadata.get(from.name).copy(from);
}
}
......
......@@ -63,10 +63,8 @@ public class JournalStructure implements JMetadataBuilder<Object> {
this.openFileTTL = model.getOpenFileTTL();
this.lag = model.getLag();
for (int i = 0; i < model.getColumnCount(); i++) {
ColumnMetadata from = model.getColumnMetadata(i);
ColumnMetadata to = new ColumnMetadata();
to.copy(from);
metadata.add(to);
metadata.add(to.copy(model.getColumnMetadata(i)));
nameToIndexMap.put(to.name, i);
}
}
......@@ -273,8 +271,7 @@ public class JournalStructure implements JMetadataBuilder<Object> {
private ColumnMetadata newMeta(String name) {
int index = nameToIndexMap.get(name);
if (index == -1) {
ColumnMetadata meta = new ColumnMetadata();
meta.name = name;
ColumnMetadata meta = new ColumnMetadata().setName(name);
metadata.add(meta);
nameToIndexMap.put(name, metadata.size() - 1);
return meta;
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -22,10 +22,46 @@ import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
public interface AggregatorFunction {
/**
* Columns that aggregation function writes out. Out of {#ColumnMetadata} objects
* returned significant fields are "name" and "type". All other fields are ignored.
* <p/>
* Complex functions, such as average, may need to write three columns, "count", "sum" and
* possibly "average" itself.
* <p/>
* Returned array and its objects are immutable by convention. There is no need to
* be defensive and copy arrays.
* <p/>
* Also it is possible that this method is called multiple times, so expensive
* operations must be cached by implementation.
*
* @return array of required columns
*/
ColumnMetadata[] getColumns();
/**
* Callback to give implementation opportunity to resolve column names to their indexes.
*
* @param source record source
*/
void prepareSource(RecordSource<? extends Record> source);
/**
* When calculating values implementing classes will be sharing columns of {#Record}. Each
* aggregator columns indexes need to be mapped into space of "record" column indexes. This
* is done once when aggregator is prepared. Implementing class must maintain column mapping
* in an int[] array. For example:
* <p/>
* <pre>
* private int map[] = new int[columnCount];
* ...
* map[k] = i;
* </pre>
*
* @param k column index in space of aggregator function
* @param i column index in space of "record"
*/
void mapColumn(int k, int i);
void calculate(Record rec, MapValues values);
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.mmap.MapRecordValueInterceptor;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.column.ColumnType;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
public class AvgDoubleAggregationFunction implements AggregatorFunction, MapRecordValueInterceptor {
private final ColumnMetadata sourceColumn;
private int columnIndex;
private int countIdx;
private int sumIdx;
private int avgIdx;
public AvgDoubleAggregationFunction(ColumnMetadata sourceColumn) {
this.sourceColumn = sourceColumn;
}
@Override
public ColumnMetadata[] getColumns() {
return new ColumnMetadata[]{
new ColumnMetadata().setName("$count").setType(ColumnType.LONG)
, new ColumnMetadata().setName("$sum").setType(ColumnType.DOUBLE)
, new ColumnMetadata().setName("avg").setType(ColumnType.DOUBLE)
};
}
@Override
public void prepareSource(RecordSource<? extends Record> source) {
this.columnIndex = source.getMetadata().getColumnIndex(sourceColumn.name);
}
@Override
public void mapColumn(int k, int i) {
switch (k) {
case 0:
countIdx = i;
break;
case 1:
sumIdx = i;
break;
case 2:
avgIdx = i;
}
}
@Override
public void calculate(Record rec, MapValues values) {
if (values.isNew()) {
values.putLong(countIdx, 1);
values.putDouble(sumIdx, rec.getDouble(columnIndex));
} else {
values.putLong(countIdx, values.getLong(countIdx) + 1);
values.putDouble(sumIdx, values.getDouble(sumIdx) + rec.getDouble(columnIndex));
}
}
@Override
public void beforeRecord(MapValues values) {
values.putDouble(avgIdx, values.getDouble(sumIdx) / values.getLong(countIdx));
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -27,9 +27,7 @@ public class CountIntAggregatorFunction extends AbstractAggregatorFunction {
private final ColumnMetadata[] meta = new ColumnMetadata[1];
public CountIntAggregatorFunction(String name) {
this.meta[0] = new ColumnMetadata();
this.meta[0].name = name;
this.meta[0].type = ColumnType.INT;
this.meta[0] = new ColumnMetadata().setName(name).setType(ColumnType.INT);
}
@Override
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.column.ColumnType;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
public class CountLongAggregatorFunction extends AbstractAggregatorFunction {
private final ColumnMetadata[] meta = new ColumnMetadata[1];
public CountLongAggregatorFunction(String name) {
this.meta[0] = new ColumnMetadata();
this.meta[0].name = name;
this.meta[0].type = ColumnType.LONG;
}
@Override
protected ColumnMetadata[] getColumnsInternal() {
return meta;
}
@Override
public void calculate(Record rec, MapValues values) {
if (values.isNew()) {
values.putLong(map(0), 1);
} else {
values.putLong(map(0), values.getLong(map(0)) + 1);
}
}
@Override
public void prepareSource(RecordSource<? extends Record> source) {
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.Record;
public class FirstLongAggregationFunction extends AbstractSingleColumnAggregatorFunction {
public FirstLongAggregationFunction(ColumnMetadata meta) {
super(meta);
}
@Override
public void calculate(Record rec, MapValues values) {
if (values.isNew()) {
values.putLong(map(0), rec.getLong(getColumnIndex()));
}
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -14,16 +14,16 @@
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl;
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.AbstractImmutableIterator;
import com.nfsdb.collections.mmap.MapRecordSource;
import com.nfsdb.collections.mmap.MapRecordValueInterceptor;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.collections.mmap.MultiMap;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.agg.AggregatorFunction;
import com.nfsdb.lang.cst.impl.qry.GenericRecordSource;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordMetadata;
......@@ -32,7 +32,7 @@ import com.nfsdb.utils.Dates;
import java.util.List;
public class Resampler extends AbstractImmutableIterator<Record> implements GenericRecordSource {
public class ResampledSource extends AbstractImmutableIterator<Record> implements GenericRecordSource {
private final MultiMap map;
private final RecordSource<? extends Record> rowSource;
......@@ -43,7 +43,7 @@ public class Resampler extends AbstractImmutableIterator<Record> implements Gene
private MapRecordSource mapRecordSource;
private Record nextRecord = null;
public Resampler(RecordSource<? extends Record> rowSource, List<ColumnMetadata> keyColumns, List<AggregatorFunction> aggregators, ColumnMetadata timestampMetadata, SampleBy sampleBy) {
public ResampledSource(RecordSource<? extends Record> rowSource, List<ColumnMetadata> keyColumns, List<AggregatorFunction> aggregators, ColumnMetadata timestampMetadata, SampleBy sampleBy) {
MultiMap.Builder builder = new MultiMap.Builder();
int keyColumnsSize = keyColumns.size();
......@@ -71,6 +71,10 @@ public class Resampler extends AbstractImmutableIterator<Record> implements Gene
builder.valueColumn(columns[k]);
func.mapColumn(k, index++);
}
if (func instanceof MapRecordValueInterceptor) {
builder.interceptor((MapRecordValueInterceptor) func);
}
}
this.map = builder.build();
......@@ -163,7 +167,7 @@ public class Resampler extends AbstractImmutableIterator<Record> implements Gene
throw new JournalRuntimeException("Unsupported type: " + rowSource.getMetadata().getColumnType(i + 1));
}
}
key.$();
key.commit();
MapValues values = map.claimSlot(key);
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.Record;
public class SumDoubleAggregationFunction extends AbstractSingleColumnAggregatorFunction {
public SumDoubleAggregationFunction(ColumnMetadata meta) {
super(meta);
}
@Override
public void calculate(Record rec, MapValues values) {
if (values.isNew()) {
values.putDouble(map(0), rec.getDouble(getColumnIndex()));
} else {
int c = map(0);
values.putDouble(c, values.getDouble(c) + rec.getDouble(getColumnIndex()));
}
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.Record;
public class SumIntAggregationFunction extends AbstractSingleColumnAggregatorFunction {
public SumIntAggregationFunction(ColumnMetadata meta) {
super(meta);
}
@Override
public void calculate(Record rec, MapValues values) {
if (values.isNew()) {
values.putInt(map(0), rec.getInt(getColumnIndex()));
} else {
int c = map(0);
values.putInt(c, values.getInt(c) + rec.getInt(getColumnIndex()));
}
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.lang.cst.impl.agg;
import com.nfsdb.collections.mmap.MapValues;
import com.nfsdb.column.ColumnType;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.qry.Record;
public class SumIntToLongAggregationFunction extends AbstractSingleColumnAggregatorFunction {
public SumIntToLongAggregationFunction(ColumnMetadata meta) {
super(new ColumnMetadata().copy(meta).setType(ColumnType.LONG));
}
@Override
public void calculate(Record rec, MapValues values) {
if (values.isNew()) {
values.putLong(map(0), rec.getInt(getColumnIndex()));
} else {
int c = map(0);
values.putLong(c, values.getLong(c) + rec.getInt(getColumnIndex()));
}
}
}
......@@ -51,6 +51,7 @@ public class JournalEventBridge {
private final BatchEventProcessor<JournalEvent> batchEventProcessor;
private final RingBuffer<JournalEvent> outRingBuffer;
private final SequenceBarrier outBarrier;
@SuppressWarnings("CanBeFinal")
private volatile AgentBarrierHolder agentBarrierHolder = new AgentBarrierHolder();
......
......@@ -119,6 +119,7 @@ public class QueryHeadBuilderImpl<T> implements QueryHeadBuilder<T> {
zone2Keys.reset(symbolKeys.size());
zone1Keys.add(symbolKeys);
//noinspection ConstantConditions
return journal.iteratePartitionsDesc(
new UnorderedResultSetBuilder<T>(interval) {
private final KVIndex filterKVIndexes[] = new KVIndex[filterSymbolKeys.size()];
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -126,7 +126,7 @@ public class MultiMapTest extends AbstractTest {
map.claimKey()
.putLong(Dates.floorMI(ts))
.putInt(e.getInt(symIndex))
.$()
.commit()
);
val.putInt(0, val.isNew() ? 1 : val.getInt(0) + 1);
......
......@@ -21,11 +21,7 @@ import com.nfsdb.JournalWriter;
import com.nfsdb.export.RecordSourcePrinter;
import com.nfsdb.export.StringSink;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.lang.cst.impl.Resampler;
import com.nfsdb.lang.cst.impl.agg.AggregatorFunction;
import com.nfsdb.lang.cst.impl.agg.CountIntAggregatorFunction;
import com.nfsdb.lang.cst.impl.agg.FirstDoubleAggregationFunction;
import com.nfsdb.lang.cst.impl.agg.LastDoubleAggregationFunction;
import com.nfsdb.lang.cst.impl.agg.*;
import com.nfsdb.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.lang.cst.impl.psrc.JournalPartitionSource;
import com.nfsdb.lang.cst.impl.rsrc.AllRowSource;
......@@ -38,7 +34,7 @@ import org.junit.Test;
import java.util.ArrayList;
public class ResamplerTest extends AbstractTest {
public class ResampledSourceTest extends AbstractTest {
@Test
public void testResampleWithCount() throws Exception {
......@@ -222,7 +218,7 @@ public class ResamplerTest extends AbstractTest {
final Journal r = factory.reader(Quote.class.getName());
Resampler resampler = new Resampler(
ResampledSource resampledSource = new ResampledSource(
new JournalSourceImpl(
new JournalPartitionSource(r, false)
, new AllRowSource()
......@@ -238,12 +234,12 @@ public class ResamplerTest extends AbstractTest {
add(new LastDoubleAggregationFunction(r.getMetadata().getColumnMetadata("ask")));
}}
, r.getMetadata().getTimestampMetadata()
, Resampler.SampleBy.MINUTE
, ResampledSource.SampleBy.MINUTE
);
StringSink sink = new StringSink();
RecordSourcePrinter out = new RecordSourcePrinter(sink);
out.print(resampler);
out.print(resampledSource);
Assert.assertEquals(expected, sink.toString());
}
}
\ No newline at end of file
......@@ -269,7 +269,7 @@ public class CstTest {
map.claimKey()
.putLong(ts)
.putInt(e.getInt(symIndex))
.$()
.commit()
);
val.putInt(0, val.isNew() ? 1 : val.getInt(0) + 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册