提交 fba2bc96 编写于 作者: C cheddar

Merge pull request #204 from metamx/close-handle

add the ability to unmap mmapped files
......@@ -23,6 +23,9 @@ import com.metamx.druid.index.column.ColumnSelector;
import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
/**
*/
public interface QueryableIndex extends ColumnSelector
......@@ -31,4 +34,11 @@ public interface QueryableIndex extends ColumnSelector
public int getNumRows();
public Indexed<String> getColumnNames();
public Indexed<String> getAvailableDimensions();
/**
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
* @throws IOException
*/
@Deprecated
public void close() throws IOException;
}
......@@ -19,10 +19,12 @@
package com.metamx.druid.index;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.Map;
/**
......@@ -34,13 +36,15 @@ public class SimpleQueryableIndex implements QueryableIndex
private final Indexed<String> availableDimensions;
private final Column timeColumn;
private final Map<String, Column> otherColumns;
private final SmooshedFileMapper fileMapper;
public SimpleQueryableIndex(
Interval dataInterval,
Indexed<String> columnNames,
Indexed<String> dimNames,
Column timeColumn,
Map<String, Column> otherColumns
Map<String, Column> otherColumns,
SmooshedFileMapper fileMapper
)
{
this.dataInterval = dataInterval;
......@@ -48,6 +52,7 @@ public class SimpleQueryableIndex implements QueryableIndex
this.availableDimensions = dimNames;
this.timeColumn = timeColumn;
this.otherColumns = otherColumns;
this.fileMapper = fileMapper;
}
@Override
......@@ -85,4 +90,10 @@ public class SimpleQueryableIndex implements QueryableIndex
{
return otherColumns.get(columnName);
}
@Override
public void close() throws IOException
{
fileMapper.close();
}
}
......@@ -375,7 +375,8 @@ public class IndexIO
dimValueLookups,
dimColumns,
invertedIndexed,
spatialIndexed
spatialIndexed,
smooshedFiles
);
log.debug("Mapped v8 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
......@@ -761,7 +762,8 @@ public class IndexIO
.setType(ValueType.LONG)
.setGenericColumn(new LongGenericColumnSupplier(index.timestamps))
.build(),
columns
columns,
index.getFileMapper()
);
}
}
......@@ -795,7 +797,7 @@ public class IndexIO
}
final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns
dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns, smooshedFiles
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
......
......@@ -24,18 +24,19 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.common.logger.Logger;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedList;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
import java.util.Arrays;
......@@ -57,6 +58,7 @@ public class MMappedIndex
final Map<String, VSizeIndexed> dimColumns;
final Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes;
final Map<String, ImmutableRTree> spatialIndexes;
final SmooshedFileMapper fileMapper;
private final Map<String, Integer> metricIndexes = Maps.newHashMap();
......@@ -69,7 +71,8 @@ public class MMappedIndex
Map<String, GenericIndexed<String>> dimValueLookups,
Map<String, VSizeIndexed> dimColumns,
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes,
Map<String, ImmutableRTree> spatialIndexes
Map<String, ImmutableRTree> spatialIndexes,
SmooshedFileMapper fileMapper
)
{
this.availableDimensions = availableDimensions;
......@@ -81,6 +84,7 @@ public class MMappedIndex
this.dimColumns = dimColumns;
this.invertedIndexes = invertedIndexes;
this.spatialIndexes = spatialIndexes;
this.fileMapper = fileMapper;
for (int i = 0; i < availableMetrics.size(); i++) {
metricIndexes.put(availableMetrics.get(i), i);
......@@ -169,6 +173,18 @@ public class MMappedIndex
return (retVal == null) ? emptySet : retVal;
}
public SmooshedFileMapper getFileMapper()
{
return fileMapper;
}
public void close() throws IOException
{
if (fileMapper != null) {
fileMapper.close();
}
}
public static MMappedIndex fromIndex(Index index)
{
log.info("Converting timestamps");
......@@ -273,7 +289,8 @@ public class MMappedIndex
dimValueLookups,
dimColumns,
invertedIndexes,
spatialIndexes
spatialIndexes,
null
);
}
}
......@@ -38,7 +38,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version>
<metamx.java-util.version>0.22.6</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties>
......
......@@ -24,11 +24,14 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.index.ReferenceCountingSegment;
import com.metamx.druid.index.ReferenceCountingSequence;
import com.metamx.druid.index.Segment;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
......@@ -53,6 +56,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
......@@ -71,7 +75,7 @@ public class ServerManager implements QuerySegmentWalker
private final ServiceEmitter emitter;
private final ExecutorService exec;
private final Map<String, VersionedIntervalTimeline<String, Segment>> dataSources;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources;
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
......@@ -88,7 +92,7 @@ public class ServerManager implements QuerySegmentWalker
this.exec = exec;
this.dataSources = new HashMap<String, VersionedIntervalTimeline<String, Segment>>();
this.dataSources = new HashMap<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>>();
}
public Map<String, Long> getDataSourceSizes()
......@@ -132,14 +136,14 @@ public class ServerManager implements QuerySegmentWalker
synchronized (lock) {
String dataSource = segment.getDataSource();
VersionedIntervalTimeline<String, Segment> loadedIntervals = dataSources.get(dataSource);
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
loadedIntervals = new VersionedIntervalTimeline<String, Segment>(Ordering.natural());
loadedIntervals = new VersionedIntervalTimeline<String, ReferenceCountingSegment>(Ordering.natural());
dataSources.put(dataSource, loadedIntervals);
}
PartitionHolder<Segment> entry = loadedIntervals.findEntry(
PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry(
segment.getInterval(),
segment.getVersion()
);
......@@ -149,7 +153,9 @@ public class ServerManager implements QuerySegmentWalker
}
loadedIntervals.add(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(adapter)
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
);
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, segment.getSize());
......@@ -164,17 +170,19 @@ public class ServerManager implements QuerySegmentWalker
{
String dataSource = segment.getDataSource();
synchronized (lock) {
VersionedIntervalTimeline<String, Segment> loadedIntervals = dataSources.get(dataSource);
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource);
return;
}
PartitionChunk<Segment> removed = loadedIntervals.remove(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk((Segment) null)
PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk((ReferenceCountingSegment) null)
);
Segment oldQueryable = (removed == null) ? null : removed.getObject();
ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
synchronized (dataSourceSizes) {
......@@ -183,6 +191,16 @@ public class ServerManager implements QuerySegmentWalker
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L);
}
try {
oldQueryable.close();
}
catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
......@@ -205,7 +223,7 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final VersionedIntervalTimeline<String, Segment> timeline = dataSources.get(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(query.getDataSource());
if (timeline == null) {
return new NoopQueryRunner<T>();
......@@ -214,20 +232,22 @@ public class ServerManager implements QuerySegmentWalker
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, Segment>>>()
new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, Segment>> apply(Interval input)
public Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> apply(Interval input)
{
return timeline.lookup(input);
}
}
)
.transformCat(
new Function<TimelineObjectHolder<String, Segment>, Iterable<QueryRunner<T>>>()
new Function<TimelineObjectHolder<String, ReferenceCountingSegment>, Iterable<QueryRunner<T>>>()
{
@Override
public Iterable<QueryRunner<T>> apply(@Nullable final TimelineObjectHolder<String, Segment> holder)
public Iterable<QueryRunner<T>> apply(
@Nullable final TimelineObjectHolder<String, ReferenceCountingSegment> holder
)
{
if (holder == null) {
return null;
......@@ -236,10 +256,10 @@ public class ServerManager implements QuerySegmentWalker
return FunctionalIterable
.create(holder.getObject())
.transform(
new Function<PartitionChunk<Segment>, QueryRunner<T>>()
new Function<PartitionChunk<ReferenceCountingSegment>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(PartitionChunk<Segment> input)
public QueryRunner<T> apply(PartitionChunk<ReferenceCountingSegment> input)
{
return buildAndDecorateQueryRunner(
factory,
......@@ -280,7 +300,7 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final VersionedIntervalTimeline<String, Segment> timeline = dataSources.get(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(query.getDataSource());
if (timeline == null) {
return new NoopQueryRunner<T>();
......@@ -293,9 +313,9 @@ public class ServerManager implements QuerySegmentWalker
{
@Override
@SuppressWarnings("unchecked")
public Iterable<QueryRunner<T>> apply(@Nullable SegmentDescriptor input)
public Iterable<QueryRunner<T>> apply(SegmentDescriptor input)
{
final PartitionHolder<Segment> entry = timeline.findEntry(
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
input.getInterval(), input.getVersion()
);
......@@ -303,12 +323,12 @@ public class ServerManager implements QuerySegmentWalker
return null;
}
final PartitionChunk<Segment> chunk = entry.getChunk(input.getPartitionNumber());
final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(input.getPartitionNumber());
if (chunk == null) {
return null;
}
final Segment adapter = chunk.getObject();
final ReferenceCountingSegment adapter = chunk.getObject();
return Arrays.asList(
buildAndDecorateQueryRunner(factory, toolChest, adapter, new SpecificSegmentSpec(input))
);
......@@ -323,10 +343,10 @@ public class ServerManager implements QuerySegmentWalker
}
private <T> QueryRunner<T> buildAndDecorateQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
Segment adapter,
QuerySegmentSpec segmentSpec
final ReferenceCountingSegment adapter,
final QuerySegmentSpec segmentSpec
)
{
return new SpecificSegmentQueryRunner<T>(
......@@ -335,7 +355,7 @@ public class ServerManager implements QuerySegmentWalker
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
......@@ -343,10 +363,17 @@ public class ServerManager implements QuerySegmentWalker
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query)
{
return new ReferenceCountingSequence<T>(factory.createRunner(adapter).run(query), adapter);
}
}
)
).withWaitMeasuredFromNow(),
segmentSpec
);
}
}
}
\ No newline at end of file
......@@ -24,6 +24,8 @@ import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
import java.io.IOException;
/**
*/
public class IncrementalIndexSegment implements Segment
......@@ -60,4 +62,10 @@ public class IncrementalIndexSegment implements Segment
{
return new IncrementalIndexStorageAdapter(index);
}
@Override
public void close() throws IOException
{
// do nothing
}
}
......@@ -23,6 +23,8 @@ import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.v1.QueryableIndexStorageAdapter;
import org.joda.time.Interval;
import java.io.IOException;
/**
*/
public class QueryableIndexSegment implements Segment
......@@ -59,4 +61,11 @@ public class QueryableIndexSegment implements Segment
{
return new QueryableIndexStorageAdapter(index);
}
@Override
public void close() throws IOException
{
// this is kinda nasty
index.close();
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index;
import com.metamx.druid.StorageAdapter;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReferenceCountingSegment implements Segment
{
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
private final Segment baseSegment;
private final Object lock = new Object();
private volatile int numReferences = 0;
private volatile boolean isClosed = false;
public ReferenceCountingSegment(Segment baseSegment)
{
this.baseSegment = baseSegment;
}
public Segment getBaseSegment()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment;
}
return null;
}
}
public int getNumReferences()
{
return numReferences;
}
public boolean isClosed()
{
return isClosed;
}
@Override
public String getIdentifier()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.getIdentifier();
}
return null;
}
}
@Override
public Interval getDataInterval()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.getDataInterval();
}
return null;
}
}
@Override
public QueryableIndex asQueryableIndex()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.asQueryableIndex();
}
return null;
}
}
@Override
public StorageAdapter asStorageAdapter()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.asStorageAdapter();
}
return null;
}
}
@Override
public void close() throws IOException
{
synchronized (lock) {
if (!isClosed) {
if (numReferences > 0) {
decrement();
} else {
baseSegment.close();
isClosed = true;
}
}
}
}
public Closeable increment()
{
synchronized (lock) {
if (!isClosed) {
numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
}
return null;
}
}
private void decrement()
{
synchronized (lock) {
if (!isClosed) {
if (--numReferences < 0) {
try {
close();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
}
}
}
}
}
\ No newline at end of file
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index;
import com.metamx.common.guava.ResourceClosingYielder;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
/**
*/
public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final ReferenceCountingSegment segment;
public ReferenceCountingSequence(Sequence<T> baseSequence, ReferenceCountingSegment segment)
{
this.baseSequence = baseSequence;
this.segment = segment;
}
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
return new ResourceClosingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), segment.increment());
}
}
\ No newline at end of file
......@@ -22,9 +22,11 @@ package com.metamx.druid.index;
import com.metamx.druid.StorageAdapter;
import org.joda.time.Interval;
import java.io.Closeable;
/**
*/
public interface Segment
public interface Segment extends Closeable
{
public String getIdentifier();
public Interval getDataInterval();
......
......@@ -22,6 +22,7 @@ package com.metamx.druid.loading;
import com.metamx.druid.index.QueryableIndex;
import java.io.File;
import java.io.IOException;
/**
*/
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index;
import com.google.common.base.Throwables;
import com.metamx.druid.StorageAdapter;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*/
public class ReferenceCountingSegmentTest
{
private ReferenceCountingSegment segment;
private ExecutorService exec;
@Before
public void setUp() throws Exception
{
segment = new ReferenceCountingSegment(
new Segment()
{
@Override
public String getIdentifier()
{
throw new UnsupportedOperationException();
}
@Override
public Interval getDataInterval()
{
throw new UnsupportedOperationException();
}
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
}
}
);
exec = Executors.newSingleThreadExecutor();
}
@Test
public void testMultipleClose() throws Exception
{
Assert.assertFalse(segment.isClosed());
final Closeable closeable = segment.increment();
Assert.assertTrue(segment.getNumReferences() == 1);
closeable.close();
closeable.close();
exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
closeable.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertFalse(segment.isClosed());
segment.close();
segment.close();
exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
segment.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertTrue(segment.isClosed());
segment.increment();
segment.increment();
segment.increment();
Assert.assertTrue(segment.getNumReferences() == 0);
segment.close();
Assert.assertTrue(segment.getNumReferences() == 0);
}
}
......@@ -27,6 +27,7 @@ import com.metamx.druid.index.Segment;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
......@@ -68,6 +69,11 @@ public class CacheTestSegmentLoader implements SegmentLoader
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
}
};
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册