diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java index 82a3c947bc38ef5addb585b4a0135ca9fabff9be..231cbd4410242e228ee3fc22fdac07fa7314d8b3 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java @@ -1,9 +1,10 @@ package com.metamx.druid.indexer.data; +import com.metamx.common.exception.FormattedException; import com.metamx.druid.input.InputRow; public interface InputRowParser { - public InputRow parse(T input); + public InputRow parse(T input) throws FormattedException; public void addDimensionExclusion(String dimension); } diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java index 60a97c131bf2c5c1674a5295791100c4e3a3923e..b2d9586f272c2e49322927dbb3cf431d9ffc64be 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.common.exception.FormattedException; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; import org.joda.time.DateTime; @@ -37,21 +38,30 @@ public class MapInputRowParser implements InputRowParser> } @Override - public InputRow parse(Map theMap) + public InputRow parse(Map theMap) throws FormattedException { final List dimensions = dataSpec.hasCustomDimensions() ? dataSpec.getDimensions() : Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions)); - final DateTime timestamp = timestampSpec.extractTimestamp(theMap); - if (timestamp == null) { - final String input = theMap.toString(); - throw new NullPointerException( - String.format( - "Null timestamp in input: %s", - input.length() < 100 ? input : input.substring(0, 100) + "..." - ) - ); + final DateTime timestamp; + try { + timestamp = timestampSpec.extractTimestamp(theMap); + if (timestamp == null) { + final String input = theMap.toString(); + throw new NullPointerException( + String.format( + "Null timestamp in input: %s", + input.length() < 100 ? input : input.substring(0, 100) + "..." + ) + ); + } + } + catch (Exception e) { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_TIMESTAMP) + .withMessage(e.toString()) + .build(); } return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index 0874a900f4283ae94fedb1a9758565b852244d0b..f40fd75998c4571f02c79689dde7b2c450ea6229 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -164,32 +164,37 @@ public class RealtimeManager implements QuerySegmentWalker final InputRow inputRow; try { inputRow = firehose.nextRow(); + } + catch (FormattedException e) { + log.info(e, "unparseable line: %s", e.getDetails()); + metrics.incrementUnparseable(); + continue; + } + catch (Exception e) { + log.info(e, "thrown away line due to exception"); + metrics.incrementThrownAway(); + continue; + } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink == null) { - metrics.incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); - - if (System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - - continue; - } + final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if (sink == null) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); - int currCount = sink.add(inputRow); - metrics.incrementProcessed(); - if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { + if (System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } - } - catch (FormattedException e) { - log.info(e, "unparseable line: %s", e.getDetails()); - metrics.incrementUnparseable(); + continue; } + + int currCount = sink.add(inputRow); + metrics.incrementProcessed(); + if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } } } catch (RuntimeException e) { log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())