提交 53cd45b7 编写于 作者: F fjy

add a conversion script to convert old and new ingestion specs

上级 8ff42df0
......@@ -47,7 +47,7 @@ public class DbUpdaterJob implements Jobby
)
{
this.config = config;
this.dbConnector = new DbConnector(config.getSchema().getIOConfig().getMetadataUpdateSpec(), null);
this.dbConnector = new DbConnector(config.getSpec().getIOConfig().getMetadataUpdateSpec(), null);
this.dbi = this.dbConnector.getDBI();
}
......@@ -69,7 +69,7 @@ public class DbUpdaterJob implements Jobby
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)" :
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable()
config.getSpec().getIOConfig().getMetadataUpdateSpec().getSegmentTable()
)
);
for (final DataSegment segment : segments) {
......
......@@ -48,7 +48,6 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
......@@ -108,9 +107,9 @@ public class HadoopDruidIndexerConfig
INVALID_ROW_COUNTER
}
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSpec schema)
public static HadoopDruidIndexerConfig fromSpec(HadoopIngestionSpec spec)
{
return new HadoopDruidIndexerConfig(schema);
return new HadoopDruidIndexerConfig(spec, null);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
......@@ -123,7 +122,8 @@ public class HadoopDruidIndexerConfig
HadoopDruidIndexerConfig.jsonMapper.convertValue(
argSpec,
HadoopIngestionSpec.class
)
),
null
);
}
}
......@@ -135,8 +135,8 @@ public class HadoopDruidIndexerConfig
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>()
{
}
{
}
)
);
}
......@@ -152,8 +152,8 @@ public class HadoopDruidIndexerConfig
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
{
}
)
);
}
......@@ -169,97 +169,102 @@ public class HadoopDruidIndexerConfig
return retVal;
}
private volatile HadoopIngestionSpec schema;
private volatile HadoopIngestionSpec spec;
private volatile PathSpec pathSpec;
private volatile Map<DateTime,ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private volatile Map<DateTime, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
private final QueryGranularity rollupGran;
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("schema") HadoopIngestionSpec schema
final @JsonProperty("spec") HadoopIngestionSpec spec,
final @JsonProperty("schema") HadoopIngestionSpec schema // backwards compat
)
{
this.schema = schema;
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : schema.getTuningConfig().getShardSpecs().entrySet()) {
if (spec != null) {
this.spec = spec;
} else {
this.spec = schema;
}
this.pathSpec = jsonMapper.convertValue(this.spec.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : spec.getTuningConfig().getShardSpecs().entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
continue;
}
final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec();
shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup(
Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{
@Override
public ShardSpec apply(HadoopyShardSpec input)
{
return input.getActualSpec();
}
}
Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{
@Override
public ShardSpec apply(HadoopyShardSpec input)
{
return input.getActualSpec();
}
}
)
)
)
);
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
}
}
this.rollupGran = schema.getDataSchema().getGranularitySpec().getQueryGranularity();
this.rollupGran = this.spec.getDataSchema().getGranularitySpec().getQueryGranularity();
}
@JsonProperty
public HadoopIngestionSpec getSchema()
public HadoopIngestionSpec getSpec()
{
return schema;
return spec;
}
public String getDataSource()
{
return schema.getDataSchema().getDataSource();
return spec.getDataSchema().getDataSource();
}
public GranularitySpec getGranularitySpec()
{
return schema.getDataSchema().getGranularitySpec();
return spec.getDataSchema().getGranularitySpec();
}
public void setGranularitySpec(GranularitySpec granularitySpec)
{
this.schema = schema.withDataSchema(schema.getDataSchema().withGranularitySpec(granularitySpec));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
this.spec = spec.withDataSchema(spec.getDataSchema().withGranularitySpec(granularitySpec));
this.pathSpec = jsonMapper.convertValue(spec.getIOConfig().getPathSpec(), PathSpec.class);
}
public PartitionsSpec getPartitionsSpec()
{
return schema.getTuningConfig().getPartitionsSpec();
return spec.getTuningConfig().getPartitionsSpec();
}
public boolean isOverwriteFiles()
{
return schema.getTuningConfig().isOverwriteFiles();
return spec.getTuningConfig().isOverwriteFiles();
}
public boolean isIgnoreInvalidRows()
{
return schema.getTuningConfig().isIgnoreInvalidRows();
return spec.getTuningConfig().isIgnoreInvalidRows();
}
public void setVersion(String version)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withVersion(version));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
this.spec = spec.withTuningConfig(spec.getTuningConfig().withVersion(version));
this.pathSpec = jsonMapper.convertValue(spec.getIOConfig().getPathSpec(), PathSpec.class);
}
public void setShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
this.spec = spec.withTuningConfig(spec.getTuningConfig().withShardSpecs(shardSpecs));
this.pathSpec = jsonMapper.convertValue(spec.getIOConfig().getPathSpec(), PathSpec.class);
}
public Optional<List<Interval>> getIntervals()
{
Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
Optional<SortedSet<Interval>> setOptional = spec.getDataSchema().getGranularitySpec().bucketIntervals();
if (setOptional.isPresent()) {
return Optional.of((List<Interval>) JodaUtils.condenseIntervals(setOptional.get()));
} else {
......@@ -269,37 +274,37 @@ public class HadoopDruidIndexerConfig
public boolean isDeterminingPartitions()
{
return schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions();
return spec.getTuningConfig().getPartitionsSpec().isDeterminingPartitions();
}
public Long getTargetPartitionSize()
{
return schema.getTuningConfig().getPartitionsSpec().getTargetPartitionSize();
return spec.getTuningConfig().getPartitionsSpec().getTargetPartitionSize();
}
public long getMaxPartitionSize()
{
return schema.getTuningConfig().getPartitionsSpec().getMaxPartitionSize();
return spec.getTuningConfig().getPartitionsSpec().getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet()
{
return (schema.getIOConfig().getMetadataUpdateSpec() != null);
return (spec.getIOConfig().getMetadataUpdateSpec() != null);
}
public boolean isCombineText()
{
return schema.getTuningConfig().isCombineText();
return spec.getTuningConfig().isCombineText();
}
public StringInputRowParser getParser()
{
return (StringInputRowParser) schema.getDataSchema().getParser();
return (StringInputRowParser) spec.getDataSchema().getParser();
}
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
return spec.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}
public Job addInputPaths(Job job) throws IOException
......@@ -320,7 +325,7 @@ public class HadoopDruidIndexerConfig
*/
public Optional<Bucket> getBucket(InputRow inputRow)
{
final Optional<Interval> timeBucket = schema.getDataSchema().getGranularitySpec().bucketInterval(
final Optional<Interval> timeBucket = spec.getDataSchema().getGranularitySpec().bucketInterval(
new DateTime(
inputRow.getTimestampFromEpoch()
)
......@@ -329,7 +334,11 @@ public class HadoopDruidIndexerConfig
return Optional.absent();
}
final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart())
.getShardSpec(
rollupGran.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
return Optional.of(
......@@ -345,10 +354,10 @@ public class HadoopDruidIndexerConfig
public Optional<Set<Interval>> getSegmentGranularIntervals()
{
return Optional.fromNullable(
(Set<Interval>) schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
(Set<Interval>) spec.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
);
}
......@@ -366,7 +375,7 @@ public class HadoopDruidIndexerConfig
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime);
final List<HadoopyShardSpec> specs = spec.getTuningConfig().getShardSpecs().get(bucketTime);
if (specs == null) {
return ImmutableList.of();
}
......@@ -409,9 +418,9 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
"%s/%s/%s",
schema.getTuningConfig().getWorkingPath(),
schema.getDataSchema().getDataSource(),
schema.getTuningConfig().getVersion().replace(":", "")
spec.getTuningConfig().getWorkingPath(),
spec.getDataSchema().getDataSource(),
spec.getTuningConfig().getVersion().replace(":", "")
)
);
}
......@@ -455,16 +464,16 @@ public class HadoopDruidIndexerConfig
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{
final Interval bucketInterval = schema.getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get();
final Interval bucketInterval = spec.getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem) {
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
schema.getIOConfig().getSegmentOutputPath(),
schema.getDataSchema().getDataSource(),
spec.getIOConfig().getSegmentOutputPath(),
spec.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
schema.getTuningConfig().getVersion().replace(":", "_"),
spec.getTuningConfig().getVersion().replace(":", "_"),
bucket.partitionNum
)
);
......@@ -472,11 +481,11 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
schema.getIOConfig().getSegmentOutputPath(),
schema.getDataSchema().getDataSource(),
spec.getIOConfig().getSegmentOutputPath(),
spec.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
schema.getTuningConfig().getVersion(),
spec.getTuningConfig().getVersion(),
bucket.partitionNum
)
);
......@@ -486,7 +495,7 @@ public class HadoopDruidIndexerConfig
{
Configuration conf = job.getConfiguration();
for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) {
for (final Map.Entry<String, String> entry : spec.getTuningConfig().getJobProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
......@@ -512,13 +521,13 @@ public class HadoopDruidIndexerConfig
throw Throwables.propagate(e);
}
Preconditions.checkNotNull(schema.getDataSchema().getDataSource(), "dataSource");
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec(), "parseSpec");
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec().getTimestampSpec(), "timestampSpec");
Preconditions.checkNotNull(schema.getDataSchema().getGranularitySpec(), "granularitySpec");
Preconditions.checkNotNull(spec.getDataSchema().getDataSource(), "dataSource");
Preconditions.checkNotNull(spec.getDataSchema().getParser().getParseSpec(), "parseSpec");
Preconditions.checkNotNull(spec.getDataSchema().getParser().getParseSpec().getTimestampSpec(), "timestampSpec");
Preconditions.checkNotNull(spec.getDataSchema().getGranularitySpec(), "granularitySpec");
Preconditions.checkNotNull(pathSpec, "pathSpec");
Preconditions.checkNotNull(schema.getTuningConfig().getWorkingPath(), "workingPath");
Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
Preconditions.checkNotNull(spec.getTuningConfig().getWorkingPath(), "workingPath");
Preconditions.checkNotNull(spec.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(spec.getTuningConfig().getVersion(), "version");
}
}
......@@ -262,7 +262,7 @@ public class IndexGeneratorJob implements Jobby
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) {
for (AggregatorFactory factory : config.getSpec().getDataSchema().getAggregators()) {
metricNames.add(factory.getName().toLowerCase());
}
......@@ -279,7 +279,7 @@ public class IndexGeneratorJob implements Jobby
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
//final DataRollupSpec rollupSpec = config.getRollupSpec();
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
final AggregatorFactory[] aggs = config.getSpec().getDataSchema().getAggregators();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
......@@ -303,7 +303,7 @@ public class IndexGeneratorJob implements Jobby
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
if (numRows >= config.getSpec().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
......@@ -457,7 +457,7 @@ public class IndexGeneratorJob implements Jobby
DataSegment segment = new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
config.getSpec().getTuningConfig().getVersion(),
loadSpec,
dimensionNames,
metricNames,
......@@ -619,8 +619,8 @@ public class IndexGeneratorJob implements Jobby
return new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withSpatialDimensions(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withSpatialDimensions(config.getSpec().getDataSchema().getParser())
.withQueryGranularity(config.getSpec().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build()
);
......
......@@ -63,7 +63,7 @@ public class JobHelper
final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getSchema().getTuningConfig().getWorkingPath(), "classpath");
Path distributedClassPath = new Path(config.getSpec().getTuningConfig().getWorkingPath(), "classpath");
if (fs instanceof LocalFileSystem) {
return;
......@@ -138,8 +138,8 @@ public class JobHelper
}
}
if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
if (failedMessage == null || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
if (!config.getSpec().getTuningConfig().isLeaveIntermediate()) {
if (failedMessage == null || config.getSpec().getTuningConfig().isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
......
......@@ -93,7 +93,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
String bucketOutput = String.format(
"%s/%s",
config.getSchema().getIOConfig().getSegmentOutputPath(),
config.getSpec().getIOConfig().getSegmentOutputPath(),
segmentGranularity.toPath(timeBucket)
);
for (FileStatus fileStatus : FSSpideringIterator.spiderIterable(fs, new Path(bucketOutput))) {
......@@ -119,4 +119,4 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
return super.addInputPaths(config, job);
}
}
\ No newline at end of file
}
......@@ -21,12 +21,10 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.JSONDataSpec;
import io.druid.data.input.impl.TimestampSpec;
......@@ -34,7 +32,6 @@ import io.druid.granularity.QueryGranularity;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.hadoop.fs.LocalFileSystem;
......@@ -67,10 +64,10 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
HadoopIngestionSpec schema;
HadoopIngestionSpec spec;
try {
schema = jsonReadWriteRead(
spec = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"source\","
+ " \"granularitySpec\":{"
......@@ -88,12 +85,13 @@ public class HadoopDruidIndexerConfigTest
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withTuningConfig(
schema.getTuningConfig()
spec.withTuningConfig(
spec.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
)
),
null
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
......@@ -107,10 +105,10 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
final HadoopIngestionSpec schema;
final HadoopIngestionSpec spec;
try {
schema = jsonReadWriteRead(
spec = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
......@@ -128,12 +126,13 @@ public class HadoopDruidIndexerConfigTest
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withTuningConfig(
schema.getTuningConfig()
spec.withTuningConfig(
spec.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
)
),
null
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
......@@ -184,7 +183,7 @@ public class HadoopDruidIndexerConfigTest
null,
null
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(spec);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
final List<String> dims = Arrays.asList("diM1", "dIM2");
final ImmutableMap<String, Object> values = ImmutableMap.<String, Object>of(
"Dim1",
......
......@@ -283,7 +283,7 @@ public class HadoopIndexTask extends AbstractTask
schema,
HadoopIngestionSpec.class
);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(
theSchema
.withTuningConfig(theSchema.getTuningConfig().withVersion(version))
);
......@@ -312,7 +312,7 @@ public class HadoopIndexTask extends AbstractTask
schema,
HadoopIngestionSpec.class
);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(
theSchema
.withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath))
.withTuningConfig(theSchema.getTuningConfig().withWorkingPath(workingPath))
......@@ -322,7 +322,7 @@ public class HadoopIndexTask extends AbstractTask
log.info("Starting a hadoop determine configuration job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config.getSchema());
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config.getSpec());
}
return null;
......
......@@ -129,15 +129,16 @@ public class IndexTask extends AbstractFixedIntervalTask
}
@JsonIgnore
private final IndexIngestionSpec ingestionSchema;
private final IndexIngestionSpec ingestionSpec;
private final ObjectMapper jsonMapper;
@JsonCreator
public IndexTask(
@JsonProperty("id") String id,
@JsonProperty("schema") IndexIngestionSpec ingestionSchema,
@JsonProperty("spec") IndexIngestionSpec ingestionSpec,
// Backwards Compatible
@JsonProperty("schema") IndexIngestionSpec schema,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("granularitySpec") final GranularitySpec granularitySpec,
@JsonProperty("aggregators") final AggregatorFactory[] aggregators,
......@@ -150,15 +151,19 @@ public class IndexTask extends AbstractFixedIntervalTask
{
super(
// _not_ the version, just something uniqueish
makeId(id, ingestionSchema, dataSource),
makeDataSource(ingestionSchema, dataSource),
makeInterval(ingestionSchema, granularitySpec)
makeId(id, ingestionSpec, dataSource),
makeDataSource(ingestionSpec, dataSource),
makeInterval(ingestionSpec, granularitySpec)
);
if (ingestionSchema != null) {
this.ingestionSchema = ingestionSchema;
if (ingestionSpec != null || schema != null) {
if (ingestionSpec != null) {
this.ingestionSpec = ingestionSpec;
} else {
this.ingestionSpec = schema;
}
} else { // Backwards Compatible
this.ingestionSchema = new IndexIngestionSpec(
this.ingestionSpec = new IndexIngestionSpec(
new DataSchema(
dataSource,
firehoseFactory.getParser(),
......@@ -178,17 +183,17 @@ public class IndexTask extends AbstractFixedIntervalTask
return "index";
}
@JsonProperty("schema")
public IndexIngestionSpec getIngestionSchema()
@JsonProperty("spec")
public IndexIngestionSpec getIngestionSpec()
{
return ingestionSchema;
return ingestionSpec;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final int targetPartitionSize = ingestionSchema.getTuningConfig().getTargetPartitionSize();
final GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec();
final int targetPartitionSize = ingestionSpec.getTuningConfig().getTargetPartitionSize();
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
......@@ -203,7 +208,7 @@ public class IndexTask extends AbstractFixedIntervalTask
if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity());
} else {
int numShards = ingestionSchema.getTuningConfig().getNumShards();
int numShards = ingestionSpec.getTuningConfig().getNumShards();
if (numShards > 0) {
shardSpecs = Lists.newArrayList();
for (int i = 0; i < numShards; i++) {
......@@ -216,7 +221,7 @@ public class IndexTask extends AbstractFixedIntervalTask
for (final ShardSpec shardSpec : shardSpecs) {
final DataSegment segment = generateSegment(
toolbox,
ingestionSchema.getDataSchema(),
ingestionSpec.getDataSchema(),
shardSpec,
bucket,
myLock.getVersion()
......@@ -230,11 +235,11 @@ public class IndexTask extends AbstractFixedIntervalTask
private SortedSet<Interval> getDataIntervals() throws IOException
{
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final FirehoseFactory firehoseFactory = ingestionSpec.getIOConfig().getFirehoseFactory();
final GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec();
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
try (Firehose firehose = firehoseFactory.connect(ingestionSpec.getDataSchema().getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getSegmentGranularity()
......@@ -254,14 +259,14 @@ public class IndexTask extends AbstractFixedIntervalTask
{
log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final FirehoseFactory firehoseFactory = ingestionSpec.getIOConfig().getFirehoseFactory();
// The implementation of this determine partitions stuff is less than optimal. Should be done better.
// Use HLL to estimate number of rows
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
// Load data
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
try (Firehose firehose = firehoseFactory.connect(ingestionSpec.getDataSchema().getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (interval.contains(inputRow.getTimestampFromEpoch())) {
......@@ -327,8 +332,8 @@ public class IndexTask extends AbstractFixedIntervalTask
)
);
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final int rowFlushBoundary = ingestionSchema.getTuningConfig().getRowFlushBoundary();
final FirehoseFactory firehoseFactory = ingestionSpec.getIOConfig().getFirehoseFactory();
final int rowFlushBoundary = ingestionSpec.getTuningConfig().getRowFlushBoundary();
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
......@@ -351,7 +356,7 @@ public class IndexTask extends AbstractFixedIntervalTask
// Create firehose + plumber
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser());
final Firehose firehose = firehoseFactory.connect(ingestionSpec.getDataSchema().getParser());
final Plumber plumber = new YeOldePlumberSchool(
interval,
version,
......@@ -363,7 +368,7 @@ public class IndexTask extends AbstractFixedIntervalTask
final int myRowFlushBoundary = rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity();
final QueryGranularity rollupGran = ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity();
try {
plumber.startJob();
......
......@@ -110,7 +110,7 @@ public class IndexTaskTest
null
)
),
null, null, null, null, 0, null, 0,
null, null, null, null, null, 0, null, 0,
new DefaultObjectMapper()
);
......
......@@ -55,6 +55,7 @@ public class TaskSerdeTest
public void testIndexTaskSerde() throws Exception
{
final IndexTask task = new IndexTask(
null,
null,
null,
"foo",
......@@ -89,8 +90,8 @@ public class TaskSerdeTest
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task.getIngestionSpec().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getIngestionSpec().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
}
@Test
......
......@@ -229,6 +229,7 @@ public class TaskLifecycleTest
public void testIndexTask() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
null,
"foo",
......@@ -290,6 +291,7 @@ public class TaskLifecycleTest
public void testIndexTaskFailure() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
null,
"foo",
......
......@@ -23,6 +23,7 @@ import com.google.inject.Injector;
import io.airlift.command.Cli;
import io.airlift.command.Help;
import io.airlift.command.ParseException;
import io.druid.cli.convert.ConvertIngestionSpec;
import io.druid.cli.convert.ConvertProperties;
import io.druid.cli.validate.DruidJsonValidator;
import io.druid.guice.ExtensionsConfig;
......@@ -61,7 +62,8 @@ public class Main
builder.withGroup("tools")
.withDescription("Various tools for working with Druid")
.withDefaultCommand(Help.class)
.withCommands(ConvertProperties.class, DruidJsonValidator.class, PullDependencies.class);
.withCommands(ConvertProperties.class, DruidJsonValidator.class, PullDependencies.class,
ConvertIngestionSpec.class);
builder.withGroup("index")
.withDescription("Run indexing for druid")
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli.convert;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexing.common.task.HadoopIndexTask;
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.realtime.FireDepartment;
import java.io.File;
/**
*/
@Command(
name = "convertSpec",
description = "Converts the old Druid ingestion spec to the new version"
)
public class ConvertIngestionSpec implements Runnable
{
@Option(name = "-o", title = "old ingestion file", description = "file with old ingestion spec", required = true)
public String oldFile;
@Option(name = "-n", title = "new ingestion file", description = "file with new ingestion spec", required = true)
public String newFile;
@Option(name = "-t", title = "type", description = "the type of ingestion spec to convert. types[standalone_realtime, cli_hadoop, index_realtime, index_hadoop, index]", required = true)
public String type;
@Override
public void run()
{
File file = new File(oldFile);
if (!file.exists()) {
System.out.printf("File[%s] does not exist.%n", file);
}
final ObjectMapper jsonMapper = new DefaultObjectMapper();
try {
String converterType = jsonMapper.writeValueAsString(ImmutableMap.of("type", type));
IngestionSchemaConverter val = jsonMapper.readValue(converterType, IngestionSchemaConverter.class);
jsonMapper.writerWithDefaultPrettyPrinter().writeValue(new File(newFile), val.convert(jsonMapper, file));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "standalone_realtime", value = StandaloneRealtimeIngestionSchemaConverter.class),
@JsonSubTypes.Type(name = "cli_hadoop", value = CliHadoopIngestionSchemaConverter.class),
@JsonSubTypes.Type(name = "index_realtime", value = IndexRealtimeIngestionSchemaConverter.class),
@JsonSubTypes.Type(name = "index_hadoop", value = IndexHadoopIngestionSchemaConverter.class),
@JsonSubTypes.Type(name = "index", value = IndexIngestionSchemaConverter.class),
})
private static interface IngestionSchemaConverter<T>
{
public T convert(ObjectMapper jsonMapper, File oldFile) throws Exception;
}
private static class StandaloneRealtimeIngestionSchemaConverter implements IngestionSchemaConverter<FireDepartment>
{
@Override
public FireDepartment convert(ObjectMapper jsonMapper, File oldFile) throws Exception
{
return jsonMapper.readValue(oldFile, FireDepartment.class);
}
}
private static class CliHadoopIngestionSchemaConverter implements IngestionSchemaConverter<HadoopDruidIndexerConfig>
{
@Override
public HadoopDruidIndexerConfig convert(ObjectMapper jsonMapper, File oldFile) throws Exception
{
return new HadoopDruidIndexerConfig(
jsonMapper.readValue(oldFile, HadoopIngestionSpec.class),
null
);
}
}
private static class IndexRealtimeIngestionSchemaConverter implements IngestionSchemaConverter<RealtimeIndexTask>
{
@Override
public RealtimeIndexTask convert(ObjectMapper jsonMapper, File oldFile) throws Exception
{
return jsonMapper.readValue(oldFile, RealtimeIndexTask.class);
}
}
private static class IndexHadoopIngestionSchemaConverter implements IngestionSchemaConverter<HadoopIndexTask>
{
@Override
public HadoopIndexTask convert(ObjectMapper jsonMapper, File oldFile) throws Exception
{
return jsonMapper.readValue(oldFile, HadoopIndexTask.class);
}
}
private static class IndexIngestionSchemaConverter implements IngestionSchemaConverter<IndexTask>
{
@Override
public IndexTask convert(ObjectMapper jsonMapper, File oldFile) throws Exception
{
return jsonMapper.readValue(oldFile, IndexTask.class);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册