提交 4ce12470 编写于 作者: N nishantmonu51

Add way to skip determine partitions for index task

Add a way to skip determinePartitions for IndexTask by manually
specifying numShards.
上级 64307766
...@@ -179,7 +179,16 @@ public class DetermineHashedPartitionsJob implements Jobby ...@@ -179,7 +179,16 @@ public class DetermineHashedPartitionsJob implements Jobby
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else { } else {
for (int i = 0; i < numberOfShards; ++i) { for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); actualSpecs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(
i,
numberOfShards,
HadoopDruidIndexerConfig.jsonMapper
),
shardCount++
)
);
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
} }
} }
......
...@@ -67,7 +67,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby ...@@ -67,7 +67,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
for (int i = 0; i < shardsPerInterval; i++) { for (int i = 0; i < shardsPerInterval; i++) {
specs.add( specs.add(
new HadoopyShardSpec( new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, shardsPerInterval), new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.jsonMapper),
shardCount++ shardCount++
) )
); );
......
...@@ -19,10 +19,13 @@ ...@@ -19,10 +19,13 @@
package io.druid.indexing.common.task; package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
...@@ -53,12 +56,14 @@ import io.druid.segment.loading.DataSegmentPusher; ...@@ -53,12 +56,14 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
...@@ -107,6 +112,8 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -107,6 +112,8 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonIgnore @JsonIgnore
private final IndexIngestionSpec ingestionSchema; private final IndexIngestionSpec ingestionSchema;
private final ObjectMapper jsonMapper;
@JsonCreator @JsonCreator
public IndexTask( public IndexTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
...@@ -118,7 +125,8 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -118,7 +125,8 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("indexGranularity") final QueryGranularity indexGranularity, @JsonProperty("indexGranularity") final QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") final int targetPartitionSize, @JsonProperty("targetPartitionSize") final int targetPartitionSize,
@JsonProperty("firehose") final FirehoseFactory firehoseFactory, @JsonProperty("firehose") final FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary @JsonProperty("rowFlushBoundary") final int rowFlushBoundary,
@JacksonInject ObjectMapper jsonMapper
) )
{ {
super( super(
...@@ -139,9 +147,10 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -139,9 +147,10 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity) granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
), ),
new IndexIOConfig(firehoseFactory), new IndexIOConfig(firehoseFactory),
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary) new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null)
); );
} }
this.jsonMapper = jsonMapper;
} }
@Override @Override
...@@ -175,7 +184,15 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -175,7 +184,15 @@ public class IndexTask extends AbstractFixedIntervalTask
if (targetPartitionSize > 0) { if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize); shardSpecs = determinePartitions(bucket, targetPartitionSize);
} else { } else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec()); int numShards = ingestionSchema.getTuningConfig().getNumShards();
if (numShards > 0) {
shardSpecs = Lists.newArrayList();
for (int i = 0; i < numShards; i++) {
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper));
}
} else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
}
} }
for (final ShardSpec shardSpec : shardSpecs) { for (final ShardSpec shardSpec : shardSpecs) {
final DataSegment segment = generateSegment( final DataSegment segment = generateSegment(
...@@ -206,6 +223,7 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -206,6 +223,7 @@ public class IndexTask extends AbstractFixedIntervalTask
retVal.add(interval); retVal.add(interval);
} }
} }
return retVal; return retVal;
} }
...@@ -477,7 +495,7 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -477,7 +495,7 @@ public class IndexTask extends AbstractFixedIntervalTask
this.dataSchema = dataSchema; this.dataSchema = dataSchema;
this.ioConfig = ioConfig; this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0) : tuningConfig; this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null) : tuningConfig;
} }
@Override @Override
...@@ -530,15 +548,22 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -530,15 +548,22 @@ public class IndexTask extends AbstractFixedIntervalTask
private final int targetPartitionSize; private final int targetPartitionSize;
private final int rowFlushBoundary; private final int rowFlushBoundary;
private final int numShards;
@JsonCreator @JsonCreator
public IndexTuningConfig( public IndexTuningConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize, @JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary @JsonProperty("rowFlushBoundary") int rowFlushBoundary,
) @JsonProperty("numShards") @Nullable Integer numShards
)
{ {
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
this.numShards = numShards == null ? -1 : numShards;
Preconditions.checkArgument(
this.targetPartitionSize == -1 || this.numShards == -1,
"targetPartitionsSize and shardCount both cannot be set"
);
} }
@JsonProperty @JsonProperty
...@@ -552,5 +577,11 @@ public class IndexTask extends AbstractFixedIntervalTask ...@@ -552,5 +577,11 @@ public class IndexTask extends AbstractFixedIntervalTask
{ {
return rowFlushBoundary; return rowFlushBoundary;
} }
@JsonProperty
public int getNumShards()
{
return numShards;
}
} }
} }
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package io.druid.indexing.common.task; package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
...@@ -67,16 +69,19 @@ public class TaskSerdeTest ...@@ -67,16 +69,19 @@ public class TaskSerdeTest
QueryGranularity.NONE, QueryGranularity.NONE,
10000, 10000,
new LocalFirehoseFactory(new File("lol"), "rofl", null), new LocalFirehoseFactory(new File("lol"), "rofl", null),
-1 -1,
jsonMapper
); );
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule); jsonMapper.registerModule(jacksonModule);
} }
InjectableValues inject = new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper);
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); final IndexTask task2 = jsonMapper.reader(Task.class).with(inject).readValue(json);
Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval()); Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval());
......
...@@ -43,6 +43,7 @@ import io.druid.data.input.InputRow; ...@@ -43,6 +43,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TestUtils;
import io.druid.segment.column.ColumnConfig; import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.SegmentLoaderFactory;
...@@ -249,7 +250,8 @@ public class TaskLifecycleTest ...@@ -249,7 +250,8 @@ public class TaskLifecycleTest
IR("2010-01-02T01", "a", "c", 1) IR("2010-01-02T01", "a", "c", 1)
) )
), ),
-1 -1,
TestUtils.MAPPER
); );
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId()); final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
...@@ -297,7 +299,8 @@ public class TaskLifecycleTest ...@@ -297,7 +299,8 @@ public class TaskLifecycleTest
QueryGranularity.NONE, QueryGranularity.NONE,
10000, 10000,
newMockExceptionalFirehoseFactory(), newMockExceptionalFirehoseFactory(),
-1 -1,
TestUtils.MAPPER
); );
final TaskStatus status = runTask(indexTask); final TaskStatus status = runTask(indexTask);
......
...@@ -34,18 +34,18 @@ import java.util.List; ...@@ -34,18 +34,18 @@ import java.util.List;
public class HashBasedNumberedShardSpec extends NumberedShardSpec public class HashBasedNumberedShardSpec extends NumberedShardSpec
{ {
private static final HashFunction hashFunction = Hashing.murmur3_32(); private static final HashFunction hashFunction = Hashing.murmur3_32();
@JacksonInject private final ObjectMapper jsonMapper;
private ObjectMapper jsonMapper;
@JsonCreator @JsonCreator
public HashBasedNumberedShardSpec( public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum, @JsonProperty("partitionNum") int partitionNum,
@JsonProperty("partitions") int partitions @JsonProperty("partitions") int partitions,
@JacksonInject ObjectMapper jsonMapper
) )
{ {
super(partitionNum, partitions); super(partitionNum, partitions);
this.jsonMapper = jsonMapper;
} }
@Override @Override
......
...@@ -26,6 +26,7 @@ import com.metamx.common.ISE; ...@@ -26,6 +26,7 @@ import com.metamx.common.ISE;
import io.druid.TestUtil; import io.druid.TestUtil;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
...@@ -43,7 +44,7 @@ public class HashBasedNumberedShardSpecTest ...@@ -43,7 +44,7 @@ public class HashBasedNumberedShardSpecTest
{ {
final ShardSpec spec = TestUtil.MAPPER.readValue( final ShardSpec spec = TestUtil.MAPPER.readValue(
TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2)), TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, TestUtil.MAPPER)),
ShardSpec.class ShardSpec.class
); );
Assert.assertEquals(1, spec.getPartitionNum()); Assert.assertEquals(1, spec.getPartitionNum());
...@@ -65,9 +66,9 @@ public class HashBasedNumberedShardSpecTest ...@@ -65,9 +66,9 @@ public class HashBasedNumberedShardSpecTest
public void testPartitionChunks() public void testPartitionChunks()
{ {
final List<ShardSpec> specs = ImmutableList.<ShardSpec>of( final List<ShardSpec> specs = ImmutableList.<ShardSpec>of(
new HashBasedNumberedShardSpec(0, 3), new HashBasedNumberedShardSpec(0, 3, TestUtil.MAPPER),
new HashBasedNumberedShardSpec(1, 3), new HashBasedNumberedShardSpec(1, 3, TestUtil.MAPPER),
new HashBasedNumberedShardSpec(2, 3) new HashBasedNumberedShardSpec(2, 3, TestUtil.MAPPER)
); );
final List<PartitionChunk<String>> chunks = Lists.transform( final List<PartitionChunk<String>> chunks = Lists.transform(
...@@ -141,7 +142,7 @@ public class HashBasedNumberedShardSpecTest ...@@ -141,7 +142,7 @@ public class HashBasedNumberedShardSpecTest
int partitions int partitions
) )
{ {
super(partitionNum, partitions); super(partitionNum, partitions, TestUtil.MAPPER);
} }
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册