提交 9febaa04 编写于 作者: V Vlad

durable commit implementation

上级 8c46910f
......@@ -48,7 +48,7 @@
</developers>
<scm>
<url>scm:git:https://github.com/bluestreak01/nfsdb.git</url>
<url>scm:git:https://github.com/NFSdb/nfsdb.git</url>
</scm>
<build>
......@@ -74,49 +74,65 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release-sign-artifacts</id>
<activation>
<property>
<name>release</name>
<value>true</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
<dependency>
......
......@@ -56,6 +56,8 @@ public class Journal<T> implements Iterable<T>, Closeable {
public static final long TX_LIMIT_EVAL = -1L;
private static final Logger LOGGER = Logger.getLogger(Journal.class);
protected final List<Partition<T>> partitions = new ArrayList<>();
// empty container for current transaction
protected final Tx tx = new Tx();
protected TxLog txLog;
private final JournalMetadata<T> metadata;
private final File location;
......@@ -126,8 +128,8 @@ public class Journal<T> implements Iterable<T>, Closeable {
public boolean refresh() throws JournalException {
if (txLog.hasNext()) {
Tx tx = txLog.get();
refresh(tx);
txLog.head(tx);
refreshInternal();
for (int i = 0; i < symbolTables.size(); i++) {
symbolTables.get(i).applyTx(tx.symbolTableSizes[i], tx.symbolTableIndexPointers[i]);
}
......@@ -559,10 +561,10 @@ public class Journal<T> implements Iterable<T>, Closeable {
}
protected void configure() throws JournalException {
Tx tx = txLog.get();
configureColumns(tx);
txLog.head(tx);
configureColumns();
configureSymbolTableSynonyms();
configurePartitions(tx);
configurePartitions();
}
protected void removeIrregularPartitionInternal() {
......@@ -582,7 +584,7 @@ public class Journal<T> implements Iterable<T>, Closeable {
return timerCache;
}
private void configureColumns(Tx tx) throws JournalException {
private void configureColumns() throws JournalException {
int columnCount = getMetadata().getColumnCount();
columnMetadata = new ColumnMetadata[columnCount];
for (int i = 0; i < columnCount; i++) {
......@@ -612,7 +614,7 @@ public class Journal<T> implements Iterable<T>, Closeable {
}
}
private void configurePartitions(Tx tx) throws JournalException {
private void configurePartitions() throws JournalException {
File[] files = getLocation().listFiles(new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() && !f.getName().startsWith(JournalConfiguration.TEMP_DIRECTORY_PREFIX);
......@@ -658,15 +660,15 @@ public class Journal<T> implements Iterable<T>, Closeable {
}
}
}
configureIrregularPartition(tx);
configureIrregularPartition();
}
private void configureIrregularPartition(Tx tx) throws JournalException {
private void configureIrregularPartition() throws JournalException {
// if journal is under intense write activity in another process
// lag partition can keep changing
// so we will be trying to pin lag partition
while (true) {
String lagPartitionName = tx != null ? tx.lagName : null;
String lagPartitionName = tx.lagName;
if (lagPartitionName != null && (irregularPartition == null || !lagPartitionName.equals(irregularPartition.getName()))) {
// new lag partition
// try to lock lag directory before any activity
......@@ -706,21 +708,21 @@ public class Journal<T> implements Iterable<T>, Closeable {
* Replaces current Lag partition, which is cached in this instance of Partition Manager with Lag partition,
* which was written to _lag file by another process.
*/
void refresh(Tx tx) throws JournalException {
void refreshInternal() throws JournalException {
assert tx != null;
assert tx.address > 0;
int txPartitionIndex = Rows.toPartitionIndex(tx.journalMaxRowID);
if (partitions.size() != txPartitionIndex + 1 || tx.journalMaxRowID == 0) {
if (tx.journalMaxRowID == 0 || partitions.size() > txPartitionIndex + 1) {
closePartitions();
}
configurePartitions(tx);
configurePartitions();
} else {
long txPartitionSize = Rows.toLocalRowID(tx.journalMaxRowID);
Partition<T> partition = partitions.get(txPartitionIndex);
partition.applyTx(txPartitionSize, tx.indexPointers);
configureIrregularPartition(tx);
configureIrregularPartition();
}
}
......
......@@ -121,37 +121,47 @@ public class JournalWriter<T> extends Journal<T> {
public void rollback() throws JournalException {
if (txActive) {
Tx tx = txLog.get();
// partitions need to be dealt with first to make sure new lag is assigned a correct partitionIndex
rollbackPartitions(tx);
Partition<T> lag = getIrregularPartition();
if (tx.lagName != null && tx.lagName.length() > 0 && (lag == null || !tx.lagName.equals(lag.getName()))) {
Partition<T> newLag = createTempPartition(tx.lagName);
setIrregularPartition(newLag);
newLag.applyTx(tx.lagSize, tx.lagIndexPointers);
} else if (lag != null && tx.lagName == null) {
removeIrregularPartitionInternal();
} else if (lag != null) {
lag.truncate(tx.lagSize);
}
rollback(txLog.headAddress());
txActive = false;
}
}
public void rollback(long address) throws JournalException {
if (tx.symbolTableSizes.length == 0) {
for (int i = 0; i < getSymbolTableCount(); i++) {
getSymbolTable(i).truncate();
}
} else {
for (int i = 0; i < getSymbolTableCount(); i++) {
getSymbolTable(i).truncate(tx.symbolTableSizes[i]);
}
txLog.get(address, tx);
if (tx.address == 0) {
throw new JournalException("Invalid transaction address");
}
// partitions need to be dealt with first to make sure new lag is assigned a correct partitionIndex
rollbackPartitions(tx);
Partition<T> lag = getIrregularPartition();
if (tx.lagName != null && tx.lagName.length() > 0 && (lag == null || !tx.lagName.equals(lag.getName()))) {
Partition<T> newLag = createTempPartition(tx.lagName);
setIrregularPartition(newLag);
newLag.applyTx(tx.lagSize, tx.lagIndexPointers);
} else if (lag != null && tx.lagName == null) {
removeIrregularPartitionInternal();
} else if (lag != null) {
lag.truncate(tx.lagSize);
}
if (tx.symbolTableSizes.length == 0) {
for (int i = 0; i < getSymbolTableCount(); i++) {
getSymbolTable(i).truncate();
}
} else {
for (int i = 0; i < getSymbolTableCount(); i++) {
getSymbolTable(i).truncate(tx.symbolTableSizes[i]);
}
appendTimestampLo = -1;
appendTimestampHi = -1;
appendPartition = null;
txActive = false;
}
appendTimestampLo = -1;
appendTimestampHi = -1;
appendPartition = null;
txLog.setTxAddress(tx.address);
txActive = false;
}
public void setTxListener(TxListener txListener) {
......@@ -202,8 +212,10 @@ public class JournalWriter<T> extends Journal<T> {
}
public void purgeUnusedTempPartitions(TxLog txLog) throws JournalException {
final Tx tx = new Tx();
txLog.head(tx);
final String lagPartitionName = hasIrregularPartition() ? getIrregularPartition().getName() : null;
final String txLagName = txLog != null ? txLog.get().lagName : null;
final String txLagName = tx.lagName;
File[] files = getLocation().listFiles(new FileFilter() {
public boolean accept(File f) {
......@@ -261,7 +273,7 @@ public class JournalWriter<T> extends Journal<T> {
getSymbolTable(i).truncate();
}
appendTimestampLo = -1;
commit(true);
commitDurable();
}
public void commit() throws JournalException {
......@@ -621,7 +633,7 @@ public class JournalWriter<T> extends Journal<T> {
if (txLog.isEmpty()) {
commit(Tx.TX_NORMAL);
}
Tx tx = txLog.get();
txLog.head(tx);
File meta = new File(getLocation(), JournalConfiguration.JOURNAL_META_FILE);
if (!meta.exists()) {
......@@ -679,6 +691,7 @@ public class JournalWriter<T> extends Journal<T> {
Tx tx = new Tx();
tx.command = command;
tx.prevTxAddress = txLog.getTxAddress();
tx.journalMaxRowID = partition == null ? 0 : Rows.toRowID(partition.getPartitionIndex(), partition.size());
tx.lastPartitionTimestamp = partition == null || partition.getInterval() == null ? 0 : partition.getInterval().getStartMillis();
tx.lagSize = lag == null ? 0 : lag.open().size();
......@@ -719,7 +732,9 @@ public class JournalWriter<T> extends Journal<T> {
}
txLog.create(tx);
txLog.force();
if (force) {
txLog.force();
}
}
private void rollbackPartitionDirs() throws JournalException {
......
......@@ -20,7 +20,7 @@ import com.nfsdb.journal.utils.ByteBuffers;
import java.nio.MappedByteBuffer;
public class ByteBufferWrapper {
class ByteBufferWrapper {
private final long offset;
private MappedByteBuffer byteBuffer;
......
......@@ -269,6 +269,7 @@ public class MappedFileImpl implements MappedFile {
} else {
this.offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 8);
}
offsetBuffer.order(ByteOrder.LITTLE_ENDIAN);
} catch (FileNotFoundException e) {
throw new JournalNoSuchFileException(e);
} catch (IOException e) {
......
......@@ -61,8 +61,8 @@ public class JournalConfiguration {
private final File journalBase;
private final String configurationFile;
private final int globalRecordHint;
private final NullsAdaptorFactory nullsAdaptorFactory;
private boolean configured = false;
private NullsAdaptorFactory nullsAdaptorFactory;
public JournalConfiguration(File journalBase) {
this(DEFAULT_CONFIG_FILE, journalBase);
......@@ -73,14 +73,14 @@ public class JournalConfiguration {
}
public JournalConfiguration(String configurationFile, File journalBase, int globalRecordHint) {
this(configurationFile, journalBase, globalRecordHint, null);
}
public JournalConfiguration(String configurationFile, File journalBase, int globalRecordHint, NullsAdaptorFactory nullsAdaptorFactory) {
this.globalRecordHint = globalRecordHint;
this.journalBase = journalBase;
this.configurationFile = configurationFile;
}
public JournalConfiguration setNullsAdaptorFactory(NullsAdaptorFactory nullsAdaptorFactory) {
this.nullsAdaptorFactory = nullsAdaptorFactory;
return this;
}
public JournalConfiguration build() throws JournalConfigurationException {
......
......@@ -87,11 +87,6 @@ public final class Lock {
}
Lock(File location, boolean shared) throws JournalException {
if (!location.exists()) {
if (!location.mkdirs()) {
throw new JournalException("Could not create directory: %s", location);
}
}
try {
this.location = location;
......
......@@ -18,22 +18,31 @@ package com.nfsdb.journal.printer.appender;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
public class OutputStreamAppender implements Appender {
private final PrintWriter w;
private final OutputStream out;
private byte[] lineSeparator;
public OutputStreamAppender(OutputStream out) {
w = new PrintWriter(out);
this.out = out;
String separator = System.getProperty("line.separator");
lineSeparator = new byte[separator.length()];
for (int i = 0; i < separator.length(); i++) {
lineSeparator[i] = (byte) separator.charAt(i);
}
}
@Override
public void append(StringBuilder stringBuilder) throws IOException {
w.println(stringBuilder.toString());
for (int i = 0; i < stringBuilder.length(); i++) {
out.write(stringBuilder.charAt(i));
}
out.write(this.lineSeparator);
}
@Override
public void close() throws IOException {
w.flush();
out.flush();
}
}
......@@ -22,6 +22,8 @@ public class Tx {
public static final byte TX_NORMAL = 0;
public static final byte TX_FORCE = 1;
// transient
public long address;
// 8
public long prevTxAddress;
// 1
......@@ -48,7 +50,8 @@ public class Tx {
@Override
public String toString() {
return "Tx{" +
"prevTxAddress=" + prevTxAddress +
"address=" + address +
", prevTxAddress=" + prevTxAddress +
", command=" + command +
", timestamp=" + timestamp +
", journalMaxRowID=" + journalMaxRowID +
......
......@@ -26,6 +26,7 @@ import com.nfsdb.journal.utils.ByteBuffers;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
public class TxLog {
......@@ -45,66 +46,17 @@ public class TxLog {
return mf.getAppendOffset() <= 9;
}
public Tx get() {
long offset = getTxAddress();
assert offset > 0 : "zero offset";
Tx tx = new Tx();
ByteBuffer buffer = mf.getBuffer(offset, 4);
int txSize = buffer.getInt();
buffer = mf.getBuffer(offset + 4, txSize);
tx.prevTxAddress = buffer.getLong();
tx.command = buffer.get();
tx.timestamp = buffer.getLong();
tx.journalMaxRowID = buffer.getLong();
tx.lastPartitionTimestamp = buffer.getLong();
tx.lagSize = buffer.getLong();
int sz = buffer.get();
if (sz == 0) {
tx.lagName = null;
} else {
// lagName
sz = buffer.get();
if (buf == null || buf.length < sz) {
buf = new char[sz];
}
for (int i = 0; i < sz; i++) {
buf[i] = buffer.getChar();
}
tx.lagName = new String(buf, 0, sz);
}
// symbolTableSizes
sz = buffer.getChar();
tx.symbolTableSizes = new int[sz];
for (int i = 0; i < sz; i++) {
tx.symbolTableSizes[i] = buffer.getInt();
}
//symbolTableIndexPointers
sz = buffer.getChar();
tx.symbolTableIndexPointers = new long[sz];
for (int i = 0; i < sz; i++) {
tx.symbolTableIndexPointers[i] = buffer.getLong();
}
//indexPointers
sz = buffer.getChar();
tx.indexPointers = new long[sz];
for (int i = 0; i < sz; i++) {
tx.indexPointers[i] = buffer.getLong();
}
public void head(Tx tx) {
get(headAddress(), tx);
}
//lagIndexPointers
sz = buffer.getChar();
tx.lagIndexPointers = new long[sz];
for (int i = 0; i < sz; i++) {
tx.lagIndexPointers[i] = buffer.getLong();
}
public long headAddress() {
return this.address = getTxAddress();
}
this.address = offset;
return tx;
public long prevAddress(long address) {
ByteBuffer buffer = mf.getBuffer(address, 12);
return buffer.getLong(buffer.position() + 4);
}
public void create(Tx tx) {
......@@ -149,12 +101,7 @@ public class TxLog {
ByteBuffers.putLongW(buffer, tx.lagIndexPointers);
// write out tx address
buffer = mf.getBuffer(0, 9);
buffer.mark();
buffer.put((byte) 0);
buffer.putLong(offset);
buffer.reset();
buffer.put((byte) 1);
setTxAddress(offset);
address = offset + tx.size();
mf.setAppendOffset(address);
}
......@@ -167,22 +114,107 @@ public class TxLog {
mf.force();
}
private long getTxAddress() {
if (isEmpty()) {
return 0;
public long getTxAddress() {
ByteBuffer buf = mf.getBuffer(0, 9);
long address;
while (true) {
address = buf.getLong();
byte checksum = buf.get();
byte b0 = (byte) address;
byte b1 = (byte) (address >> 8);
byte b2 = (byte) (address >> 16);
byte b3 = (byte) (address >> 24);
byte b4 = (byte) (address >> 32);
byte b5 = (byte) (address >> 40);
byte b6 = (byte) (address >> 48);
byte b7 = (byte) (address >> 56);
if ((b0 ^ b1 ^ b2 ^ b3 ^ b4 ^ b5 ^ b6 ^ b7) == checksum) {
break;
}
}
return address;
}
ByteBuffer buffer = mf.getBuffer(0, 9);
buffer.mark();
long limit = 100;
while (limit > 0 && buffer.get() == 0) {
Thread.yield();
buffer.reset();
limit--;
public void setTxAddress(long address) {
MappedByteBuffer buffer = mf.getBuffer(0, 8);
buffer.putLong(address);
// checksum
byte b0 = (byte) address;
byte b1 = (byte) (address >> 8);
byte b2 = (byte) (address >> 16);
byte b3 = (byte) (address >> 24);
byte b4 = (byte) (address >> 32);
byte b5 = (byte) (address >> 40);
byte b6 = (byte) (address >> 48);
byte b7 = (byte) (address >> 56);
buffer.put((byte) (b0 ^ b1 ^ b2 ^ b3 ^ b4 ^ b5 ^ b6 ^ b7));
}
public void get(long address, Tx tx) {
assert address > 0 : "zero address";
tx.address = address;
ByteBuffer buffer = mf.getBuffer(address, 4);
int txSize = buffer.getInt();
buffer = mf.getBuffer(address + 4, txSize);
tx.prevTxAddress = buffer.getLong();
tx.command = buffer.get();
tx.timestamp = buffer.getLong();
tx.journalMaxRowID = buffer.getLong();
tx.lastPartitionTimestamp = buffer.getLong();
tx.lagSize = buffer.getLong();
int sz = buffer.get();
if (sz == 0) {
tx.lagName = null;
} else {
// lagName
sz = buffer.get();
if (buf == null || buf.length < sz) {
buf = new char[sz];
}
for (int i = 0; i < sz; i++) {
buf[i] = buffer.getChar();
}
tx.lagName = new String(buf, 0, sz);
}
// symbolTableSizes
sz = buffer.getChar();
if (tx.symbolTableSizes == null || tx.symbolTableSizes.length < sz) {
tx.symbolTableSizes = new int[sz];
}
if (limit == 0) {
throw new JournalRuntimeException("Could not get spin-lock on txLog");
for (int i = 0; i < sz; i++) {
tx.symbolTableSizes[i] = buffer.getInt();
}
//symbolTableIndexPointers
sz = buffer.getChar();
if (tx.symbolTableIndexPointers == null || tx.symbolTableIndexPointers.length < sz) {
tx.symbolTableIndexPointers = new long[sz];
}
for (int i = 0; i < sz; i++) {
tx.symbolTableIndexPointers[i] = buffer.getLong();
}
//indexPointers
sz = buffer.getChar();
if (tx.indexPointers == null || tx.indexPointers.length < sz) {
tx.indexPointers = new long[sz];
}
for (int i = 0; i < sz; i++) {
tx.indexPointers[i] = buffer.getLong();
}
//lagIndexPointers
sz = buffer.getChar();
if (tx.lagIndexPointers == null || tx.lagIndexPointers.length < sz) {
tx.lagIndexPointers = new long[sz];
}
for (int i = 0; i < sz; i++) {
tx.lagIndexPointers[i] = buffer.getLong();
}
return buffer.getLong();
}
}
package com.nfsdb.journal;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.test.model.Quote;
import com.nfsdb.journal.test.tools.AbstractTest;
import com.nfsdb.journal.utils.Dates;
import org.junit.Assert;
import org.junit.Test;
public class NullTest extends AbstractTest {
@Test
public void tumbleTryNullTest() throws JournalException {
final int TEST_DATA_SIZE = (int) 1E3;
JournalWriter<Quote> w = factory.writer(Quote.class, "quote", 1000);
long timestamp = Dates.toMillis("2013-10-05T10:00:00.000Z");
String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
Quote q = new Quote();
int increment = 6000;
for (int i = 0; i < TEST_DATA_SIZE; i++) {
q.clear();
if (i % 7 != 0) {
q.setSym(symbols[i % symbols.length]);
}
if (i % 11 != 0) {
q.setAsk(i * 22.98007E8);
}
if (i % 13 != 0) {
q.setBid(i * 22.98007E-8);
}
if (i % 3 != 0) {
q.setAskSize(i);
}
if (i % 5 != 0) {
q.setBidSize(i * 7);
}
if (i % 2 != 0) {
q.setEx("LXE");
}
if (i % 17 != 0) {
q.setMode("Some interesting string with киррилица and special char" + (char) (i % Character.MAX_VALUE));
}
q.setTimestamp(timestamp);
timestamp += increment;
w.append(q);
}
w.commit();
w.close();
Journal<Quote> r = factory.reader(Quote.class, "quote");
int i = 0;
for (Quote qr : r.bufferedIterator()) {
if (i % 7 != 0) {
Assert.assertEquals(symbols[i % symbols.length], qr.getSym());
}
if (i % 11 != 0) {
Assert.assertEquals(i * 22.98007E8, qr.getAsk(), 1E-9);
}
if (i % 13 != 0) {
Assert.assertEquals(i * 22.98007E-8, qr.getBid(), 1E-9);
}
if (i % 3 != 0) {
Assert.assertEquals(i, qr.getAskSize());
}
if (i % 5 != 0) {
Assert.assertEquals(i * 7, qr.getBidSize());
}
if (i % 2 != 0) {
Assert.assertEquals("LXE", qr.getEx());
}
if (i % 17 != 0) {
Assert.assertEquals("Some interesting string with киррилица and special char" + (char) (i % Character.MAX_VALUE), qr.getMode());
}
i++;
}
}
}
......@@ -16,6 +16,9 @@
package com.nfsdb.journal;
import com.nfsdb.journal.test.model.Quote;
import com.nfsdb.journal.test.tools.AbstractTest;
import com.nfsdb.journal.test.tools.TestUtils;
import com.nfsdb.journal.tx.Tx;
import com.nfsdb.journal.tx.TxLog;
import org.junit.Assert;
......@@ -25,7 +28,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
public class TxLogTest {
public class TxLogTest extends AbstractTest {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
......@@ -54,7 +57,9 @@ public class TxLogTest {
TxLog r = new TxLog(dir, JournalMode.READ);
Assert.assertTrue(r.hasNext());
Tx tx1 = r.get();
Tx tx1 = new Tx();
r.head(tx1);
Assert.assertEquals(0, tx1.command);
Assert.assertEquals(10, tx1.journalMaxRowID);
Assert.assertEquals(12, tx1.lagSize);
......@@ -73,4 +78,31 @@ public class TxLogTest {
txLog.close();
r.close();
}
@Test
public void testTxLogWalk() throws Exception {
JournalWriter<Quote> writer = factory.writer(Quote.class);
// tx1
TestUtils.generateQuoteData(writer, 100, System.currentTimeMillis());
writer.commit();
// tx2
TestUtils.generateQuoteData(writer, 100, System.currentTimeMillis());
writer.commit();
// tx3
TestUtils.generateQuoteData(writer, 100, System.currentTimeMillis());
writer.commit();
// tx4
TestUtils.generateQuoteData(writer, 100, System.currentTimeMillis());
writer.commit();
Assert.assertEquals(400, writer.size());
writer.rollback(writer.txLog.prevAddress(writer.txLog.headAddress()));
Assert.assertEquals(300, writer.size());
Tx tx = new Tx();
long address = writer.txLog.headAddress();
writer.txLog.get(address, tx);
Assert.assertEquals(300, tx.journalMaxRowID);
}
}
......@@ -39,7 +39,7 @@ public class JournalTestFactory extends JournalFactory implements TestRule, Jour
}
public <B> JournalTestFactory(NullsAdaptorFactory<B> factory) {
super(new JournalConfiguration(Files.makeTempDir()).setNullsAdaptorFactory(factory));
super(new JournalConfiguration(JournalConfiguration.DEFAULT_CONFIG_FILE, Files.makeTempDir(), 10000, factory));
}
@Override
......
......@@ -186,7 +186,7 @@ public final class TestUtils {
}
}
public static void generateQuoteData(int count, long timetamp, int increment) {
public static void generateQuoteData(int count, long timestamp, int increment) {
String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
String exchanges[] = {"LXE", "GR", "SK", "LN"};
Random r = new Random(System.currentTimeMillis());
......@@ -199,8 +199,8 @@ public final class TestUtils {
q.setBidSize(Math.abs(r.nextInt()));
q.setEx(exchanges[Math.abs(r.nextInt() % (exchanges.length - 1))]);
q.setMode("Fast trading");
q.setTimestamp(timetamp);
timetamp += increment;
q.setTimestamp(timestamp);
timestamp += increment;
print(q);
}
}
......
......@@ -48,7 +48,7 @@
</developers>
<scm>
<url>scm:git:https://github.com/bluestreak01/nfsdb.git</url>
<url>scm:git:https://github.com/NFSdb/nfsdb.git</url>
</scm>
<build>
......@@ -74,49 +74,65 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release-sign-artifacts</id>
<activation>
<property>
<name>release</name>
<value>true</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
<dependency>
......
......@@ -49,7 +49,7 @@
</developers>
<scm>
<url>scm:git:https://github.com/bluestreak01/nfsdb.git</url>
<url>scm:git:https://github.com/NFSdb/nfsdb.git</url>
</scm>
<build>
......
package org.nfsdb.examples.append;
import com.nfsdb.journal.JournalKey;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.utils.Files;
import org.nfsdb.examples.model.Price;
import java.io.File;
import java.util.concurrent.TimeUnit;
public class SimplestAppend {
/**
* Appends 1 million rows.
*
* @param args factory directory
* @throws com.nfsdb.journal.exceptions.JournalException
*/
public static void main(String[] args) throws JournalException {
try (JournalFactory factory = new JournalFactory("c:\\temp\\nfsdb")) {
// delete existing price journal
Files.delete(new File(factory.getConfiguration().getJournalBase(), "price"));
final int count = 1000000;
try (JournalWriter<Price> writer = factory.writer(new JournalKey<>(Price.class))) {
long tZero = System.nanoTime();
Price p = new Price();
for (int i = 0; i < count; i++) {
p.setTimestamp(tZero + i);
p.setSym(String.valueOf(i % 20));
p.setPrice(i * 1.04598 + i);
writer.append(p);
}
// commit is necessary
writer.commit();
System.out.println("Persisted " + count + " objects in " +
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - tZero) + "ms.");
}
}
}
}
package org.nfsdb.examples.iterate;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import org.nfsdb.examples.model.Price;
import java.util.concurrent.TimeUnit;
public class SimplestIterate {
public static void main(String[] args) throws JournalException {
try (JournalFactory factory = new JournalFactory("c:\\temp\\nfsdb")) {
try (Journal<Price> journal = factory.reader(Price.class)) {
long tZero = System.nanoTime();
int count = 0;
for (Price p : journal) {
assert p != null;
count++;
}
System.out.println("Read " + count + " objects in " +
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - tZero) + "ms.");
}
}
}
}
package org.nfsdb.examples.query;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.query.api.QueryHeadBuilder;
import org.nfsdb.examples.model.Price;
import java.util.concurrent.TimeUnit;
public class SimplestLatestByKeyQuery {
public static void main(String[] args) throws JournalException {
try (JournalFactory factory = new JournalFactory("c:\\temp\\nfsdb")) {
try (Journal<Price> journal = factory.reader(Price.class)) {
long tZero = System.nanoTime();
int count = 0;
QueryHeadBuilder<Price> builder = journal.query().head().withKeys();
for (Price p : builder.asResultSet()) {
assert p != null;
count++;
}
System.out.println("Read " + count + " objects in " +
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - tZero) + "ms.");
}
}
}
}
package org.nfsdb.examples.query;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.query.api.QueryAllBuilder;
import org.nfsdb.examples.model.Price;
import java.util.concurrent.TimeUnit;
/**
* Created by Alexander on 01/06/2014.
*/
public class SimplestSymbolQuery {
public static void main(String[] args) throws JournalException {
try (JournalFactory factory = new JournalFactory("c:\\temp\\nfsdb")) {
try (Journal<Price> journal = factory.reader(Price.class)) {
long tZero = System.nanoTime();
int count = 0;
QueryAllBuilder<Price> builder = journal.query().all().withSymValues("sym", "17");
for (Price p : builder.asResultSet().bufferedIterator()) {
assert p != null;
count++;
}
System.out.println("Read " + count + " objects in " +
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - tZero) + "ms.");
}
}
}
}
......@@ -19,14 +19,13 @@ package org.nfsdb.examples.reporting;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalConfiguration;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.printer.JournalPrinter;
import com.nfsdb.journal.printer.appender.StdOutAppender;
import com.nfsdb.journal.query.api.QueryAllBuilder;
import com.nfsdb.journal.utils.Dates;
import com.nfsdb.journal.utils.Files;
import com.nfsdb.thrift.ThriftNullsAdaptorFactory;
import com.nfsdb.thrift.JournalThriftFactory;
import org.joda.time.DateTime;
import org.joda.time.DateTimeField;
import org.joda.time.chrono.ISOChronology;
......@@ -47,7 +46,7 @@ public class DailyPriceAverageExample {
String journalLocation = args[0];
// this is another way to setup JournalFactory if you would like to provide NullsAdaptor. NullsAdaptor for thrift,
// which is used in this case implements JIT-friendly object reset method, which is quite fast.
try (JournalFactory factory = new JournalFactory(new JournalConfiguration(new File(journalLocation)).setNullsAdaptorFactory(new ThriftNullsAdaptorFactory()).build())) {
try (JournalFactory factory = new JournalThriftFactory(journalLocation)) {
// delete existing quote journal
Files.delete(new File(factory.getConfiguration().getJournalBase(), "quote"));
......
......@@ -22,4 +22,9 @@
<sym name="ex" indexed="true" maxsize="2" hintDistinctCount="1"/>
<sym name="mode" indexed="true" hintDistinctCount="1"/>
</journal>
<journal class="org.nfsdb.examples.model.Price" defaultPath="price" timestampColumn="timestamp"
partitionType="MONTH" recordCountHint="1000000" openPartitionTTL="180" lagHours="24" key="sym">
<sym name="sym" indexed="true" maxsize="4" hintDistinctCount="15"/>
</journal>
</db>
......@@ -49,7 +49,7 @@
</developers>
<scm>
<url>scm:git:https://github.com/bluestreak01/nfsdb.git</url>
<url>scm:git:https://github.com/NFSdb/nfsdb.git</url>
</scm>
<build>
......@@ -91,49 +91,65 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release-sign-artifacts</id>
<activation>
<property>
<name>release</name>
<value>true</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.thrift</groupId>
......
namespace java com.nfsdb.journal.test.model
struct Price {
1: required i64 timestamp;
2: required string sym;
3: required double price;
}
......@@ -49,7 +49,7 @@
</developers>
<scm>
<url>scm:git:https://github.com/bluestreak01/nfsdb.git</url>
<url>scm:git:https://github.com/NFSdb/nfsdb.git</url>
</scm>
<build>
......@@ -75,49 +75,65 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release-sign-artifacts</id>
<activation>
<property>
<name>release</name>
<value>true</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
<dependency>
......
......@@ -25,6 +25,10 @@ import java.io.File;
public class JournalThriftFactory extends JournalFactory {
public JournalThriftFactory(String journalBase) throws JournalConfigurationException {
super(new JournalConfiguration(new File(journalBase)).setNullsAdaptorFactory(new ThriftNullsAdaptorFactory()).build());
this(new File(journalBase));
}
public JournalThriftFactory(File journalBase) throws JournalConfigurationException {
super(new JournalConfiguration(JournalConfiguration.DEFAULT_CONFIG_FILE, journalBase, -1, new ThriftNullsAdaptorFactory()).build());
}
}
......@@ -16,17 +16,27 @@
package com.nfsdb.thrift;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.test.model.Quote;
import com.nfsdb.journal.test.tools.JournalTestFactory;
import com.nfsdb.journal.utils.Dates;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TProtocol;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.util.BitSet;
public class ThriftNullsTest {
@Rule
public JournalTestFactory factory = new JournalTestFactory(new ThriftNullsAdaptorFactory());
@Test
public void testByteBitField() throws Exception {
ThriftNullsAdaptor<ByteFieldSample> adaptor = new ThriftNullsAdaptor<>(ByteFieldSample.class);
......@@ -147,6 +157,35 @@ public class ThriftNullsTest {
Assert.assertEquals("{1}", dst.toString());
}
@Test
public void testFirstSymbolNull() throws JournalException {
JournalWriter<Quote> w = factory.writer(Quote.class, "quote", 1000);
long timestamp = Dates.toMillis("2013-10-05T10:00:00.000Z");
Quote q = new Quote();
for (int i = 0; i < 3; i++) {
w.clearObject(q);
if (i == 0) {
q.setAsk(123);
}
q.setTimestamp(timestamp);
w.append(q);
}
w.commit();
w.close();
Journal<Quote> r = factory.reader(Quote.class, "quote");
q = r.read(0);
Quote q1 = r.read(1);
Assert.assertNull(q.getSym());
Assert.assertTrue(q.isSetAsk());
Assert.assertFalse(q1.isSetAsk());
}
private static abstract class AbstractSample implements TBase {
@Override
public void read(TProtocol tProtocol) throws TException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册