提交 42cc87a2 编写于 作者: F fjy

Merge branch 'master' into refactor-indexing

Conflicts:
	indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexTask.java
	pom.xml
......@@ -29,7 +29,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -127,15 +127,15 @@ public class BrokerServerView implements TimelineServerView
}
}
private void addServer(DruidServer server)
private QueryableDruidServer addServer(DruidServer server)
{
QueryableDruidServer exists = clients.put(
server.getName(),
new QueryableDruidServer(server, makeDirectClient(server))
);
QueryableDruidServer retVal = new QueryableDruidServer(server, makeDirectClient(server));
QueryableDruidServer exists = clients.put(server.getName(), retVal);
if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!?", server);
log.warn("QueryRunner for server[%s] already existed!? Well it's getting replaced", server);
}
return retVal;
}
private DirectDruidClient makeDirectClient(DruidServer server)
......@@ -143,12 +143,12 @@ public class BrokerServerView implements TimelineServerView
return new DirectDruidClient(warehose, smileMapper, httpClient, server.getHost());
}
private void removeServer(DruidServer server)
private QueryableDruidServer removeServer(DruidServer server)
{
clients.remove(server.getName());
for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment);
}
return clients.remove(server.getName());
}
private void serverAddedSegment(final DruidServer server, final DataSegment segment)
......@@ -171,10 +171,11 @@ public class BrokerServerView implements TimelineServerView
selectors.put(segmentId, selector);
}
if (!clients.containsKey(server.getName())) {
addServer(server);
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
queryableDruidServer = addServer(server);
}
selector.addServer(clients.get(server.getName()));
selector.addServer(queryableDruidServer);
}
}
......@@ -236,6 +237,7 @@ public class BrokerServerView implements TimelineServerView
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
return null;
}
return queryableDruidServer.getClient();
}
......
......@@ -44,6 +44,7 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.selector.QueryableDruidServer;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
......@@ -55,7 +56,7 @@ import com.metamx.druid.query.segment.MultipleSpecificSegmentSpec;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.result.BySegmentResultValueClass;
import com.metamx.druid.result.Result;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval;
......@@ -73,7 +74,7 @@ import java.util.concurrent.Executors;
*/
public class CachingClusteredClient<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(CachingClusteredClient.class);
private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
private final QueryToolChestWarehouse warehouse;
private final TimelineServerView serverView;
......@@ -120,7 +121,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final Query<T> rewrittenQuery;
......@@ -160,22 +162,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
final byte[] queryCacheKey;
if(strategy != null) {
if (strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
// Pull cached segments from cache and remove from set of segments to query
if(useCache && queryCacheKey != null) {
if (useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
for (Pair<ServerSelector, SegmentDescriptor> e : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
}
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue();
......@@ -190,8 +192,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// remove cached segment from set of segments to query
segments.remove(segment);
}
else {
} else {
final String segmentIdentifier = selector.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
......@@ -202,16 +203,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
// Compile list of all segments not pulled from cache
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final DruidServer server = segment.lhs.pick().getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
if (queryableDruidServer == null) {
log.error("No servers found for %s?! How can this be?!", segment.rhs);
} else {
final DruidServer server = queryableDruidServer.getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
}
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
descriptors.add(segment.rhs);
}
descriptors.add(segment.rhs);
}
return new LazySequence<T>(
......@@ -235,8 +242,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
if (strategy == null) {
return toolChest.mergeSequences(seq);
}
else {
} else {
return strategy.mergeSequences(seq);
}
}
......@@ -291,7 +297,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final QueryRunner clientQueryable = serverView.getQueryRunner(server);
if (clientQueryable == null) {
throw new ISE("WTF!? server[%s] doesn't have a client Queryable?", server);
log.makeAlert("WTF!? server[%s] doesn't have a client Queryable?", server).emit();
continue;
}
final Sequence<T> resultSeqToAdd;
......@@ -349,7 +356,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
private Cache.NamedKey computeSegmentCacheKey(
String segmentIdentifier,
SegmentDescriptor descriptor,
byte[] queryCacheKey
)
{
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();
......
......@@ -124,9 +124,16 @@ public class DruidServer implements Comparable
return segments.get(segmentName);
}
public DruidServer addDataSegment(String segmentName, DataSegment segment)
public DruidServer addDataSegment(String segmentId, DataSegment segment)
{
synchronized (lock) {
DataSegment shouldNotExist = segments.get(segmentId);
if (shouldNotExist != null) {
log.warn("Asked to add data segment that already exists!? server[%s], segment[%s]", getName(), segmentId);
return this;
}
String dataSourceName = segment.getDataSource();
DruidDataSource dataSource = dataSources.get(dataSourceName);
......@@ -138,9 +145,9 @@ public class DruidServer implements Comparable
dataSources.put(dataSourceName, dataSource);
}
dataSource.addSegment(segmentName, segment);
segments.put(segmentName, segment);
dataSource.addSegment(segmentId, segment);
segments.put(segmentId, segment);
currSize += segment.getSize();
}
return this;
......@@ -156,13 +163,13 @@ public class DruidServer implements Comparable
return this;
}
public DruidServer removeDataSegment(String segmentName)
public DruidServer removeDataSegment(String segmentId)
{
synchronized (lock) {
DataSegment segment = segments.get(segmentName);
DataSegment segment = segments.get(segmentId);
if (segment == null) {
log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentName);
log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentId);
return this;
}
......@@ -172,18 +179,20 @@ public class DruidServer implements Comparable
log.warn(
"Asked to remove data segment from dataSource[%s] that doesn't exist, but the segment[%s] exists!?!?!?! wtf? server[%s]",
segment.getDataSource(),
segmentName,
segmentId,
getName()
);
return this;
}
dataSource.removePartition(segmentName);
segments.remove(segmentName);
dataSource.removePartition(segmentId);
segments.remove(segmentId);
currSize -= segment.getSize();
if (dataSource.isEmpty()) {
dataSources.remove(dataSource.getName());
}
currSize -= segment.getSize();
}
return this;
......
......@@ -148,7 +148,18 @@ public class ServerInventoryView implements ServerView, InventoryView
@Override
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventory);
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
if (container.getSegment(inventoryKey) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
}
final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
runSegmentCallbacks(
......@@ -170,6 +181,17 @@ public class ServerInventoryView implements ServerView, InventoryView
{
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
}
final DruidServer retVal = container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
......
......@@ -83,7 +83,12 @@ public class ServerSelector
public QueryableDruidServer pick()
{
synchronized (this) {
return Collections.min(servers, comparator);
final int size = servers.size();
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Collections.min(servers, comparator);
}
}
}
}
......@@ -44,10 +44,10 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* An InventoryManager watches updates to inventory on Zookeeper (or some other discovery-like service publishing
* system). It is built up on two object types: containers and inventory objects.
*
* <p/>
* The logic of the InventoryManager just maintains a local cache of the containers and inventory it sees on ZK. It
* provides methods for getting at the container objects, which house the actual individual pieces of inventory.
*
* <p/>
* A Strategy is provided to the constructor of an Inventory manager, this strategy provides all of the
* object-specific logic to serialize, deserialize, compose and alter the container and inventory objects.
*/
......@@ -128,8 +128,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
final ContainerHolder containerHolder = containers.remove(containerKey);
if (containerHolder == null) {
log.wtf("!? Got key[%s] from keySet() but it didn't have a value!?", containerKey);
}
else {
} else {
// This close() call actually calls shutdownNow() on the executor registered with the Cache object...
containerHolder.getCache().close();
}
......@@ -202,52 +201,60 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
switch (event.getType()) {
case CHILD_ADDED:
container = strategy.deserializeContainer(child.getData());
synchronized (lock) {
container = strategy.deserializeContainer(child.getData());
// This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear
// and come back, we expect a removed event in between.
if (containers.containsKey(containerKey)) {
log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
} else {
final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
log.info("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath);
inventoryCache.start();
strategy.newContainer(container);
}
// This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear
// and come back, we expect a removed event in between.
if (containers.containsKey(containerKey)) {
log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
break;
}
case CHILD_REMOVED:
synchronized (lock) {
final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
break;
}
final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
inventoryCache.start();
strategy.newContainer(container);
// This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
// better have its own executor or ignore shutdownNow() calls...
log.info("Closing inventory cache for %s. Also removing listeners.", containerKey);
removed.getCache().getListenable().clear();
removed.getCache().close();
strategy.deadContainer(removed.getContainer());
break;
case CHILD_REMOVED:
final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
break;
}
// This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
// better have its own executor or ignore shutdownNow() calls...
removed.getCache().close();
strategy.deadContainer(removed.getContainer());
break;
case CHILD_UPDATED:
container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) {
log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
}
else {
synchronized (oldContainer) {
oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
synchronized (lock) {
container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) {
log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
} else {
synchronized (oldContainer) {
oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
}
}
}
break;
break;
}
}
}
......@@ -260,6 +267,8 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
{
this.containerKey = containerKey;
this.inventoryPath = inventoryPath;
log.info("Created new InventoryCacheListener for %s", inventoryPath);
}
@Override
......
......@@ -31,32 +31,36 @@ public class ColumnAnalysis
public static ColumnAnalysis error(String reason)
{
return new ColumnAnalysis(ERROR_PREFIX + reason, -1, null);
return new ColumnAnalysis(ValueType.STRING, -1, null, ERROR_PREFIX + reason);
}
private final String type;
private final long size;
private final Integer cardinality;
private final String errorMessage;
@JsonCreator
public ColumnAnalysis(
@JsonProperty("type") ValueType type,
@JsonProperty("size") long size,
@JsonProperty("cardinality") Integer cardinality
@JsonProperty("cardinality") Integer cardinality,
@JsonProperty("errorMessage") String errorMessage
)
{
this(type.name(), size, cardinality);
this(type.name(), size, cardinality, errorMessage);
}
private ColumnAnalysis(
String type,
long size,
Integer cardinality
Integer cardinality,
String errorMessage
)
{
this.type = type;
this.size = size;
this.cardinality = cardinality;
this.errorMessage = errorMessage;
}
@JsonProperty
......@@ -77,9 +81,15 @@ public class ColumnAnalysis
return cardinality;
}
@JsonProperty
public String getErrorMessage()
{
return errorMessage;
}
public boolean isError()
{
return type.startsWith(ERROR_PREFIX);
return (errorMessage != null && !errorMessage.isEmpty());
}
public ColumnAnalysis fold(ColumnAnalysis rhs)
......@@ -103,7 +113,7 @@ public class ColumnAnalysis
}
}
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality);
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality, null);
}
@Override
......@@ -113,6 +123,7 @@ public class ColumnAnalysis
"type='" + type + '\'' +
", size=" + size +
", cardinality=" + cardinality +
", errorMessage='" + errorMessage + '\'' +
'}';
}
}
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.partition.ImmutablePartitionHolder;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.partition.PartitionHolder;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
......@@ -78,7 +79,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
this.versionComparator = versionComparator;
}
public void add(Interval interval, VersionType version, PartitionChunk<ObjectType> object)
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
try {
lock.writeLock().lock();
......@@ -278,6 +279,13 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
addIntervalToTimeline(interval, entry, timeline);
}
/**
*
* @param timeline
* @param key
* @param entry
* @return boolean flag indicating whether or not we inserted or discarded something
*/
private boolean addAtKey(
NavigableMap<Interval, TimelineEntry> timeline,
Interval key,
......@@ -292,7 +300,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
return false;
}
while (currKey != null && currKey.overlaps(entryInterval)) {
while (entryInterval != null && currKey != null && currKey.overlaps(entryInterval)) {
Interval nextKey = timeline.higherKey(currKey);
int versionCompare = versionComparator.compare(
......@@ -311,7 +319,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
if (entryInterval.getEnd().isAfter(currKey.getEnd())) {
entryInterval = new Interval(currKey.getEnd(), entryInterval.getEnd());
} else {
entryInterval = null;
entryInterval = null; // discard this entry
}
}
} else if (versionCompare > 0) {
......@@ -491,4 +499,9 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
return partitionHolder;
}
}
public static void main(String[] args)
{
System.out.println(new Interval(new DateTime(), (DateTime) null));
}
}
......@@ -1068,6 +1068,27 @@ public class VersionedIntervalTimelineTest
);
}
// |----3---||---1---|
// |---2---|
@Test
public void testOverlapCausesNullEntries() throws Exception
{
timeline = makeStringIntegerTimeline();
add("2011-01-01T12/2011-01-02", "3", 3);
add("2011-01-02/3011-01-03", "1", 1);
add("2011-01-01/2011-01-02", "2", 2);
assertValues(
Arrays.asList(
createExpected("2011-01-01/2011-01-01T12", "2", 2),
createExpected("2011-01-01T12/2011-01-02", "3", 3),
createExpected("2011-01-02/3011-01-03", "1", 1)
),
timeline.lookup(new Interval("2011-01-01/3011-01-03"))
);
}
// 1|----| |----|
// 2|------| |------|
// 3|------------------|
......
## Introduction
Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables:
`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The
index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains
compressed segments for distribution to compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread
the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that
stores the segment metadatak.
## Schema
Below are the create statements for each:
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
## Getting Started
First create the schema above. (I use a new keyspace called `druid`)
Then, add the following properties to your properties file to enable a Cassandra
backend.
druid.pusher.cassandra=true
druid.pusher.cassandra.host=localhost:9160
druid.pusher.cassandra.keyspace=druid
Use the `druid-development@googlegroups.com` mailing list if you have questions,
or feel free to reach out directly: `bone@alumni.brown.edu`.
curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query
{
"queryType": "groupBy",
"dataSource": "randSeq",
"granularity": "all",
"dimensions": [],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
],
"postAggregations":[
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
......@@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -50,7 +50,7 @@ public class RealtimeStandaloneMain
rn.setDataSegmentPusher(new NoopDataSegmentPusher());
rn.setServerView(new NoopServerView());
rn.setInventoryView(new NoopInventoryView());
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -27,6 +27,7 @@ public interface BitmapIndex
{
public int getCardinality();
public String getValue(int index);
public boolean hasNulls();
public ImmutableConciseSet getConciseSet(String value);
public ImmutableConciseSet getConciseSet(int idx);
}
......@@ -58,6 +58,12 @@ public class BitmapIndexColumnPartSupplier implements Supplier<BitmapIndex>
return dictionary.get(index);
}
@Override
public boolean hasNulls()
{
return dictionary.indexOf(null) >= 0;
}
@Override
public ImmutableConciseSet getConciseSet(String value)
{
......
......@@ -356,6 +356,11 @@ public class IncrementalIndex implements Iterable<Row>
return spatialDimensions;
}
public SpatialDimensionRowFormatter getSpatialDimensionRowFormatter()
{
return spatialDimensionRowFormatter;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
......
......@@ -790,6 +790,9 @@ public class IndexMerger
int count = 0;
for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
if (dimVal == null) {
continue;
}
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
......
......@@ -22,30 +22,50 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Floats;
import com.metamx.common.ISE;
import com.metamx.druid.input.InputRow;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* We throw away all invalid spatial dimensions
*/
public class SpatialDimensionRowFormatter
{
private static final Joiner JOINER = Joiner.on(",");
private static final Splitter SPLITTER = Splitter.on(",");
private final List<SpatialDimensionSchema> spatialDimensions;
private final Set<String> spatialDimNames;
private final Set<String> spatialPartialDimNames;
public SpatialDimensionRowFormatter(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
Lists.transform(
spatialDimensions,
new Function<SpatialDimensionSchema, String>()
{
@Override
public String apply(SpatialDimensionSchema input)
{
return input.getDimName();
}
}
)
);
this.spatialPartialDimNames = Sets.newHashSet(
Iterables.concat(
Lists.transform(
spatialDimensions,
......@@ -64,7 +84,7 @@ public class SpatialDimensionRowFormatter
public InputRow formatRow(final InputRow row)
{
final Map<String, List<String>> finalDimLookup = Maps.newHashMap();
final Map<String, List<String>> spatialLookup = Maps.newHashMap();
// remove all spatial dimensions
final List<String> finalDims = Lists.newArrayList(
......@@ -86,14 +106,11 @@ public class SpatialDimensionRowFormatter
@Override
public boolean apply(String input)
{
return !spatialDimNames.contains(input);
return !spatialDimNames.contains(input) && !spatialPartialDimNames.contains(input);
}
}
)
);
for (String dim : finalDims) {
finalDimLookup.put(dim, row.getDimension(dim));
}
InputRow retVal = new InputRow()
{
......@@ -112,7 +129,8 @@ public class SpatialDimensionRowFormatter
@Override
public List<String> getDimension(String dimension)
{
return finalDimLookup.get(dimension);
List<String> retVal = spatialLookup.get(dimension);
return (retVal == null) ? row.getDimension(dimension) : retVal;
}
@Override
......@@ -122,19 +140,62 @@ public class SpatialDimensionRowFormatter
}
};
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (dimVals == null || dimVals.isEmpty()) {
return retVal;
if (!spatialPartialDimNames.isEmpty()) {
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (isSpatialDimValsValid(dimVals)) {
spatialDimVals.addAll(dimVals);
}
}
if (spatialDimVals.size() == spatialPartialDimNames.size()) {
spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}
}
} else {
for (String spatialDimName : spatialDimNames) {
List<String> dimVals = row.getDimension(spatialDimName);
if (dimVals.size() != 1) {
throw new ISE("Cannot have a spatial dimension value with size[%d]", dimVals.size());
}
if (isJoinedSpatialDimValValid(dimVals.get(0))) {
spatialLookup.put(spatialDimName, dimVals);
finalDims.add(spatialDimName);
}
spatialDimVals.addAll(dimVals);
}
finalDimLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}
return retVal;
}
private boolean isSpatialDimValsValid(List<String> dimVals)
{
if (dimVals == null || dimVals.isEmpty()) {
return false;
}
for (String dimVal : dimVals) {
if (Floats.tryParse(dimVal) == null) {
return false;
}
}
return true;
}
private boolean isJoinedSpatialDimValValid(String dimVal)
{
if (dimVal == null || dimVal.isEmpty()) {
return false;
}
Iterable<String> dimVals = SPLITTER.split(dimVal);
for (String val : dimVals) {
if (Floats.tryParse(val) == null) {
return false;
}
}
return true;
}
}
......@@ -19,6 +19,7 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.JSONParser;
......@@ -34,6 +35,7 @@ public class JSONDataSpec implements DataSpec
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator
public JSONDataSpec(
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -90,8 +90,8 @@ import java.util.Set;
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* </ul>
*
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes segment size
* variance.
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
* segment size relative to the target.
*/
public class DeterminePartitionsJob implements Jobby
{
......@@ -692,8 +692,8 @@ public class DeterminePartitionsJob implements Jobby
}
int maxCardinality = Integer.MIN_VALUE;
long minVariance = Long.MAX_VALUE;
DimPartitions minVariancePartitions = null;
long minDistance = Long.MAX_VALUE;
DimPartitions minDistancePartitions = null;
DimPartitions maxCardinalityPartitions = null;
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
......@@ -722,16 +722,16 @@ public class DeterminePartitionsJob implements Jobby
}
final int cardinality = dimPartitions.getCardinality();
final long variance = dimPartitions.getVariance();
final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize());
if(cardinality > maxCardinality) {
maxCardinality = cardinality;
maxCardinalityPartitions = dimPartitions;
}
if(variance < minVariance) {
minVariance = variance;
minVariancePartitions = dimPartitions;
if(distance < minDistance) {
minDistance = distance;
minDistancePartitions = dimPartitions;
}
}
......@@ -745,7 +745,7 @@ public class DeterminePartitionsJob implements Jobby
final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD
? maxCardinalityPartitions
: minVariancePartitions;
: minDistancePartitions;
final List<ShardSpec> chosenShardSpecs = Lists.transform(
chosenPartitions.partitions, new Function<DimPartition, ShardSpec>()
......@@ -824,17 +824,15 @@ public class DeterminePartitionsJob implements Jobby
return sum;
}
public long getVariance()
public long getDistanceSquaredFromTarget(long target)
{
final long meanRows = getRows() / partitions.size();
long variance = 0;
long distance = 0;
for(final DimPartition dimPartition : partitions) {
variance += (dimPartition.rows - meanRows) * (dimPartition.rows - meanRows);
distance += (dimPartition.rows - target) * (dimPartition.rows - target);
}
variance /= partitions.size();
return variance;
distance /= partitions.size();
return distance;
}
public int getRows()
......
......@@ -280,7 +280,7 @@ public class IndexGeneratorJob implements Jobby
for (final Text value : values) {
context.progress();
final InputRow inputRow = parser.parse(value.toString());
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -251,6 +251,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
firehoseFactory,
new Schema(
schema.getDataSource(),
schema.getSpatialDimensions(),
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec
......
......@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
......@@ -34,6 +35,7 @@ import com.metamx.druid.indexing.common.actions.SpawnTasksAction;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
......@@ -45,6 +47,9 @@ public class IndexTask extends AbstractTask
@JsonIgnore
private final GranularitySpec granularitySpec;
@JsonProperty
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonIgnore
private final AggregatorFactory[] aggregators;
......@@ -67,6 +72,7 @@ public class IndexTask extends AbstractTask
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") long targetPartitionSize,
......@@ -85,6 +91,9 @@ public class IndexTask extends AbstractTask
);
this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec");
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = indexGranularity;
this.targetPartitionSize = targetPartitionSize;
......@@ -107,6 +116,7 @@ public class IndexTask extends AbstractTask
firehoseFactory,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
new NoneShardSpec()
......@@ -125,6 +135,7 @@ public class IndexTask extends AbstractTask
firehoseFactory,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
new NoneShardSpec()
......
......@@ -418,7 +418,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@Override
public void run()
{
cleanup(workerId, taskId);
runningTasks.remove(taskId);
addPendingTask(taskRunnerWorkItem);
}
},
......@@ -644,15 +644,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
Set<String> tasksToRetry = Sets.newHashSet(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
List<String> tasksToRetry = cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()));
log.info("%s has %d pending tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
......
......@@ -218,6 +218,7 @@ public class TaskQueue
try {
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkNotNull(task, "task");
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue.
......
......@@ -32,6 +32,7 @@ public class TaskSerdeTest
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
......@@ -64,6 +65,7 @@ public class TaskSerdeTest
null,
new Schema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
new NoneShardSpec()
......@@ -200,7 +202,7 @@ public class TaskSerdeTest
{
final Task task = new RealtimeIndexTask(
null,
new Schema("foo", new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
new Period("PT10M"),
......
......@@ -193,6 +193,7 @@ public class TaskLifecycleTest
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
......@@ -239,6 +240,7 @@ public class TaskLifecycleTest
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
......
......@@ -24,7 +24,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
......@@ -40,7 +40,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version>
<apache.curator.version>2.0.1-21-22</apache.curator.version>
<apache.curator.version>2.0.2-21-22</apache.curator.version>
</properties>
<modules>
......@@ -163,7 +163,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
<version>14.0.1</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -22,18 +22,22 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec;
import java.util.Arrays;
import java.util.List;
/**
*/
public class Schema
{
private final String dataSource;
private final List<SpatialDimensionSchema> spatialDimensions;
private final AggregatorFactory[] aggregators;
private final QueryGranularity indexGranularity;
private final ShardSpec shardSpec;
......@@ -41,12 +45,15 @@ public class Schema
@JsonCreator
public Schema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
{
this.dataSource = dataSource;
this.spatialDimensions = (spatialDimensions == null) ? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = indexGranularity;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
......@@ -62,6 +69,12 @@ public class Schema
return dataSource;
}
@JsonProperty("spatialDimensions")
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@JsonProperty
public AggregatorFactory[] getAggregators()
{
......@@ -85,8 +98,10 @@ public class Schema
{
return "Schema{" +
"dataSource='" + dataSource + '\'' +
", spatialDimensions=" + spatialDimensions +
", aggregators=" + (aggregators == null ? null : Arrays.asList(aggregators)) +
", indexGranularity=" + indexGranularity +
", shardSpec=" + shardSpec +
'}';
}
}
......@@ -30,6 +30,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
......@@ -42,7 +43,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
*/
public class Sink implements Iterable<FireHydrant>
{
private static final Logger log = new Logger(Sink.class);
......@@ -145,7 +146,8 @@ public class Sink implements Iterable<FireHydrant>
{
return input.getName();
}
}),
}
),
schema.getShardSpec(),
null,
0
......@@ -155,7 +157,12 @@ public class Sink implements Iterable<FireHydrant>
private FireHydrant makeNewCurrIndex(long minTimestamp, Schema schema)
{
IncrementalIndex newIndex = new IncrementalIndex(
minTimestamp, schema.getIndexGranularity(), schema.getAggregators()
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(schema.getIndexGranularity())
.withSpatialDimensions(schema.getSpatialDimensions())
.withMetrics(schema.getAggregators())
.build()
);
FireHydrant old;
......
......@@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
......@@ -28,7 +29,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......@@ -194,6 +195,11 @@
<artifactId>java-xmlbuilder</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<!-- Dependencies required for jets3t -->
<!-- Tests -->
......@@ -228,7 +234,6 @@
</dependency>
</dependencies>
<build>
......
......@@ -255,7 +255,8 @@ public class ServerManager implements QuerySegmentWalker
);
}
}
);
)
.filter(Predicates.<QueryRunner<T>>notNull());
}
}
)
......@@ -313,6 +314,9 @@ public class ServerManager implements QuerySegmentWalker
);
}
}
)
.filter(
Predicates.<QueryRunner<T>>notNull()
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
......
......@@ -141,17 +141,9 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
private void initializeSegmentLoader()
{
if (segmentLoader == null) {
final Properties props = getProps();
try {
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
setSegmentLoader(
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class))
ServerInit.makeDefaultQueryableLoader(getConfigFactory(), getProps())
);
}
catch (S3ServiceException e) {
......
......@@ -108,6 +108,7 @@ public class InfoResource
InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager,
DatabaseRuleManager databaseRuleManager,
@Nullable
IndexingServiceClient indexingServiceClient
)
{
......
......@@ -40,9 +40,9 @@ public class RegexFilter extends DimensionPredicateFilter
Pattern compiled = Pattern.compile(pattern);
@Override
public boolean apply(@Nullable String input)
public boolean apply(String input)
{
return compiled.matcher(input).find();
return (input != null) && compiled.matcher(input).find();
}
}
);
......
......@@ -58,7 +58,7 @@ public class SpatialFilter implements Filter
@Override
public boolean hasNext()
{
return dimValueIndexesIter.hasNext() || iter.hasNext();
return dimValueIndexesIter.hasNext() || (iter != null && iter.hasNext());
}
@Override
......
......@@ -119,14 +119,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public Iterable<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
{
Interval actualIntervalTmp = interval;
final Interval indexInterval = getInterval();
if (!actualIntervalTmp.overlaps(indexInterval)) {
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (!actualIntervalTmp.overlaps(dataInterval)) {
return ImmutableList.of();
}
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (actualIntervalTmp.getStart().isBefore(dataInterval.getStart())) {
actualIntervalTmp = actualIntervalTmp.withStart(dataInterval.getStart());
}
......@@ -229,7 +227,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
numAdvanced++;
}
} else {
Iterators.skip(baseIter, numAdvanced);
Iterators.advance(baseIter, numAdvanced);
if (baseIter.hasNext()) {
currEntry.set(baseIter.next());
}
......
......@@ -137,14 +137,13 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
{
Interval actualInterval = interval;
final Interval indexInterval = getInterval();
if (!actualInterval.overlaps(indexInterval)) {
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (!actualInterval.overlaps(dataInterval)) {
return ImmutableList.of();
}
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (actualInterval.getStart().isBefore(dataInterval.getStart())) {
actualInterval = actualInterval.withStart(dataInterval.getStart());
}
......@@ -238,8 +237,10 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
if (!column.getCapabilities().hasBitmapIndexes()) {
return new ImmutableConciseSet();
}
// This is a workaround given the current state of indexing, I feel shame
final int index = column.getBitmapIndex().hasNulls() ? idx + 1 : idx;
return column.getBitmapIndex().getConciseSet(idx);
return column.getBitmapIndex().getConciseSet(index);
}
public ImmutableRTree getRTreeSpatialIndex(String dimension)
......@@ -285,7 +286,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
......@@ -508,27 +509,25 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
if (holder != null) {
final ColumnCapabilities capabilities = holder.getCapabilities();
if(capabilities.hasMultipleValues()) {
if (capabilities.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
"makeObjectColumnSelector does not support multivalued columns"
);
}
if(capabilities.isDictionaryEncoded()) {
if (capabilities.isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(capabilities.getType() == ValueType.COMPLEX) {
} else if (capabilities.getType() == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
} else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) {
if (cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
......@@ -537,11 +536,11 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
if (cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
......@@ -557,7 +556,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
if(type == ValueType.LONG) {
if (type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
......@@ -573,7 +572,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
if(type == ValueType.STRING) {
if (type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
......@@ -609,7 +608,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
......@@ -647,8 +646,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
Closeables.closeQuietly(complexColumn);
}
for (Object column : complexColumnCache.values()) {
if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable)column);
if (column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column);
}
}
}
......@@ -946,26 +945,24 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
if(holder.getCapabilities().hasMultipleValues()) {
if (holder != null) {
if (holder.getCapabilities().hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
"makeObjectColumnSelector does not support multivalued columns"
);
}
final ValueType type = holder.getCapabilities().getType();
if(holder.getCapabilities().isDictionaryEncoded()) {
if (holder.getCapabilities().isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(type == ValueType.COMPLEX) {
} else if (type == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
} else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) {
if (cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
......@@ -974,11 +971,11 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
if (cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
......@@ -994,7 +991,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
if(type == ValueType.LONG) {
if (type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
......@@ -1010,7 +1007,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
if(type == ValueType.STRING) {
if (type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
......@@ -1046,7 +1043,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
......@@ -1082,7 +1079,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
Closeables.closeQuietly(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column);
if (column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column);
}
}
}
}
......
......@@ -19,6 +19,18 @@
package com.metamx.druid.initialization;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
......@@ -45,6 +57,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryEngine;
......@@ -60,17 +75,6 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.utils.PropUtils;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
......@@ -79,26 +83,32 @@ public class ServerInit
private static Logger log = new Logger(ServerInit.class);
public static SegmentLoader makeDefaultQueryableLoader(
RestS3Service s3Client,
SegmentLoaderConfig config
)
final ConfigurationObjectFactory configFactory,
final Properties props
) throws S3ServiceException
{
SegmentLoaderConfig config = configFactory.build(SegmentLoaderConfig.class);
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
props.getProperty("com.metamx.aws.accessKey", ""),
props.getProperty("com.metamx.aws.secretKey", "")
)
);
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
final SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
delegateLoader.setLoaderTypes(
ImmutableMap.<String, SegmentLoader>builder()
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config))
.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config))
.build()
.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config))
.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config))
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("c*",new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config))
.build()
);
return delegateLoader;
}
......@@ -171,6 +181,11 @@ public class ServerInit
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) {
final CassandraDataSegmentConfig config = configFactory.build(CassandraDataSegmentConfig.class);
return new CassandraDataSegmentPusher(config, jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) {
final HdfsDataSegmentPusherConfig config = configFactory.build(HdfsDataSegmentPusherConfig.class);
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import org.skife.config.Config;
import org.skife.config.Default;
/**
* Cassandra Config
*
* @author boneill42
*/
public abstract class CassandraDataSegmentConfig
{
@Config("druid.pusher.cassandra.host")
@Default("")
public abstract String getHost();
@Config("druid.pusher.cassandra.keyspace")
@Default("")
public abstract String getKeyspace();
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import java.io.File;
import java.io.OutputStream;
import org.apache.commons.io.FileUtils;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
/**
* Cassandra Segment Puller
*
* @author boneill42
*/
public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller
{
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final int CONCURRENCY = 10;
private static final int BATCH_SIZE = 10;
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
{
super(config);
}
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
if (!outDir.exists())
{
outDir.mkdirs();
}
if (!outDir.isDirectory())
{
throw new ISE("outDir[%s] must be a directory.", outDir);
}
long startTime = System.currentTimeMillis();
ObjectMetadata meta = null;
final File outFile = new File(outDir, "index.zip");
try
{
try
{
log.info("Writing to [%s]", outFile.getAbsolutePath());
OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput();
meta = ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
os.close();
CompressionUtils.unzip(outFile, outDir);
} catch (Exception e)
{
FileUtils.deleteDirectory(outDir);
}
} catch (Exception e)
{
throw new SegmentLoadingException(e, e.getMessage());
}
log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
meta.getObjectSize());
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
OperationResult<ColumnList<String>> result;
try
{
result = this.keyspace.prepareQuery(descriptorStorage)
.getKey(key)
.execute();
ColumnList<String> children = result.getResult();
long lastModified = children.getColumnByName("lastmodified").getLongValue();
log.info("Read lastModified for [%s] as [%d]", key, lastModified);
return lastModified;
} catch (ConnectionException e)
{
throw new SegmentLoadingException(e, e.getMessage());
}
}
}
package com.metamx.druid.loading.cassandra;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DataSegmentPusherUtil;
import com.metamx.druid.utils.CompressionUtils;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
/**
* Cassandra Segment Pusher
*
* @author boneill42
*/
public class CassandraDataSegmentPusher extends CassandraStorage implements DataSegmentPusher
{
private static final Logger log = new Logger(CassandraDataSegmentPusher.class);
private static final int CONCURRENCY = 10;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final ObjectMapper jsonMapper;
public CassandraDataSegmentPusher(
CassandraDataSegmentConfig config,
ObjectMapper jsonMapper)
{
super(config);
this.jsonMapper=jsonMapper;
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
);
// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key);
int version = IndexIO.getVersionFromDir(indexFilesDir);
try
{
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
} catch (Exception e)
{
throw new IOException(e);
}
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object> of("type", "c*", "key", key)
)
.withBinaryVersion(version);
log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
return segment;
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading.cassandra;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
/**
* Superclass for accessing Cassandra Storage.
*
* This is the schema used to support the index and descriptor storage:
*
* CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
* CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
*/
public class CassandraStorage
{
private static final String CLUSTER_NAME = "druid_cassandra_cluster";
private static final String INDEX_TABLE_NAME = "index_storage";
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
private AstyanaxContext<Keyspace> astyanaxContext;
final Keyspace keyspace;
final ChunkedStorageProvider indexStorage;
final ColumnFamily<String, String> descriptorStorage;
final CassandraDataSegmentConfig config;
public CassandraStorage(CassandraDataSegmentConfig config)
{
this.astyanaxContext = new AstyanaxContext.Builder()
.forCluster(CLUSTER_NAME)
.forKeyspace(config.getKeyspace())
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10)
.setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
this.astyanaxContext.start();
this.keyspace = this.astyanaxContext.getEntity();
this.config = config;
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
StringSerializer.get(), StringSerializer.get());
}
}
......@@ -105,7 +105,7 @@ public class SegmentAnalyzer
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null);
return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null, null);
}
public ColumnAnalysis analyzeStringColumn(Column column)
......@@ -125,7 +125,7 @@ public class SegmentAnalyzer
}
}
return new ColumnAnalysis(capabilities.getType(), size, cardinality);
return new ColumnAnalysis(capabilities.getType(), size, cardinality, null);
}
return ColumnAnalysis.error("string_no_bitmap");
......@@ -153,6 +153,6 @@ public class SegmentAnalyzer
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
return new ColumnAnalysis(capabilities.getType(), size, null);
return new ColumnAnalysis(capabilities.getType(), size, null, null);
}
}
......@@ -103,11 +103,13 @@ public class DruidSetup
if ("dump".equals(cmd) && args.length == 3) {
final String zkConnect = args[1];
curator = connectToZK(zkConnect);
curator.start();
String zpathBase = args[2];
dumpFromZk(curator, zkConnect, zpathBase, System.out);
} else if ("put".equals(cmd) && args.length == 3) {
final String zkConnect = args[1];
curator = connectToZK(zkConnect);
curator.start();
final String pfile = args[2];
putToZk(curator, pfile);
} else {
......@@ -294,6 +296,7 @@ public class DruidSetup
createPath(curator, zkPaths.getMasterPath(), out);
createPath(curator, zkPaths.getLoadQueuePath(), out);
createPath(curator, zkPaths.getServedSegmentsPath(), out);
createPath(curator, zkPaths.getPropertiesPath(), out);
}
private static void createPath(CuratorFramework curator, String thePath, PrintStream out)
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.brita;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import com.metamx.druid.Druids;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.index.IncrementalIndexSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.filter.SpatialDimFilter;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
/**
*/
@RunWith(Parameterized.class)
public class SpatialFilterBonusTest
{
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "dim.geo");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final IncrementalIndex rtIndex = makeIncrementalIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex();
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex();
return Arrays.asList(
new Object[][]{
{
new IncrementalIndexSegment(rtIndex)
},
{
new QueryableIndexSegment(null, mMappedTestIndex)
},
{
new QueryableIndexSegment(null, mergedRealtimeIndex)
}
}
);
}
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", "0.0,0.0",
"val", 17l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", "1.0,3.0",
"val", 29l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", "4.0,2.0",
"val", 13l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"dim.geo", "7.0,3.0",
"val", 91l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "8.0,6.0",
"val", 47l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"dim.geo", String.format(
"%s,%s",
(float) (rand.nextFloat() * 10 + 10.0),
(float) (rand.nextFloat() * 10 + 10.0)
),
"val", i
)
)
);
}
return theIndex;
}
private static QueryableIndex makeQueryableIndex() throws IOException
{
IncrementalIndex theIndex = makeIncrementalIndex();
File tmpFile = File.createTempFile("billy", "yay");
tmpFile.delete();
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", "0.0,0.0",
"val", 17l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", "1.0,3.0",
"val", 29l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", "4.0,2.0",
"val", 13l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"dim.geo", "7.0,3.0",
"val", 91l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "8.0,6.0",
"val", 47l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"dim.geo", String.format(
"%s,%s",
(float) (rand.nextFloat() * 10 + 10.0),
(float) (rand.nextFloat() * 10 + 10.0)
),
"val", i
)
)
);
}
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File firstFile = new File(tmpFile, "first");
File secondFile = new File(tmpFile, "second");
File thirdFile = new File(tmpFile, "third");
File mergedFile = new File(tmpFile, "merged");
firstFile.mkdirs();
firstFile.deleteOnExit();
secondFile.mkdirs();
secondFile.deleteOnExit();
thirdFile.mkdirs();
thirdFile.deleteOnExit();
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
)
);
return mergedRealtime;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private final Segment segment;
public SpatialFilterBonusTest(Segment segment)
{
this.segment = segment;
}
@Test
public void testSpatialQuery()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"dim.geo",
new RadiusBound(new float[]{0.0f, 0.0f}, 5)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 3L)
.put("val", 59l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Test
public void testSpatialQueryMorePoints()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.DAY)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"dim.geo",
new RectangularBound(new float[]{0.0f, 0.0f}, new float[]{9.0f, 9.0f})
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 17l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-02T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 29l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-03T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 13l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-04T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 91l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-05T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 47l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
\ No newline at end of file
......@@ -61,7 +61,7 @@ import java.util.List;
import java.util.Random;
/**
*/
*/
@RunWith(Parameterized.class)
public class SpatialFilterTest
{
......@@ -175,6 +175,31 @@ public class SpatialFilterTest
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"lat", "_mmx.unknown",
"long", "_mmx.unknown",
"val", 101l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
// Add a bunch of random points
Random rand = new Random();
......@@ -292,6 +317,31 @@ public class SpatialFilterTest
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"lat", "_mmx.unknown",
"long", "_mmx.unknown",
"val", 101l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
......
......@@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.17-SNAPSHOT</version>
<version>0.4.33-SNAPSHOT</version>
</parent>
<dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册