提交 0748eabe 编写于 作者: N nishantmonu51

batch ingestion fixes

1) Fix path when mapped output is compressed
2) Add number of reducers to the determine hashed partitions job
manually
3) Add a way to disable determine partitions and specify shardCount in
HashedPartitionsSpec
上级 4e1c159f
......@@ -100,6 +100,8 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
if (!config.getSegmentGranularIntervals().isPresent()) {
groupByJob.setNumReduceTasks(1);
} else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
}
JobHelper.setupClasspath(config, groupByJob);
......@@ -124,9 +126,6 @@ public class DetermineHashedPartitionsJob implements Jobby
if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!fileSystem.exists(intervalInfoPath)) {
throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
}
List<Interval> intervals = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
{
......@@ -144,37 +143,33 @@ public class DetermineHashedPartitionsJob implements Jobby
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
Long cardinality = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{
}
Long cardinality = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{
}
);
int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
if (numberOfShards > MAX_SHARDS) {
throw new ISE(
"Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
numberOfShards,
MAX_SHARDS
);
int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
if (numberOfShards > MAX_SHARDS) {
throw new ISE(
"Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
numberOfShards,
MAX_SHARDS
);
}
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
if (numberOfShards == 1) {
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else {
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
if (numberOfShards == 1) {
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else {
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
}
shardSpecs.put(bucket, actualSpecs);
shardSpecs.put(bucket, actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
}
config.setShardSpecs(shardSpecs);
log.info(
......
......@@ -215,23 +215,20 @@ public class DeterminePartitionsJob implements Jobby
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{
}
);
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (int i = 0; i < specs.size(); ++i) {
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
}
List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{
}
);
shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (int i = 0; i < specs.size(); ++i) {
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
}
shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
}
config.setShardSpecs(shardSpecs);
......
......@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
......@@ -56,13 +57,25 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
if (config.isDeterminingPartitions()) {
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else {
int shardsPerInterval = config.getPartitionsSpec().getShardCount();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
if (shardsPerInterval > 0) {
for (int i = 0; i < shardsPerInterval; i++) {
final HadoopyShardSpec spec = new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, shardsPerInterval),
shardCount++
);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
} else {
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
}
config.setShardSpecs(shardSpecs);
}
......
......@@ -20,19 +20,23 @@
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
public abstract class AbstractPartitionsSpec implements PartitionsSpec
{
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
private static final int MAX_SHARDS = 128;
private final long targetPartitionSize;
private final long maxPartitionSize;
private final boolean assumeGrouped;
private final int shardCount;
public AbstractPartitionsSpec(
Long targetPartitionSize,
Long maxPartitionSize,
Boolean assumeGrouped
Boolean assumeGrouped,
Integer shardCount
)
{
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
......@@ -40,6 +44,15 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
this.shardCount = shardCount == null ? -1 : shardCount;
Preconditions.checkArgument(
targetPartitionSize == -1 || shardCount == -1,
"targetPartitionsSize and shardCount both cannot be set"
);
Preconditions.checkArgument(
shardCount < MAX_SHARDS,
"shardCount cannot be more than MAX_SHARD_COUNT[%d] ", MAX_SHARDS
);
}
@JsonProperty
......@@ -65,4 +78,10 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
{
return targetPartitionSize > 0;
}
@Override
public int getShardCount()
{
return shardCount;
}
}
......@@ -21,6 +21,7 @@ package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
......@@ -33,10 +34,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("shardCount") @Nullable Integer shardCount
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount);
}
@Override
......
......@@ -49,4 +49,7 @@ public interface PartitionsSpec
@JsonIgnore
public boolean isDeterminingPartitions();
@JsonProperty
public int getShardCount();
}
......@@ -21,9 +21,6 @@ package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
......@@ -35,9 +32,10 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec
public RandomPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("shardCount") @Nullable Integer shardCount
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount);
}
}
......@@ -41,7 +41,7 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
this.partitionDimension = partitionDimension;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册