提交 a79119b5 编写于 作者: V Vlad

refactoring, examples, performance improvements.

上级 be9eeea9
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.journal.iterators;
import com.lmax.disruptor.*;
import com.nfsdb.journal.concurrent.NamedDaemonThreadFactory;
import com.nfsdb.journal.exceptions.JournalImmutableIteratorException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class AbstractConcurrentIterator<T> implements EventFactory<AbstractConcurrentIterator.Holder<T>>, ConcurrentIterator<T> {
RingBuffer<Holder<T>> buffer;
SequenceBarrier barrier;
private final ExecutorService service;
private int bufferSize;
private Sequence sequence;
private long nextSequence;
private long availableSequence;
private boolean started = false;
@Override
public AbstractConcurrentIterator.Holder<T> newInstance() {
Holder<T> h = new Holder<>();
h.object = getJournal().newObject();
h.hasNext = true;
return h;
}
@Override
public Iterator<T> iterator() {
return this;
}
@Override
public T next() {
hasNext();
T result = null;
if (availableSequence >= nextSequence) {
result = buffer.get(nextSequence).object;
nextSequence++;
}
sequence.set(nextSequence - 2);
return result;
}
@Override
public boolean hasNext() {
if (!started) {
start();
started = true;
}
if (availableSequence >= nextSequence) {
return buffer.get(nextSequence).hasNext;
}
try {
availableSequence = barrier.waitFor(nextSequence);
return availableSequence >= nextSequence && buffer.get(nextSequence).hasNext;
} catch (AlertException | TimeoutException | InterruptedException e) {
throw new JournalRuntimeException(e);
}
}
@Override
public ConcurrentIterator<T> buffer(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
@Override
public void remove() {
throw new JournalImmutableIteratorException();
}
@Override
public void close() {
service.shutdown();
barrier.alert();
}
protected abstract Runnable getRunnable();
void start() {
this.buffer = RingBuffer.createSingleProducer(this, bufferSize, new BusySpinWaitStrategy());
this.barrier = buffer.newBarrier();
this.sequence = new Sequence(barrier.getCursor());
this.nextSequence = sequence.get() + 1L;
this.availableSequence = -1L;
this.buffer.addGatingSequences(sequence);
service.submit(getRunnable());
}
protected final static class Holder<T> {
T object;
boolean hasNext;
}
AbstractConcurrentIterator(int bufferSize) {
this.bufferSize = bufferSize;
this.service = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("jj-iterator", false));
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.journal.iterators;
import java.io.Closeable;
public interface ConcurrentIterator<T> extends JournalIterator<T>, Closeable {
@Override
void close();
ConcurrentIterator<T> buffer(int bufferSize);
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.journal.iterators;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import com.nfsdb.journal.utils.Rows;
import java.util.List;
public class JournalConcurrentIterator<T> extends AbstractConcurrentIterator<T> {
private final Journal<T> journal;
private final List<JournalIteratorRange> ranges;
public JournalConcurrentIterator(Journal<T> journal, List<JournalIteratorRange> ranges, int bufferSize) {
super(bufferSize);
this.journal = journal;
this.ranges = ranges;
}
@Override
public Journal<T> getJournal() {
return journal;
}
@Override
protected Runnable getRunnable() {
return new Runnable() {
private int currentIndex = 0;
private long currentRowID;
private long currentUpperBound;
private int currentPartitionID;
boolean hasNext = true;
@Override
public void run() {
updateVariables();
while (!barrier.isAlerted()) {
try {
long outSeq = buffer.next();
Holder<T> holder = buffer.get(outSeq);
boolean hadNext = hasNext;
if (hadNext) {
journal.clearObject(holder.object);
journal.read(Rows.toRowID(currentPartitionID, currentRowID), holder.object);
if (currentRowID < currentUpperBound) {
currentRowID++;
} else {
currentIndex++;
updateVariables();
}
}
holder.hasNext = hadNext;
buffer.publish(outSeq);
if (!hadNext) {
break;
}
} catch (JournalException e) {
throw new JournalRuntimeException("Error in iterator [" + this + "]", e);
}
}
}
private void updateVariables() {
if (currentIndex < ranges.size()) {
JournalIteratorRange w = ranges.get(currentIndex);
currentRowID = w.lowerRowIDBound;
currentUpperBound = w.upperRowIDBound;
currentPartitionID = w.partitionID;
} else {
hasNext = false;
}
}
};
}
}
package com.nfsdb.journal.iterators;
public class JournalRow<T> {
private final T object;
private long rowID;
public JournalRow(T object) {
this.object = object;
}
public T getObject() {
return object;
}
public long getRowID() {
return rowID;
}
void setRowID(long rowID) {
this.rowID = rowID;
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.journal.iterators;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.exceptions.JournalImmutableIteratorException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
import com.nfsdb.journal.utils.Rows;
import java.util.Iterator;
import java.util.List;
public class JournalRowBufferedIterator<T> implements Iterable<JournalRow<T>>, Iterator<JournalRow<T>> {
boolean hasNext = true;
private final List<JournalIteratorRange> ranges;
private final Journal<T> journal;
private final JournalRow<T> row;
private int currentIndex = 0;
private long currentRowID;
private long currentUpperBound;
private int currentPartitionID;
public JournalRowBufferedIterator(Journal<T> journal, List<JournalIteratorRange> ranges) {
this.ranges = ranges;
this.journal = journal;
this.row = new JournalRow<>(journal.newObject());
updateVariables();
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public JournalRow<T> next() {
try {
T obj = row.getObject();
journal.clearObject(obj);
long rowID = Rows.toRowID(currentPartitionID, currentRowID);
row.setRowID(rowID);
journal.read(rowID, obj);
if (currentRowID < currentUpperBound) {
currentRowID++;
} else {
currentIndex++;
updateVariables();
}
return row;
} catch (JournalException e) {
throw new JournalRuntimeException("Error in iterator [" + this + "]", e);
}
}
@Override
public void remove() {
throw new JournalImmutableIteratorException();
}
@Override
public Iterator<JournalRow<T>> iterator() {
return this;
}
@Override
public String toString() {
return "JournalRowBufferedIterator{" +
"currentRowID=" + currentRowID +
", currentUpperBound=" + currentUpperBound +
", currentPartitionID=" + currentPartitionID +
", currentIndex=" + currentIndex +
", journal=" + journal +
'}';
}
public Journal<T> getJournal() {
return journal;
}
private void updateVariables() {
if (currentIndex < ranges.size()) {
JournalIteratorRange w = ranges.get(currentIndex);
currentRowID = w.lowerRowIDBound;
currentUpperBound = w.upperRowIDBound;
currentPartitionID = w.partitionID;
} else {
hasNext = false;
}
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.journal.iterators;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.Partition;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
public class PartitionConcurrentIterator<T> extends AbstractConcurrentIterator<T> {
private final Partition<T> partition;
private final long lo;
private final long hi;
public PartitionConcurrentIterator(Partition<T> partition, long lo, long hi, int bufferSize) {
super(bufferSize);
this.partition = partition;
this.lo = lo;
this.hi = hi;
}
@Override
public Journal<T> getJournal() {
return partition.getJournal();
}
@Override
protected Runnable getRunnable() {
return new Runnable() {
@Override
public void run() {
for (long i = lo; i <= hi; i++) {
try {
partition.open();
if (barrier.isAlerted()) {
break;
}
long seq = buffer.next();
Holder<T> holder = buffer.get(seq);
partition.getJournal().clearObject(holder.object);
partition.read(i, holder.object);
buffer.publish(seq);
} catch (JournalException e) {
throw new JournalRuntimeException("Cannot read partition " + partition + " at " + (i - 1), e);
}
}
long seq = buffer.next();
buffer.get(seq).hasNext = false;
buffer.publish(seq);
}
};
}
}
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.journal.iterators;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.ResultSet;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.exceptions.JournalRuntimeException;
public class ResultSetConcurrentIterator<T> extends AbstractConcurrentIterator<T> {
private final ResultSet<T> rs;
public ResultSetConcurrentIterator(ResultSet<T> rs, int bufferSize) {
super(bufferSize);
this.rs = rs;
}
@Override
public Journal<T> getJournal() {
return rs.getJournal();
}
@Override
protected Runnable getRunnable() {
return new Runnable() {
@Override
public void run() {
for (int i = 0, size = rs.size(); i < size; i++) {
try {
if (barrier.isAlerted()) {
break;
}
long seq = buffer.next();
Holder<T> holder = buffer.get(seq);
getJournal().clearObject(holder.object);
rs.read(i, holder.object);
buffer.publish(seq);
} catch (JournalException e) {
throw new JournalRuntimeException("Cannot read ResultSet %s at %d", e, rs, (i - 1));
}
}
long seq = buffer.next();
buffer.get(seq).hasNext = false;
buffer.publish(seq);
}
};
}
}
<!--
~ Copyright (c) 2014. Vlad Ilyushchenko
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<version>1.0</version>
<groupId>com.nfsdb</groupId>
<artifactId>nfsdb-samples</artifactId>
<packaging>jar</packaging>
<name>NFSdb.Samples</name>
<description>NFSdb Sample Code</description>
<url>http://www.nfsdb.org</url>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
<comments>A business-friendly OSS license</comments>
</license>
</licenses>
<developers>
<developer>
<name>Vlad Ilyushchenko</name>
<email>bluestreak@gmail.com</email>
</developer>
</developers>
<scm>
<url>scm:git:https://github.com/bluestreak01/nfsdb.git</url>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.nfsdb</groupId>
<artifactId>nfsdb-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.nfsdb</groupId>
<artifactId>nfsdb-thrift</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.nfsdb</groupId>
<artifactId>nfsdb-core</artifactId>
<version>1.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package org.nfsdb.examples.append;
import com.nfsdb.journal.JournalKey;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.PartitionType;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import org.nfsdb.examples.model.Quote;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class AppendUnordered {
/**
* Appends 1 million quotes with random timestamp values. Journal doesn't enforce order.
*
* @param args factory directory
* @throws JournalException
*/
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + BasicRecordAppend.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
try (JournalFactory factory = new JournalFactory(journalLocation)) {
try (JournalWriter<Quote> writer = factory.writer(new JournalKey<>(
Quote.class // model class
, "quote-unordered" // directory name where journal is stored. This is relative to factory location.
, PartitionType.NONE // NONE - no partitioning
, false // order enforcement is turned off
))) {
final int count = 1000000;
final String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
final Random r = new Random(System.currentTimeMillis());
// reuse same same instance of Quote class to keep GC under control
final Quote q = new Quote();
long t = System.nanoTime();
for (int i = 0; i < count; i++) {
// prepare object for new set of data
q.clear();
// generate some data
q.setSym(symbols[Math.abs(r.nextInt() % (symbols.length - 1))]);
q.setAsk(Math.abs(r.nextDouble()));
q.setBid(Math.abs(r.nextDouble()));
q.setAskSize(Math.abs(r.nextInt() % 10000));
q.setBidSize(Math.abs(r.nextInt() % 10000));
q.setEx("LXE");
q.setMode("Fast trading");
q.setTimestamp(r.nextLong());
//
writer.append(q);
}
// commit is necessary
writer.commit();
System.out.println("Journal size: " + writer.size());
System.out.println("Generated " + count + " objects in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.append;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.utils.Dates;
import org.joda.time.DateTime;
import org.nfsdb.examples.model.Quote;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class AppendUnorderedToLag {
/**
* For cases where incoming data feed is not in chronological order but you would like your journal to be in chronological order.
* This is a lossy way to append data as journal would only be merging a slice of data as specified by "lagHours" attribute in nfsdb.xml.
*
* @param args factory directory
* @throws JournalException
*/
public static void main(String[] args) throws JournalException {
System.out.println(-Math.abs(Long.MIN_VALUE));
System.exit(1);
if (args.length != 1) {
System.out.println("Usage: " + BasicRecordAppend.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
try (JournalFactory factory = new JournalFactory(journalLocation)) {
try (JournalWriter<Quote> writer = factory.writer(
Quote.class // model class
, "quote-lag" // directory name where journal is stored. This is relative to factory location.
)) {
final String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
final Random r = new Random(System.currentTimeMillis());
// 20 batches of 50,000 quotes, total 1,000,000
final int batchCount = 20;
final int batchSize = 50000;
final ArrayList<Quote> batch = new ArrayList<>(batchSize);
// have pre-initialized array to reduce GC overhead
for (int i = 0; i < batchSize; i++) {
batch.add(new Quote());
}
DateTime utc = Dates.utc();
long t = System.nanoTime();
for (int i = 0; i < batchCount; i++) {
// populate batch in-memory
for (int k = 0; k < batchSize; k++) {
Quote q = batch.get(k);
q.clear();
// generate some data
q.setSym(symbols[Math.abs(r.nextInt() % (symbols.length - 1))]);
q.setAsk(Math.abs(r.nextDouble()));
q.setBid(Math.abs(r.nextDouble()));
q.setAskSize(Math.abs(r.nextInt() % 10000));
q.setBidSize(Math.abs(r.nextInt() % 10000));
q.setEx("LXE");
q.setMode("Fast trading");
long timestamp = utc.plusSeconds(i * batchSize + (batchSize - k)).getMillis();
// make batches overlap (subtract 10 seconds)
timestamp -= 100000000L;
q.setTimestamp(timestamp);
}
// batch must be sorted before being presented to writer
Collections.sort(batch, writer.getTimestampComparator());
// append batch and have journal merge data
writer.appendIrregular(batch);
}
// commit is necessary
writer.commit();
System.out.println("Journal size: " + writer.size());
System.out.println("Generated " + batchCount * batchSize + " objects in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.append;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import org.nfsdb.examples.model.Quote;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class BasicRecordAppend {
/**
* Appends 1 million quotes to journal. Timestamp values are in chronological order.
*
* @param args factory directory
* @throws JournalException
*/
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + BasicRecordAppend.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
try (JournalFactory factory = new JournalFactory(journalLocation)) {
try (JournalWriter<Quote> writer = factory.writer(Quote.class)) {
final int count = 1000000;
final String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
final Random r = new Random(System.currentTimeMillis());
// reuse same same instance of Quote class to keep GC under control
final Quote q = new Quote();
long t = System.nanoTime();
for (int i = 0; i < count; i++) {
// prepare object for new set of data
q.clear();
// generate some data
q.setSym(symbols[Math.abs(r.nextInt() % (symbols.length - 1))]);
q.setAsk(Math.abs(r.nextDouble()));
q.setBid(Math.abs(r.nextDouble()));
q.setAskSize(Math.abs(r.nextInt() % 10000));
q.setBidSize(Math.abs(r.nextInt() % 10000));
q.setEx("LXE");
q.setMode("Fast trading");
q.setTimestamp(System.currentTimeMillis());
writer.append(q);
}
// commit is necessary
writer.commit();
System.out.println("Journal size: " + writer.size());
System.out.println("Generated " + count + " objects in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.append;
import com.lmax.disruptor.*;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import org.nfsdb.examples.model.Quote;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentAppend {
/**
* Appends 2 million quotes to two journals simultaneously.
*
* @param args factory directory
* @throws com.nfsdb.journal.exceptions.JournalException
*/
public static void main(String[] args) throws JournalException, InterruptedException {
if (args.length != 1) {
System.out.println("Usage: " + ConcurrentAppend.class.getName() + " <path>");
System.exit(1);
}
final ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch latch = new CountDownLatch(2);
final RingBuffer<Quote> ringBuffer = RingBuffer.createSingleProducer(new QuoteFactory(), 1024 * 64, new YieldingWaitStrategy());
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
try (JournalFactory factory = new JournalFactory(args[0])) {
try (JournalWriter<Quote> writer1 = factory.writer(Quote.class, "quote_1")) {
try (JournalWriter<Quote> writer2 = factory.writer(Quote.class, "quote_2")) {
BatchEventProcessor<Quote> processor1 = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, new Handler(writer1, latch, 0));
BatchEventProcessor<Quote> processor2 = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, new Handler(writer2, latch, 1));
ringBuffer.addGatingSequences(processor1.getSequence(), processor2.getSequence());
service.submit(processor1);
service.submit(processor2);
final int count = 2000000;
final String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
final Random r = new Random(System.currentTimeMillis());
long t = System.nanoTime();
for (int i = 0; i < count; i++) {
long sequence = ringBuffer.next();
Quote q = ringBuffer.get(sequence);
// generate some data
String sym = symbols[Math.abs(r.nextInt() % (symbols.length - 1))];
q.setSym(sym);
q.setAsk(Math.abs(r.nextDouble()));
q.setBid(Math.abs(r.nextDouble()));
q.setAskSize(Math.abs(r.nextInt() % 10000));
q.setBidSize(Math.abs(r.nextInt() % 10000));
q.setEx("LXE");
q.setMode("Fast trading");
q.setTimestamp(System.currentTimeMillis());
ringBuffer.publish(sequence);
}
// publish special EOF event
long sequence = ringBuffer.next();
Quote q = ringBuffer.get(sequence);
q.setTimestamp(-1);
ringBuffer.publish(sequence);
// wait for handlers to flush
latch.await();
System.out.println("Published " + count + " quotes in " + (System.nanoTime() - t) / 1000000 + "ms.");
// stop the threads
processor1.halt();
processor2.halt();
//
writer1.commit();
writer2.commit();
}
}
}
service.shutdown();
}
private static class Handler implements EventHandler<Quote> {
private final JournalWriter<Quote> writer;
private final CountDownLatch latch;
private final int condition;
@Override
public void onEvent(Quote event, long sequence, boolean endOfBatch) throws Exception {
if (event.getTimestamp() == -1) {
latch.countDown();
} else {
if (event.getSym().hashCode() % 2 == condition) {
writer.append(event);
}
}
}
private Handler(JournalWriter<Quote> writer, CountDownLatch latch, int condition) {
this.writer = writer;
this.latch = latch;
this.condition = condition;
}
}
private static class QuoteFactory implements EventFactory<Quote> {
@Override
public Quote newInstance() {
return new Quote();
}
}
}
package org.nfsdb.examples.append;
import com.nfsdb.journal.JournalKey;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.PartitionType;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import org.nfsdb.examples.model.Quote;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class PartitionByDayAppend {
/**
* Appends 1 million quotes into journal partitioned by day. Journal can only be partitioned on values of timestamp column.
*
* @param args factory directory
* @throws JournalException
*/
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + PartitionByDayAppend.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
try (JournalFactory factory = new JournalFactory(journalLocation)) {
// default partition type is configured in nfsdb.xml and it is MONTH
// you can change it in runtime and also, optionally put journal in alternative location
try (JournalWriter<Quote> writer = factory.writer(new JournalKey<>(Quote.class, "quote-by-day", PartitionType.DAY))) {
final int count = 1000000;
final String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
final Random r = new Random(System.currentTimeMillis());
// reuse same same instance of Quote class to keep GC under control
final Quote q = new Quote();
long t = System.nanoTime();
for (int i = 0; i < count; i++) {
// prepare object for new set of data
q.clear();
// generate some data
q.setSym(symbols[Math.abs(r.nextInt() % (symbols.length - 1))]);
q.setAsk(Math.abs(r.nextDouble()));
q.setBid(Math.abs(r.nextDouble()));
q.setAskSize(Math.abs(r.nextInt() % 10000));
q.setBidSize(Math.abs(r.nextInt() % 10000));
q.setEx("LXE");
q.setMode("Fast trading");
q.setTimestamp(System.currentTimeMillis() + (i * 100));
writer.append(q);
}
// commit is necessary
writer.commit();
System.out.println("Journal size: " + writer.size());
System.out.println("Generated " + count + " objects in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.iterate;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalConfiguration;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.iterators.ConcurrentIterator;
import com.nfsdb.thrift.ThriftNullsAdaptorFactory;
import org.nfsdb.examples.model.Quote;
import org.nfsdb.examples.support.QuoteGenerator;
import java.io.File;
import java.util.concurrent.TimeUnit;
public class ConcurrentIteratorExample {
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + ConcurrentIteratorExample.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
// this is another way to setup JournalFactory if you would like to provide NullsAdaptor. NullsAdaptor for thrift,
// which is used in this case implements JIT-friendly object reset method, which is quite fast.
try (JournalFactory factory = new JournalFactory(new JournalConfiguration(new File(journalLocation)).setNullsAdaptorFactory(new ThriftNullsAdaptorFactory()).build())) {
// get some data in :)
try (JournalWriter<Quote> w = factory.writer(Quote.class)) {
QuoteGenerator.generateQuoteData(w, 10000000);
}
// copying journal using fast BufferedIterator
try (Journal<Quote> src = factory.reader(Quote.class)) {
try (JournalWriter<Quote> w = factory.writer(Quote.class, "quote-copy2")) {
long t = System.nanoTime();
int count = 0;
try (ConcurrentIterator<Quote> iterator = src.concurrentIterator()) {
for (Quote q : iterator) {
w.append(q);
count++;
}
}
w.commit();
System.out.println("ConcurrentIterator copied " + count + " quotes in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
// copying journal using fast BufferedIterator
try (Journal<Quote> src = factory.reader(Quote.class)) {
try (JournalWriter<Quote> w = factory.writer(Quote.class, "quote-copy")) {
long t = System.nanoTime();
int count = 0;
for (Quote q : src.bufferedIterator()) {
w.append(q);
count++;
}
w.commit();
System.out.println("BufferedIterator copied " + count + " quotes in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
}
package org.nfsdb.examples.iterate;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.utils.Dates;
import org.nfsdb.examples.model.Quote;
import org.nfsdb.examples.support.QuoteGenerator;
import java.util.concurrent.TimeUnit;
public class IntervalExample {
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + IntervalExample.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
// this is another way to setup JournalFactory if you would like to provide NullsAdaptor. NullsAdaptor for thrift,
// which is used in this case implements JIT-friendly object reset method, which is quite fast.
try (JournalFactory factory = new JournalFactory(journalLocation)) {
// get some data in :)
try (JournalWriter<Quote> w = factory.writer(Quote.class)) {
QuoteGenerator.generateQuoteData(w, 10000000, 90);
}
// basic iteration
try (Journal<Quote> journal = factory.reader(Quote.class)) {
int count = 0;
long t = System.nanoTime();
// 10 days from now
long lo = System.currentTimeMillis() + 10 * 24 * 60 * 60 * 1000;
// 20 days from now
long hi = lo + 10 * 24 * 60 * 60 * 1000;
// iterate the interval between lo and hi millis.
for (Quote q : journal.query().all().iterator(Dates.interval(lo, hi))) {
assert q != null;
count++;
}
System.out.println("Iterator read " + count + " quotes in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.iterate;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalConfiguration;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.journal.iterators.ConcurrentIterator;
import com.nfsdb.thrift.ThriftNullsAdaptorFactory;
import org.nfsdb.examples.model.Quote;
import org.nfsdb.examples.support.QuoteGenerator;
import java.io.File;
import java.util.concurrent.TimeUnit;
public class IteratorExample {
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + IteratorExample.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
// this is another way to setup JournalFactory if you would like to provide NullsAdaptor. NullsAdaptor for thrift,
// which is used in this case implements JIT-friendly object reset method, which is quite fast.
try (JournalFactory factory = new JournalFactory(new JournalConfiguration(new File(journalLocation)).setNullsAdaptorFactory(new ThriftNullsAdaptorFactory()).build())) {
// get some data in :)
try (JournalWriter<Quote> w = factory.writer(Quote.class)) {
QuoteGenerator.generateQuoteData(w, 10000000);
}
// basic iteration
try (Journal<Quote> journal = factory.reader(Quote.class)) {
int count = 0;
long t = System.nanoTime();
// regular iterator
for (Quote q : journal) {
assert q != null;
count++;
}
System.out.println("Iterator read " + count + " quotes in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
// buffered iterator, this one reuses object it produces making no impact on GC
// it you intend to throw away majority of objects in the loop it is best to use buffered iterator
count = 0;
t = System.nanoTime();
for (Quote q : journal.bufferedIterator()) {
assert q != null;
count++;
}
System.out.println("Buffered iterator read " + count + " quotes in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
// concurrent iterator just as buffered iterator reuses object instance. Main difference is that all reading is done in another thread
// e.g in parallel with execution on main loop. Parallel iterator is sensitive to buffer size.
// because parallel iterator starts a thread it has to be closed after use
//
// there is an overhead to messaging between threads, but it would pay dividends if the for loop is either CPU or IO bound as both
// read and compute operations will be done in parallel.
count = 0;
t = System.nanoTime();
try (ConcurrentIterator<Quote> iterator = journal.concurrentIterator().buffer(1024 * 64)) {
for (Quote q : iterator) {
assert q != null;
count++;
}
}
System.out.println("Parallel iterator read " + count + " quotes in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.iterate;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalConfiguration;
import com.nfsdb.journal.factory.JournalFactory;
import com.nfsdb.thrift.ThriftNullsAdaptorFactory;
import org.nfsdb.examples.model.Quote;
import org.nfsdb.examples.support.QuoteGenerator;
import java.io.File;
import java.util.concurrent.TimeUnit;
public class SelectColumnsExample {
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + SelectColumnsExample.class.getName() + " <path>");
System.exit(1);
}
String journalLocation = args[0];
// this is another way to setup JournalFactory if you would like to provide NullsAdaptor. NullsAdaptor for thrift,
// which is used in this case implements JIT-friendly object reset method, which is quite fast.
try (JournalFactory factory = new JournalFactory(new JournalConfiguration(new File(journalLocation)).setNullsAdaptorFactory(new ThriftNullsAdaptorFactory()).build())) {
// get some data in :)
try (JournalWriter<Quote> w = factory.writer(Quote.class)) {
QuoteGenerator.generateQuoteData(w, 10000000);
}
// read only sym and askSize columns
try (Journal<Quote> journal = factory.reader(Quote.class).setReadColumns("sym", "askSize")) {
int count = 0;
long t = System.nanoTime();
for (Quote q : journal.bufferedIterator()) {
assert q != null;
count++;
}
System.out.println("Two columns of " + count + " quotes read in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t) + "ms.");
}
}
}
}
package org.nfsdb.examples.misc;
import com.nfsdb.journal.Journal;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.column.SymbolTable;
import com.nfsdb.journal.exceptions.JournalException;
import com.nfsdb.journal.factory.JournalFactory;
import org.nfsdb.examples.model.Quote;
import org.nfsdb.examples.support.QuoteGenerator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class ExistsExample {
public static void main(String[] args) throws JournalException {
if (args.length != 1) {
System.out.println("Usage: " + ExistsExample.class.getName() + " <path>");
System.exit(1);
}
try (JournalFactory factory = new JournalFactory(args[0])) {
// get some data in :)
try (JournalWriter<Quote> w = factory.writer(Quote.class)) {
QuoteGenerator.generateQuoteData(w, 1000000);
}
final Set<String> values = new HashSet<String>() {{
add("TLW.L");
add("ABF.L");
add("LLOY.L");
add("TLZ.L");
add("BT-A.L");
add("KBR.L");
add("WTB.L");
}};
try (Journal<Quote> journal = factory.reader(Quote.class)) {
long t = System.nanoTime();
//
// check values against SymbolTable, if they are there they would exist in journal too.
//
SymbolTable tab = journal.getSymbolTable("sym");
for (String v : values) {
if (tab.getQuick(v) == SymbolTable.VALUE_NOT_FOUND) {
System.out.println(v + ": MISSING");
} else {
System.out.println(v + ": ok");
}
}
System.out.println("Done in " + TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - t) + "μs");
}
}
}
}
package org.nfsdb.examples.support;
import com.nfsdb.journal.JournalWriter;
import com.nfsdb.journal.exceptions.JournalException;
import org.nfsdb.examples.model.Quote;
import java.util.Random;
public class QuoteGenerator {
public static void generateQuoteData(JournalWriter<Quote> w, int count) throws JournalException {
generateQuoteData(w, count, 30);
}
public static void generateQuoteData(JournalWriter<Quote> w, int count, int days) throws JournalException {
long lo = System.currentTimeMillis();
long hi = lo + days * 24 * 60 * 60 * 1000L;
long delta = (hi - lo) / count;
String symbols[] = {"AGK.L", "BP.L", "TLW.L", "ABF.L", "LLOY.L", "BT-A.L", "WTB.L", "RRS.L", "ADM.L", "GKN.L", "HSBA.L"};
Quote q = new Quote();
Random r = new Random(System.currentTimeMillis());
for (int i = 0; i < count; i++) {
q.clear();
q.setSym(symbols[Math.abs(r.nextInt() % (symbols.length))]);
q.setAsk(Math.abs(r.nextDouble()));
q.setBid(Math.abs(r.nextDouble()));
q.setAskSize(Math.abs(r.nextInt() % 10000));
q.setBidSize(Math.abs(r.nextInt() % 10000));
q.setEx("LXE");
q.setMode("Fast trading");
q.setTimestamp(lo);
w.append(q);
lo += delta;
}
w.commit();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2014. Vlad Ilyushchenko
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<db>
<journal class="org.nfsdb.examples.model.Quote" defaultPath="quote" timestampColumn="timestamp"
partitionType="MONTH" recordHint="1000000" openPartitionTTL="180" lagHours="24" key="sym">
<sym name="sym" indexed="true" maxsize="4" hintDistinctCount="15"/>
<sym name="ex" indexed="true" maxsize="2" hintDistinctCount="30"/>
<sym name="mode" indexed="true" hintDistinctCount="30"/>
</journal>
</db>
namespace java org.nfsdb.examples.model
struct Quote {
1: required i64 timestamp;
2: required string sym;
3: required double bid;
4: required double ask;
5: required i32 bidSize;
6: required i32 askSize;
7: required string mode;
8: required string ex;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册