diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index e6402221e45b104dde461881b3164d1be716c136..335c8c1858b7ba744f762a21e0ed32389ec5e48a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -41,6 +41,7 @@ import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; +import io.druid.granularity.QueryGranularity; import io.druid.guice.GuiceInjectors; import io.druid.guice.JsonConfigProvider; import io.druid.guice.annotations.Self; @@ -172,6 +173,7 @@ public class HadoopDruidIndexerConfig private volatile PathSpec pathSpec; private volatile Map shardSpecLookups = Maps.newHashMap(); private volatile Map hadoopShardSpecLookup = Maps.newHashMap(); + private final QueryGranularity rollupGran; @JsonCreator public HadoopDruidIndexerConfig( @@ -203,6 +205,7 @@ public class HadoopDruidIndexerConfig hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec); } } + this.rollupGran = schema.getDataSchema().getGranularitySpec().getQueryGranularity(); } @JsonProperty @@ -326,7 +329,7 @@ public class HadoopDruidIndexerConfig return Optional.absent(); } - final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow); + final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec); return Optional.of( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index b7496c7078280cf5c275d54fc104b07308b1b594..f04736a66e3d4d3f9f4bbc2c81555d531fef7232 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -407,14 +407,14 @@ public class IndexTask extends AbstractFixedIntervalTask final int myRowFlushBoundary = rowFlushBoundary > 0 ? rowFlushBoundary : toolbox.getConfig().getDefaultRowFlushBoundary(); - + final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(); try { plumber.startJob(); while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); - if (shouldIndex(shardSpec, interval, inputRow)) { + if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) { int numRows = plumber.add(inputRow); if (numRows == -1) { throw new ISE( @@ -469,13 +469,15 @@ public class IndexTask extends AbstractFixedIntervalTask * * @return true or false */ - private boolean shouldIndex( + private static boolean shouldIndex( final ShardSpec shardSpec, final Interval interval, - final InputRow inputRow + final InputRow inputRow, + final QueryGranularity rollupGran ) { - return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(inputRow); + return interval.contains(inputRow.getTimestampFromEpoch()) + && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); } public static class IndexIngestionSpec extends IngestionSpec diff --git a/pom.xml b/pom.xml index 8c1202d7b7943dc62d37f65b4310fd814d7d691a..c8eca1e87388e7047bc3030e27adb06a4bbab23e 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.9 2.6.0 - 0.2.14 + 0.2.14.1 diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index be640a03545ad397e324fdec4ba0ec15748f2ff6..56ff5159c62787a02ed6d6cc1231f51219fc12cf 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -49,14 +49,14 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec } @Override - public boolean isInChunk(InputRow inputRow) + public boolean isInChunk(long timestamp, InputRow inputRow) { - return (((long) hash(inputRow)) - getPartitionNum()) % getPartitions() == 0; + return (((long) hash(timestamp, inputRow)) - getPartitionNum()) % getPartitions() == 0; } - protected int hash(InputRow inputRow) + protected int hash(long timestamp, InputRow inputRow) { - final List groupKey = Rows.toGroupKey(inputRow.getTimestampFromEpoch(), inputRow); + final List groupKey = Rows.toGroupKey(timestamp, inputRow); try { return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt(); } @@ -80,9 +80,9 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec return new ShardSpecLookup() { @Override - public ShardSpec getShardSpec(InputRow row) + public ShardSpec getShardSpec(long timestamp, InputRow row) { - int index = Math.abs(hash(row) % getPartitions()); + int index = Math.abs(hash(timestamp, row) % getPartitions()); return shardSpecs.get(index); } }; diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index d3ab608a328f202610d0c0b7f6e9a93afcec3eac..b095f7135c4b1988c681a01aa5eb3c652928bab2 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -50,7 +50,7 @@ public class LinearShardSpec implements ShardSpec return new ShardSpecLookup() { @Override - public ShardSpec getShardSpec(InputRow row) + public ShardSpec getShardSpec(long timestamp, InputRow row) { return shardSpecs.get(0); } @@ -63,7 +63,7 @@ public class LinearShardSpec implements ShardSpec } @Override - public boolean isInChunk(InputRow inputRow) { + public boolean isInChunk(long timestamp, InputRow inputRow) { return true; } diff --git a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java index 65399f1009f059aebf0f32332776a5bfb6ca8fb4..7c90e66c7117cad2b161869eb263c46356af3221 100644 --- a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java @@ -60,7 +60,7 @@ public class NumberedShardSpec implements ShardSpec return new ShardSpecLookup() { @Override - public ShardSpec getShardSpec(InputRow row) + public ShardSpec getShardSpec(long timestamp, InputRow row) { return shardSpecs.get(0); } @@ -80,7 +80,7 @@ public class NumberedShardSpec implements ShardSpec } @Override - public boolean isInChunk(InputRow inputRow) + public boolean isInChunk(long timestamp, InputRow inputRow) { return true; } diff --git a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java index 1cf232a4b29246124c3d6d099fac36f4431016af..2350dc559efb3fbe6dd9276d0414e9506e36d6da 100644 --- a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java @@ -100,10 +100,10 @@ public class SingleDimensionShardSpec implements ShardSpec return new ShardSpecLookup() { @Override - public ShardSpec getShardSpec(InputRow row) + public ShardSpec getShardSpec(long timestamp, InputRow row) { for (ShardSpec spec : shardSpecs) { - if (spec.isInChunk(row)) { + if (spec.isInChunk(timestamp, row)) { return spec; } } @@ -124,7 +124,7 @@ public class SingleDimensionShardSpec implements ShardSpec } @Override - public boolean isInChunk(InputRow inputRow) + public boolean isInChunk(long timestamp, InputRow inputRow) { final List values = inputRow.getDimension(dimension); diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index a5e3ff1181d19fe7972d6e66ba434ae114b67fc8..47c78724010669988470b18c0700d76637df0127 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -127,7 +127,7 @@ public class HashBasedNumberedShardSpecTest public boolean assertExistsInOneSpec(List specs, InputRow row) { for (ShardSpec spec : specs) { - if (spec.isInChunk(row)) { + if (spec.isInChunk(row.getTimestampFromEpoch(), row)) { return true; } } @@ -145,7 +145,7 @@ public class HashBasedNumberedShardSpecTest } @Override - protected int hash(InputRow inputRow) + protected int hash(long timestamp, InputRow inputRow) { return inputRow.hashCode(); } @@ -208,4 +208,5 @@ public class HashBasedNumberedShardSpecTest return 0; } } + } diff --git a/server/src/test/java/io/druid/server/shard/SingleDimensionShardSpecTest.java b/server/src/test/java/io/druid/server/shard/SingleDimensionShardSpecTest.java index d07b52d9fcbd236f08cf5fa94c94a4f1af3da756..5bb49b38e5cbcaf83038bff9373cbee2c076352e 100644 --- a/server/src/test/java/io/druid/server/shard/SingleDimensionShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/SingleDimensionShardSpecTest.java @@ -111,7 +111,7 @@ public class SingleDimensionShardSpecTest } ) ); - Assert.assertEquals(String.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow)); + Assert.assertEquals(String.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow.getTimestampFromEpoch(), inputRow)); } } }