提交 0c5a906a 编写于 作者: C cheddar

Merge branch 'master' of github.com:metamx/druid

......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes({
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class)
})
public interface InputRowParser<T>
{
public InputRow parse(T input) throws FormattedException;
......
......@@ -19,28 +19,29 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
/**
*/
public class StringInputRowParser implements ByteBufferInputRowParser
{
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final MapInputRowParser inputRowCreator;
private final Parser<String, Object> parser;
private final DataSpec dataSpec;
private CharBuffer chars = null;
......@@ -50,6 +51,7 @@ public class StringInputRowParser implements ByteBufferInputRowParser
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
this.dataSpec = dataSpec;
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec.getDimensions(), dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
......@@ -116,9 +118,21 @@ public class StringInputRowParser implements ByteBufferInputRowParser
return inputRowCreator.parse(theMap);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return inputRowCreator.getTimestampSpec();
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public List<String> getDimensionExclusions()
{
return ImmutableList.copyOf(inputRowCreator.getDimensionExclusions());
}
}
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.nio.ByteBuffer;
public class InputRowParserSerdeTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testStringInputRowParserSerde() throws Exception
{
final StringInputRowParser parser = new StringInputRowParser(
new TimestampSpec("timestamp", "iso"),
new JSONDataSpec(
ImmutableList.of("foo", "bar"), ImmutableList.<SpatialDimensionSchema>of()
),
ImmutableList.of("baz")
);
final ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
ByteBufferInputRowParser.class
);
final InputRow parsed = parser2.parse(
ByteBuffer.wrap(
"{\"foo\":\"x\",\"bar\":\"y\",\"qux\":\"z\",\"timestamp\":\"2000\"}".getBytes(Charsets.UTF_8)
)
);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
Assert.assertEquals(new DateTime("2000").getMillis(), parsed.getTimestampFromEpoch());
}
@Test
public void testMapInputRowParserSerde() throws Exception
{
final MapInputRowParser parser = new MapInputRowParser(
new TimestampSpec("timestamp", "iso"),
ImmutableList.of("foo", "bar"),
ImmutableList.of("baz")
);
final MapInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
);
final InputRow parsed = parser2.parse(
ImmutableMap.<String, Object>of(
"foo", "x",
"bar", "y",
"qux", "z",
"timestamp", "2000"
)
);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
Assert.assertEquals(new DateTime("2000").getMillis(), parsed.getTimestampFromEpoch());
}
}
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -19,14 +19,13 @@
package com.metamx.druid.realtime.firehose;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.ByteBufferInputRowParser;
import com.metamx.druid.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
......@@ -34,13 +33,11 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.InputRowParser;
import com.metamx.druid.input.InputRow;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
......@@ -92,9 +89,9 @@ public class KafkaFirehoseFactory implements FirehoseFactory
{
private final ConsumerConnector connector;
private final Iterator<MessageAndMetadata<Message>> iter;
private final InputRowParser<ByteBuffer> parser;
private final ByteBufferInputRowParser parser;
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, InputRowParser<ByteBuffer> parser)
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, ByteBufferInputRowParser parser)
{
iter = stream.iterator();
this.connector = connector;
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.40-SNAPSHOT</version>
<version>0.5.41-SNAPSHOT</version>
</parent>
<dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册