提交 36fc8573 编写于 作者: N nishantmonu51

Add ShardSpec Lookup

Optimize choosing shardSpec for Hash Partitions
上级 b8347cf4
...@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; ...@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Maps;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
...@@ -30,6 +31,7 @@ import com.google.common.base.Preconditions; ...@@ -30,6 +31,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
...@@ -51,6 +53,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; ...@@ -51,6 +53,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
...@@ -60,6 +63,7 @@ import org.joda.time.DateTime; ...@@ -60,6 +63,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat; import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
...@@ -169,6 +173,8 @@ public class HadoopDruidIndexerConfig ...@@ -169,6 +173,8 @@ public class HadoopDruidIndexerConfig
private volatile HadoopIngestionSpec schema; private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec; private volatile PathSpec pathSpec;
private volatile ColumnConfig columnConfig; private volatile ColumnConfig columnConfig;
private volatile Map<DateTime,ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
@JsonCreator @JsonCreator
public HadoopDruidIndexerConfig( public HadoopDruidIndexerConfig(
...@@ -178,6 +184,30 @@ public class HadoopDruidIndexerConfig ...@@ -178,6 +184,30 @@ public class HadoopDruidIndexerConfig
this.columnConfig = columnConfig; this.columnConfig = columnConfig;
this.schema = schema; this.schema = schema;
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class); this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : schema.getTuningConfig().getShardSpecs().entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
continue;
}
final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec();
shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup(
Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{
@Nullable
@Override
public ShardSpec apply(@Nullable HadoopyShardSpec input)
{
return input.getActualSpec();
}
}
)
)
);
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
}
}
} }
@JsonProperty @JsonProperty
...@@ -306,14 +336,9 @@ public class HadoopDruidIndexerConfig ...@@ -306,14 +336,9 @@ public class HadoopDruidIndexerConfig
return Optional.absent(); return Optional.absent();
} }
final List<HadoopyShardSpec> shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart()); final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow);
if (shards == null || shards.isEmpty()) { final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
return Optional.absent();
}
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
if (actualSpec.isInChunk(inputRow)) {
return Optional.of( return Optional.of(
new Bucket( new Bucket(
hadoopyShardSpec.getShardNum(), hadoopyShardSpec.getShardNum(),
...@@ -321,10 +346,7 @@ public class HadoopDruidIndexerConfig ...@@ -321,10 +346,7 @@ public class HadoopDruidIndexerConfig
actualSpec.getPartitionNum() actualSpec.getPartitionNum()
) )
); );
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
} }
public Optional<Set<Interval>> getSegmentGranularIntervals() public Optional<Set<Interval>> getSegmentGranularIntervals()
......
...@@ -25,12 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; ...@@ -25,12 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import java.util.List; import java.util.List;
import java.util.Map;
public class HashBasedNumberedShardSpec extends NumberedShardSpec public class HashBasedNumberedShardSpec extends NumberedShardSpec
{ {
...@@ -74,4 +76,22 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec ...@@ -74,4 +76,22 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
'}'; '}';
} }
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
final ImmutableMap.Builder<Integer, ShardSpec> shardSpecsMapBuilder = ImmutableMap.builder();
for (ShardSpec spec : shardSpecs) {
shardSpecsMapBuilder.put(spec.getPartitionNum(), spec);
}
final Map<Integer, ShardSpec> shardSpecMap = shardSpecsMapBuilder.build();
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(InputRow row)
{
return shardSpecMap.get((long) hash(row) % getPartitions());
}
};
}
} }
\ No newline at end of file
...@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; ...@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List;
import java.util.Set;
public class LinearShardSpec implements ShardSpec public class LinearShardSpec implements ShardSpec
{ {
private int partitionNum; private int partitionNum;
...@@ -42,6 +45,19 @@ public class LinearShardSpec implements ShardSpec ...@@ -42,6 +45,19 @@ public class LinearShardSpec implements ShardSpec
return partitionNum; return partitionNum;
} }
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(InputRow row)
{
return shardSpecs.get(0);
}
};
}
@Override @Override
public <T> PartitionChunk<T> createChunk(T obj) { public <T> PartitionChunk<T> createChunk(T obj) {
return new LinearPartitionChunk<T>(partitionNum, obj); return new LinearPartitionChunk<T>(partitionNum, obj);
......
...@@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; ...@@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List;
import java.util.Set;
public class NumberedShardSpec implements ShardSpec public class NumberedShardSpec implements ShardSpec
{ {
@JsonIgnore @JsonIgnore
...@@ -52,6 +55,19 @@ public class NumberedShardSpec implements ShardSpec ...@@ -52,6 +55,19 @@ public class NumberedShardSpec implements ShardSpec
return partitionNum; return partitionNum;
} }
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(InputRow row)
{
return shardSpecs.get(0);
}
};
}
@JsonProperty("partitions") @JsonProperty("partitions")
public int getPartitions() public int getPartitions()
{ {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package io.druid.timeline.partition; package io.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.ISE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List; import java.util.List;
...@@ -94,6 +95,24 @@ public class SingleDimensionShardSpec implements ShardSpec ...@@ -94,6 +95,24 @@ public class SingleDimensionShardSpec implements ShardSpec
return partitionNum; return partitionNum;
} }
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(InputRow row)
{
for (ShardSpec spec : shardSpecs) {
if (spec.isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
}
};
}
public void setPartitionNum(int partitionNum) public void setPartitionNum(int partitionNum)
{ {
this.partitionNum = partitionNum; this.partitionNum = partitionNum;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册