From 4ce12470a198bcebaee6db31cae93fed675c9e93 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 18 Jul 2014 18:52:15 +0530 Subject: [PATCH] Add way to skip determine partitions for index task Add a way to skip determinePartitions for IndexTask by manually specifying numShards. --- .../indexer/DetermineHashedPartitionsJob.java | 11 ++++- .../HadoopDruidDetermineConfigurationJob.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 43 ++++++++++++++++--- .../indexing/common/task/TaskSerdeTest.java | 9 +++- .../indexing/overlord/TaskLifecycleTest.java | 7 ++- .../partition/HashBasedNumberedShardSpec.java | 8 ++-- .../shard/HashBasedNumberedShardSpecTest.java | 11 ++--- 7 files changed, 70 insertions(+), 21 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 4bc9cb9f35..77466b8c0d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -179,7 +179,16 @@ public class DetermineHashedPartitionsJob implements Jobby actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); } else { for (int i = 0; i < numberOfShards; ++i) { - actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); + actualSpecs.add( + new HadoopyShardSpec( + new HashBasedNumberedShardSpec( + i, + numberOfShards, + HadoopDruidIndexerConfig.jsonMapper + ), + shardCount++ + ) + ); log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 311eec6248..51ba40abc4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -67,7 +67,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby for (int i = 0; i < shardsPerInterval; i++) { specs.add( new HadoopyShardSpec( - new HashBasedNumberedShardSpec(i, shardsPerInterval), + new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.jsonMapper), shardCount++ ) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 182a156990..b7496c7078 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -19,10 +19,13 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -53,12 +56,14 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.Plumber; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -107,6 +112,8 @@ public class IndexTask extends AbstractFixedIntervalTask @JsonIgnore private final IndexIngestionSpec ingestionSchema; + private final ObjectMapper jsonMapper; + @JsonCreator public IndexTask( @JsonProperty("id") String id, @@ -118,7 +125,8 @@ public class IndexTask extends AbstractFixedIntervalTask @JsonProperty("indexGranularity") final QueryGranularity indexGranularity, @JsonProperty("targetPartitionSize") final int targetPartitionSize, @JsonProperty("firehose") final FirehoseFactory firehoseFactory, - @JsonProperty("rowFlushBoundary") final int rowFlushBoundary + @JsonProperty("rowFlushBoundary") final int rowFlushBoundary, + @JacksonInject ObjectMapper jsonMapper ) { super( @@ -139,9 +147,10 @@ public class IndexTask extends AbstractFixedIntervalTask granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity) ), new IndexIOConfig(firehoseFactory), - new IndexTuningConfig(targetPartitionSize, rowFlushBoundary) + new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null) ); } + this.jsonMapper = jsonMapper; } @Override @@ -175,7 +184,15 @@ public class IndexTask extends AbstractFixedIntervalTask if (targetPartitionSize > 0) { shardSpecs = determinePartitions(bucket, targetPartitionSize); } else { - shardSpecs = ImmutableList.of(new NoneShardSpec()); + int numShards = ingestionSchema.getTuningConfig().getNumShards(); + if (numShards > 0) { + shardSpecs = Lists.newArrayList(); + for (int i = 0; i < numShards; i++) { + shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper)); + } + } else { + shardSpecs = ImmutableList.of(new NoneShardSpec()); + } } for (final ShardSpec shardSpec : shardSpecs) { final DataSegment segment = generateSegment( @@ -206,6 +223,7 @@ public class IndexTask extends AbstractFixedIntervalTask retVal.add(interval); } } + return retVal; } @@ -477,7 +495,7 @@ public class IndexTask extends AbstractFixedIntervalTask this.dataSchema = dataSchema; this.ioConfig = ioConfig; - this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0) : tuningConfig; + this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null) : tuningConfig; } @Override @@ -530,15 +548,22 @@ public class IndexTask extends AbstractFixedIntervalTask private final int targetPartitionSize; private final int rowFlushBoundary; + private final int numShards; @JsonCreator public IndexTuningConfig( @JsonProperty("targetPartitionSize") int targetPartitionSize, - @JsonProperty("rowFlushBoundary") int rowFlushBoundary - ) + @JsonProperty("rowFlushBoundary") int rowFlushBoundary, + @JsonProperty("numShards") @Nullable Integer numShards + ) { this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; + this.numShards = numShards == null ? -1 : numShards; + Preconditions.checkArgument( + this.targetPartitionSize == -1 || this.numShards == -1, + "targetPartitionsSize and shardCount both cannot be set" + ); } @JsonProperty @@ -552,5 +577,11 @@ public class IndexTask extends AbstractFixedIntervalTask { return rowFlushBoundary; } + + @JsonProperty + public int getNumShards() + { + return numShards; + } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 0cd0fde932..516ea98430 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -19,6 +19,8 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -67,16 +69,19 @@ public class TaskSerdeTest QueryGranularity.NONE, 10000, new LocalFirehoseFactory(new File("lol"), "rofl", null), - -1 + -1, + jsonMapper ); for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } + InjectableValues inject = new InjectableValues.Std() + .addValue(ObjectMapper.class, jsonMapper); final String json = jsonMapper.writeValueAsString(task); Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change - final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); + final IndexTask task2 = jsonMapper.reader(Task.class).with(inject).readValue(json); Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval()); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 55c00a58d9..33411036e1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -43,6 +43,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TestUtils; import io.druid.segment.column.ColumnConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.indexing.common.SegmentLoaderFactory; @@ -249,7 +250,8 @@ public class TaskLifecycleTest IR("2010-01-02T01", "a", "c", 1) ) ), - -1 + -1, + TestUtils.MAPPER ); final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); @@ -297,7 +299,8 @@ public class TaskLifecycleTest QueryGranularity.NONE, 10000, newMockExceptionalFirehoseFactory(), - -1 + -1, + TestUtils.MAPPER ); final TaskStatus status = runTask(indexTask); diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 8f347ee6cf..be640a0354 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -34,18 +34,18 @@ import java.util.List; public class HashBasedNumberedShardSpec extends NumberedShardSpec { - private static final HashFunction hashFunction = Hashing.murmur3_32(); - @JacksonInject - private ObjectMapper jsonMapper; + private final ObjectMapper jsonMapper; @JsonCreator public HashBasedNumberedShardSpec( @JsonProperty("partitionNum") int partitionNum, - @JsonProperty("partitions") int partitions + @JsonProperty("partitions") int partitions, + @JacksonInject ObjectMapper jsonMapper ) { super(partitionNum, partitions); + this.jsonMapper = jsonMapper; } @Override diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index afab880ff8..97b9fefc30 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -26,6 +26,7 @@ import com.metamx.common.ISE; import io.druid.TestUtil; import io.druid.data.input.InputRow; import io.druid.data.input.Row; +import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.ShardSpec; @@ -43,7 +44,7 @@ public class HashBasedNumberedShardSpecTest { final ShardSpec spec = TestUtil.MAPPER.readValue( - TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2)), + TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, TestUtil.MAPPER)), ShardSpec.class ); Assert.assertEquals(1, spec.getPartitionNum()); @@ -65,9 +66,9 @@ public class HashBasedNumberedShardSpecTest public void testPartitionChunks() { final List specs = ImmutableList.of( - new HashBasedNumberedShardSpec(0, 3), - new HashBasedNumberedShardSpec(1, 3), - new HashBasedNumberedShardSpec(2, 3) + new HashBasedNumberedShardSpec(0, 3, TestUtil.MAPPER), + new HashBasedNumberedShardSpec(1, 3, TestUtil.MAPPER), + new HashBasedNumberedShardSpec(2, 3, TestUtil.MAPPER) ); final List> chunks = Lists.transform( @@ -141,7 +142,7 @@ public class HashBasedNumberedShardSpecTest int partitions ) { - super(partitionNum, partitions); + super(partitionNum, partitions, TestUtil.MAPPER); } @Override -- GitLab