提交 e71e30a0 编写于 作者: V Vlad Ilyushchenko

metadata refactoring

上级 e7d78c26
......@@ -24,6 +24,7 @@ import com.nfsdb.concurrent.TimerCache;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.factory.JournalClosingListener;
import com.nfsdb.factory.configuration.ColumnMetaWithSymTab;
import com.nfsdb.factory.configuration.Constants;
import com.nfsdb.factory.configuration.JournalMetadata;
import com.nfsdb.iterators.ConcurrentIterator;
......@@ -73,7 +74,7 @@ public class Journal<T> implements Iterable<T>, Closeable {
private final BitSet inactiveColumns;
TxLog txLog;
boolean open;
ColumnMetadata[] columnMetadata;
ColumnMetaWithSymTab[] columnMetadata;
private Partition<T> irregularPartition;
private JournalClosingListener closeListener;
......@@ -85,7 +86,7 @@ public class Journal<T> implements Iterable<T>, Closeable {
this.timerCache = timerCache;
this.txLog = new TxLog(location, getMode());
this.open = true;
this.timestampOffset = getMetadata().getTimestampColumnMetadata() == null ? -1 : getMetadata().getTimestampColumnMetadata().offset;
this.timestampOffset = getMetadata().getTimestampMetadata() == null ? -1 : getMetadata().getTimestampMetadata().offset;
this.inactiveColumns = new BitSet(metadata.getColumnCount());
configure();
......@@ -367,7 +368,7 @@ public class Journal<T> implements Iterable<T>, Closeable {
* @param columnIndex the column index (0-indexed)
* @return the specified column's metadata
*/
public ColumnMetadata getColumnMetadata(int columnIndex) {
public ColumnMetaWithSymTab getColumnMetadata(int columnIndex) {
return columnMetadata[columnIndex];
}
......@@ -594,10 +595,10 @@ public class Journal<T> implements Iterable<T>, Closeable {
private void configureColumns() throws JournalException {
int columnCount = getMetadata().getColumnCount();
columnMetadata = new ColumnMetadata[columnCount];
columnMetadata = new ColumnMetaWithSymTab[columnCount];
for (int i = 0; i < columnCount; i++) {
columnMetadata[i] = new ColumnMetadata();
com.nfsdb.factory.configuration.ColumnMetadata meta = metadata.getColumnMetadata(i);
columnMetadata[i] = new ColumnMetaWithSymTab(meta);
if (meta.type == ColumnType.SYMBOL && meta.sameAs == null) {
int tabIndex = symbolTables.size();
int tabSize = tx.symbolTableSizes.length > tabIndex ? tx.symbolTableSizes[tabIndex] : 0;
......@@ -607,7 +608,6 @@ public class Journal<T> implements Iterable<T>, Closeable {
symbolTableMap.put(meta.name, tab);
columnMetadata[i].symbolTable = tab;
}
columnMetadata[i].meta = meta;
}
}
......@@ -709,9 +709,4 @@ public class Journal<T> implements Iterable<T>, Closeable {
configureIrregularPartition();
}
}
public static class ColumnMetadata {
public SymbolTable symbolTable;
public com.nfsdb.factory.configuration.ColumnMetadata meta;
}
}
......@@ -19,6 +19,7 @@ package com.nfsdb;
import com.nfsdb.column.*;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.factory.configuration.ColumnMetaWithSymTab;
import com.nfsdb.utils.Checksum;
import java.io.InputStream;
......@@ -27,7 +28,7 @@ import java.util.BitSet;
public class JournalEntryWriterImpl implements JournalEntryWriter {
private final JournalWriter journal;
private final Journal.ColumnMetadata meta[];
private final ColumnMetaWithSymTab meta[];
private final int timestampIndex;
private final BitSet updated = new BitSet();
private final long[] koTuple;
......@@ -39,7 +40,7 @@ public class JournalEntryWriterImpl implements JournalEntryWriter {
public JournalEntryWriterImpl(JournalWriter journal) {
this.journal = journal;
this.meta = journal.columnMetadata;
this.timestampIndex = journal.getMetadata().getTimestampColumnIndex();
this.timestampIndex = journal.getMetadata().getTimestampIndex();
koTuple = new long[meta.length * 2];
}
......@@ -140,7 +141,7 @@ public class JournalEntryWriterImpl implements JournalEntryWriter {
}
columns[i].commit();
if (meta[i].meta.indexed) {
if (meta[i].indexed) {
indexProxies[i].getIndex().add((int) koTuple[i * 2], koTuple[i * 2 + 1]);
}
}
......@@ -162,14 +163,14 @@ public class JournalEntryWriterImpl implements JournalEntryWriter {
}
private void assertType(int index, ColumnType t) {
if (meta[index].meta.type != t) {
throw new JournalRuntimeException("Expected type: " + meta[index].meta.type);
if (meta[index].type != t) {
throw new JournalRuntimeException("Expected type: " + meta[index].type);
}
}
private void putString0(int index, CharSequence value) {
if (meta[index].meta.indexed) {
koTuple[index * 2] = value == null ? SymbolTable.VALUE_IS_NULL : Checksum.hash(value, meta[index].meta.distinctCountHint);
if (meta[index].indexed) {
koTuple[index * 2] = value == null ? SymbolTable.VALUE_IS_NULL : Checksum.hash(value, meta[index].distinctCountHint);
koTuple[index * 2 + 1] = ((VariableColumn) columns[index]).putStr(value);
} else {
((VariableColumn) columns[index]).putStr(value);
......@@ -183,7 +184,7 @@ public class JournalEntryWriterImpl implements JournalEntryWriter {
} else {
key = meta[index].symbolTable.put(value);
}
if (meta[index].meta.indexed) {
if (meta[index].indexed) {
koTuple[index * 2] = key;
koTuple[index * 2 + 1] = ((FixedColumn) columns[index]).putInt(key);
} else {
......@@ -192,7 +193,7 @@ public class JournalEntryWriterImpl implements JournalEntryWriter {
}
private void putNull0(int index) {
switch (meta[index].meta.type) {
switch (meta[index].type) {
case STRING:
putString0(index, null);
break;
......@@ -211,8 +212,8 @@ public class JournalEntryWriterImpl implements JournalEntryWriter {
}
private void putInt0(int index, int value) {
if (meta[index].meta.indexed) {
koTuple[index * 2] = value % meta[index].meta.distinctCountHint;
if (meta[index].indexed) {
koTuple[index * 2] = value % meta[index].distinctCountHint;
koTuple[index * 2 + 1] = ((FixedColumn) columns[index]).putInt(value);
} else {
((FixedColumn) columns[index]).putInt(value);
......
......@@ -70,7 +70,7 @@ public class JournalWriter<T> extends Journal<T> {
public JournalWriter(JournalMetadata<T> metadata, JournalKey<T> key, TimerCache timerCache) throws JournalException {
super(metadata, key, timerCache);
if (metadata.isPartialMapping()) {
if (metadata.isPartialMapped()) {
close();
throw new JournalException("Metadata is unusable for writer. Partially mapped?");
}
......
......@@ -30,7 +30,7 @@ public class OrderedResultSet<T> extends ResultSet<T> {
return 0;
}
long rowID = getRowID(size() - 1);
int timestampColumnIndex = getJournal().getMetadata().getTimestampColumnIndex();
int timestampColumnIndex = getJournal().getMetadata().getTimestampIndex();
return getJournal().getPartition(Rows.toPartitionIndex(rowID), true).getLong(Rows.toLocalRowID(rowID), timestampColumnIndex);
}
}
......@@ -19,6 +19,7 @@ package com.nfsdb;
import com.nfsdb.column.*;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.factory.configuration.ColumnMetaWithSymTab;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.factory.configuration.JournalMetadata;
import com.nfsdb.index.KVIndex;
......@@ -75,7 +76,7 @@ public class Partition<T> implements Iterable<T>, Closeable {
open(i);
}
int tsIndex = journal.getMetadata().getTimestampColumnIndex();
int tsIndex = journal.getMetadata().getTimestampIndex();
if (tsIndex >= 0) {
timestampColumn = getFixedWidthColumn(tsIndex);
}
......@@ -214,47 +215,46 @@ public class Partition<T> implements Iterable<T>, Closeable {
public void read(long localRowID, T obj) {
for (int i = 0; i < columnCount; i++) {
Journal.ColumnMetadata m;
if (journal.getInactiveColumns().get(i) || (m = journal.columnMetadata[i]).meta.offset == 0) {
ColumnMetaWithSymTab m;
if (journal.getInactiveColumns().get(i) || (m = journal.columnMetadata[i]).offset == 0) {
continue;
}
switch (m.meta.type) {
switch (m.type) {
case BOOLEAN:
Unsafe.getUnsafe().putBoolean(obj, m.meta.offset, ((FixedColumn) columns[i]).getBool(localRowID));
Unsafe.getUnsafe().putBoolean(obj, m.offset, ((FixedColumn) columns[i]).getBool(localRowID));
break;
case BYTE:
Unsafe.getUnsafe().putByte(obj, m.meta.offset, ((FixedColumn) columns[i]).getByte(localRowID));
Unsafe.getUnsafe().putByte(obj, m.offset, ((FixedColumn) columns[i]).getByte(localRowID));
break;
case DOUBLE:
Unsafe.getUnsafe().putDouble(obj, m.meta.offset, ((FixedColumn) columns[i]).getDouble(localRowID));
Unsafe.getUnsafe().putDouble(obj, m.offset, ((FixedColumn) columns[i]).getDouble(localRowID));
break;
case INT:
Unsafe.getUnsafe().putInt(obj, m.meta.offset, ((FixedColumn) columns[i]).getInt(localRowID));
Unsafe.getUnsafe().putInt(obj, m.offset, ((FixedColumn) columns[i]).getInt(localRowID));
break;
case LONG:
case DATE:
Unsafe.getUnsafe().putLong(obj, m.meta.offset, ((FixedColumn) columns[i]).getLong(localRowID));
Unsafe.getUnsafe().putLong(obj, m.offset, ((FixedColumn) columns[i]).getLong(localRowID));
break;
case SHORT:
Unsafe.getUnsafe().putShort(obj, m.meta.offset, ((FixedColumn) columns[i]).getShort(localRowID));
Unsafe.getUnsafe().putShort(obj, m.offset, ((FixedColumn) columns[i]).getShort(localRowID));
break;
case STRING:
Unsafe.getUnsafe().putObject(obj, m.meta.offset, ((VariableColumn) columns[i]).getStr(localRowID));
Unsafe.getUnsafe().putObject(obj, m.offset, ((VariableColumn) columns[i]).getStr(localRowID));
break;
case SYMBOL:
Unsafe.getUnsafe().putObject(obj, m.meta.offset, m.symbolTable.value(((FixedColumn) columns[i]).getInt(localRowID)));
Unsafe.getUnsafe().putObject(obj, m.offset, m.symbolTable.value(((FixedColumn) columns[i]).getInt(localRowID)));
break;
case BINARY:
readBin(localRowID, obj, i, m);
}
}
}
private void readBin(long localRowID, T obj, int i, Journal.ColumnMetadata m) {
private void readBin(long localRowID, T obj, int i, ColumnMetadata m) {
int size = ((VariableColumn) columns[i]).getBinSize(localRowID);
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, m.meta.offset);
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, m.offset);
if (size == -1) {
if (buf != null) {
buf.clear();
......@@ -262,7 +262,7 @@ public class Partition<T> implements Iterable<T>, Closeable {
} else {
if (buf == null || buf.capacity() < size) {
buf = ByteBuffer.allocate(size);
Unsafe.getUnsafe().putObject(obj, m.meta.offset, buf);
Unsafe.getUnsafe().putObject(obj, m.offset, buf);
}
if (buf.remaining() < size) {
......@@ -309,7 +309,7 @@ public class Partition<T> implements Iterable<T>, Closeable {
}
public FixedColumn getTimestampColumn() {
return getFixedWidthColumn(journal.getMetadata().getTimestampColumnIndex());
return getFixedWidthColumn(journal.getMetadata().getTimestampIndex());
}
public void rebuildIndexes() throws JournalException {
......@@ -351,7 +351,7 @@ public class Partition<T> implements Iterable<T>, Closeable {
getIndexForColumn(columnIndex).close();
File base = journal.getMetadata().getColumnIndexBase(partitionDir, columnIndex);
File base = new File(partitionDir, journal.getMetadata().getColumnMetadata(columnIndex).name);
KVIndex.delete(base);
try (KVIndex index = new KVIndex(base, keyCountHint, recordCountHint, txCountHint, JournalMode.APPEND, 0)) {
......@@ -474,36 +474,36 @@ public class Partition<T> implements Iterable<T>, Closeable {
try {
for (int i = 0; i < columnCount; i++) {
Journal.ColumnMetadata meta = journal.getColumnMetadata(i);
ColumnMetaWithSymTab meta = journal.getColumnMetadata(i);
switch (meta.meta.type) {
switch (meta.type) {
case INT:
int v = Unsafe.getUnsafe().getInt(obj, meta.meta.offset);
if (meta.meta.indexed) {
sparseIndexProxies[i].getIndex().add(v % meta.meta.distinctCountHint, ((FixedColumn) columns[i]).putInt(v));
int v = Unsafe.getUnsafe().getInt(obj, meta.offset);
if (meta.indexed) {
sparseIndexProxies[i].getIndex().add(v % meta.distinctCountHint, ((FixedColumn) columns[i]).putInt(v));
} else {
((FixedColumn) columns[i]).putInt(v);
}
break;
case STRING:
String s = (String) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
String s = (String) Unsafe.getUnsafe().getObject(obj, meta.offset);
long offset = ((VariableColumn) columns[i]).putStr(s);
if (meta.meta.indexed) {
if (meta.indexed) {
sparseIndexProxies[i].getIndex().add(
s == null ? SymbolTable.VALUE_IS_NULL : Checksum.hash(s, meta.meta.distinctCountHint)
s == null ? SymbolTable.VALUE_IS_NULL : Checksum.hash(s, meta.distinctCountHint)
, offset
);
}
break;
case SYMBOL:
int key;
String sym = (String) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
String sym = (String) Unsafe.getUnsafe().getObject(obj, meta.offset);
if (sym == null) {
key = SymbolTable.VALUE_IS_NULL;
} else {
key = meta.symbolTable.put(sym);
}
if (meta.meta.indexed) {
if (meta.indexed) {
sparseIndexProxies[i].getIndex().add(key, ((FixedColumn) columns[i]).putInt(key));
} else {
((FixedColumn) columns[i]).putInt(key);
......@@ -513,7 +513,7 @@ public class Partition<T> implements Iterable<T>, Closeable {
appendBin(obj, i, meta);
break;
default:
((FixedColumn) columns[i]).copy(obj, meta.meta.offset);
((FixedColumn) columns[i]).copy(obj, meta.offset);
break;
}
......@@ -527,8 +527,8 @@ public class Partition<T> implements Iterable<T>, Closeable {
}
}
private void appendBin(T obj, int i, Journal.ColumnMetadata meta) {
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, meta.meta.offset);
private void appendBin(T obj, int i, ColumnMetadata meta) {
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, meta.offset);
if (buf == null || buf.remaining() == 0) {
((VariableColumn) columns[i]).putNull();
} else {
......
......@@ -19,6 +19,7 @@ package com.nfsdb;
import com.nfsdb.collections.DirectLongList;
import com.nfsdb.column.SymbolTable;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.factory.configuration.ColumnMetaWithSymTab;
import com.nfsdb.iterators.ConcurrentIterator;
import com.nfsdb.iterators.ResultSetBufferedIterator;
import com.nfsdb.iterators.ResultSetConcurrentIterator;
......@@ -46,12 +47,12 @@ public class ResultSet<T> implements Iterable<T> {
Partition<T> rightPart = journal.getPartition(Rows.toPartitionIndex(rightRowID), true);
for (int column : columns) {
Journal.ColumnMetadata meta = journal.getColumnMetadata(column);
ColumnMetaWithSymTab meta = journal.getColumnMetadata(column);
String leftStr;
String rightStr;
switch (meta.meta.type) {
switch (meta.type) {
case STRING:
leftStr = leftPart.getStr(leftLocalRowID, column);
rightStr = rightPart.getStr(rightLocalRowID, column);
......@@ -67,7 +68,7 @@ public class ResultSet<T> implements Iterable<T> {
}
break;
default:
switch (meta.meta.type) {
switch (meta.type) {
case INT:
result = compare(rightPart.getInt(rightLocalRowID, column), leftPart.getInt(leftLocalRowID, column));
break;
......@@ -100,7 +101,7 @@ public class ResultSet<T> implements Iterable<T> {
}
break;
default:
throw new JournalException("Unsupported type: " + meta.meta.type);
throw new JournalException("Unsupported type: " + meta.type);
}
}
......@@ -179,7 +180,7 @@ public class ResultSet<T> implements Iterable<T> {
}
public long[] readTimestamps() throws JournalException {
int timestampColIndex = journal.getMetadata().getTimestampColumnIndex();
int timestampColIndex = journal.getMetadata().getTimestampIndex();
long[] result = new long[size()];
for (int i = 0, rowIDsLength = rowIDs.size(); i < rowIDsLength; i++) {
......
......@@ -18,12 +18,14 @@ package com.nfsdb;
import com.nfsdb.concurrent.TimerCache;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.factory.configuration.ColumnMetadata;
import com.nfsdb.factory.configuration.JournalMetadata;
import com.nfsdb.index.KVIndex;
import com.nfsdb.logging.Logger;
import com.nfsdb.utils.Dates;
import java.io.Closeable;
import java.io.File;
class SymbolIndexProxy<T> implements Closeable {
......@@ -78,9 +80,15 @@ class SymbolIndexProxy<T> implements Closeable {
lastAccessed = timerCache.getCachedMillis();
if (index == null) {
JournalMetadata<T> meta = partition.getJournal().getMetadata();
ColumnMetadata columnMetadata = meta.getColumnMetadata(columnIndex);
if (!columnMetadata.indexed) {
throw new JournalException("There is no index for column: %s", columnMetadata.name);
}
index = new KVIndex(
meta.getColumnIndexBase(partition.getPartitionDir(), columnIndex),
meta.getColumnMetadata(columnIndex).distinctCountHint,
new File(partition.getPartitionDir(), columnMetadata.name),
columnMetadata.distinctCountHint,
meta.getRecordHint(),
meta.getTxCountHint(),
partition.getJournal().getMode(),
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.factory.configuration;
import com.nfsdb.column.SymbolTable;
public class ColumnMetaWithSymTab extends ColumnMetadata {
public SymbolTable symbolTable;
public ColumnMetaWithSymTab(ColumnMetadata meta) {
copy(meta);
}
}
......@@ -20,35 +20,63 @@ import com.nfsdb.JournalKey;
import com.nfsdb.PartitionType;
import com.nfsdb.column.HugeBuffer;
import com.nfsdb.exceptions.JournalRuntimeException;
import java.io.File;
import org.jetbrains.annotations.NotNull;
public interface JournalMetadata<T> {
JournalKey<T> deriveKey();
ColumnMetadata getColumnMetadata(String name);
/**
* Number of columns in Journal.
*
* @return column count
*/
int getColumnCount();
/**
* Lookup column metadata by name. This method is slower than
* simple array de-reference of index lookup. Column names are case-sensitive.
* <p/>
* This method cannot return null. An exception is thrown if column name is invalid.
*
* @param name of column
* @return column metadata
*/
@NotNull
ColumnMetadata getColumnMetadata(CharSequence name);
/**
* Lookup column metadata by index. This method does unchecked exception and
* will throw ArrayIndexOutOfBoundsException.
* <p/>
* To obtain column index and validate column name {@see #getColumnIndex}
*
* @param columnIndex index of column
* @return column metadata
*/
ColumnMetadata getColumnMetadata(int columnIndex);
/**
* Lookup column index and validate column name. Column names are case-sensitive and {#JournalRuntimeException} is
* thrown if column name is invalid.
*
* @param columnName the name
* @return 0-based column index
*/
int getColumnIndex(CharSequence columnName);
String getLocation();
ColumnMetadata getTimestampMetadata();
PartitionType getPartitionType();
int getTimestampIndex();
int getColumnCount();
ColumnMetadata getTimestampColumnMetadata();
String getLocation();
int getTimestampColumnIndex();
PartitionType getPartitionType();
Object newObject() throws JournalRuntimeException;
Class<T> getModelClass();
File getColumnIndexBase(File partitionDir, int columnIndex);
long getOpenFileTTL();
int getLag();
......@@ -65,5 +93,5 @@ public interface JournalMetadata<T> {
void write(HugeBuffer buf);
boolean isPartialMapping();
boolean isPartialMapped();
}
\ No newline at end of file
......@@ -53,7 +53,7 @@ public class JournalMetadataBuilder<T> implements JMetadataBuilder<T> {
this.modelClass = model.getModelClass();
parseClass();
this.location = model.getLocation();
this.tsColumnIndex = model.getTimestampColumnIndex();
this.tsColumnIndex = model.getTimestampIndex();
this.partitionBy = model.getPartitionType();
this.recordCountHint = model.getRecordHint();
this.txCountHint = model.getTxCountHint();
......
......@@ -24,8 +24,8 @@ import com.nfsdb.exceptions.JournalConfigurationException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.utils.Base64;
import com.nfsdb.utils.Checksum;
import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.lang.reflect.Constructor;
import java.util.Arrays;
......@@ -141,7 +141,8 @@ public class JournalMetadataImpl<T> implements JournalMetadata<T> {
}
@Override
public ColumnMetadata getColumnMetadata(String name) {
@NotNull
public ColumnMetadata getColumnMetadata(CharSequence name) {
return getColumnMetadata(getColumnIndex(name));
}
......@@ -175,12 +176,12 @@ public class JournalMetadataImpl<T> implements JournalMetadata<T> {
}
@Override
public ColumnMetadata getTimestampColumnMetadata() {
public ColumnMetadata getTimestampMetadata() {
return timestampMetadata;
}
@Override
public int getTimestampColumnIndex() {
public int getTimestampIndex() {
return timestampColumnIndex;
}
......@@ -198,15 +199,6 @@ public class JournalMetadataImpl<T> implements JournalMetadata<T> {
return modelClass;
}
@Override
public File getColumnIndexBase(File partitionDir, int columnIndex) {
ColumnMetadata meta = getColumnMetadata(columnIndex);
if (!meta.indexed) {
throw new JournalRuntimeException("There is no index for column: %s", meta.name);
}
return new File(partitionDir, meta.name);
}
@Override
public long getOpenFileTTL() {
return openFileTTL;
......@@ -245,7 +237,7 @@ public class JournalMetadataImpl<T> implements JournalMetadata<T> {
}
@Override
public boolean isPartialMapping() {
public boolean isPartialMapped() {
return partialMapping;
}
......
......@@ -55,7 +55,7 @@ public class JournalStructure implements JMetadataBuilder<Object> {
public JournalStructure(JournalMetadata model) {
this.location = model.getLocation();
this.tsColumnIndex = model.getTimestampColumnIndex();
this.tsColumnIndex = model.getTimestampIndex();
this.partitionBy = model.getPartitionType();
this.recordCountHint = model.getRecordHint();
this.txCountHint = model.getTxCountHint();
......
......@@ -43,7 +43,7 @@ public class ReplayIterator<T> extends AbstractImmutableIterator<T> {
this.clock = MilliClock.INSTANCE;
this.speed = speed;
this.timeSource = new TimeSource<T>() {
private final long timestampOffset = underlying.getJournal().getMetadata().getTimestampColumnMetadata().offset;
private final long timestampOffset = underlying.getJournal().getMetadata().getTimestampMetadata().offset;
@Override
public long getTicks(T object) {
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -237,7 +237,7 @@ public class ResamplerTest extends AbstractTest {
add(new FirstDoubleAggregationFunction(r.getMetadata().getColumnMetadata("ask")));
add(new LastDoubleAggregationFunction(r.getMetadata().getColumnMetadata("ask")));
}}
, r.getMetadata().getTimestampColumnMetadata()
, r.getMetadata().getTimestampMetadata()
, Resampler.SampleBy.MINUTE
);
......
......@@ -162,7 +162,7 @@ public final class TestUtils {
}
public static <T> void assertOrder(JournalIterator<T> rs) {
ColumnMetadata meta = rs.getJournal().getMetadata().getTimestampColumnMetadata();
ColumnMetadata meta = rs.getJournal().getMetadata().getTimestampMetadata();
long max = 0;
for (T obj : rs) {
long timestamp = Unsafe.getUnsafe().getLong(obj, meta.offset);
......@@ -174,7 +174,7 @@ public final class TestUtils {
}
public static <T> void assertOrderDesc(JournalIterator<T> rs) {
ColumnMetadata meta = rs.getJournal().getMetadata().getTimestampColumnMetadata();
ColumnMetadata meta = rs.getJournal().getMetadata().getTimestampMetadata();
long max = Long.MAX_VALUE;
for (T obj : rs) {
long timestamp = Unsafe.getUnsafe().getLong(obj, meta.offset);
......@@ -302,7 +302,7 @@ public final class TestUtils {
}
for (int k = 0; k < expected.getMetadata().getColumnCount(); k++) {
if (expected.getColumnMetadata(k).meta.indexed) {
if (expected.getColumnMetadata(k).indexed) {
KVIndex ei = ep.getIndexForColumn(k);
KVIndex ai = ap.getIndexForColumn(k);
......@@ -314,7 +314,7 @@ public final class TestUtils {
ei.getValues(j, ev);
ai.getValues(j, av);
Assert.assertEquals("Values mismatch. partition=" + i + ",column=" + expected.getColumnMetadata(k).meta.name + ", key=" + j + ": ", ev.size(), av.size());
Assert.assertEquals("Values mismatch. partition=" + i + ",column=" + expected.getColumnMetadata(k).name + ", key=" + j + ": ", ev.size(), av.size());
for (int l = 0; l < ev.size(); l++) {
Assert.assertEquals(ev.get(l), av.get(l));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册