diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index ac10c297dd40617dc9ca0da5b4e5dfc578a12d53..472fa2ff8faff4633f22a73b3f9a2dfb4e62cb47 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -26,6 +26,7 @@ import com.google.inject.Binder; import io.druid.data.input.ProtoBufInputRowParser; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -53,7 +54,8 @@ public class FirehoseModule implements DruidModule new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(LocalFirehoseFactory.class, "local"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(CombiningFirehoseFactory.class, "combining") ) ); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..f4a5c8bba6dade3ae57a5fd86d40dd8bcf531cb0 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -0,0 +1,125 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * Creates firehose that combines data from different Firehoses. Useful for ingesting data from multiple sources. + */ +public class CombiningFirehoseFactory implements FirehoseFactory +{ + private final List delegateFactoryList; + + @JsonCreator + public CombiningFirehoseFactory( + @JsonProperty("delegates") List delegateFactoryList + ) + { + Preconditions.checkArgument(!delegateFactoryList.isEmpty()); + this.delegateFactoryList = delegateFactoryList; + } + + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + return new CombiningFirehose(parser); + } + + @Override + public InputRowParser getParser() + { + return delegateFactoryList.get(0).getParser(); + } + + @JsonProperty("delegates") + public List getDelegateFactoryList() + { + return delegateFactoryList; + } + + public class CombiningFirehose implements Firehose + { + private final InputRowParser parser; + private final Iterator firehoseFactoryIterator; + private volatile Firehose currentFirehose; + + public CombiningFirehose(InputRowParser parser) throws IOException + { + this.firehoseFactoryIterator = delegateFactoryList.iterator(); + this.parser = parser; + nextFirehose(); + } + + private void nextFirehose() + { + if (firehoseFactoryIterator.hasNext()) { + try { + if (currentFirehose != null) { + currentFirehose.close(); + } + currentFirehose = firehoseFactoryIterator.next().connect(parser); + } + catch (IOException e) { + Throwables.propagate(e); + } + } + } + + @Override + public boolean hasMore() + { + return currentFirehose.hasMore(); + } + + @Override + public InputRow nextRow() + { + InputRow rv = currentFirehose.nextRow(); + if (!currentFirehose.hasMore()) { + nextFirehose(); + } + return rv; + } + + @Override + public Runnable commit() + { + return currentFirehose.commit(); + } + + @Override + public void close() throws IOException + { + currentFirehose.close(); + } + } +} diff --git a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a69327b86be4db0b0c231a60484fe00971b2b757 --- /dev/null +++ b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -0,0 +1,147 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.realtime.firehose; + +import com.google.common.collect.Lists; +import com.metamx.common.parsers.ParseException; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; +import io.druid.utils.Runnables; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +public class CombiningFirehoseFactoryTest +{ + + @Test + public void testCombiningfirehose() throws IOException + { + List list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2)); + List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4)); + FirehoseFactory combiningFactory = new CombiningFirehoseFactory( + Arrays.asList( + new ListFirehoseFactory( + list1 + ), new ListFirehoseFactory(list2) + ) + ); + final Firehose firehose = combiningFactory.connect(null); + for (int i = 1; i < 5; i++) { + Assert.assertTrue(firehose.hasMore()); + final InputRow inputRow = firehose.nextRow(); + Assert.assertEquals(i, inputRow.getTimestampFromEpoch()); + Assert.assertEquals(i, inputRow.getFloatMetric("test"), 0); + } + Assert.assertFalse(firehose.hasMore()); + } + + private InputRow makeRow(final long timestamp, final float metricValue) + { + return new InputRow() + { + @Override + public List getDimensions() + { + return Arrays.asList("testDim"); + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp; + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } + + @Override + public float getFloatMetric(String metric) + { + return metricValue; + } + + @Override + public Object getRaw(String dimension) + { + return null; + } + + }; + } + + public static class ListFirehoseFactory implements FirehoseFactory + { + private final List rows; + + ListFirehoseFactory(List rows) + { + this.rows = rows; + } + + @Override + public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException + { + final Iterator iterator = rows.iterator(); + return new Firehose() + { + @Override + public boolean hasMore() + { + return iterator.hasNext(); + } + + @Override + public InputRow nextRow() + { + return iterator.next(); + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + // + } + }; + } + + @Override + public InputRowParser getParser() + { + return null; + } + } +}