提交 77ec4df7 编写于 作者: F fjy

update guava, java-util, and druid-api

上级 f7c4d6a2
......@@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.Yielders;
......@@ -70,7 +71,7 @@ public class OrderedMergeSequence<T> implements Sequence<T>
return yielder.get();
}
finally {
Closeables.closeQuietly(yielder);
CloseQuietly.close(yielder);
}
}
......
......@@ -46,14 +46,9 @@ public class SerializerUtils
public void writeString(OutputSupplier<? extends OutputStream> supplier, String name) throws IOException
{
OutputStream out = null;
try {
out = supplier.getOutput();
try (OutputStream out = supplier.getOutput()) {
writeString(out, name);
}
finally {
Closeables.closeQuietly(out);
}
}
public void writeString(WritableByteChannel out, String name) throws IOException
......
......@@ -21,6 +21,7 @@ package io.druid.storage.hdfs;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
......@@ -52,22 +53,17 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
final FileSystem fs = checkPathAndGetFilesystem(path);
FSDataInputStream in = null;
try {
if (path.getName().endsWith(".zip")) {
in = fs.open(path);
CompressionUtils.unzip(in, dir);
in.close();
if (path.getName().endsWith(".zip")) {
try {
try (FSDataInputStream in = fs.open(path)) {
CompressionUtils.unzip(in, dir);
}
}
else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
catch (IOException e) {
throw new SegmentLoadingException(e, "Some IOException");
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Some IOException");
}
finally {
Closeables.closeQuietly(in);
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
......@@ -85,7 +81,8 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
}
}
private Path getPath(DataSegment segment) {
private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get("path")));
}
......
......@@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
......@@ -78,17 +79,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
fs.mkdirs(outFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, outFile);
FSDataOutputStream out = null;
long size;
try {
out = fs.create(outFile);
long size;
try (FSDataOutputStream out = fs.create(outFile)) {
size = CompressionUtils.zip(inDir, out);
out.close();
}
finally {
Closeables.closeQuietly(out);
}
return createDescriptorFile(
......
......@@ -31,6 +31,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
......@@ -420,7 +421,7 @@ public class IndexGeneratorJob implements Jobby
if (caughtException == null) {
Closeables.close(out, false);
} else {
Closeables.closeQuietly(out);
CloseQuietly.close(out);
throw Throwables.propagate(caughtException);
}
}
......@@ -600,7 +601,7 @@ public class IndexGeneratorJob implements Jobby
}
}
finally {
Closeables.closeQuietly(in);
CloseQuietly.close(in);
}
out.closeEntry();
context.progress();
......
......@@ -24,9 +24,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closeables;
import com.metamx.common.Granularity;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
......@@ -44,8 +43,8 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentConfig;
......@@ -353,7 +352,7 @@ public class RealtimeIndexTask extends AbstractTask
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
}
catch (FormattedException e) {
catch (Exception e) {
log.warn(e, "unparseable line");
fireDepartment.getMetrics().incrementUnparseable();
}
......@@ -375,7 +374,7 @@ public class RealtimeIndexTask extends AbstractTask
log.makeAlert(e, "Failed to finish realtime task").emit();
}
finally {
Closeables.closeQuietly(firehose);
CloseQuietly.close(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}
......
......@@ -585,7 +585,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = new Stopwatch();
Stopwatch timeoutStopwatch = Stopwatch.createUnstarted();
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task.getId())) {
......
......@@ -31,7 +31,7 @@ public class TestUtils
public static boolean conditionValid(IndexingServiceCondition condition)
{
try {
Stopwatch stopwatch = new Stopwatch();
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
while (!condition.isValid()) {
Thread.sleep(100);
......
......@@ -22,9 +22,9 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
......@@ -115,7 +115,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
}
@Override
public InputRow nextRow() throws FormattedException
public InputRow nextRow()
{
final byte[] message = iter.next().message();
......@@ -127,10 +127,8 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return theParser.parse(ByteBuffer.wrap(message));
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString()))
.build();
log.error("Unparseable row! Error parsing[%s]", ByteBuffer.wrap(message));
throw Throwables.propagate(e);
}
}
......
......@@ -21,9 +21,9 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
......@@ -123,7 +123,7 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
}
@Override
public InputRow nextRow() throws FormattedException
public InputRow nextRow()
{
final Message message = iter.next().message();
......@@ -134,16 +134,14 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return parseMessage(message);
}
public InputRow parseMessage(Message message) throws FormattedException
public InputRow parseMessage(Message message)
{
try {
return theParser.parse(message.payload());
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString()))
.build();
log.error("Unparseable row! Error parsing[%s]", message.payload());
throw Throwables.propagate(e);
}
}
......
......@@ -30,7 +30,7 @@
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.107-SNAPSHOT</tag>
<tag>druid-0.6.117-SNAPSHOT</tag>
</scm>
<prerequisites>
......@@ -39,9 +39,9 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.6</metamx.java-util.version>
<metamx.java-util.version>0.26.0-SNAPSHOT</metamx.java-util.version>
<apache.curator.version>2.4.0</apache.curator.version>
<druid.api.version>0.2.3</druid.api.version>
<druid.api.version>0.2.4-SNAPSHOT</druid.api.version>
</properties>
<modules>
......@@ -198,7 +198,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
<version>17.0</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
......
......@@ -28,10 +28,8 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec;
......@@ -94,7 +92,7 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
public InputRow parse(ByteBuffer input)
{
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly...
......
......@@ -19,7 +19,7 @@
package io.druid.query;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import io.druid.segment.ReferenceCountingSegment;
......@@ -52,7 +52,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
Closeables.closeQuietly(closeable);
CloseQuietly.close(closeable);
throw e;
}
}
......
......@@ -54,7 +54,7 @@ public class CardinalityAggregator implements Aggregator
// nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases.
if (size == 1) {
final String value = selector.lookupName(row.get(0));
hasher.putString(value != null ? value : NULL_STRING);
hasher.putUnencodedChars(value != null ? value : NULL_STRING);
} else if (size != 0) {
final String[] values = new String[size];
for (int i = 0; i < size; ++i) {
......@@ -67,7 +67,7 @@ public class CardinalityAggregator implements Aggregator
if (i != 0) {
hasher.putChar(SEPARATOR);
}
hasher.putString(values[i]);
hasher.putUnencodedChars(values[i]);
}
}
}
......@@ -79,7 +79,7 @@ public class CardinalityAggregator implements Aggregator
for (final DimensionSelector selector : selectors) {
for (final Integer index : selector.getRow()) {
final String value = selector.lookupName(index);
collector.add(hashFn.hashString(value == null ? NULL_STRING : value).asBytes());
collector.add(hashFn.hashUnencodedChars(value == null ? NULL_STRING : value).asBytes());
}
}
}
......
......@@ -25,12 +25,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
......@@ -123,7 +123,7 @@ public class GroupByQueryEngine
@Override
public void cleanup(RowIterator iterFromMake)
{
Closeables.closeQuietly(iterFromMake);
CloseQuietly.close(iterFromMake);
}
}
);
......@@ -135,7 +135,7 @@ public class GroupByQueryEngine
@Override
public void close() throws IOException
{
Closeables.closeQuietly(bufferHolder);
CloseQuietly.close(bufferHolder);
}
}
)
......
......@@ -19,8 +19,8 @@
package io.druid.query.topn;
import com.google.common.io.Closeables;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.query.aggregation.BufferAggregator;
......@@ -233,7 +233,7 @@ public class PooledTopNAlgorithm
if (resultsBufHolder != null) {
resultsBufHolder.get().clear();
}
Closeables.closeQuietly(resultsBufHolder);
CloseQuietly.close(resultsBufHolder);
}
public static class PooledTopNParams extends TopNParams
......
......@@ -19,8 +19,8 @@
package io.druid.segment;
import com.google.common.io.Closeables;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.guava.CloseQuietly;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.DictionaryEncodedColumn;
......@@ -95,7 +95,7 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
return column.length();
}
finally {
Closeables.closeQuietly(column);
CloseQuietly.close(column);
}
}
......
......@@ -165,15 +165,10 @@ public class IndexIO
}
final File indexFile = new File(inDir, "index.drd");
InputStream in = null;
int version;
try {
in = new FileInputStream(indexFile);
try (InputStream in = new FileInputStream(indexFile)) {
version = in.read();
}
finally {
Closeables.closeQuietly(in);
}
return version;
}
......@@ -194,8 +189,8 @@ public class IndexIO
case 2:
case 3:
log.makeAlert("Attempt to load segment of version <= 3.")
.addData("version", version)
.emit();
.addData("version", version)
.emit();
return false;
case 4:
case 5:
......
......@@ -38,6 +38,7 @@ import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.nary.BinaryFn;
......@@ -438,9 +439,9 @@ public class IndexMerger
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
}
finally {
Closeables.closeQuietly(channel);
CloseQuietly.close(channel);
channel = null;
Closeables.closeQuietly(fileOutputStream);
CloseQuietly.close(fileOutputStream);
fileOutputStream = null;
}
IndexIO.checkFileSize(indexFile);
......@@ -881,7 +882,7 @@ public class IndexMerger
);
}
finally {
Closeables.closeQuietly(channel);
CloseQuietly.close(channel);
channel = null;
}
IndexIO.checkFileSize(indexFile);
......
......@@ -20,7 +20,7 @@
package io.druid.segment;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
......@@ -118,9 +118,9 @@ public class MMappedIndexAdapter implements IndexableAdapter
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
CloseQuietly.close(timestamps);
for (IndexedFloats floatMetric : floatMetrics) {
Closeables.closeQuietly(floatMetric);
CloseQuietly.close(floatMetric);
}
done = true;
}
......
......@@ -20,11 +20,11 @@
package io.druid.segment;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.InputSupplier;
import com.google.common.io.OutputSupplier;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.common.utils.SerializerUtils;
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.CompressedFloatsSupplierSerializer;
......@@ -84,8 +84,8 @@ public class MetricHolder
ByteStreams.copy(in, out);
}
finally {
Closeables.closeQuietly(out);
Closeables.closeQuietly(in);
CloseQuietly.close(out);
CloseQuietly.close(in);
}
}
......
......@@ -22,8 +22,8 @@ package io.druid.segment;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
......@@ -208,10 +208,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
CloseQuietly.close(timestamps);
for (Object metric : metrics) {
if (metric instanceof Closeable) {
Closeables.closeQuietly((Closeable) metric);
CloseQuietly.close((Closeable) metric);
}
}
done = true;
......
......@@ -23,7 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
......@@ -108,7 +108,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return new DateTime(column.getLongSingleValueRow(0));
}
finally {
Closeables.closeQuietly(column);
CloseQuietly.close(column);
}
}
......@@ -121,7 +121,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return new DateTime(column.getLongSingleValueRow(column.length() - 1));
}
finally {
Closeables.closeQuietly(column);
CloseQuietly.close(column);
}
}
......@@ -531,16 +531,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void close() throws IOException
{
Closeables.closeQuietly(timestamps);
CloseQuietly.close(timestamps);
for (GenericColumn column : genericColumnCache.values()) {
Closeables.closeQuietly(column);
CloseQuietly.close(column);
}
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
CloseQuietly.close(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column);
CloseQuietly.close((Closeable) column);
}
}
}
......@@ -955,16 +955,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void close() throws IOException
{
Closeables.closeQuietly(timestamps);
CloseQuietly.close(timestamps);
for (GenericColumn column : genericColumnCache.values()) {
Closeables.closeQuietly(column);
CloseQuietly.close(column);
}
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
CloseQuietly.close(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if (column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column);
CloseQuietly.close((Closeable) column);
}
}
}
......
......@@ -20,7 +20,7 @@
package io.druid.segment.column;
import com.google.common.base.Supplier;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
/**
*/
......@@ -68,7 +68,7 @@ class SimpleColumn implements Column
return column.length();
}
finally {
Closeables.closeQuietly(column);
CloseQuietly.close(column);
}
}
......
......@@ -25,6 +25,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;
import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
......@@ -123,7 +124,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
private void loadBuffer(int bufferNum)
{
Closeables.closeQuietly(holder);
CloseQuietly.close(holder);
holder = baseFloatBuffers.get(bufferNum);
buffer = holder.get();
currIndex = bufferNum;
......
......@@ -106,17 +106,11 @@ public class CompressedFloatsSupplierSerializer
flattener.close();
OutputStream out = null;
try {
out = consolidatedOut.getOutput();
try (OutputStream out = consolidatedOut.getOutput()) {
out.write(CompressedFloatsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer));
ByteStreams.copy(flattener.combineStreams(), out);
}
finally {
Closeables.closeQuietly(out);
}
}
}
......@@ -25,6 +25,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
......@@ -122,7 +123,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
private void loadBuffer(int bufferNum)
{
Closeables.closeQuietly(holder);
CloseQuietly.close(holder);
holder = baseLongBuffers.get(bufferNum);
buffer = holder.get();
currIndex = bufferNum;
......
......@@ -100,17 +100,11 @@ public class CompressedLongsSupplierSerializer
flattener.close();
OutputStream out = null;
try {
out = consolidatedOut.getOutput();
try (OutputStream out = consolidatedOut.getOutput()) {
out.write(CompressedLongsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer));
ByteStreams.copy(flattener.combineStreams(), out);
}
finally {
Closeables.closeQuietly(out);
}
}
}
......@@ -20,7 +20,7 @@
package io.druid.segment.data;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFDecoder;
......@@ -74,7 +74,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
buf.put(outputBytes, 0, numDecompressedBytes);
buf.flip();
Closeables.closeQuietly(outputBytesHolder);
CloseQuietly.close(outputBytesHolder);
return new ResourceHolder<T>()
{
......@@ -105,7 +105,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length);
Closeables.closeQuietly(encoder);
CloseQuietly.close(encoder);
return chunk.getData();
}
......
......@@ -21,9 +21,9 @@ package io.druid.segment.data;
import com.google.common.base.Charsets;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
......@@ -73,14 +73,14 @@ public class GenericIndexed<T> implements Indexed<T>
allowReverseLookup = false;
}
if (prevVal instanceof Closeable) {
Closeables.closeQuietly((Closeable) prevVal);
CloseQuietly.close((Closeable) prevVal);
}
prevVal = next;
++count;
}
if (prevVal instanceof Closeable) {
Closeables.closeQuietly((Closeable) prevVal);
CloseQuietly.close((Closeable) prevVal);
}
ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(4 + (count * 4));
......@@ -98,7 +98,7 @@ public class GenericIndexed<T> implements Indexed<T>
valueBytes.write(bytes);
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
CloseQuietly.close((Closeable) object);
}
}
}
......
......@@ -22,7 +22,7 @@ package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
......@@ -153,7 +153,7 @@ public class InMemoryCompressedFloats implements IndexedFloats
private void loadBuffer(int bufferNum)
{
loadBuffer = null;
Closeables.closeQuietly(holder);
CloseQuietly.close(holder);
final byte[] compressedBytes = compressedBuffers.get(bufferNum);
holder = strategy.fromByteBuffer(ByteBuffer.wrap(compressedBytes), compressedBytes.length);
loadBuffer = holder.get();
......@@ -191,6 +191,6 @@ public class InMemoryCompressedFloats implements IndexedFloats
@Override
public void close() throws IOException
{
Closeables.closeQuietly(holder);
CloseQuietly.close(holder);
}
}
......@@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
......@@ -163,7 +164,7 @@ public class InMemoryCompressedLongs implements IndexedLongs
private void loadBuffer(int bufferNum)
{
loadBuffer = null;
Closeables.closeQuietly(holder);
CloseQuietly.close(holder);
final byte[] compressedBytes = compressedBuffers.get(bufferNum);
holder = strategy.fromByteBuffer(ByteBuffer.wrap(compressedBytes), compressedBytes.length);
loadBuffer = holder.get();
......
......@@ -180,6 +180,7 @@ public class TestIndex
new TimestampSpec("ts", "iso"),
new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null),
"\t",
"\u0001",
Arrays.asList(COLUMNS)
),
null, null, null, null
......
......@@ -19,8 +19,8 @@
package io.druid.segment.data;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import com.metamx.common.guava.CloseQuietly;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
......@@ -47,7 +47,7 @@ public class CompressedLongsIndexedSupplierTest
@Before
public void setUp() throws Exception
{
Closeables.closeQuietly(indexed);
CloseQuietly.close(indexed);
indexed = null;
supplier = null;
vals = null;
......@@ -56,7 +56,7 @@ public class CompressedLongsIndexedSupplierTest
@After
public void tearDown() throws Exception
{
Closeables.closeQuietly(indexed);
CloseQuietly.close(indexed);
}
private void setupSimple()
......@@ -247,7 +247,7 @@ public class CompressedLongsIndexedSupplierTest
stopLatch.await();
}
finally {
Closeables.closeQuietly(indexed2);
CloseQuietly.close(indexed2);
}
if (failureHappened.get()) {
......
......@@ -21,7 +21,6 @@ package io.druid.storage.s3;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.ISE;
......@@ -95,9 +94,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
try {
s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
InputStream in = null;
try {
in = s3Obj.getDataInputStream();
try (InputStream in = s3Obj.getDataInputStream()) {
final String key = s3Obj.getKey();
if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir);
......@@ -113,9 +110,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (IOException e) {
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
}
finally {
Closeables.closeQuietly(in);
}
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
......@@ -127,7 +121,8 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (Exception e) {
try {
FileUtils.deleteDirectory(outDir);
} catch (IOException ioe) {
}
catch (IOException ioe) {
log.warn(
ioe,
"Failed to remove output directory for segment[%s] after exception: %s",
......
......@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
......@@ -36,6 +35,7 @@ import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
......@@ -208,7 +208,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public void cleanup(JsonParserIterator<T> iterFromMake)
{
Closeables.closeQuietly(iterFromMake);
CloseQuietly.close(iterFromMake);
}
}
);
......@@ -251,7 +251,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
Closeables.closeQuietly(jp);
CloseQuietly.close(jp);
return false;
}
......
......@@ -23,10 +23,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
......@@ -112,7 +112,7 @@ public class Announcer
started = false;
for (Map.Entry<String, PathChildrenCache> entry : listeners.entrySet()) {
Closeables.closeQuietly(entry.getValue());
CloseQuietly.close(entry.getValue());
}
for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry : announcements.entrySet()) {
......@@ -353,7 +353,7 @@ public class Announcer
cache.start();
}
catch (Exception e) {
Closeables.closeQuietly(cache);
CloseQuietly.close(cache);
throw Throwables.propagate(e);
}
}
......
......@@ -23,9 +23,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
......@@ -95,7 +94,7 @@ public class RealtimeManager implements QuerySegmentWalker
public void stop()
{
for (FireChief chief : chiefs.values()) {
Closeables.closeQuietly(chief);
CloseQuietly.close(chief);
}
}
......@@ -185,7 +184,7 @@ public class RealtimeManager implements QuerySegmentWalker
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
final InputRow inputRow;
InputRow inputRow = null;
try {
try {
inputRow = firehose.nextRow();
......@@ -214,8 +213,10 @@ public class RealtimeManager implements QuerySegmentWalker
}
metrics.incrementProcessed();
}
catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails());
catch (Exception e) {
if (inputRow != null) {
log.error(e, "unparseable line: %s", inputRow);
}
metrics.incrementUnparseable();
continue;
}
......@@ -237,7 +238,7 @@ public class RealtimeManager implements QuerySegmentWalker
throw e;
}
finally {
Closeables.closeQuietly(firehose);
CloseQuietly.close(firehose);
if (normalExit) {
plumber.finishJob();
plumber = null;
......
......@@ -31,18 +31,13 @@ import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
import com.ircclouds.irc.api.state.IIRCState;
import com.metamx.common.Pair;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import org.joda.time.DateTime;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
......
......@@ -24,12 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.metamx.common.Pair;
import com.metamx.common.exception.FormattedException;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime;
/**
......@@ -52,7 +49,7 @@ public class IrcParser implements InputRowParser<Pair<DateTime, ChannelPrivMsg>>
}
@Override
public InputRow parse(Pair<DateTime, ChannelPrivMsg> msg) throws FormattedException
public InputRow parse(Pair<DateTime, ChannelPrivMsg> msg)
{
return decoder.decodeMessage(msg.lhs, msg.rhs.getChannelName(), msg.rhs.getText());
}
......
......@@ -25,8 +25,8 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
......@@ -200,7 +200,7 @@ public class QueryResource
}
finally {
resp.flushBuffer();
Closeables.closeQuietly(out);
CloseQuietly.close(out);
}
}
}
......@@ -23,11 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
......@@ -338,7 +338,7 @@ public class DruidClusterBridge
log.makeAlert(e, "Exception becoming leader")
.emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch);
CloseQuietly.close(oldLatch);
try {
leaderLatch.get().start();
}
......
......@@ -27,12 +27,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
......@@ -591,7 +591,7 @@ public class DruidCoordinator
log.makeAlert(e, "Unable to become leader")
.emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch);
CloseQuietly.close(oldLatch);
try {
leaderLatch.get().start();
}
......
......@@ -21,9 +21,9 @@ package io.druid.server.initialization;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import java.io.BufferedInputStream;
......@@ -80,7 +80,7 @@ public class PropertiesModule implements Module
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
}
finally {
Closeables.closeQuietly(stream);
CloseQuietly.close(stream);
}
binder.bind(Properties.class).toInstance(props);
......
......@@ -21,8 +21,8 @@ package io.druid.server.log;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import io.druid.server.RequestLogLine;
......@@ -83,7 +83,7 @@ public class FileRequestLogger implements RequestLogger
try {
synchronized (lock) {
Closeables.closeQuietly(fileWriter);
CloseQuietly.close(fileWriter);
fileWriter = new FileWriter(new File(baseDir, currentDay.toString()), true);
}
}
......@@ -105,7 +105,7 @@ public class FileRequestLogger implements RequestLogger
public void stop()
{
synchronized (lock) {
Closeables.closeQuietly(fileWriter);
CloseQuietly.close(fileWriter);
}
}
......
......@@ -27,7 +27,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import io.druid.data.input.Row;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
......@@ -222,6 +222,6 @@ public class SQLRunner
}
}
Closeables.closeQuietly(stdInput);
CloseQuietly.close(stdInput);
}
}
......@@ -335,7 +335,7 @@ public class BatchServerInventoryViewTest
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception
{
Stopwatch stopwatch = new Stopwatch().start();
Stopwatch stopwatch = Stopwatch.createStarted();
while (Iterables.isEmpty(batchServerInventoryView.getInventory())
|| Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
Thread.sleep(500);
......
......@@ -19,7 +19,7 @@
package io.druid.curator;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
......@@ -51,7 +51,7 @@ public class CuratorTestBase
protected void tearDownServerAndCurator()
{
Closeables.closeQuietly(curator);
Closeables.closeQuietly(server);
CloseQuietly.close(curator);
CloseQuietly.close(server);
}
}
......@@ -137,7 +137,7 @@ public class RealtimeManagerTest
{
realtimeManager.start();
Stopwatch stopwatch = new Stopwatch().start();
Stopwatch stopwatch = Stopwatch.createStarted();
while (realtimeManager.getMetrics("test").processed() != 1) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
......
......@@ -26,7 +26,6 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
......@@ -85,7 +84,7 @@ public class RealtimePlumberSchoolTest
new InputRowParser()
{
@Override
public InputRow parse(Object input) throws FormattedException
public InputRow parse(Object input)
{
return null;
}
......@@ -177,7 +176,7 @@ public class RealtimePlumberSchoolTest
}
);
Stopwatch stopwatch = new Stopwatch().start();
Stopwatch stopwatch = Stopwatch.createStarted();
while (!committed.booleanValue()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
......
......@@ -71,7 +71,7 @@ public class DruidCoordinatorBalancerProfiler
public void bigProfiler()
{
Stopwatch watch = new Stopwatch();
Stopwatch watch = Stopwatch.createUnstarted();
int numSegments = 55000;
int numServers = 50;
EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
......@@ -184,7 +184,7 @@ public class DruidCoordinatorBalancerProfiler
public void profileRun()
{
Stopwatch watch = new Stopwatch();
Stopwatch watch = Stopwatch.createUnstarted();
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
......
......@@ -22,7 +22,7 @@ package io.druid.cli.convert;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.airlift.command.Option;
......@@ -196,7 +196,7 @@ public class ConvertProperties implements Runnable
}
finally {
if (out != null) {
Closeables.closeQuietly(out);
CloseQuietly.close(out);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册