Initial code drop, includes Ant buildfile and QueueFile

# Retrofit
Reusable Java and Android code from Square, Inc.
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright 2010 Square, Inc. -->
- Builds the Retrofit project, relying on Apache Ivy to download external
- dependencies.
- Retrofit is divided into modules, all of which are built from this single
- Ant buildfile. The directory structure is as follows:
- [project root]
- modules
- [module]
- src
- src-tests
name="Retrofit" basedir="." default="dist">
- The Retrofit version number, becomes part of the JAR file names.
<property name="retrofit.revision" value="0.1"/>
<property name="ivy.install.version" value="2.2.0"/>
<condition property="ivy.home" value="${env.IVY_HOME}">
<isset property="env.IVY_HOME"/>
<property name="ivy.home" value="${user.home}/.ant"/>
<property name="ivy.jar.dir" value="${ivy.home}/lib"/>
<property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar"/>
<property name="build.dir" location="build"/>
<property name="testreports.dir" location="${build.dir}/testreports"/>
- Gets Ivy from the public Maven repo so we don't need a custom Ant
- installation.
<target name="download-ivy" unless="offline">
<mkdir dir="${ivy.jar.dir}"/>
dest="${ivy.jar.file}" usetimestamp="true"/>
<target name="init-ivy" depends="download-ivy">
- Try to load ivy here from ivy home, in case the user has not already
- dropped it into ant's lib dir (note that the latter copy will always
- take precedence). We will not fail as long as local lib dir exists (it
- may be empty) and ivy is in at least one of ant's lib dir or the local
- lib dir.
<path id="ivy.lib.path">
<fileset dir="${ivy.jar.dir}" includes="*.jar"/>
<taskdef resource="org/apache/ivy/ant/antlib.xml"
uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
<target name="prepare" depends="init-ivy">
<!-- ISO 8601 format: 2010-06-02T15:25:45Z -->
<format property="build.time" timezone="GMT"
<format property="build.year" timezone="GMT"
- Assigns the git commit hash to the 'commit.hash' property, sending
- errors to the Ant log.
<exec executable="git"
<arg value="log"/>
<arg value="-n1"/>
<arg value="--pretty=format:%H"/>
<!-- Downloads dependencies and places JARs under lib. -->
<target name="clean">
<delete dir="${build.dir}"/>
<target name="compile" depends="prepare">
<build.module module="util"/>
- Reusable macro to compile a module and create its JARs. Assumes each
- module has this directory structure:
- modules
- \
- [modulename]
- \src
- \src-tests
<macrodef name="build.module">
<attribute name="module"
description="The directory name under 'modules'."/>
<element name="compile.main.classpath" optional="true"/>
<element name="compile.tests.classpath" optional="true"/>
<mkdir dir="${build.dir}/@{module}/main"/>
<!-- Compile src/* and place classes in build/[module]/main. -->
<javac srcdir="modules/@{module}/src"
<fileset dir="lib" includes="*.jar"/>
<!-- Compile src-tests/* and place classes in build/[module]/tests. -->
<mkdir dir="${build.dir}/@{module}/tests"/>
<javac srcdir="modules/@{module}/src-tests"
<fileset dir="lib" includes="*.jar"/>
<pathelement location="${build.dir}/@{module}/main"/>
<!-- Create the JAR files. -->
<jar jarfile="${build.dir}/retrofit-@{module}-${retrofit.revision}.jar">
<attribute name="Build-Time" value="${build.time}"/>
<attribute name="Copyright"
value="${build.year}, Square, Inc."/>
<attribute name="Commit-Hash" value="${commit.hash}"/>
<fileset dir="${build.dir}/@{module}/main"/>
<jar jarfile="${build.dir}/retrofit-@{module}-${retrofit.revision}-src.jar">
<attribute name="Build-Time" value="${build.time}"/>
<attribute name="Copyright"
value="${build.year}, Square, Inc."/>
<attribute name="Commit-Hash" value="${commit.hash}"/>
<fileset dir="modules/@{module}/src"/>
<target name="require.tests"
description="Sets a property so test failures abort the build.">
<property name="require.tests" value="true"/>
<target name="test" depends="compile" description="Runs tests.">
<mkdir dir="${testreports.dir}"/>
- If this property isn't already set, assume broken tests should not
- abort the build.
<property name="require.tests" value="false"/>
<junit printsummary="true" haltonfailure="${require.tests}">
<pathelement location="${build.dir}/util/main"/>
<pathelement location="${build.dir}/util/tests"/>
<fileset dir="lib" includes="*.jar"/>
<formatter type="xml"/>
<batchtest todir="${testreports.dir}">
<fileset dir="modules/util/src-tests">
<include name="**/*Test.java"/>
<junitreport todir="${testreports.dir}">
<fileset dir="${testreports.dir}" includes="TEST-*.xml"/>
<report todir="${testreports.dir}"/>
<echo message="${line.separator}Test report written to:${line.separator}${testreports.dir}/index.html"/>
<target name="dist" depends="require.tests,test"
description="Compiles, ensures tests pass, and creates all deliverables.">
<?xml version="1.0" encoding="utf-8"?>
<!-- Copyright 2010 Square, Inc. -->
- Ivy module file, describing dependencies on other projects.
- @author Eric Burke (eric@squareup.com)
<ivy-module version="2.0">
<info organisation="com.squareup" module="retrofit"/>
<!-- Note: find these at http://mvnrepository.xml/ -->
<dependency org="junit" name="junit" rev="4.8.1"/>
\ No newline at end of file
// Copyright 2010 Square, Inc.
package retrofit.util;
import junit.framework.ComparisonFailure;
import junit.framework.TestCase;
import java.io.*;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.logging.Logger;
* Tests for QueueFile.
* @author Bob Lee (bob@squareup.com)
public class QueueFileTest extends TestCase {
private static final Logger logger =
* Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of
* 255 so that the number of bytes isn't a multiple of 4.
private static int N = 254; //
private static byte[][] values = new byte[N][];
static {
for (int i = 0; i < N; i++) {
byte[] value = new byte[i];
// Example: values[3] = { 3, 2, 1 }
for (int ii = 0; ii < i; ii++) value[ii] = (byte) (i - ii);
values[i] = value;
private File file;
@Override protected void setUp() throws Exception {
file = File.createTempFile("test.queue", null);
@Override protected void tearDown() throws Exception {
public void testAddOneElement() throws IOException {
// This test ensures that we update 'first' correctly.
QueueFile queue = new QueueFile(file);
byte[] expected = values[253];
assertEquals(expected, queue.peek());
queue = new QueueFile(file);
assertEquals(expected, queue.peek());
public void testAddAndRemoveElements() throws IOException {
long start = System.nanoTime();
Queue<byte[]> expected = new LinkedList<byte[]>();
for (int round = 0; round < 5; round++) {
QueueFile queue = new QueueFile(file);
for (int i = 0; i < N; i++) {
// Leave N elements in round N, 15 total for 5 rounds. Removing all the
// elements would be like starting with an empty queue.
for (int i = 0; i < N - round - 1; i++) {
assertEquals(expected.remove(), queue.peek());
// Remove and validate remaining 15 elements.
QueueFile queue = new QueueFile(file);
assertEquals(15, queue.size());
assertEquals(expected.size(), queue.size());
while (!expected.isEmpty()) {
assertEquals(expected.remove(), queue.peek());
// length() returns 0, but I checked the size w/ 'ls', and it is correct.
// assertEquals(65536, file.length());
logger.info("Ran in " + ((System.nanoTime() - start) / 1000000) + "ms.");
/** Tests queue expansion when the data crosses EOF. */
public void testSplitExpansion() throws IOException {
// This should result in 3560 bytes.
int max = 80;
Queue<byte[]> expected = new LinkedList<byte[]>();
QueueFile queue = new QueueFile(file);
for (int i = 0; i < max; i++) {
// Remove all but 1.
for (int i = 1; i < max; i++) {
assertEquals(expected.remove(), queue.peek());
// This should wrap around before expanding.
for (int i = 0; i < N; i++) {
while (!expected.isEmpty()) {
assertEquals(expected.remove(), queue.peek());
public void testFailedAdd() throws IOException {
QueueFile queueFile = new QueueFile(file);
final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
queueFile = new QueueFile(braf);
try {
} catch (IOException e) { /* expected */ }
braf.rejectCommit = false;
// Allow a subsequent add to succeed.
queueFile = new QueueFile(file);
assertEquals(2, queueFile.size());
assertEquals(values[253], queueFile.peek());
assertEquals(values[251], queueFile.peek());
public void testFailedRemoval() throws IOException {
QueueFile queueFile = new QueueFile(file);
final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
queueFile = new QueueFile(braf);
try {
} catch (IOException e) { /* expected */ }
queueFile = new QueueFile(file);
assertEquals(1, queueFile.size());
assertEquals(values[253], queueFile.peek());
assertEquals(values[99], queueFile.peek());
public void testFailedExpansion() throws IOException {
QueueFile queueFile = new QueueFile(file);
final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
queueFile = new QueueFile(braf);
try {
// This should trigger an expansion which should fail.
queueFile.add(new byte[8000]);
} catch (IOException e) { /* expected */ }
queueFile = new QueueFile(file);
assertEquals(1, queueFile.size());
assertEquals(values[253], queueFile.peek());
assertEquals(4096, queueFile.fileLength);
assertEquals(values[99], queueFile.peek());
public void testPeakWithElementReader() throws IOException {
QueueFile queueFile = new QueueFile(file);
final byte[] a = {1, 2};
final byte[] b = {3, 4, 5};
queueFile.peek(new QueueFile.ElementReader() {
public void read(InputStream in, int length) throws IOException {
assertEquals(length, 2);
byte[] actual = new byte[length];
assertEquals(a, actual);
queueFile.peek(new QueueFile.ElementReader() {
public void read(InputStream in, int length) throws IOException {
assertEquals(length, 2);
assertEquals(1, in.read());
assertEquals(2, in.read());
assertEquals(-1, in.read());
queueFile.peek(new QueueFile.ElementReader() {
public void read(InputStream in, int length) throws IOException {
assertEquals(length, 3);
byte[] actual = new byte[length];
assertEquals(b, actual);
assertEquals(b, queueFile.peek());
assertEquals(1, queueFile.size());
public void testForEach() throws IOException {
QueueFile queueFile = new QueueFile(file);
final byte[] a = {1, 2};
final byte[] b = {3, 4, 5};
final int[] iteration = new int[]{0};
QueueFile.ElementReader elementReader = new QueueFile.ElementReader() {
public void read(InputStream in, int length) throws IOException {
if (iteration[0] == 0) {
assertEquals(length, 2);
byte[] actual = new byte[length];
assertEquals(a, actual);
} else if (iteration[0] == 1) {
assertEquals(length, 3);
byte[] actual = new byte[length];
assertEquals(b, actual);
} else {
assertEquals(a, queueFile.peek());
assertEquals(2, iteration[0]);
/** Compares two byte[]s for equality. */
private static void assertEquals(byte[] expected, byte[] actual) {
if (!Arrays.equals(expected, actual)) {
throw new ComparisonFailure(null, Arrays.toString(expected),
* Exercise a bug where wrapped elements were getting corrupted when the
* QueueFile was forced to expand in size and a portion of the final Element
* had been wrapped into space at the beginning of the file.
public void testFileExpansionDoesntCorruptWrappedElements()
throws IOException {
QueueFile queue = new QueueFile(file);
// Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5.
byte[][] values = new byte[5][];
for (int blockNum = 0; blockNum < values.length; blockNum++) {
values[blockNum] = new byte[1024];
for (int i = 0; i < values[blockNum].length; i++) {
values[blockNum][i] = (byte) (blockNum + 1);
// First, add the first two blocks to the queue, remove one leaving a
// 1K space at the beginning of the buffer.
// The trailing end of block "4" will be wrapped to the start of the buffer.
// Cause buffer to expand as there isn't space between the end of block "4"
// and the start of block "2". Internally the queue should cause block "4"
// to be contiguous, but there was a bug where that wasn't happening.
// Make sure values are not corrupted, specifically block "4" that wasn't
// being made contiguous in the version with the bug.
for (int blockNum = 1; blockNum < values.length; blockNum++) {
byte[] value = queue.peek();
for (int i = 0; i < value.length; i++) {
"Block " + (blockNum + 1) + " corrupted at byte index " + i,
(byte) (blockNum + 1), value[i]);
* Exercise a bug where wrapped elements were getting corrupted when the
* QueueFile was forced to expand in size and a portion of the final Element
* had been wrapped into space at the beginning of the file - if multiple
* Elements have been written to empty buffer space at the start does the
* expansion correctly update all their positions?
public void testFileExpansionCorrectlyMovesElements() throws IOException {
QueueFile queue = new QueueFile(file);
// Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5.
byte[][] values = new byte[5][];
for (int blockNum = 0; blockNum < values.length; blockNum++) {
values[blockNum] = new byte[1024];
for (int i = 0; i < values[blockNum].length; i++) {
values[blockNum][i] = (byte) (blockNum + 1);
// smaller data elements
byte[][] smaller = new byte[3][];
for (int blockNum = 0; blockNum < smaller.length; blockNum++) {
smaller[blockNum] = new byte[256];
for (int i = 0; i < smaller[blockNum].length; i++) {
smaller[blockNum][i] = (byte) (blockNum + 6);
// First, add the first two blocks to the queue, remove one leaving a
// 1K space at the beginning of the buffer.
// The trailing end of block "4" will be wrapped to the start of the buffer.
// Now fill in some space with smaller blocks, none of which will cause
// an expansion.
// Cause buffer to expand as there isn't space between the end of the
// smaller block "8" and the start of block "2". Internally the queue
// should cause all of tbe smaller blocks, and the trailing end of
// block "5" to be moved to the end of the file.
byte[] expectedBlockNumbers = {2, 3, 4, 6, 7, 8,};
// Make sure values are not corrupted, specifically block "4" that wasn't
// being made contiguous in the version with the bug.
for (byte expectedBlockNumber : expectedBlockNumbers) {
byte[] value = queue.peek();
for (int i = 0; i < value.length; i++) {
assertEquals("Block " + (expectedBlockNumber) +
" corrupted at byte index " + i,
expectedBlockNumber, value[i]);
* A RandomAccessFile that can break when you go to write the COMMITTED
* status.
static class BrokenRandomAccessFile extends RandomAccessFile {
boolean rejectCommit = true;
BrokenRandomAccessFile(File file, String mode)
throws FileNotFoundException {
super(file, mode);
@Override public void write(byte[] buffer) throws IOException {
if (rejectCommit && getFilePointer() == 0) {
throw new IOException("No commit for you!");
package retrofit.util;
* Object utility methods.
* @author Bob Lee (bob@squareup.com)
public class Objects {
* Returns t unless it's null.
* @throws NullPointerException if t is null
public static <T> T nonNull(T t, String name) {
if (t == null) throw new NullPointerException(name);
return t;
/** Returns true if the two possibly objects are equal. */
public static <T> boolean equal(T a, T b) {
return a == b || a != null && a.equals(b);
* Copyright (C) 2010 Square, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package retrofit.util;
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.logging.Logger;
* A reliable, efficient, file-based, FIFO queue. Additions and removals are
* O(1). All operations are atomic. Writes are synchronous; data will be written
* to disk before an operation returns. The underlying file is structured to
* survive process and even system crashes. If an I/O exception is thrown during
* a mutating change, the change is aborted. It is safe to continue to use a
* {@code QueueFile} instance after an exception.
* <p/>
* <p>All operations are synchronized. In a traditional queue, the remove
* operation returns an element. In this queue, {@link #peek} and {@link
* #remove} are used in conjunction. Use {@code peek} to retrieve the first
* element, and then {@code remove} to remove it after successful processing. If
* the system crashes after {@code peek} and during processing, the element will
* remain in the queue, to be processed when the system restarts.
* <p/>
* <p><b><font color="red">NOTE:</font></b> The current implementation is built
* for file systems that support atomic segment writes (like YAFFS). Most
* conventional file systems don't support this; if the power goes out while
* writing a segment, the segment will contain garbage and the file will be
* corrupt. We'll add journaling support so this class can be used with more
* file systems later.
* @author Bob Lee (bob@squareup.com)
public class QueueFile {
private static final Logger logger =
/** Initial file size in bytes. */
private static final int INITIAL_LENGTH = 4096; // one file system block
/** Length of header in bytes. */
static final int HEADER_LENGTH = 16;
* The underlying file. Uses a ring buffer to store entries. Designed so that
* a modification isn't committed or visible until we write the header. The
* header is much smaller than a segment. So long as the underlying file
* system supports atomic segment writes, changes to the queue are atomic.
* Storing the file length ensures we can recover from a failed expansion
* (i.e. if setting the file length succeeds but the process dies before the
* data can be copied).
* <p/>
* <pre>
* Format:
* Header (16 bytes)
* Element Ring Buffer (File Length - 16 bytes)
* <p/>
* Header:
* File Length (4 bytes)
* Element Count (4 bytes)
* First Element Position (4 bytes, =0 if null)
* Last Element Position (4 bytes, =0 if null)
* <p/>
* Element:
* Length (4 bytes)
* Data (Length bytes)
* </pre>
private final RandomAccessFile raf;
/** Cached file length. Always a power of 2. */
int fileLength;
/** Number of elements. */
private int elementCount;
/** Pointer to first (or eldest) element. */
private Element first;
/** Pointer to last (or newest) element. */
private Element last;
/** In-memory buffer. Big enough to hold the header. */
private final byte[] buffer = new byte[16];
* Constructs a new queue backed by the given file. Only one {@code QueueFile}
* instance should access a given file at a time.
public QueueFile(File file) throws IOException {
if (!file.exists()) initialize(file);
raf = open(file);
/** For testing. */
QueueFile(RandomAccessFile raf) throws IOException {
this.raf = raf;
* Stores int in buffer. The behavior is equivalent to calling {@link
* RandomAccessFile#writeInt}.
private static void writeInt(byte[] buffer, int offset, int value) {
buffer[offset] = (byte) (value >> 24);
buffer[offset + 1] = (byte) (value >> 16);
buffer[offset + 2] = (byte) (value >> 8);
buffer[offset + 3] = (byte) value;
* Stores int values in buffer. The behavior is equivalent to calling {@link
* RandomAccessFile#writeInt} for each value.
private static void writeInts(byte[] buffer, int... values) {
int offset = 0;
for (int value : values) {
writeInt(buffer, offset, value);
offset += 4;
/** Reads an int from a byte[]. */
private static int readInt(byte[] buffer, int offset) {
return ((buffer[offset] & 0xff) << 24)
+ ((buffer[offset + 1] & 0xff) << 16)
+ ((buffer[offset + 2] & 0xff) << 8)
+ (buffer[offset + 3] & 0xff);
/** Reads the header. */
private void readHeader() throws IOException {
fileLength = readInt(buffer, 0);
elementCount = readInt(buffer, 4);
int firstOffset = readInt(buffer, 8);
int lastOffset = readInt(buffer, 12);
first = readElement(firstOffset);
last = readElement(lastOffset);
* Writes header atomically. The arguments contain the updated values. The
* class member fields should not have changed yet. This only updates the
* state in the file. It's up to the caller to update the class member
* variables *after* this call succeeds. Assumes segment writes are atomic in
* the underlying file system.
private void writeHeader(int fileLength, int elementCount, int firstPosition,
int lastPosition) throws IOException {
writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition);
/** Returns the Element for the given offset. */
private Element readElement(int position) throws IOException {
if (position == 0) return Element.NULL;
return new Element(position, raf.readInt());
/** Atomically initializes a new file. */
private static void initialize(File file) throws IOException {
// Use a temp file so we don't leave a partially-initialized file.
File tempFile = new File(file.getPath() + ".tmp");
RandomAccessFile raf = open(tempFile);
try {
byte[] headerBuffer = new byte[16];
writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);
} finally {
// A rename is atomic.
if (!tempFile.renameTo(file)) throw new IOException("Rename failed!");
/** Opens a random access file that writes synchronously. */
private static RandomAccessFile open(File file) throws FileNotFoundException {
return new RandomAccessFile(file, "rwd");
/** Wraps the position if it exceeds the end of the file. */
private int wrapPosition(int position) {
return position < fileLength ? position
: HEADER_LENGTH + position - fileLength;
* Writes count bytes from buffer to position in file. Automatically wraps
* write if position is past the end of the file or if buffer overlaps it.
* @param position in file to write to
* @param buffer to write from
* @param count # of bytes to write
private void ringWrite(int position, byte[] buffer, int offset, int count)
throws IOException {
position = wrapPosition(position);
if (position + count <= fileLength) {
raf.write(buffer, offset, count);
} else {
// The write overlaps the EOF.
// # of bytes to write before the EOF.
int beforeEof = fileLength - position;
raf.write(buffer, offset, beforeEof);
raf.write(buffer, offset + beforeEof, count - beforeEof);
* Reads count bytes into buffer from file. Wraps if necessary.
* @param position in file to read from
* @param buffer to read into
* @param count # of bytes to read
private void ringRead(int position, byte[] buffer, int offset, int count)
throws IOException {
position = wrapPosition(position);
if (position + count <= fileLength) {
raf.readFully(buffer, 0, count);
} else {
// The read overlaps the EOF.
// # of bytes to read before the EOF.
int beforeEof = fileLength - position;
raf.readFully(buffer, offset, beforeEof);
raf.readFully(buffer, offset + beforeEof, count - beforeEof);
* Adds an element to the end of the queue.
* @param data to copy bytes from
public void add(byte[] data) throws IOException {
add(data, 0, data.length);
* Adds an element to the end of the queue.
* @param data to copy bytes from
* @param offset to start from in buffer
* @param count number of bytes to copy
* @throws IndexOutOfBoundsException if {@code offset < 0} or {@code count <
* 0}, or if {@code offset + count} is
* bigger than the length of {@code
* buffer}.
public synchronized void add(byte[] data, int offset, int count)
throws IOException {
Objects.nonNull(data, "buffer");
if ((offset | count) < 0 || count > data.length - offset) {
throw new IndexOutOfBoundsException();
// Insert a new element after the current last element.
boolean wasEmpty = isEmpty();
int position = wasEmpty ? HEADER_LENGTH : wrapPosition(
last.position + Element.HEADER_LENGTH + last.length);
Element newLast = new Element(position, count);
// Write length.
writeInt(buffer, 0, count);
ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);
// Write data.
ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);
// Commit the addition. If wasEmpty, first == last.
int firstPosition = wasEmpty ? newLast.position : first.position;
writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
last = newLast;
if (wasEmpty) first = last; // first element
/** Returns the number of used bytes. */
private int usedBytes() {
if (elementCount == 0) return HEADER_LENGTH;
if (last.position >= first.position) {
// Contiguous queue.
return (last.position - first.position) // all but last entry
+ Element.HEADER_LENGTH + last.length // last entry
} else {
// tail < head. The queue wraps.
return last.position // buffer front + header
+ Element.HEADER_LENGTH + last.length // last entry
+ fileLength - first.position; // buffer end
/** Returns number of unused bytes. */
private int remainingBytes() {
return fileLength - usedBytes();
/** Returns true if this queue contains no entries. */
public synchronized boolean isEmpty() {
return elementCount == 0;
* If necessary, expands the file to accommodate an additional element of the
* given length.
* @param dataLength length of data being added
private void expandIfNecessary(int dataLength) throws IOException {
int elementLength = Element.HEADER_LENGTH + dataLength;
int remainingBytes = remainingBytes();
if (remainingBytes >= elementLength) return;
// Expand.
int previousLength = fileLength;
int newLength;
// Double the length until we can fit the new data.
do {
remainingBytes += previousLength;
newLength = previousLength << 1;
previousLength = newLength;
} while (remainingBytes < elementLength);
// Calculate the position of the tail end of the data in the ring buffer
int endOfLastElement = wrapPosition(
last.position + Element.HEADER_LENGTH + last.length);
// If the buffer is split, we need to make it contiguous
if (endOfLastElement < first.position) {
FileChannel channel = raf.getChannel();
channel.position(fileLength); // destination position
int count = endOfLastElement - Element.HEADER_LENGTH;
if (channel.transferTo(HEADER_LENGTH, count, channel) != count) {
throw new AssertionError("Copied insufficient number of bytes!");
// Commit the expansion.
if (last.position < first.position) {
int newLastPosition = fileLength + last.position - HEADER_LENGTH;
writeHeader(newLength, elementCount, first.position, newLastPosition);
last = new Element(newLastPosition, last.length);
} else {
writeHeader(newLength, elementCount, first.position, last.position);
fileLength = newLength;
/** Reads the eldest element. Returns null if the queue is empty. */
public synchronized byte[] peek() throws IOException {
if (isEmpty()) return null;
int length = first.length;
byte[] data = new byte[length];
ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
return data;
/** Invokes reader with the eldest element, if an element is available. */
public synchronized void peek(ElementReader reader) throws IOException {
if (elementCount > 0) {
reader.read(new ElementInputStream(first), first.length);
* Invokes the given reader once for each element in the queue, from eldest to
* most recently added.
public synchronized void forEach(ElementReader reader) throws IOException {
int position = first.position;
for (int i = 0; i < elementCount; i++) {
Element current = readElement(position);
reader.read(new ElementInputStream(current), current.length);
position = wrapPosition(current.position + Element.HEADER_LENGTH
+ current.length);
/** Reads a single element. */
private class ElementInputStream extends InputStream {
private int position;
private int remaining;
private ElementInputStream(Element element) {
position = wrapPosition(element.position + Element.HEADER_LENGTH);
remaining = element.length;
@Override public int read(byte[] buffer, int offset, int length)
throws IOException {
Objects.nonNull(buffer, "buffer");
if ((offset | length) < 0 || length > buffer.length - offset) {
throw new ArrayIndexOutOfBoundsException();
if (length > remaining) length = remaining;
ringRead(position, buffer, offset, length);
position = wrapPosition(position + length);
remaining -= length;
return length;
@Override public int read() throws IOException {
if (remaining == 0) return -1;
int b = raf.read();
position = wrapPosition(position + 1);
return b;
/** Returns the number of elements in this queue. */
public synchronized int size() {
return elementCount;
* Removes the eldest element.
* @throw NoSuchElementException if the queue is empty
public synchronized void remove() throws IOException {
if (isEmpty()) throw new NoSuchElementException();
if (elementCount == 1) {
} else {
// assert elementCount > 1
int newFirstPosition = wrapPosition(first.position
+ Element.HEADER_LENGTH + first.length);
ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
int length = readInt(buffer, 0);
writeHeader(fileLength, elementCount - 1, newFirstPosition,
first = new Element(newFirstPosition, length);
/** Clears this queue. Truncates the file to the initial size. */
public synchronized void clear() throws IOException {
if (fileLength > INITIAL_LENGTH) raf.setLength(INITIAL_LENGTH);
writeHeader(INITIAL_LENGTH, 0, 0, 0);
elementCount = 0;
first = last = Element.NULL;
fileLength = INITIAL_LENGTH;
/** Closes the underlying file. */
public synchronized void close() throws IOException {
@Override public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append(", size=").append(elementCount);
builder.append(", first=").append(first);
builder.append(", last=").append(last);
builder.append(", element lengths=[");
try {
forEach(new ElementReader() {
boolean first = true;
public void read(InputStream in, int length) throws IOException {
if (first) {
first = false;
} else {
builder.append(", ");
} catch (IOException e) {
logger.log(Level.WARNING, "read error", e);
return builder.toString();
/** A pointer to an element. */
static class Element {
/** Length of element header in bytes. */
static final int HEADER_LENGTH = 4;
/** Null element. */
static final Element NULL = new Element(0, 0);
/** Position in file. */
final int position;
/** The length of the data. */
final int length;
* Constructs a new element.
* @param position within file
* @param length of data
Element(int position, int length) {
this.position = position;
this.length = length;
@Override public String toString() {
return getClass().getSimpleName() + "["
+ "position = " + position
+ ", length = " + length + "]";
* Reads queue elements. Enables partial reads as opposed to reading all of
* the bytes into a byte[].
public interface ElementReader {
* TODO: Support remove() call from read().
* Called once per element.
* @param in stream of element data. Reads as many bytes as requested,
* unless fewer than the request number of bytes remains, in
* which case it reads all the remaining bytes. Not buffered.
* @param length of element data in bytes
public void read(InputStream in, int length) throws IOException;
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src-tests" isTestSource="true" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="JARs" level="project" />
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/modules/util/src" isTestSource="false" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
