提交 9b9a2a9f 编写于 作者: G Gian Merlino

Merge branch 'master' into num-shard-spec

Conflicts:
	common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -320,12 +320,9 @@ public class Announcer ...@@ -320,12 +320,9 @@ public class Announcer
final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath); final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null) { if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path); log.error("Path[%s] not announced, cannot unannounce.", path);
} return;
if (subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path);
} }
try { try {
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -96,13 +96,7 @@ public class IntegerPartitionChunk<T> implements PartitionChunk<T> ...@@ -96,13 +96,7 @@ public class IntegerPartitionChunk<T> implements PartitionChunk<T>
{ {
if (chunk instanceof IntegerPartitionChunk) { if (chunk instanceof IntegerPartitionChunk) {
IntegerPartitionChunk<T> intChunk = (IntegerPartitionChunk<T>) chunk; IntegerPartitionChunk<T> intChunk = (IntegerPartitionChunk<T>) chunk;
return comparator.compare(chunkNumber, intChunk.chunkNumber);
int retVal = comparator.compare(start, intChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, intChunk.end);
}
return retVal;
} else { } else {
throw new IllegalArgumentException("Cannot compare against something that is not an IntegerPartitionChunk."); throw new IllegalArgumentException("Cannot compare against something that is not an IntegerPartitionChunk.");
} }
......
package com.metamx.druid.partition; package com.metamx.druid.partition;
import com.google.common.collect.Ordering;
import java.util.Comparator;
public class LinearPartitionChunk <T> implements PartitionChunk<T> public class LinearPartitionChunk <T> implements PartitionChunk<T>
{ {
Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final int chunkNumber; private final int chunkNumber;
private final T object; private final T object;
...@@ -56,7 +62,7 @@ public class LinearPartitionChunk <T> implements PartitionChunk<T> ...@@ -56,7 +62,7 @@ public class LinearPartitionChunk <T> implements PartitionChunk<T>
if (chunk instanceof LinearPartitionChunk) { if (chunk instanceof LinearPartitionChunk) {
LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk; LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk;
return chunkNumber - chunk.getChunkNumber(); return comparator.compare(chunkNumber, linearChunk.chunkNumber);
} }
throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk."); throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
} }
......
...@@ -27,6 +27,7 @@ import com.google.common.collect.Sets; ...@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
...@@ -68,11 +69,16 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>> ...@@ -68,11 +69,16 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
public PartitionChunk<T> remove(PartitionChunk<T> chunk) public PartitionChunk<T> remove(PartitionChunk<T> chunk)
{ {
// Somewhat funky implementation in order to return the removed object as it exists in the set if (!holderSet.isEmpty()) {
PartitionChunk<T> element = holderSet.tailSet(chunk, true).first(); // Somewhat funky implementation in order to return the removed object as it exists in the set
if (chunk.equals(element)) { SortedSet<PartitionChunk<T>> tailSet = holderSet.tailSet(chunk, true);
holderSet.remove(element); if (!tailSet.isEmpty()) {
return element; PartitionChunk<T> element = tailSet.first();
if (chunk.equals(element)) {
holderSet.remove(element);
return element;
}
}
} }
return null; return null;
} }
...@@ -110,16 +116,17 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>> ...@@ -110,16 +116,17 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
return true; return true;
} }
public PartitionChunk<T> getChunk(final int partitionNum) { public PartitionChunk<T> getChunk(final int partitionNum)
{
final Iterator<PartitionChunk<T>> retVal = Iterators.filter( final Iterator<PartitionChunk<T>> retVal = Iterators.filter(
holderSet.iterator(), new Predicate<PartitionChunk<T>>() holderSet.iterator(), new Predicate<PartitionChunk<T>>()
{ {
@Override @Override
public boolean apply(PartitionChunk<T> input) public boolean apply(PartitionChunk<T> input)
{ {
return input.getChunkNumber() == partitionNum; return input.getChunkNumber() == partitionNum;
} }
} }
); );
return retVal.hasNext() ? retVal.next() : null; return retVal.hasNext() ? retVal.next() : null;
......
...@@ -27,7 +27,7 @@ import java.util.Comparator; ...@@ -27,7 +27,7 @@ import java.util.Comparator;
*/ */
public class StringPartitionChunk<T> implements PartitionChunk<T> public class StringPartitionChunk<T> implements PartitionChunk<T>
{ {
private static final Comparator<String> comparator = Ordering.<String>natural().nullsFirst(); private static final Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final String start; private final String start;
private final String end; private final String end;
...@@ -95,12 +95,7 @@ public class StringPartitionChunk<T> implements PartitionChunk<T> ...@@ -95,12 +95,7 @@ public class StringPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof StringPartitionChunk) { if (chunk instanceof StringPartitionChunk) {
StringPartitionChunk<T> stringChunk = (StringPartitionChunk<T>) chunk; StringPartitionChunk<T> stringChunk = (StringPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, stringChunk.start); return comparator.compare(chunkNumber, stringChunk.chunkNumber);
if (retVal == 0) {
retVal = comparator.compare(end, stringChunk.end);
}
return retVal;
} }
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk."); throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
} }
......
...@@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest ...@@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest
public void testCompareTo() throws Exception public void testCompareTo() throws Exception
{ {
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1))); Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 1, 2))); Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 0, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 1, 2))); Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 0, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 1, 2))); Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 0, 2)));
Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2))); Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 0, 1))); Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 1, 1)));
Assert.assertEquals(1, make(20, 33, 0, 1).compareTo(make(11, 20, 0, 1))); Assert.assertEquals(1, make(20, 33, 1, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 0, 1).compareTo(make(null, 10, 0, 1))); Assert.assertEquals(1, make(10, null, 1, 1).compareTo(make(null, 10, 0, 1)));
} }
@Test @Test
......
...@@ -61,14 +61,14 @@ public class StringPartitionChunkTest ...@@ -61,14 +61,14 @@ public class StringPartitionChunkTest
@Test @Test
public void testCompareTo() throws Exception public void testCompareTo() throws Exception
{ {
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 1, 2))); Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 1, 2))); Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 0, 2)));
Assert.assertEquals(0, make(null, "10", 0, 1).compareTo(make(null, "10", 1, 2))); Assert.assertEquals(0, make(null, "10", 1, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 0, 1).compareTo(make("10", "11", 1, 2))); Assert.assertEquals(0, make("10", "11", 1, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2))); Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 0, 1))); Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 1, 1)));
Assert.assertEquals(1, make("20", "33", 0, 1).compareTo(make("11", "20", 0, 1))); Assert.assertEquals(1, make("20", "33", 1, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 0, 1).compareTo(make(null, "10", 0, 1))); Assert.assertEquals(1, make("10", null, 1, 1).compareTo(make(null, "10", 0, 1)));
} }
@Test @Test
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -521,7 +521,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -521,7 +521,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
); );
} }
runningTasks.put(task.getId(), pendingTasks.remove(task.getId())); RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return;
}
runningTasks.put(task.getId(), workItem.withWorker(theWorker));
log.info("Task %s switched from pending to running", task.getId()); log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
...@@ -613,8 +621,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -613,8 +621,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId); taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId); log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId);
} }
break; break;
} }
...@@ -653,20 +663,33 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -653,20 +663,33 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{ {
log.info("Kaboom! Worker[%s] removed!", worker.getHost()); log.info("Kaboom! Worker[%s] removed!", worker.getHost());
ZkWorker zkWorker = zkWorkers.get(worker.getHost()); final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) { if (zkWorker != null) {
try { try {
for (String assignedTask : cf.getChildren() List<String> tasksToFail = Lists.newArrayList(
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) { cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask); );
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}
}
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask); String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) { if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath); cf.delete().guaranteed().forPath(taskPath);
} }
log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else { } else {
log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask); log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
} }
} }
} }
...@@ -678,7 +701,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -678,7 +701,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
zkWorker.close(); zkWorker.close();
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception closing worker %s!", worker.getHost()); log.error(e, "Exception closing worker[%s]!", worker.getHost());
} }
zkWorkers.remove(worker.getHost()); zkWorkers.remove(worker.getHost());
} }
......
...@@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator; ...@@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker;
import org.joda.time.DateTime; import org.joda.time.DateTime;
/** /**
...@@ -29,6 +30,7 @@ import org.joda.time.DateTime; ...@@ -29,6 +30,7 @@ import org.joda.time.DateTime;
public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{ {
private final SettableFuture<TaskStatus> result; private final SettableFuture<TaskStatus> result;
private final Worker worker;
public RemoteTaskRunnerWorkItem( public RemoteTaskRunnerWorkItem(
Task task, Task task,
...@@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem ...@@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{ {
super(task, result); super(task, result);
this.result = result; this.result = result;
this.worker = null;
} }
public RemoteTaskRunnerWorkItem( public RemoteTaskRunnerWorkItem(
Task task, Task task,
SettableFuture<TaskStatus> result, SettableFuture<TaskStatus> result,
DateTime createdTime, DateTime createdTime,
DateTime queueInsertionTime DateTime queueInsertionTime,
Worker worker
) )
{ {
super(task, result, createdTime, queueInsertionTime); super(task, result, createdTime, queueInsertionTime);
this.result = result; this.result = result;
this.worker = worker;
}
public Worker getWorker()
{
return worker;
} }
public void setResult(TaskStatus status) public void setResult(TaskStatus status)
...@@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem ...@@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
@Override @Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{ {
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time); return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
}
public RemoteTaskRunnerWorkItem withWorker(Worker worker)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker);
} }
} }
...@@ -50,10 +50,13 @@ import java.io.File; ...@@ -50,10 +50,13 @@ import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
* class as well as integration tests in the very near future.
*/ */
public class RemoteTaskRunnerTest public class RemoteTaskRunnerTest
{ {
...@@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest ...@@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode()); Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
} }
@Test
public void testWorkerRemoved() throws Exception
{
doSetup();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
Future<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
workerCuratorCoordinator.stop();
TaskStatus status = future.get();
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
private void doSetup() throws Exception private void doSetup() throws Exception
{ {
makeWorker(); makeWorker();
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
...@@ -182,37 +182,37 @@ ...@@ -182,37 +182,37 @@
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId> <artifactId>jackson-annotations</artifactId>
<version>2.1.4</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId> <artifactId>jackson-core</artifactId>
<version>2.1.4</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>2.1.4</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.datatype</groupId> <groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId> <artifactId>jackson-datatype-guava</artifactId>
<version>2.1.2</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.datatype</groupId> <groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId> <artifactId>jackson-datatype-joda</artifactId>
<version>2.1.2</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId> <artifactId>jackson-dataformat-smile</artifactId>
<version>2.1.4</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId> <groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId> <artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.1.4</version> <version>2.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.inject</groupId> <groupId>javax.inject</groupId>
...@@ -365,6 +365,16 @@ ...@@ -365,6 +365,16 @@
<version>${apache.curator.version}</version> <version>${apache.curator.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>
<version>1.0-0011</version>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>0.4.0</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
...@@ -147,6 +147,14 @@ ...@@ -147,6 +147,14 @@
<artifactId>java-xmlbuilder</artifactId> <artifactId>java-xmlbuilder</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</dependency>
<!-- Dependencies required for jets3t --> <!-- Dependencies required for jets3t -->
<!-- Tests --> <!-- Tests -->
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime; package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime; package com.metamx.druid.realtime;
import org.skife.config.Config; import org.skife.config.Config;
......
...@@ -94,6 +94,14 @@ public class RealtimeManager implements QuerySegmentWalker ...@@ -94,6 +94,14 @@ public class RealtimeManager implements QuerySegmentWalker
Closeables.closeQuietly(chief); Closeables.closeQuietly(chief);
} }
} }
public FireDepartmentMetrics getMetrics(String datasource)
{
FireChief chief = chiefs.get(datasource);
if (chief == null) {
return null;
}
return chief.getMetrics();
}
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
...@@ -149,6 +157,11 @@ public class RealtimeManager implements QuerySegmentWalker ...@@ -149,6 +157,11 @@ public class RealtimeManager implements QuerySegmentWalker
} }
} }
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
@Override @Override
public void run() public void run()
{ {
...@@ -186,11 +199,11 @@ public class RealtimeManager implements QuerySegmentWalker ...@@ -186,11 +199,11 @@ public class RealtimeManager implements QuerySegmentWalker
} }
int currCount = sink.add(inputRow); int currCount = sink.add(inputRow);
metrics.incrementProcessed();
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit()); plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
} }
metrics.incrementProcessed();
} }
catch (FormattedException e) { catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails()); log.info(e, "unparseable line: %s", e.getDetails());
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime; package com.metamx.druid.realtime;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
......
...@@ -29,7 +29,8 @@ import java.io.IOException; ...@@ -29,7 +29,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class), @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class), @JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class), @JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class) @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class),
@JsonSubTypes.Type(name = "irc", value = IrcFirehoseFactory.class)
}) })
public interface FirehoseFactory public interface FirehoseFactory
{ {
......
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class)
})
public interface IrcDecoder
{
public InputRow decodeMessage(DateTime timestamp, String channel, String msg);
}
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.ircclouds.irc.api.Callback;
import com.ircclouds.irc.api.IRCApi;
import com.ircclouds.irc.api.IRCApiImpl;
import com.ircclouds.irc.api.IServerParameters;
import com.ircclouds.irc.api.domain.IRCServer;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
import com.ircclouds.irc.api.state.IIRCState;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <p><b>Example Usage</b></p>
*
* <p>Decoder definition: <code>wikipedia-decoder.json</code></p>
* <pre>{@code
*
* {
* "type": "wikipedia",
* "namespaces": {
* "#en.wikipedia": {
* "": "main",
* "Category": "category",
* "Template talk": "template talk",
* "Help talk": "help talk",
* "Media": "media",
* "MediaWiki talk": "mediawiki talk",
* "File talk": "file talk",
* "MediaWiki": "mediawiki",
* "User": "user",
* "File": "file",
* "User talk": "user talk",
* "Template": "template",
* "Help": "help",
* "Special": "special",
* "Talk": "talk",
* "Category talk": "category talk"
* }
* },
* "geoIpDatabase": "path/to/GeoLite2-City.mmdb"
* }
* }</pre>
*
* <p><b>Example code:</b></p>
* <pre>{@code
* IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
* new File("wikipedia-decoder.json"),
* IrcDecoder.class
* );
*
* IrcFirehoseFactory factory = new IrcFirehoseFactory(
* "wiki123",
* "irc.wikimedia.org",
* Lists.newArrayList(
* "#en.wikipedia",
* "#fr.wikipedia",
* "#de.wikipedia",
* "#ja.wikipedia"
* ),
* wikipediaDecoder
* );
* }</pre>
*/
public class IrcFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(IrcFirehoseFactory.class);
private final String nick;
private final String host;
private final List<String> channels;
private final IrcDecoder decoder;
@JsonCreator
public IrcFirehoseFactory(
@JsonProperty String nick,
@JsonProperty String host,
@JsonProperty List<String> channels,
@JsonProperty IrcDecoder decoder
)
{
this.nick = nick;
this.host = host;
this.channels = channels;
this.decoder = decoder;
}
@Override
public Firehose connect() throws IOException
{
final IRCApi irc = new IRCApiImpl(false);
final LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>> queue = new LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>>();
irc.addListener(new VariousMessageListenerAdapter() {
@Override
public void onChannelMessage(ChannelPrivMsg aMsg)
{
try {
queue.put(Pair.of(DateTime.now(), aMsg));
} catch(InterruptedException e) {
throw new RuntimeException("interrupted adding message to queue", e);
}
}
});
log.info("connecting to irc server [%s]", host);
irc.connect(
new IServerParameters()
{
@Override
public String getNickname()
{
return nick;
}
@Override
public List<String> getAlternativeNicknames()
{
return Lists.newArrayList(nick + "_",
nick + "__",
nick + "___");
}
@Override
public String getIdent()
{
return "druid";
}
@Override
public String getRealname()
{
return nick;
}
@Override
public IRCServer getServer()
{
return new IRCServer(host, false);
}
},
new Callback<IIRCState>()
{
@Override
public void onSuccess(IIRCState aObject)
{
log.info("irc connection to server [%s] established", host);
for(String chan : channels) {
log.info("Joining channel %s", chan);
irc.joinChannel(chan);
}
}
@Override
public void onFailure(Exception e)
{
log.error(e, "Unable to connect to irc server [%s]", host);
throw new RuntimeException("Unable to connect to server", e);
}
});
return new Firehose()
{
InputRow nextRow = null;
@Override
public boolean hasMore()
{
try {
while(true) {
Pair<DateTime, ChannelPrivMsg> nextMsg = queue.take();
try {
nextRow = decoder.decodeMessage(nextMsg.lhs, nextMsg.rhs.getChannelName(), nextMsg.rhs.getText());
if(nextRow != null) return true;
}
catch (IllegalArgumentException iae) {
log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName());
}
}
}
catch(InterruptedException e) {
Thread.interrupted();
throw new RuntimeException("interrupted retrieving elements from queue", e);
}
}
@Override
public InputRow nextRow()
{
return nextRow;
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// nothing to see here
}
};
}
@Override
public void close() throws IOException
{
log.info("disconnecting from irc server [%s]", host);
irc.disconnect("");
}
};
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.Omni;
import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
class WikipediaIrcDecoder implements IrcDecoder
{
static final Logger log = new Logger(WikipediaIrcDecoder.class);
final DatabaseReader geoLookup;
static final Pattern pattern = Pattern.compile(
"\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03"
);
static final Pattern ipPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+");
static final Pattern shortnamePattern = Pattern.compile("#(\\w\\w)\\..*");
static final List<String> dimensionList = Lists.newArrayList(
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
);
final Map<String, Map<String, String>> namespaces;
public WikipediaIrcDecoder( Map<String, Map<String, String>> namespaces) {
this(namespaces, null);
}
@JsonCreator
public WikipediaIrcDecoder(@JsonProperty("namespaces") Map<String, Map<String, String>> namespaces,
@JsonProperty("geoIpDatabase") String geoIpDatabase)
{
if(namespaces == null) {
namespaces = Maps.newHashMap();
}
this.namespaces = namespaces;
File geoDb;
if(geoIpDatabase != null) {
geoDb = new File(geoIpDatabase);
} else {
try {
String tmpDir = System.getProperty("java.io.tmpdir");
geoDb = new File(tmpDir, this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb");
if(!geoDb.exists()) {
log.info("Downloading geo ip database to [%s]", geoDb);
FileUtils.copyInputStreamToFile(
new GZIPInputStream(
new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
),
geoDb
);
}
} catch(IOException e) {
throw new RuntimeException("Unable to download geo ip database [%s]", e);
}
}
try {
geoLookup = new DatabaseReader(geoDb);
} catch(IOException e) {
throw new RuntimeException("Unable to open geo ip lookup database", e);
}
}
@Override
public InputRow decodeMessage(final DateTime timestamp, String channel, String msg)
{
final Map<String, String> dimensions = Maps.newHashMap();
final Map<String, Float> metrics = Maps.newHashMap();
Matcher m = pattern.matcher(msg);
if(!m.matches()) {
throw new IllegalArgumentException("Invalid input format");
}
Matcher shortname = shortnamePattern.matcher(channel);
if(shortname.matches()) {
dimensions.put("language", shortname.group(1));
}
String page = m.group(1);
String pageUrl = page.replaceAll("\\s", "_");
dimensions.put("page", pageUrl);
String user = m.group(4);
Matcher ipMatch = ipPattern.matcher(user);
boolean anonymous = ipMatch.matches();
if(anonymous) {
try {
final InetAddress ip = InetAddress.getByName(ipMatch.group());
final Omni lookup = geoLookup.omni(ip);
dimensions.put("continent", lookup.getContinent().getName());
dimensions.put("country", lookup.getCountry().getName());
dimensions.put("region", lookup.getMostSpecificSubdivision().getName());
dimensions.put("city", lookup.getCity().getName());
} catch(UnknownHostException e) {
log.error(e, "invalid ip [%s]", ipMatch.group());
} catch(IOException e) {
log.error(e, "error looking up geo ip");
} catch(GeoIp2Exception e) {
log.error(e, "error looking up geo ip");
}
}
dimensions.put("user", user);
final String flags = m.group(2);
dimensions.put("unpatrolled", Boolean.toString(flags.contains("!")));
dimensions.put("newPage", Boolean.toString(flags.contains("N")));
dimensions.put("robot", Boolean.toString(flags.contains("B")));
dimensions.put("anonymous", Boolean.toString(anonymous));
String[] parts = page.split(":");
if(parts.length > 1 && !parts[1].startsWith(" ")) {
Map<String, String> channelNamespaces = namespaces.get(channel);
if(channelNamespaces != null && channelNamespaces.containsKey(parts[0])) {
dimensions.put("namespace", channelNamespaces.get(parts[0]));
} else {
dimensions.put("namespace", "wikipedia");
}
}
else {
dimensions.put("namespace", "article");
}
float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0;
metrics.put("delta", delta);
metrics.put("added", Math.max(delta, 0));
metrics.put("deleted", Math.min(delta, 0));
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return dimensionList;
}
@Override
public long getTimestampFromEpoch()
{
return timestamp.getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return ImmutableList.of(dimensions.get(dimension));
}
@Override
public float getFloatMetric(String metric)
{
return metrics.get(metric);
}
@Override
public String toString()
{
return "WikipediaRow{" +
"timestamp=" + timestamp +
", dimensions=" + dimensions +
", metrics=" + metrics +
'}';
}
};
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.Interval; import org.joda.time.Interval;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;
......
...@@ -100,6 +100,11 @@ public class Sink implements Iterable<FireHydrant> ...@@ -100,6 +100,11 @@ public class Sink implements Iterable<FireHydrant>
return interval; return interval;
} }
public FireHydrant getCurrIndex()
{
return currIndex;
}
public int add(InputRow row) public int add(InputRow row)
{ {
if (currIndex == null) { if (currIndex == null) {
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.guava.Runnables;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimeManagerTest
{
private RealtimeManager realtimeManager;
private Schema schema;
private TestPlumber plumber;
@Before
public void setUp() throws Exception
{
schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.NONE,
new NoneShardSpec()
);
final List<InputRow> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FirehoseFactory()
{
@Override
public Firehose connect() throws IOException
{
return new TestFirehose(rows.iterator());
}
},
new PlumberSchool()
{
@Override
public Plumber findPlumber(
Schema schema, FireDepartmentMetrics metrics
)
{
return plumber;
}
}
)
),
null
);
}
@Test
public void testRun() throws Exception
{
realtimeManager.start();
Stopwatch stopwatch = new Stopwatch().start();
while (realtimeManager.getMetrics("test").processed() != 1) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Realtime manager should have completed processing 2 events!");
}
}
Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount());
}
private InputRow makeRow(final long timestamp)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
};
}
private static class TestFirehose implements Firehose
{
private final Iterator<InputRow> rows;
private TestFirehose(Iterator<InputRow> rows)
{
this.rows = rows;
}
@Override
public boolean hasMore()
{
return rows.hasNext();
}
@Override
public InputRow nextRow()
{
return rows.next();
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
}
}
private static class TestPlumber implements Plumber
{
private final Sink sink;
private volatile boolean startedJob = false;
private volatile boolean finishedJob = false;
private volatile int persistCount = 0;
private TestPlumber(Sink sink)
{
this.sink = sink;
}
private boolean isStartedJob()
{
return startedJob;
}
private boolean isFinishedJob()
{
return finishedJob;
}
private int getPersistCount()
{
return persistCount;
}
@Override
public void startJob()
{
startedJob = true;
}
@Override
public Sink getSink(long timestamp)
{
if (sink.getInterval().contains(timestamp)) {
return sink;
}
return null;
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
throw new UnsupportedOperationException();
}
@Override
public void persist(Runnable commitRunnable)
{
persistCount++;
}
@Override
public void finishJob()
{
finishedJob = true;
}
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
/**
*/
public class IntervalStartVersioningPolicyTest
{
@Test
public void testGetVersion() throws Exception
{
IntervalStartVersioningPolicy policy = new IntervalStartVersioningPolicy();
String version = policy.getVersion(new Interval("2013-01-01/2013-01-02"));
Assert.assertEquals("2013-01-01T00:00:00.000Z", version);
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Test;
/**
*/
public class MessageTimeRejectionPolicyFactoryTest
{
@Test
public void testAccept() throws Exception
{
Period period = new Period("PT10M");
RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(period);
DateTime now = new DateTime();
DateTime past = now.minus(period).minus(1);
DateTime future = now.plus(period).plus(1);
Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
Assert.assertTrue(rejectionPolicy.accept(future.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(now.getMillis()));
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.service.ServiceEmitter;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimePlumberSchoolTest
{
private Plumber plumber;
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
private ServerView serverView;
private ServiceEmitter emitter;
@Before
public void setUp() throws Exception
{
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
final Schema schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.NONE,
new NoneShardSpec()
);
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(ServerView.class);
serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
emitter = EasyMock.createMock(ServiceEmitter.class);
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps.<Class<? extends Query>, QueryRunnerFactory>newHashMap()));
realtimePlumberSchool.setSegmentAnnouncer(announcer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy());
realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
realtimePlumberSchool.setServerView(serverView);
realtimePlumberSchool.setServiceEmitter(emitter);
plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
}
@Test
public void testGetSink() throws Exception
{
final DateTime theTime = new DateTime("2013-01-01");
Sink sink = plumber.getSink(theTime.getMillis());
Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval());
Assert.assertEquals(theTime.toString(), sink.getVersion());
}
@Test
public void testPersist() throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
plumber.startJob();
plumber.persist(
new Runnable()
{
@Override
public void run()
{
committed.setValue(true);
}
}
);
Stopwatch stopwatch = new Stopwatch().start();
while (!committed.booleanValue()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Taking too long to set perist value");
}
}
plumber.finishJob();
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Test;
/**
*/
public class ServerTimeRejectionPolicyFactoryTest
{
@Test
public void testAccept() throws Exception
{
Period period = new Period("PT10M");
RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(period);
DateTime now = new DateTime();
DateTime past = now.minus(period).minus(1);
Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.List;
/**
*/
public class SinkTest
{
@Test
public void testSwap() throws Exception
{
final Schema schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.MINUTE,
new NoneShardSpec()
);
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
final Sink sink = new Sink(interval, schema, version);
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
});
FireHydrant currHydrant = sink.getCurrIndex();
Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
FireHydrant swapHydrant = sink.swap();
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
});
Assert.assertEquals(currHydrant, swapHydrant);
Assert.assertNotSame(currHydrant, sink.getCurrIndex());
Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval());
Assert.assertEquals(2, Iterators.size(sink.iterator()));
}
}
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
...@@ -24,11 +24,11 @@ ...@@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version> <version>0.5.45-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册