diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index ae2d61a9a93ea5de08118356d95afc1ad7c2b5fc..e3782916902e09274b167978d423b106d0434456 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -100,6 +100,8 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); if (!config.getSegmentGranularIntervals().isPresent()) { groupByJob.setNumReduceTasks(1); + } else { + groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); } JobHelper.setupClasspath(config, groupByJob); @@ -124,9 +126,6 @@ public class DetermineHashedPartitionsJob implements Jobby if (!config.getSegmentGranularIntervals().isPresent()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); - if (!fileSystem.exists(intervalInfoPath)) { - throw new ISE("Path[%s] didn't exist!?", intervalInfoPath); - } List intervals = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference>() { @@ -144,37 +143,33 @@ public class DetermineHashedPartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { - Long cardinality = config.jsonMapper.readValue( - Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() - { - } + Long cardinality = config.jsonMapper.readValue( + Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() + { + } + ); + int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); + + if (numberOfShards > MAX_SHARDS) { + throw new ISE( + "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", + numberOfShards, + MAX_SHARDS ); - int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); - - if (numberOfShards > MAX_SHARDS) { - throw new ISE( - "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", - numberOfShards, - MAX_SHARDS - ); - } + } - List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); - if (numberOfShards == 1) { - actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); - } else { - for (int i = 0; i < numberOfShards; ++i) { - actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); - } + List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); + if (numberOfShards == 1) { + actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); + } else { + for (int i = 0; i < numberOfShards; ++i) { + actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); } + } - shardSpecs.put(bucket, actualSpecs); + shardSpecs.put(bucket, actualSpecs); - } else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); - } } config.setShardSpecs(shardSpecs); log.info( diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 890a351618930bbe736f35ca3aab850d7a0d9c49..ddcb691ef09de40f4afedb6a0abba50eeb239bc7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -215,23 +215,20 @@ public class DeterminePartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { - List specs = config.jsonMapper.readValue( - Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() - { - } - ); - - List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); - for (int i = 0; i < specs.size(); ++i) { - actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); - } + List specs = config.jsonMapper.readValue( + Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() + { + } + ); - shardSpecs.put(segmentGranularity.getStart(), actualSpecs); - } else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (int i = 0; i < specs.size(); ++i) { + actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); } + + shardSpecs.put(segmentGranularity.getStart(), actualSpecs); + } config.setShardSpecs(shardSpecs); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 2076292260de58977feef87dae028dfd3c3ff6f6..7a8c25dd1370ffb53d05e365a81d36594550a721 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -56,13 +57,25 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby if (config.isDeterminingPartitions()) { jobs.add(config.getPartitionsSpec().getPartitionJob(config)); } else { + int shardsPerInterval = config.getPartitionsSpec().getShardCount(); Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { DateTime bucket = segmentGranularity.getStart(); - final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); - shardSpecs.put(bucket, Lists.newArrayList(spec)); - log.info("DateTime[%s], spec[%s]", bucket, spec); + if (shardsPerInterval > 0) { + for (int i = 0; i < shardsPerInterval; i++) { + final HadoopyShardSpec spec = new HadoopyShardSpec( + new HashBasedNumberedShardSpec(i, shardsPerInterval), + shardCount++ + ); + shardSpecs.put(bucket, Lists.newArrayList(spec)); + log.info("DateTime[%s], spec[%s]", bucket, spec); + } + } else { + final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); + shardSpecs.put(bucket, Lists.newArrayList(spec)); + log.info("DateTime[%s], spec[%s]", bucket, spec); + } } config.setShardSpecs(shardSpecs); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java index 90fab3e0435ba500ad9db9aa3b0fa202fa3de3c4..47498e0f0ef43cac084535413f885ed78b59f593 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java @@ -20,19 +20,23 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; public abstract class AbstractPartitionsSpec implements PartitionsSpec { private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5; + private static final int MAX_SHARDS = 128; private final long targetPartitionSize; private final long maxPartitionSize; private final boolean assumeGrouped; + private final int shardCount; public AbstractPartitionsSpec( Long targetPartitionSize, Long maxPartitionSize, - Boolean assumeGrouped + Boolean assumeGrouped, + Integer shardCount ) { this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; @@ -40,6 +44,15 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) : maxPartitionSize; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; + this.shardCount = shardCount == null ? -1 : shardCount; + Preconditions.checkArgument( + targetPartitionSize == -1 || shardCount == -1, + "targetPartitionsSize and shardCount both cannot be set" + ); + Preconditions.checkArgument( + shardCount < MAX_SHARDS, + "shardCount cannot be more than MAX_SHARD_COUNT[%d] ", MAX_SHARDS + ); } @JsonProperty @@ -65,4 +78,10 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec { return targetPartitionSize > 0; } + + @Override + public int getShardCount() + { + return shardCount; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java index d164cef16389cff7ef768cfaa9720770ce2ad75c..b67b61d2b5a557dd9c2f18059ab0300c240a2169 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java @@ -21,6 +21,7 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import io.druid.indexer.DetermineHashedPartitionsJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.Jobby; @@ -33,10 +34,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec public HashedPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, + @JsonProperty("shardCount") @Nullable Integer shardCount ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount); } @Override diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java index cce5de8becf6ced3476e2ffa19c786b8f283fa48..1aa1495662cd628cf4f69c5bff61d1d714227ea7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java @@ -49,4 +49,7 @@ public interface PartitionsSpec @JsonIgnore public boolean isDeterminingPartitions(); + @JsonProperty + public int getShardCount(); + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java index 30f13f49478ac8aaf0accf4ed27c2731d2bfba74..6f0d66d4da557dbbde7448f46d4e5accc6b8a595 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java @@ -21,9 +21,6 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.indexer.DetermineHashedPartitionsJob; -import io.druid.indexer.HadoopDruidIndexerConfig; -import io.druid.indexer.Jobby; import javax.annotation.Nullable; @@ -35,9 +32,10 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec public RandomPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, + @JsonProperty("shardCount") @Nullable Integer shardCount ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 118d13559141a82bc989d9168f9635bcb05240ef..7964c1cbe6f209014650abc10c3f53a79ddab7b0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -41,7 +41,7 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, null); this.partitionDimension = partitionDimension; }