From e2703627676366655001900091282c83d31315a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20G=C3=B3mez?= Date: Wed, 1 Aug 2018 19:52:54 +0200 Subject: [PATCH] Add stringLast and stringFirst aggregators extension (#5789) * Add lastString and firstString aggregators extension * Remove duplicated class * Move first-last-string doc page to extensions-contrib * Fix ObjectStrategy compare method * Fix doc bad aggregatos type name * Create FoldingAggregatorFactory classes to fix SegmentMetadataQuery * Add getMaxStringBytes() method to support JSON serialization * Fix null pointer exception at segment creation phase when the string value is null * Control the valueSelector object class on BufferAggregators * Perform all improvements * Add java doc on SerializablePairLongStringSerde * Refactor ObjectStraty compare method * Remove unused ; * Add aggregateCombiner unit tests. Rename BufferAggregators unit tests * Remove unused imports * Add license header * Add class name to java doc class serde * Throw exception if value is unsupported class type * Move first-last-string extension into druid core * Update druid core docs * Fix null pointer exception when pair->string is null * Add null control unit tests * Remove unused imports * Add first/last string folding aggregator on AggregatorsModule to support segment metadata query * Change SerializablePairLongString to extend SerializablePair * Change vars from public to private * Convert vars to primitive type * Clarify compare comment * Change IllegalStateException to ISE * Remove TODO comments * Control possible null pointer exception * Add @Nullable annotation * Remove empty line * Remove unused parameter type * Improve AggregatorCombiner javadocs * Add filterNullValues option at StringLast and StringFirst aggregators * Add filterNullValues option at agg documentation * Fix checkstyle * Update header license * Fix StringFirstAggregatorFactory.VALUE_COMPARATOR * Fix StringFirstAggregatorCombiner * Fix if condition at StringFirstAggregateCombiner * Remove filterNullValues from string first/last aggregators * Add isReset flag in FirstAggregatorCombiner * Change Arrays.asList to Collections.singletonList --- docs/content/querying/aggregations.md | 32 ++- .../io/druid/jackson/AggregatorsModule.java | 20 +- .../query/aggregation/AggregatorUtil.java | 4 + .../SerializablePairLongString.java | 35 +++ .../SerializablePairLongStringSerde.java | 146 +++++++++++ .../first/LongFirstAggregatorFactory.java | 2 +- .../first/StringFirstAggregateCombiner.java | 60 +++++ .../first/StringFirstAggregator.java | 110 ++++++++ .../first/StringFirstAggregatorFactory.java | 248 ++++++++++++++++++ .../first/StringFirstBufferAggregator.java | 157 +++++++++++ .../StringFirstFoldingAggregatorFactory.java | 105 ++++++++ .../last/LongLastAggregatorFactory.java | 2 +- .../last/StringLastAggregateCombiner.java | 55 ++++ .../last/StringLastAggregator.java | 110 ++++++++ .../last/StringLastAggregatorFactory.java | 207 +++++++++++++++ .../last/StringLastBufferAggregator.java | 157 +++++++++++ .../StringLastFoldingAggregatorFactory.java | 102 +++++++ .../first/StringFirstAggregationTest.java | 194 ++++++++++++++ .../StringFirstBufferAggregatorTest.java | 171 ++++++++++++ .../first/StringFirstTimeseriesQueryTest.java | 123 +++++++++ .../last/StringLastAggregationTest.java | 194 ++++++++++++++ .../last/StringLastBufferAggregatorTest.java | 171 ++++++++++++ .../last/StringLastTimeseriesQueryTest.java | 126 +++++++++ 23 files changed, 2526 insertions(+), 5 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java diff --git a/docs/content/querying/aggregations.md b/docs/content/querying/aggregations.md index b0ce5cc24c..3f6b5e7c09 100644 --- a/docs/content/querying/aggregations.md +++ b/docs/content/querying/aggregations.md @@ -102,7 +102,7 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to ### First / Last aggregator -First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries. +(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries. Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data. @@ -178,6 +178,36 @@ Note that queries with first/last aggregators on a segment created with rollup e } ``` +#### `stringFirst` aggregator + +`stringFirst` computes the metric value with the minimum timestamp or `null` if no row exist + +```json +{ + "type" : "stringFirst", + "name" : , + "fieldName" : , + "maxStringBytes" : # (optional, defaults to 1024), + "filterNullValues" : # (optional, defaults to false) +} +``` + + + +#### `stringLast` aggregator + +`stringLast` computes the metric value with the maximum timestamp or `null` if no row exist + +```json +{ + "type" : "stringLast", + "name" : , + "fieldName" : , + "maxStringBytes" : # (optional, defaults to 1024), + "filterNullValues" : # (optional, defaults to false) +} +``` + ### JavaScript aggregator Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java index 94deda097a..d3cc9a7365 100644 --- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java @@ -38,10 +38,13 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory; import io.druid.query.aggregation.LongMinAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.SerializablePairLongStringSerde; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.first.FloatFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; +import io.druid.query.aggregation.first.StringFirstAggregatorFactory; +import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; @@ -49,6 +52,8 @@ import io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde; import io.druid.query.aggregation.last.DoubleLastAggregatorFactory; import io.druid.query.aggregation.last.FloatLastAggregatorFactory; import io.druid.query.aggregation.last.LongLastAggregatorFactory; +import io.druid.query.aggregation.last.StringLastAggregatorFactory; +import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.DoubleGreatestPostAggregator; @@ -74,7 +79,14 @@ public class AggregatorsModule extends SimpleModule } if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) { - ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())); + ComplexMetrics.registerSerde( + "preComputedHyperUnique", + new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()) + ); + } + + if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) { + ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde()); } setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); @@ -101,9 +113,13 @@ public class AggregatorsModule extends SimpleModule @JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class), @JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class), @JsonSubTypes.Type(name = "floatFirst", value = FloatFirstAggregatorFactory.class), + @JsonSubTypes.Type(name = "stringFirst", value = StringFirstAggregatorFactory.class), + @JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class), @JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class), @JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class), - @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class) + @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class), + @JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class), + @JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class) }) public interface AggregatorFactoryMixin { diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java index 610e6ba663..eedd09580d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java @@ -94,6 +94,10 @@ public class AggregatorUtil public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29; public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x2A; + // StringFirst, StringLast aggregator + public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B; + public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C; + /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * diff --git a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java new file mode 100644 index 0000000000..91f9b2622a --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.collections.SerializablePair; + +public class SerializablePairLongString extends SerializablePair +{ + @JsonCreator + public SerializablePairLongString(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") String rhs) + { + super(lhs, rhs); + } +} + + diff --git a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java new file mode 100644 index 0000000000..ca245fa133 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.data.input.InputRow; +import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.first.StringFirstAggregatorFactory; +import io.druid.segment.GenericColumnSerializer; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.serde.ComplexColumnPartSupplier; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; +import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import io.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString). + * The serialization structure is: Long:Integer:String + *

+ * The class is used on first/last String aggregators to store the time and the first/last string. + * Long:Integer:String -> Timestamp:StringSize:StringData + */ +public class SerializablePairLongStringSerde extends ComplexMetricSerde +{ + + private static final String TYPE_NAME = "serializablePairLongString"; + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return SerializablePairLongString.class; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + return inputRow.getRaw(metricName); + } + }; + } + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) + { + final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); + columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2) + { + return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongString.class; + } + + @Override + public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + + long lhs = readOnlyBuffer.getLong(); + int stringSize = readOnlyBuffer.getInt(); + + String lastString = null; + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + + @Override + public byte[] toBytes(SerializablePairLongString val) + { + String rhsString = val.rhs; + ByteBuffer bbuf; + + if (rhsString != null) { + byte[] rhsBytes = StringUtils.toUtf8(rhsString); + bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length); + bbuf.putLong(val.lhs); + bbuf.putInt(Long.BYTES, rhsBytes.length); + bbuf.position(Long.BYTES + Integer.BYTES); + bbuf.put(rhsBytes); + } else { + bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + bbuf.putLong(val.lhs); + bbuf.putInt(Long.BYTES, 0); + } + + return bbuf.array(); + } + }; + } + + @Override + public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) + { + return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 56d8aede8a..32b575f455 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; -import io.druid.java.util.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.UOE; import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java new file mode 100644 index 0000000000..20487f6590 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.query.aggregation.ObjectAggregateCombiner; +import io.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class StringFirstAggregateCombiner extends ObjectAggregateCombiner +{ + private String firstString; + private boolean isReset = false; + + @Override + public void reset(ColumnValueSelector selector) + { + firstString = (String) selector.getObject(); + isReset = true; + } + + @Override + public void fold(ColumnValueSelector selector) + { + if (!isReset) { + firstString = (String) selector.getObject(); + isReset = true; + } + } + + @Nullable + @Override + public String getObject() + { + return firstString; + } + + @Override + public Class classOfObject() + { + return String.class; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java new file mode 100644 index 0000000000..5710a61072 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.java.util.common.ISE; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.segment.BaseLongColumnValueSelector; +import io.druid.segment.BaseObjectColumnValueSelector; + +public class StringFirstAggregator implements Aggregator +{ + + private final BaseObjectColumnValueSelector valueSelector; + private final BaseLongColumnValueSelector timeSelector; + private final int maxStringBytes; + + protected long firstTime; + protected String firstValue; + + public StringFirstAggregator( + BaseLongColumnValueSelector timeSelector, + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes + ) + { + this.valueSelector = valueSelector; + this.timeSelector = timeSelector; + this.maxStringBytes = maxStringBytes; + + firstTime = Long.MAX_VALUE; + firstValue = null; + } + + @Override + public void aggregate() + { + long time = timeSelector.getLong(); + if (time < firstTime) { + firstTime = time; + Object value = valueSelector.getObject(); + + if (value != null) { + if (value instanceof String) { + firstValue = (String) value; + } else if (value instanceof SerializablePairLongString) { + firstValue = ((SerializablePairLongString) value).rhs; + } else { + throw new ISE( + "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", + value.getClass().getCanonicalName() + ); + } + + if (firstValue != null && firstValue.length() > maxStringBytes) { + firstValue = firstValue.substring(0, maxStringBytes); + } + } else { + firstValue = null; + } + } + } + + @Override + public Object get() + { + return new SerializablePairLongString(firstTime, firstValue); + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); + } + + @Override + public double getDouble() + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java new file mode 100644 index 0000000000..187e891548 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import io.druid.query.aggregation.AggregateCombiner; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.cache.CacheKeyBuilder; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@JsonTypeName("stringFirst") +public class StringFirstAggregatorFactory extends AggregatorFactory +{ + public static final int DEFAULT_MAX_STRING_SIZE = 1024; + + public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare( + ((SerializablePairLongString) o1).lhs, + ((SerializablePairLongString) o2).lhs + ); + + public static final Comparator VALUE_COMPARATOR = (o1, o2) -> { + int comparation; + + // First we check if the objects are null + if (o1 == null && o2 == null) { + comparation = 0; + } else if (o1 == null) { + comparation = -1; + } else if (o2 == null) { + comparation = 1; + } else { + + // If the objects are not null, we will try to compare using timestamp + comparation = o1.lhs.compareTo(o2.lhs); + + // If both timestamp are the same, we try to compare the Strings + if (comparation == 0) { + + // First we check if the strings are null + if (o1.rhs == null && o2.rhs == null) { + comparation = 0; + } else if (o1.rhs == null) { + comparation = -1; + } else if (o2.rhs == null) { + comparation = 1; + } else { + + // If the strings are not null, we will compare them + // Note: This comparation maybe doesn't make sense to first/last aggregators + comparation = o1.rhs.compareTo(o2.rhs); + } + } + } + + return comparation; + }; + + private final String fieldName; + private final String name; + protected final int maxStringBytes; + + @JsonCreator + public StringFirstAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("maxStringBytes") Integer maxStringBytes + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; + this.fieldName = fieldName; + this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE : maxStringBytes; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new StringFirstAggregator( + metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeColumnValueSelector(fieldName), + maxStringBytes + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new StringFirstBufferAggregator( + metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeColumnValueSelector(fieldName), + maxStringBytes + ); + } + + @Override + public Comparator getComparator() + { + return VALUE_COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new StringFirstAggregateCombiner(); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes); + } + + @Override + public List getRequiredColumns() + { + return Collections.singletonList(new StringFirstAggregatorFactory(fieldName, fieldName, maxStringBytes)); + } + + @Override + public Object deserialize(Object object) + { + Map map = (Map) object; + return new SerializablePairLongString(((Number) map.get("lhs")).longValue(), ((String) map.get("rhs"))); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((SerializablePairLongString) object).rhs; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public Integer getMaxStringBytes() + { + return maxStringBytes; + } + + @Override + public List requiredFields() + { + return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.STRING_FIRST_CACHE_TYPE_ID) + .appendString(fieldName) + .appendInt(maxStringBytes) + .build(); + } + + @Override + public String getTypeName() + { + return "serializablePairLongString"; + } + + @Override + public int getMaxIntermediateSize() + { + return Long.BYTES + Integer.BYTES + maxStringBytes; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o; + + return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes; + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName, maxStringBytes); + } + + @Override + public String toString() + { + return "StringFirstAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", maxStringBytes=" + maxStringBytes + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java new file mode 100644 index 0000000000..c71cfbfc2d --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.BaseLongColumnValueSelector; +import io.druid.segment.BaseObjectColumnValueSelector; + +import java.nio.ByteBuffer; + +public class StringFirstBufferAggregator implements BufferAggregator +{ + private final BaseLongColumnValueSelector timeSelector; + private final BaseObjectColumnValueSelector valueSelector; + private final int maxStringBytes; + + public StringFirstBufferAggregator( + BaseLongColumnValueSelector timeSelector, + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes + ) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + this.maxStringBytes = maxStringBytes; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.putInt(position + Long.BYTES, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + Object value = valueSelector.getObject(); + + long time = timeSelector.getLong(); + String firstString = null; + + if (value != null) { + if (value instanceof SerializablePairLongString) { + SerializablePairLongString serializablePair = (SerializablePairLongString) value; + time = serializablePair.lhs; + firstString = serializablePair.rhs; + } else if (value instanceof String) { + firstString = (String) value; + } else { + throw new ISE( + "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", + value.getClass().getCanonicalName() + ); + } + } + + long lastTime = mutationBuffer.getLong(position); + + if (time < lastTime) { + if (firstString != null) { + if (firstString.length() > maxStringBytes) { + firstString = firstString.substring(0, maxStringBytes); + } + + byte[] valueBytes = StringUtils.toUtf8(firstString); + + mutationBuffer.putLong(position, time); + mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); + + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.put(valueBytes); + } else { + mutationBuffer.putLong(position, time); + mutationBuffer.putInt(position + Long.BYTES, 0); + } + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + Long timeValue = mutationBuffer.getLong(position); + int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES); + + SerializablePairLongString serializablePair; + + if (stringSizeBytes > 0) { + byte[] valueBytes = new byte[stringSizeBytes]; + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.get(valueBytes, 0, stringSizeBytes); + serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); + } else { + serializablePair = new SerializablePairLongString(timeValue, null); + } + + return serializablePair; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); + } + + @Override + public void close() + { + // no resources to cleanup + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("timeSelector", timeSelector); + inspector.visit("valueSelector", valueSelector); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java new file mode 100644 index 0000000000..b268bafa5a --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.BaseObjectColumnValueSelector; +import io.druid.segment.ColumnSelectorFactory; + +import java.nio.ByteBuffer; + +@JsonTypeName("stringFirstFold") +public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory +{ + public StringFirstFoldingAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("maxStringBytes") Integer maxStringBytes + ) + { + super(name, fieldName, maxStringBytes); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName()); + return new StringFirstAggregator(null, null, maxStringBytes) + { + @Override + public void aggregate() + { + SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); + if (pair != null && pair.lhs < firstTime) { + firstTime = pair.lhs; + firstValue = pair.rhs; + } + } + }; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName()); + return new StringFirstBufferAggregator(null, null, maxStringBytes) + { + @Override + public void aggregate(ByteBuffer buf, int position) + { + SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); + + if (pair != null && pair.lhs != null) { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + long lastTime = mutationBuffer.getLong(position); + + if (pair.lhs < lastTime) { + mutationBuffer.putLong(position, pair.lhs); + + if (pair.rhs != null) { + byte[] valueBytes = StringUtils.toUtf8(pair.rhs); + + mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.put(valueBytes); + } else { + mutationBuffer.putInt(position + Long.BYTES, 0); + } + } + } + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } + }; + } + +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java index ff33195038..dc186a9f61 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -22,8 +22,8 @@ package io.druid.query.aggregation.last; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import io.druid.java.util.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.UOE; import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java new file mode 100644 index 0000000000..6625f084e7 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.query.aggregation.ObjectAggregateCombiner; +import io.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class StringLastAggregateCombiner extends ObjectAggregateCombiner +{ + private String lastString; + + @Override + public void reset(ColumnValueSelector selector) + { + lastString = (String) selector.getObject(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + lastString = (String) selector.getObject(); + } + + @Nullable + @Override + public String getObject() + { + return lastString; + } + + @Override + public Class classOfObject() + { + return String.class; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java new file mode 100644 index 0000000000..85cd0dddb3 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.java.util.common.ISE; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.segment.BaseLongColumnValueSelector; +import io.druid.segment.BaseObjectColumnValueSelector; + +public class StringLastAggregator implements Aggregator +{ + + private final BaseObjectColumnValueSelector valueSelector; + private final BaseLongColumnValueSelector timeSelector; + private final int maxStringBytes; + + protected long lastTime; + protected String lastValue; + + public StringLastAggregator( + BaseLongColumnValueSelector timeSelector, + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes + ) + { + this.valueSelector = valueSelector; + this.timeSelector = timeSelector; + this.maxStringBytes = maxStringBytes; + + lastTime = Long.MIN_VALUE; + lastValue = null; + } + + @Override + public void aggregate() + { + long time = timeSelector.getLong(); + if (time >= lastTime) { + lastTime = time; + Object value = valueSelector.getObject(); + + if (value != null) { + if (value instanceof String) { + lastValue = (String) value; + } else if (value instanceof SerializablePairLongString) { + lastValue = ((SerializablePairLongString) value).rhs; + } else { + throw new ISE( + "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", + value.getClass().getCanonicalName() + ); + } + + if (lastValue != null && lastValue.length() > maxStringBytes) { + lastValue = lastValue.substring(0, maxStringBytes); + } + } else { + lastValue = null; + } + } + } + + @Override + public Object get() + { + return new SerializablePairLongString(lastTime, lastValue); + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); + } + + @Override + public double getDouble() + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java new file mode 100644 index 0000000000..cb4f363661 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import io.druid.query.aggregation.AggregateCombiner; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.aggregation.first.StringFirstAggregatorFactory; +import io.druid.query.cache.CacheKeyBuilder; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + + +@JsonTypeName("stringLast") +public class StringLastAggregatorFactory extends AggregatorFactory +{ + private final String fieldName; + private final String name; + protected final int maxStringBytes; + + @JsonCreator + public StringLastAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("maxStringBytes") Integer maxStringBytes + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; + this.fieldName = fieldName; + this.maxStringBytes = maxStringBytes == null + ? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE + : maxStringBytes; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new StringLastAggregator( + metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeColumnValueSelector(fieldName), + maxStringBytes + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new StringLastBufferAggregator( + metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeColumnValueSelector(fieldName), + maxStringBytes + ); + } + + @Override + public Comparator getComparator() + { + return StringFirstAggregatorFactory.VALUE_COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new StringLastAggregateCombiner(); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes); + } + + @Override + public List getRequiredColumns() + { + return Collections.singletonList(new StringLastAggregatorFactory(fieldName, fieldName, maxStringBytes)); + } + + @Override + public Object deserialize(Object object) + { + Map map = (Map) object; + return new SerializablePairLongString(((Number) map.get("lhs")).longValue(), ((String) map.get("rhs"))); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((SerializablePairLongString) object).rhs; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public Integer getMaxStringBytes() + { + return maxStringBytes; + } + + @Override + public List requiredFields() + { + return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.STRING_LAST_CACHE_TYPE_ID) + .appendString(fieldName) + .appendInt(maxStringBytes) + .build(); + } + + @Override + public String getTypeName() + { + return "serializablePairLongString"; + } + + @Override + public int getMaxIntermediateSize() + { + return Long.BYTES + Integer.BYTES + maxStringBytes; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StringLastAggregatorFactory that = (StringLastAggregatorFactory) o; + + return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes; + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName, maxStringBytes); + } + + @Override + public String toString() + { + return "StringFirstAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", maxStringBytes=" + maxStringBytes + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java new file mode 100644 index 0000000000..12c99483a6 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.BaseLongColumnValueSelector; +import io.druid.segment.BaseObjectColumnValueSelector; + +import java.nio.ByteBuffer; + +public class StringLastBufferAggregator implements BufferAggregator +{ + private final BaseLongColumnValueSelector timeSelector; + private final BaseObjectColumnValueSelector valueSelector; + private final int maxStringBytes; + + public StringLastBufferAggregator( + BaseLongColumnValueSelector timeSelector, + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes + ) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + this.maxStringBytes = maxStringBytes; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MIN_VALUE); + buf.putInt(position + Long.BYTES, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + Object value = valueSelector.getObject(); + + long time = timeSelector.getLong(); + String lastString = null; + + if (value != null) { + if (value instanceof SerializablePairLongString) { + SerializablePairLongString serializablePair = (SerializablePairLongString) value; + time = serializablePair.lhs; + lastString = serializablePair.rhs; + } else if (value instanceof String) { + lastString = (String) value; + } else { + throw new ISE( + "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", + value.getClass().getCanonicalName() + ); + } + } + + long lastTime = mutationBuffer.getLong(position); + + if (time >= lastTime) { + if (lastString != null) { + if (lastString.length() > maxStringBytes) { + lastString = lastString.substring(0, maxStringBytes); + } + + byte[] valueBytes = StringUtils.toUtf8(lastString); + + mutationBuffer.putLong(position, time); + mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); + + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.put(valueBytes); + } else { + mutationBuffer.putLong(position, time); + mutationBuffer.putInt(position + Long.BYTES, 0); + } + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + Long timeValue = mutationBuffer.getLong(position); + Integer stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES); + + SerializablePairLongString serializablePair; + + if (stringSizeBytes > 0) { + byte[] valueBytes = new byte[stringSizeBytes]; + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.get(valueBytes, 0, stringSizeBytes); + serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); + } else { + serializablePair = new SerializablePairLongString(timeValue, null); + } + + return serializablePair; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); + } + + @Override + public void close() + { + // no resources to cleanup + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("timeSelector", timeSelector); + inspector.visit("valueSelector", valueSelector); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java new file mode 100644 index 0000000000..9bd6a64488 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.BaseObjectColumnValueSelector; +import io.druid.segment.ColumnSelectorFactory; + +import java.nio.ByteBuffer; + +@JsonTypeName("stringLastFold") +public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory +{ + public StringLastFoldingAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("maxStringBytes") Integer maxStringBytes + ) + { + super(name, fieldName, maxStringBytes); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName()); + return new StringLastAggregator(null, null, maxStringBytes) + { + @Override + public void aggregate() + { + SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); + if (pair != null && pair.lhs >= lastTime) { + lastTime = pair.lhs; + lastValue = pair.rhs; + } + } + }; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName()); + return new StringLastBufferAggregator(null, null, maxStringBytes) + { + @Override + public void aggregate(ByteBuffer buf, int position) + { + SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); + if (pair != null && pair.lhs != null) { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + long lastTime = mutationBuffer.getLong(position); + + if (pair.lhs >= lastTime) { + mutationBuffer.putLong(position, pair.lhs); + if (pair.rhs != null) { + byte[] valueBytes = StringUtils.toUtf8(pair.rhs); + + mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.put(valueBytes); + } else { + mutationBuffer.putInt(position + Long.BYTES, 0); + } + } + } + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } + }; + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java new file mode 100644 index 0000000000..8f523c02ee --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.java.util.common.Pair; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class StringFirstAggregationTest +{ + private final Integer MAX_STRING_SIZE = 1024; + private StringFirstAggregatorFactory stringLastAggFactory; + private StringFirstAggregatorFactory combiningAggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestLongColumnSelector timeSelector; + private TestObjectColumnSelector valueSelector; + private TestObjectColumnSelector objectSelector; + + private String[] strings = {"1111", "2222", "3333", null, "4444"}; + private long[] times = {8224, 6879, 2436, 3546, 7888}; + private SerializablePairLongString[] pairs = { + new SerializablePairLongString(52782L, "AAAA"), + new SerializablePairLongString(65492L, "BBBB"), + new SerializablePairLongString(69134L, "CCCC"), + new SerializablePairLongString(11111L, "DDDD"), + new SerializablePairLongString(51223L, null) + }; + + @Before + public void setup() + { + stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); + combiningAggFactory = (StringFirstAggregatorFactory) stringLastAggFactory.getCombiningFactory(); + timeSelector = new TestLongColumnSelector(times); + valueSelector = new TestObjectColumnSelector<>(strings); + objectSelector = new TestObjectColumnSelector<>(pairs); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testStringLastAggregator() + { + StringFirstAggregator agg = (StringFirstAggregator) stringLastAggFactory.factorize(colSelectorFactory); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + + Assert.assertEquals(strings[2], result.rhs); + } + + @Test + public void testStringLastBufferAggregator() + { + StringFirstBufferAggregator agg = (StringFirstBufferAggregator) stringLastAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + + Assert.assertEquals(strings[2], result.rhs); + } + + @Test + public void testCombine() + { + SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA"); + SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB"); + Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2)); + } + + @Test + public void testStringLastCombiningAggregator() + { + StringFirstAggregator agg = (StringFirstAggregator) combiningAggFactory.factorize(colSelectorFactory); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + Pair expected = pairs[3]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + } + + @Test + public void testStringFirstCombiningBufferAggregator() + { + StringFirstBufferAggregator agg = (StringFirstBufferAggregator) combiningAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + Pair expected = pairs[3]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + } + + @Test + public void testStringFirstAggregateCombiner() + { + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings); + + StringFirstAggregateCombiner stringFirstAggregateCombiner = + (StringFirstAggregateCombiner) combiningAggFactory.makeAggregateCombiner(); + + stringFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + stringFirstAggregateCombiner.fold(columnSelector); + + Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject()); + + stringFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject()); + } + + private void aggregate( + StringFirstAggregator agg + ) + { + agg.aggregate(); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } + + private void aggregate( + StringFirstBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java new file mode 100644 index 0000000000..8a4a0de986 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class StringFirstBufferAggregatorTest +{ + private void aggregateBuffer( + TestLongColumnSelector timeSelector, + TestObjectColumnSelector valueSelector, + BufferAggregator agg, + ByteBuffer buf, + int position + ) + { + agg.aggregate(buf, position); + timeSelector.increment(); + valueSelector.increment(); + } + + @Test + public void testBufferAggregate() throws Exception + { + + final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringFirstBufferAggregator agg = new StringFirstBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes + ); + + String testString = "ZZZZ"; + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + buf.putLong(1526728500L); + buf.putInt(testString.length()); + buf.put(testString.getBytes(StandardCharsets.UTF_8)); + + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expectec last string value", strings[0], sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); + + } + + @Test + public void testNullBufferAggregate() throws Exception + { + + final long[] timestamps = {2222L, 1111L, 3333L, 4444L, 5555L}; + final String[] strings = {null, "AAAA", "BBBB", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringFirstBufferAggregator agg = new StringFirstBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes + ); + + String testString = "ZZZZ"; + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + buf.putLong(1526728500L); + buf.putInt(testString.length()); + buf.put(testString.getBytes(StandardCharsets.UTF_8)); + + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expectec last string value", strings[1], sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs)); + + } + + @Test(expected = IllegalStateException.class) + public void testNoStringValue() + { + + final long[] timestamps = {1526724000L, 1526724600L}; + final Double[] doubles = {null, 2.00}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(doubles); + + StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringFirstBufferAggregator agg = new StringFirstBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes + ); + + String testString = "ZZZZ"; + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + buf.putLong(1526728500L); + buf.putInt(testString.length()); + buf.put(testString.getBytes(StandardCharsets.UTF_8)); + + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java new file mode 100644 index 0000000000..bac9a6dfcb --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.data.input.MapBasedInputRow; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.Druids; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.TestHelper; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.joda.time.DateTime; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class StringFirstTimeseriesQueryTest +{ + + @Test + public void testTopNWithDistinctCountAgg() throws Exception + { + TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + + String visitor_id = "visitor_id"; + String client_type = "client_type"; + + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.SECOND) + .withMetrics(new CountAggregatorFactory("cnt")) + .withMetrics(new StringFirstAggregatorFactory( + "last_client_type", "client_type", 1024) + ) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); + + + DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); + long timestamp = time.getMillis(); + + DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z"); + long timestamp1 = time1.getMillis(); + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList(visitor_id, client_type), + ImmutableMap.of(visitor_id, "0", client_type, "iphone") + ) + ); + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList(visitor_id, client_type), + ImmutableMap.of(visitor_id, "1", client_type, "iphone") + ) + ); + index.add( + new MapBasedInputRow( + timestamp1, + Lists.newArrayList(visitor_id, client_type), + ImmutableMap.of(visitor_id, "0", client_type, "android") + ) + ); + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + new StringFirstAggregatorFactory( + "last_client_type", client_type, 1024 + ) + ) + ) + .build(); + + final Iterable> results = + engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); + + List> expectedResults = Collections.singletonList( + new Result<>( + time, + new TimeseriesResultValue( + ImmutableMap.of("last_client_type", new SerializablePairLongString(timestamp, "iphone")) + ) + ) + ); + TestHelper.assertExpectedResults(expectedResults, results); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java new file mode 100644 index 0000000000..1f2ecc4815 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.java.util.common.Pair; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class StringLastAggregationTest +{ + private final Integer MAX_STRING_SIZE = 1024; + private StringLastAggregatorFactory stringLastAggFactory; + private StringLastAggregatorFactory combiningAggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestLongColumnSelector timeSelector; + private TestObjectColumnSelector valueSelector; + private TestObjectColumnSelector objectSelector; + + private String[] strings = {"1111", "2222", "3333", null, "4444"}; + private long[] times = {8224, 6879, 2436, 3546, 7888}; + private SerializablePairLongString[] pairs = { + new SerializablePairLongString(52782L, "AAAA"), + new SerializablePairLongString(65492L, "BBBB"), + new SerializablePairLongString(69134L, "CCCC"), + new SerializablePairLongString(11111L, "DDDD"), + new SerializablePairLongString(51223L, null) + }; + + @Before + public void setup() + { + stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); + combiningAggFactory = (StringLastAggregatorFactory) stringLastAggFactory.getCombiningFactory(); + timeSelector = new TestLongColumnSelector(times); + valueSelector = new TestObjectColumnSelector<>(strings); + objectSelector = new TestObjectColumnSelector<>(pairs); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testStringLastAggregator() + { + StringLastAggregator agg = (StringLastAggregator) stringLastAggFactory.factorize(colSelectorFactory); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + + Assert.assertEquals(strings[0], result.rhs); + } + + @Test + public void testStringLastBufferAggregator() + { + StringLastBufferAggregator agg = (StringLastBufferAggregator) stringLastAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + + Assert.assertEquals(strings[0], result.rhs); + } + + @Test + public void testCombine() + { + SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA"); + SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB"); + Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2)); + } + + @Test + public void testStringLastCombiningAggregator() + { + StringLastAggregator agg = (StringLastAggregator) combiningAggFactory.factorize(colSelectorFactory); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + Pair expected = (Pair) pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + } + + @Test + public void testStringLastCombiningBufferAggregator() + { + StringLastBufferAggregator agg = (StringLastBufferAggregator) combiningAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + Pair expected = (Pair) pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + } + + @Test + public void testStringLastAggregateCombiner() + { + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings); + + StringLastAggregateCombiner stringFirstAggregateCombiner = + (StringLastAggregateCombiner) combiningAggFactory.makeAggregateCombiner(); + + stringFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + stringFirstAggregateCombiner.fold(columnSelector); + + Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject()); + + stringFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject()); + } + + private void aggregate( + StringLastAggregator agg + ) + { + agg.aggregate(); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } + + private void aggregate( + StringLastBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java new file mode 100644 index 0000000000..c7c125b67d --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class StringLastBufferAggregatorTest +{ + private void aggregateBuffer( + TestLongColumnSelector timeSelector, + TestObjectColumnSelector valueSelector, + BufferAggregator agg, + ByteBuffer buf, + int position + ) + { + agg.aggregate(buf, position); + timeSelector.increment(); + valueSelector.increment(); + } + + @Test + public void testBufferAggregate() throws Exception + { + + final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringLastAggregatorFactory factory = new StringLastAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringLastBufferAggregator agg = new StringLastBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes + ); + + String testString = "ZZZZ"; + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + buf.putLong(1526728500L); + buf.putInt(testString.length()); + buf.put(testString.getBytes(StandardCharsets.UTF_8)); + + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expectec last string value", "DDDD", sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs)); + + } + + @Test + public void testNullBufferAggregate() throws Exception + { + + final long[] timestamps = {1111L, 2222L, 6666L, 4444L, 5555L}; + final String[] strings = {"CCCC", "AAAA", "BBBB", null, "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringLastAggregatorFactory factory = new StringLastAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringLastBufferAggregator agg = new StringLastBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes + ); + + String testString = "ZZZZ"; + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + buf.putLong(1526728500L); + buf.putInt(testString.length()); + buf.put(testString.getBytes(StandardCharsets.UTF_8)); + + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expectec last string value", strings[2], sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[2]), new Long(sp.lhs)); + + } + + @Test(expected = IllegalStateException.class) + public void testNoStringValue() + { + + final long[] timestamps = {1526724000L, 1526724600L}; + final Double[] doubles = {null, 2.00}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(doubles); + + StringLastAggregatorFactory factory = new StringLastAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringLastBufferAggregator agg = new StringLastBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes + ); + + String testString = "ZZZZ"; + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + buf.putLong(1526728500L); + buf.putInt(testString.length()); + buf.put(testString.getBytes(StandardCharsets.UTF_8)); + + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java new file mode 100644 index 0000000000..a68798e64b --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.data.input.MapBasedInputRow; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.Druids; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.SerializablePairLongString; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.TestHelper; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.joda.time.DateTime; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class StringLastTimeseriesQueryTest +{ + + @Test + public void testTopNWithDistinctCountAgg() throws Exception + { + TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + + String visitor_id = "visitor_id"; + String client_type = "client_type"; + + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.SECOND) + .withMetrics(new CountAggregatorFactory("cnt")) + .withMetrics(new StringLastAggregatorFactory( + "last_client_type", "client_type", 1024) + ) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); + + + DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); + long timestamp = time.getMillis(); + + DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z"); + long timestamp1 = time1.getMillis(); + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList(visitor_id, client_type), + ImmutableMap.of(visitor_id, "0", client_type, "iphone") + ) + ); + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList(visitor_id, client_type), + ImmutableMap.of(visitor_id, "1", client_type, "iphone") + ) + ); + index.add( + new MapBasedInputRow( + timestamp1, + Lists.newArrayList(visitor_id, client_type), + ImmutableMap.of(visitor_id, "0", client_type, "android") + ) + ); + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + new StringLastAggregatorFactory( + "last_client_type", client_type, 1024 + ) + ) + ) + .build(); + + final Iterable> results = + engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); + + List> expectedResults = Collections.singletonList( + new Result<>( + time, + new TimeseriesResultValue( + ImmutableMap.of( + "last_client_type", + new SerializablePairLongString(timestamp1, "android") + ) + ) + ) + ); + TestHelper.assertExpectedResults(expectedResults, results); + } +} -- GitLab