提交 f76540c1 编写于 作者: F fjy

move reference counting classes out of server manager and more tests for concurrency

上级 5d1fe507
......@@ -25,16 +25,13 @@ import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.ReferenceCountingSegment;
import com.metamx.druid.index.ReferenceCountingSequence;
import com.metamx.druid.index.Segment;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
......@@ -59,12 +56,10 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
......@@ -345,8 +340,6 @@ public class ServerManager implements QuerySegmentWalker
final QuerySegmentSpec segmentSpec
)
{
adapter.increment();
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
......@@ -374,122 +367,4 @@ public class ServerManager implements QuerySegmentWalker
segmentSpec
);
}
public static class ReferenceCountingSegment implements Segment
{
private final Segment baseSegment;
private final AtomicInteger references = new AtomicInteger(0);
public ReferenceCountingSegment(Segment baseSegment)
{
this.baseSegment = baseSegment;
}
@Override
public String getIdentifier()
{
return baseSegment.getIdentifier();
}
@Override
public Interval getDataInterval()
{
return baseSegment.getDataInterval();
}
@Override
public QueryableIndex asQueryableIndex()
{
return baseSegment.asQueryableIndex();
}
@Override
public StorageAdapter asStorageAdapter()
{
return baseSegment.asStorageAdapter();
}
@Override
public void close() throws IOException
{
baseSegment.close();
}
public void increment()
{
references.getAndIncrement();
}
public void decrement()
{
references.getAndDecrement();
if (references.get() < 0) {
try {
close();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
}
}
}
private static class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final ReferenceCountingSegment segment;
public ReferenceCountingSequence(Sequence<T> baseSequence, ReferenceCountingSegment segment)
{
this.baseSequence = baseSequence;
this.segment = segment;
}
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
return new ReferenceCountingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), segment);
}
}
private static class ReferenceCountingYielder<OutType> implements Yielder<OutType>
{
private final Yielder<OutType> baseYielder;
private final ReferenceCountingSegment segment;
public ReferenceCountingYielder(Yielder<OutType> baseYielder, ReferenceCountingSegment segment)
{
this.baseYielder = baseYielder;
this.segment = segment;
}
@Override
public OutType get()
{
return baseYielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return new ReferenceCountingYielder<OutType>(baseYielder.next(initValue), segment);
}
@Override
public boolean isDone()
{
return baseYielder.isDone();
}
@Override
public void close() throws IOException
{
segment.decrement();
baseYielder.close();
}
}
}
}
\ No newline at end of file
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index;
import com.metamx.druid.StorageAdapter;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;
import java.io.IOException;
public class ReferenceCountingSegment implements Segment
{
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
private final Segment baseSegment;
private final Object lock = new Object();
private volatile int numReferences = 0;
private volatile boolean isClosed = false;
public ReferenceCountingSegment(Segment baseSegment)
{
this.baseSegment = baseSegment;
}
public Segment getBaseSegment()
{
return baseSegment;
}
public boolean isClosed()
{
return isClosed;
}
@Override
public String getIdentifier()
{
return baseSegment.getIdentifier();
}
@Override
public Interval getDataInterval()
{
return baseSegment.getDataInterval();
}
@Override
public QueryableIndex asQueryableIndex()
{
return baseSegment.asQueryableIndex();
}
@Override
public StorageAdapter asStorageAdapter()
{
return baseSegment.asStorageAdapter();
}
@Override
public void close() throws IOException
{
baseSegment.close();
}
public void increment()
{
synchronized (lock) {
if (!isClosed) {
numReferences++;
}
}
}
public void decrement()
{
synchronized (lock) {
if (!isClosed) {
if (--numReferences < 0) {
try {
close();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
finally {
isClosed = true;
}
}
}
}
}
}
\ No newline at end of file
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import java.io.IOException;
/**
*/
public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final ReferenceCountingSegment segment;
public ReferenceCountingSequence(Sequence<T> baseSequence, ReferenceCountingSegment segment)
{
this.baseSequence = baseSequence;
this.segment = segment;
}
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
segment.increment();
return new ReferenceCountingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), segment);
}
private static class ReferenceCountingYielder<OutType> implements Yielder<OutType>
{
private final Yielder<OutType> baseYielder;
private final ReferenceCountingSegment segment;
public ReferenceCountingYielder(Yielder<OutType> baseYielder, ReferenceCountingSegment segment)
{
this.baseYielder = baseYielder;
this.segment = segment;
}
@Override
public OutType get()
{
return baseYielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return new ReferenceCountingYielder<OutType>(baseYielder.next(initValue), segment);
}
@Override
public boolean isDone()
{
return baseYielder.isDone();
}
@Override
public void close() throws IOException
{
segment.decrement();
baseYielder.close();
}
}
}
\ No newline at end of file
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.Segment;
import org.joda.time.Interval;
import java.io.IOException;
/**
*/
public class SegmentForTesting implements Segment
{
private final String version;
private final Interval interval;
private final Object lock = new Object();
private volatile boolean closed = false;
SegmentForTesting(
String version,
Interval interval
)
{
this.version = version;
this.interval = interval;
}
public String getVersion()
{
return version;
}
public Interval getInterval()
{
return interval;
}
@Override
public String getIdentifier()
{
return version;
}
public boolean isClosed()
{
return closed;
}
@Override
public Interval getDataInterval()
{
return interval;
}
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
synchronized (lock) {
closed = true;
}
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.druid.Druids;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.index.ReferenceCountingSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SearchResultValue;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*/
public class ServerManagerConcurrencyTest
{
private TestServerManager serverManager;
private ConcurrencyTestQueryRunnerFactory factory;
private CountDownLatch queryWaitLatch;
private CountDownLatch queryNotifyLatch;
@Before
public void setUp() throws IOException
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
queryWaitLatch = new CountDownLatch(1);
queryNotifyLatch = new CountDownLatch(1);
factory = new ConcurrencyTestQueryRunnerFactory(queryWaitLatch, queryNotifyLatch);
serverManager = new TestServerManager(factory);
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "2", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-03"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-04"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T01"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T02"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T03"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T06"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-02"));
}
@Test
public void testReferenceCounting() throws Exception
{
serverManager.loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
queryNotifyLatch.await();
Assert.assertTrue(factory.getAdapters().size() == 1);
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
queryWaitLatch.countDown();
future.get();
serverManager.dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertTrue(segmentForTesting.isClosed());
}
}
@Test
public void testReferenceCountingWhileQueryExecuting() throws Exception
{
serverManager.loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
queryNotifyLatch.await();
Assert.assertTrue(factory.getAdapters().size() == 1);
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
serverManager.dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
queryWaitLatch.countDown();
future.get();
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertTrue(segmentForTesting.isClosed());
}
}
private <T> Future assertQueryable(
QueryGranularity granularity,
String dataSource,
Interval interval,
List<Pair<String, Interval>> expected
)
{
final Iterator<Pair<String, Interval>> expectedIter = expected.iterator();
final List<Interval> intervals = Arrays.asList(interval);
final SearchQuery query = Druids.newSearchQueryBuilder()
.dataSource(dataSource)
.intervals(intervals)
.granularity(granularity)
.limit(10000)
.query("wow")
.build();
final QueryRunner<Result<SearchResultValue>> runner = serverManager.getQueryRunnerForIntervals(
query,
intervals
);
return Executors.newSingleThreadExecutor().submit(
new Runnable()
{
@Override
public void run()
{
Sequence<Result<SearchResultValue>> seq = runner.run(query);
Sequences.toList(seq, Lists.<Result<SearchResultValue>>newArrayList());
Iterator<SegmentForTesting> adaptersIter = factory.getAdapters().iterator();
while (expectedIter.hasNext() && adaptersIter.hasNext()) {
Pair<String, Interval> expectedVals = expectedIter.next();
SegmentForTesting value = adaptersIter.next();
Assert.assertEquals(expectedVals.lhs, value.getVersion());
Assert.assertEquals(expectedVals.rhs, value.getInterval());
}
Assert.assertFalse(expectedIter.hasNext());
Assert.assertFalse(adaptersIter.hasNext());
}
}
);
}
public static class ConcurrencyTestQueryRunnerFactory extends ServerManagerTest.MyQueryRunnerFactory
{
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
private List<SegmentForTesting> adapters = Lists.newArrayList();
public ConcurrencyTestQueryRunnerFactory(CountDownLatch waitLatch, CountDownLatch notifyLatch)
{
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public QueryRunner<Result<SearchResultValue>> createRunner(Segment adapter)
{
if (!(adapter instanceof ReferenceCountingSegment)) {
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
}
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
return new BlockingQueryRunner<Result<SearchResultValue>>(
new NoopQueryRunner<Result<SearchResultValue>>(),
waitLatch,
notifyLatch
);
}
@Override
public List<SegmentForTesting> getAdapters()
{
return adapters;
}
@Override
public void clearAdapters()
{
adapters.clear();
}
}
private static class BlockingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> runner;
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
public BlockingQueryRunner(
QueryRunner<T> runner,
CountDownLatch waitLatch,
CountDownLatch notifyLatch
)
{
this.runner = runner;
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public Sequence<T> run(Query<T> query)
{
return new BlockingSequence<T>(runner.run(query), waitLatch, notifyLatch);
}
}
private static class BlockingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
public BlockingSequence(
Sequence<T> baseSequence,
CountDownLatch waitLatch,
CountDownLatch notifyLatch
)
{
this.baseSequence = baseSequence;
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public <OutType> Yielder<OutType> toYielder(
final OutType initValue, final YieldingAccumulator<OutType, T> accumulator
)
{
notifyLatch.countDown();
final Yielder<OutType> baseYielder = baseSequence.toYielder(initValue, accumulator);
return new Yielder<OutType>()
{
@Override
public OutType get()
{
try {
waitLatch.await();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return baseYielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return baseYielder.next(initValue);
}
@Override
public boolean isDone()
{
return baseYielder.isDone();
}
@Override
public void close() throws IOException
{
baseYielder.close();
}
};
}
}
}
\ No newline at end of file
......@@ -23,10 +23,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.MapUtils;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
......@@ -34,31 +32,20 @@ import com.metamx.common.guava.Sequences;
import com.metamx.druid.Druids;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.ReferenceCountingSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SearchResultValue;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
......@@ -74,68 +61,30 @@ import java.util.concurrent.ExecutorService;
*/
public class ServerManagerTest
{
ServerManager serverManager;
TestServerManager serverManager;
MyQueryRunnerFactory factory;
private volatile boolean closed;
@Before
public void setUp() throws IOException
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
factory = new MyQueryRunnerFactory();
serverManager = new ServerManager(
new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
@Override
public Segment getSegment(final DataSegment segment)
{
return new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
(Interval) segment.getLoadSpec().get("interval")
);
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
},
new QueryRunnerFactoryConglomerate()
{
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return (QueryRunnerFactory) factory;
}
},
new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor()
);
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
loadQueryable("test", "1", new Interval("P1d/2011-04-02"));
loadQueryable("test", "2", new Interval("P1d/2011-04-02"));
loadQueryable("test", "1", new Interval("P1d/2011-04-03"));
loadQueryable("test", "1", new Interval("P1d/2011-04-04"));
loadQueryable("test", "1", new Interval("P1d/2011-04-05"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T01"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T02"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T03"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T05"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T06"));
loadQueryable("test2", "1", new Interval("P1d/2011-04-01"));
loadQueryable("test2", "1", new Interval("P1d/2011-04-02"));
closed = false;
serverManager = new TestServerManager(factory);
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "2", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-03"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-04"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T01"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T02"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T03"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T06"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-02"));
}
@Test
......@@ -174,7 +123,7 @@ public class ServerManagerTest
)
);
dropQueryable(dataSouce, "2", interval);
serverManager.dropQueryable(dataSouce, "2", interval);
assertQueryable(
QueryGranularity.DAY,
dataSouce, interval,
......@@ -187,7 +136,7 @@ public class ServerManagerTest
@Test
public void testDelete2() throws Exception
{
loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
serverManager.loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
assertQueryable(
QueryGranularity.DAY,
......@@ -197,8 +146,8 @@ public class ServerManagerTest
)
);
dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
dropQueryable("test", "1", new Interval("2011-04-04/2011-04-05"));
serverManager.dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
serverManager.dropQueryable("test", "1", new Interval("2011-04-04/2011-04-05"));
assertQueryable(
QueryGranularity.HOUR,
......@@ -232,70 +181,6 @@ public class ServerManagerTest
);
}
@Test
public void testReferenceCounting() throws Exception
{
loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
Assert.assertFalse(closed);
dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Assert.assertTrue(closed);
}
private void loadQueryable(String dataSource, String version, Interval interval) throws IOException
{
try {
serverManager.loadSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
private void dropQueryable(String dataSource, String version, Interval interval)
{
try {
serverManager.dropSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
private <T> void assertQueryable(
QueryGranularity granularity,
String dataSource,
......@@ -331,69 +216,17 @@ public class ServerManagerTest
factory.clearAdapters();
}
private class SegmentForTesting implements Segment
{
private final String version;
private final Interval interval;
SegmentForTesting(
String version,
Interval interval
)
{
this.version = version;
this.interval = interval;
}
public String getVersion()
{
return version;
}
public Interval getInterval()
{
return interval;
}
@Override
public String getIdentifier()
{
return version;
}
@Override
public Interval getDataInterval()
{
return interval;
}
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
closed = true;
}
}
public class MyQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
public static class MyQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
{
private List<SegmentForTesting> adapters = Lists.newArrayList();
@Override
public QueryRunner<Result<SearchResultValue>> createRunner(Segment adapter)
{
adapters.add(new SegmentForTesting(adapter.getIdentifier(), adapter.getDataInterval()));
if (!(adapter instanceof ReferenceCountingSegment)) {
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
}
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
return new NoopQueryRunner<Result<SearchResultValue>>();
}
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.MapUtils;
import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public class TestServerManager extends ServerManager
{
public TestServerManager(
final QueryRunnerFactory factory
)
{
super(
new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
@Override
public Segment getSegment(final DataSegment segment)
{
return new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
(Interval) segment.getLoadSpec().get("interval")
);
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
},
new QueryRunnerFactoryConglomerate()
{
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return (QueryRunnerFactory) factory;
}
},
new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor()
);
}
public void loadQueryable(String dataSource, String version, Interval interval) throws IOException
{
try {
super.loadSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
public void dropQueryable(String dataSource, String version, Interval interval)
{
try {
super.dropSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册