提交 7e20299c 编写于 作者: S Stephan Ewen 提交者: Aljoscha Krettek

[FLINK-2753] [streaming] [api breaking] Add first parts of new window API for key grouped windows

This follows the API design outlined in https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

This is API breaking because it adds new generic type parameters to Java and Scala classes, breaking binary compatibility.
上级 501a9b08
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api;
/**
* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows).
*/
public enum TimeCharacteristic {
/**
* Processing time for operators means that the operator uses the system clock of the machine
* to determine the current time of the data stream. Processing-time windows trigger based
* on wall-clock time and include whatever elements happen to have arrived at the operator at
* that point in time.
* <p>
* Using processing time for window operations results in general in quite non-deterministic results,
* because the contents of the windows depends on the speed in which elements arrive. It is, however,
* the cheapest method of forming windows and the method that introduces the least latency.
*/
ProcessingTime,
/**
* Ingestion time means that the time of each individual element in the stream is determined
* when the element enters the Flink streaming data flow. Operations like windows group the
* elements based on that time, meaning that processing speed within the streaming dataflow
* does not affect windowing, but only the speed at which sources receive elements.
* <p>
* Ingestion time is often a good compromise between more processing time and event time.
* It does not need and special manual form of watermark generation, and events are typically
* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
* only be introduced by streaming shuffles or split/join/union operations. The fact that elements
* are not very much out-of-order means that the latency increase is moderate, compared to event
* time.
*/
IngestionTime,
/**
* Event time means that the time of each individual element in the stream (also called event)
* is determined by the event's individual custom timestamp. These timestamps either exist in the
* elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
* The big implication of this is that elements arrive in the sources and in all operators generally
* out of order, meaning that elements with earlier timestamps may arrive after elements with
* later timestamps.
* <p>
* Operators that window or order data with respect to event time must buffer data until they can
* be sure that all timestamps for a certain time interval have been received. This is handled by
* the so called "time watermarks".
* <p>
* Operations based on event time are very predictable - the result of windowing operations
* is typically identical no matter when the window is executed and how fast the streams operate.
* At the same time, the buffering and tracking of event time is also costlier than operating
* with processing time, and typically also introduces more latency. The amount of extra
* cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
* between the arrival of early and late elements is. With respect to the "time watermarks", this
* means that teh cost typically depends on how early or late the watermarks for can be generated
* for their timestamp.
* <p>
* In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
* original time, rather than the time assigned at the data source. Practically, that means that
* event time has generally more meaning, but also that it takes longer to determine that all
* elements for a certain time have arrived.
*/
EventTime
}
......@@ -67,8 +67,8 @@ public class ConnectedDataStream<IN1, IN2> {
if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
this.isGrouped = true;
this.keySelector1 = ((GroupedDataStream<IN1>) input1).keySelector;
this.keySelector2 = ((GroupedDataStream<IN2>) input2).keySelector;
this.keySelector1 = ((GroupedDataStream<IN1, ?>) input1).keySelector;
this.keySelector2 = ((GroupedDataStream<IN2, ?>) input2).keySelector;
} else {
this.isGrouped = false;
this.keySelector1 = null;
......
......@@ -229,8 +229,8 @@ public class DataStream<T> {
* The KeySelector to be used for extracting the key for partitioning
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
*/
public KeyedDataStream<T> keyBy(KeySelector<T,?> key){
return new KeyedDataStream<T>(this, clean(key));
public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){
return new KeyedDataStream<T, K>(this, clean(key));
}
/**
......@@ -241,7 +241,7 @@ public class DataStream<T> {
* will be grouped.
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
*/
public KeyedDataStream<T> keyBy(int... fields) {
public KeyedDataStream<T, Tuple> keyBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
......@@ -260,12 +260,12 @@ public class DataStream<T> {
* partitioned.
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
**/
public KeyedDataStream<T> keyBy(String... fields) {
public KeyedDataStream<T, Tuple> keyBy(String... fields) {
return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
private KeyedDataStream<T> keyBy(Keys<T> keys) {
return new KeyedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) {
return new KeyedDataStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
......@@ -279,7 +279,7 @@ public class DataStream<T> {
* will be partitioned.
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
*/
public GroupedDataStream<T> groupBy(int... fields) {
public GroupedDataStream<T, Tuple> groupBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
......@@ -304,7 +304,7 @@ public class DataStream<T> {
* grouped.
* @return The grouped {@link DataStream}
**/
public GroupedDataStream<T> groupBy(String... fields) {
public GroupedDataStream<T, Tuple> groupBy(String... fields) {
return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
......@@ -322,13 +322,13 @@ public class DataStream<T> {
* the values
* @return The grouped {@link DataStream}
*/
public GroupedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
return new GroupedDataStream<T>(this, clean(keySelector));
public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> keySelector) {
return new GroupedDataStream<T, K>(this, clean(keySelector));
}
private GroupedDataStream<T> groupBy(Keys<T> keys) {
return new GroupedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) {
return new GroupedDataStream<T, Tuple>(this,
clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
}
/**
......
......@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
* @param <OUT>
* The output type of the {@link GroupedDataStream}.
*/
public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
/**
* Creates a new {@link GroupedDataStream}, group inclusion is determined using
......@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
* @param dataStream Base stream of data
* @param keySelector Function for determining group inclusion
*/
public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) {
public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) {
super(dataStream, keySelector);
}
......@@ -324,8 +324,6 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
getType(), operator);
return returnStream;
return transform("Grouped Aggregation", getType(), operator);
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
......@@ -32,11 +33,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
* are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy.
*
*
* @param <T> The type of the elements in the Keyed Stream
* @param <T> The type of the elements in the Keyed Stream.
* @param <K> The type of the key in the Keyed Stream.
*/
public class KeyedDataStream<T> extends DataStream<T> {
public class KeyedDataStream<T, K> extends DataStream<T> {
protected final KeySelector<T, ?> keySelector;
protected final KeySelector<T, K> keySelector;
/**
* Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
......@@ -47,35 +49,70 @@ public class KeyedDataStream<T> extends DataStream<T> {
* @param keySelector
* Function for determining state partitions
*/
public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, ?> keySelector) {
public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
this.keySelector = keySelector;
}
public KeySelector<T, ?> getKeySelector() {
public KeySelector<T, K> getKeySelector() {
return this.keySelector;
}
@Override
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream.");
}
@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(
keySelector);
((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
return returnStream;
}
@Override
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
DataStreamSink<T> result = super.addSink(sinkFunction);
result.getTransformation().setStateKeySelector(keySelector);
return result;
}
// ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
/**
* Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
* grouped stream. The window is defined by a single policy.
* <p>
* For time windows, these single-policy windows result in tumbling time windows.
*
* @param policy The policy that defines the window.
* @return The windows data stream.
*/
public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
return new KeyedWindowDataStream<T, K>(this, policy);
}
/**
* Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
* grouped stream. The window is defined by a window policy, plus a slide policy.
* <p>
* For time windows, these slide policy windows result in sliding time windows.
*
* @param window The policy that defines the window.
* @param slide The additional policy defining the slide of the window.
* @return The windows data stream.
*/
public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) {
return new KeyedWindowDataStream<T, K>(this, window, slide);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
/**
* A KeyedWindowDataStream represents a data stream where elements are grouped by key, and
* for each key, the stream of elements is split into windows. The windows are conceptually
* evaluated for each key individually, meaning windows and trigger at different points
* for each key.
* <p>
* In many cases, however, the windows are "aligned", meaning they trigger at the
* same time for all keys. The most common example for that are the regular time windows.
* <p>
* Note that the KeyedWindowDataStream is purely and API construct, during runtime the
* KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
* over the window into one single operation.
*
* @param <Type> The type of elements in the stream.
* @param <Key> The type of the key by which elements are grouped.
*/
public class KeyedWindowDataStream<Type, Key> {
/** The keyed data stream that is windowed by this stream */
private final KeyedDataStream<Type, Key> input;
/** The core window policy */
private final WindowPolicy windowPolicy;
/** The optional additional slide policy */
private final WindowPolicy slidePolicy;
public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) {
this(input, windowPolicy, null);
}
public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
WindowPolicy windowPolicy, WindowPolicy slidePolicy)
{
TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
this.input = input;
this.windowPolicy = windowPolicy.makeSpecificBasedOnTimeCharacteristic(time);
this.slidePolicy = slidePolicy == null ? null : slidePolicy.makeSpecificBasedOnTimeCharacteristic(time);
}
// ------------------------------------------------------------------------
// Operations on the keyed windows
// ------------------------------------------------------------------------
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
* as a regular non-windowed stream.
* <p>
* This window will try and pre-aggregate data as much as the window policies permit. For example,
* tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
* key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
* so a few elements are stored per key (one per slide interval).
* Custom windows may not be able to pre-aggregate, or may need to store extra values in an
* aggregation tree.
*
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
String callLocation = Utils.getCallLocationName();
return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
}
/**
* Applies a window function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the window function is interpreted
* as a regular non-windowed stream.
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means od pre-aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) {
String callLocation = Utils.getCallLocationName();
TypeInformation<Type> inType = input.getType();
TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, KeyedWindowFunction.class, true, true, inType, null, false);
return createWindowOperator(function, resultType, "KeyedWindowFunction at " + callLocation);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private <Result> DataStream<Result> createWindowOperator(
Function function, TypeInformation<Result> resultType, String functionName) {
String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
KeySelector<Type, Key> keySel = input.getKeySelector();
OneInputStreamOperator<Type, Result> operator =
PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
return input.transform(opName, resultType, operator);
}
}
......@@ -103,7 +103,7 @@ public class WindowedDataStream<OUT> {
this.triggerHelper = policyHelper;
if (dataStream instanceof GroupedDataStream) {
this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
}
}
......@@ -115,7 +115,7 @@ public class WindowedDataStream<OUT> {
this.userEvicter = evicter;
if (dataStream instanceof GroupedDataStream) {
this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
}
}
......
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.environment;
import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
......@@ -49,6 +49,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
......@@ -72,10 +73,12 @@ import org.apache.flink.util.SplittableIterator;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
/**
* {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
......@@ -83,25 +86,33 @@ import java.util.List;
*/
public abstract class StreamExecutionEnvironment {
public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
/** The time characteristic that is used if none other is set */
private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
// ------------------------------------------------------------------------
private long bufferTimeout = 100;
private ExecutionConfig config = new ExecutionConfig();
private final ExecutionConfig config = new ExecutionConfig();
protected List<StreamTransformation<?>> transformations = Lists.newArrayList();
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
protected boolean isChainingEnabled = true;
protected long checkpointInterval = -1; // disabled
protected CheckpointingMode checkpointingMode = null;
protected CheckpointingMode checkpointingMode;
protected boolean forceCheckpointing = false;
protected StateHandleProvider<?> stateHandleProvider;
/** The time characteristic used by the data streams */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
/** The environment of the context (local by default, cluster if invoked through command line) */
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
......@@ -515,6 +526,30 @@ public abstract class StreamExecutionEnvironment {
}
}
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for the stream, e.g., processing time, event time,
* or ingestion time.
*
* @param characteristic The time characteristic.
*/
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Objects.requireNonNull(characteristic);
}
/**
* Gets the time characteristic for the stream, e.g., processing time, event time,
* or ingestion time.
*
* @return The time characteristic.
*/
public TimeCharacteristic getStreamTimeCharacteristic() {
return timeCharacteristic;
}
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
......
......@@ -25,12 +25,12 @@ import java.io.Serializable;
/**
* Base interface for functions that are evaluated over keyed (grouped) windows.
*
* @param <KEY> The type of the key.
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
*/
public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, Serializable {
public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, Serializable {
/**
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowpolicy;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
public class AbstractTimePolicy extends WindowPolicy {
private static final long serialVersionUID = 6593098375698927728L;
/** the time unit for this policy's time interval */
private final TimeUnit unit;
/** the length of this policy's time interval */
private final long num;
protected AbstractTimePolicy(long num, TimeUnit unit) {
this.unit = checkNotNull(unit, "time unit may not be null");
this.num = num;
}
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
/**
* Gets the time unit for this policy's time interval.
* @return The time unit for this policy's time interval.
*/
public TimeUnit getUnit() {
return unit;
}
/**
* Gets the length of this policy's time interval.
* @return The length of this policy's time interval.
*/
public long getNum() {
return num;
}
/**
* Converts the time interval to milliseconds.
* @return The time interval in milliseconds.
*/
public long toMilliseconds() {
return unit.toMillis(num);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@Override
public String toString(WindowPolicy slidePolicy) {
if (slidePolicy == null) {
return "Tumbling Window (" + getClass().getSimpleName() + ") (" + num + ' ' + unit.name() + ')';
}
else if (slidePolicy.getClass() == getClass()) {
AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy;
return "Sliding Window (" + getClass().getSimpleName() + ") (length="
+ num + ' ' + unit.name() + ", slide=" + timeSlide.num + ' ' + timeSlide.unit.name() + ')';
}
else {
return super.toString(slidePolicy);
}
}
@Override
public int hashCode() {
return 31 * (int) (num ^ (num >>> 32)) + unit.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj != null && obj.getClass() == getClass()) {
AbstractTimePolicy that = (AbstractTimePolicy) obj;
return this.num == that.num && this.unit.equals(that.unit);
}
else {
return false;
}
}
@Override
public String toString() {
return getClass().getSimpleName() + " (" + num + ' ' + unit.name() + ')';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowpolicy;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import java.util.concurrent.TimeUnit;
/**
* The definition of an event time interval for windowing. See
* {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
* of event time.
*/
public final class EventTime extends AbstractTimePolicy {
private static final long serialVersionUID = 8333566691833596747L;
/** Instantiation only via factory method */
private EventTime(long num, TimeUnit unit) {
super(num, unit);
}
@Override
public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
return this;
}
else {
throw new InvalidProgramException(
"Cannot use EventTime policy in a dataflow that runs on " + characteristic);
}
}
// ------------------------------------------------------------------------
// Factory
// ------------------------------------------------------------------------
/**
* Creates an event time policy describing an event time interval.
*
* @param num The length of the time interval.
* @param unit The init (seconds, milliseconds) of the time interval.
* @return The event time policy.
*/
public static EventTime of(long num, TimeUnit unit) {
return new EventTime(num, unit);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowpolicy;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import java.util.concurrent.TimeUnit;
/**
* The definition of a processing time interval for windowing. See
* {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
* of processing time.
*/
public final class ProcessingTime extends AbstractTimePolicy {
private static final long serialVersionUID = 7546166721132583007L;
/** Instantiation only via factory method */
private ProcessingTime(long num, TimeUnit unit) {
super(num, unit);
}
@Override
public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
if (characteristic == TimeCharacteristic.ProcessingTime) {
return this;
}
else {
throw new InvalidProgramException(
"Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
}
}
// ------------------------------------------------------------------------
// Factory
// ------------------------------------------------------------------------
/**
* Creates a processing time policy describing a processing time interval.
*
* @param num The length of the time interval.
* @param unit The init (seconds, milliseconds) of the time interval.
* @return The processing time policy.
*/
public static ProcessingTime of(long num, TimeUnit unit) {
return new ProcessingTime(num, unit);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowpolicy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import java.util.concurrent.TimeUnit;
/**
* The definition of a time interval for windowing. The time characteristic referred
* to is the default time characteristic set on the execution environment.
*/
public final class Time extends AbstractTimePolicy {
private static final long serialVersionUID = 3197290738634320211L;
/** Instantiation only via factory method */
private Time(long num, TimeUnit unit) {
super(num, unit);
}
@Override
public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
switch (timeCharacteristic) {
case ProcessingTime:
return ProcessingTime.of(getNum(), getUnit());
case IngestionTime:
case EventTime:
return EventTime.of(getNum(), getUnit());
default:
throw new IllegalArgumentException("Unknown time characteristic");
}
}
// ------------------------------------------------------------------------
// Factory
// ------------------------------------------------------------------------
/**
* Creates a time policy describing a processing time interval. The policy refers to the
* time characteristic that is set on the dataflow via
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
* setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
*
* @param num The length of the time interval.
* @param unit The init (seconds, milliseconds) of the time interval.
* @return The time policy.
*/
public static Time of(long num, TimeUnit unit) {
return new Time(num, unit);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowpolicy;
import org.apache.flink.streaming.api.TimeCharacteristic;
/**
* The base class of all window policies. Window policies define how windows
* are formed over the data stream.
*/
public abstract class WindowPolicy implements java.io.Serializable {
private static final long serialVersionUID = -8696529489282723113L;
/**
* If this window policies concrete instantiation depends on the time characteristic of the
* dataflow (processing time, event time), then this method must be overridden to convert this
* policy to the respective specific instantiation.
* <p>
* The {@link Time} policy for example, will convert itself to an {@link ProcessingTime} policy,
* if the time characteristic is set to {@link TimeCharacteristic#ProcessingTime}.
* <p>
* By default, this method does nothing and simply returns this object itself.
*
* @param characteristic The time characteristic of the dataflow.
* @return The specific instantiation of this policy, or the policy itself.
*/
public WindowPolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
return this;
}
public String toString(WindowPolicy slidePolicy) {
if (slidePolicy != null) {
return "Window [" + toString() + ", slide=" + slidePolicy + ']';
}
else {
return "Window [" + toString() + ']';
}
}
}
......@@ -32,13 +32,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
private final KeyedWindowFunction<Key, Type, Result> function;
private final KeyedWindowFunction<Type, Result, Key> function;
private long evaluationPass;
// ------------------------------------------------------------------------
public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Key, Type, Result> function) {
public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
this.keySelector = keySelector;
this.function = function;
}
......@@ -75,7 +75,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
private final KeyedWindowFunction<Key, Type, Result> function;
private final KeyedWindowFunction<Type, Result, Key> function;
private final UnionIterator<Type> unionIterator;
......@@ -83,7 +83,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private Key currentKey;
WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> function, Collector<Result> out) {
WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
......
......@@ -30,7 +30,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
public AccumulatingProcessingTimeWindowOperator(
KeyedWindowFunction<KEY, IN, OUT> function,
KeyedWindowFunction<IN, OUT, KEY> function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
......@@ -41,7 +41,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
@SuppressWarnings("unchecked")
KeyedWindowFunction<KEY, IN, OUT> windowFunction = (KeyedWindowFunction<KEY, IN, OUT>) function;
KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators.windows;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
/**
* This class implements the conversion from window policies to concrete operator
* implementations.
*/
public class PolicyToOperator {
/**
* Entry point to create an operator for the given window policies and the window function.
*/
public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
{
if (window == null || function == null) {
throw new NullPointerException();
}
// -- case 1: both policies are processing time policies
if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
final long windowLength = ((ProcessingTime) window).toMilliseconds();
final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
if (function instanceof ReduceFunction) {
@SuppressWarnings("unchecked")
ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
@SuppressWarnings("unchecked")
OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
new AggregatingProcessingTimeWindowOperator<KEY, IN>(
reducer, keySelector, windowLength, windowSlide);
return op;
}
else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked")
KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function;
return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
wf, keySelector, windowLength, windowSlide);
}
}
// -- case 2: both policies are event time policies
if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
// add event time implementation
}
throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
}
// ------------------------------------------------------------------------
/** Don't instantiate */
private PolicyToOperator() {}
}
......@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple;
public class KeySelectorUtil {
public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
if (!(typeInfo instanceof CompositeType)) {
throw new InvalidTypesException(
"This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
......@@ -93,9 +93,15 @@ public class KeySelectorUtil {
comparator.extractKeys(value, keyArray, 0);
return (K) keyArray[0];
}
}
// ------------------------------------------------------------------------
/**
* A key selector for selecting key fields via a TypeComparator.
*
* @param <IN> The type from which the key is extracted.
*/
public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
private static final long serialVersionUID = 1L;
......@@ -126,6 +132,13 @@ public class KeySelectorUtil {
}
// ------------------------------------------------------------------------
/**
* A key selector for selecting individual array fields as keys and returns them as a Tuple.
*
* @param <IN> The type from which the key is extracted, i.e., the array type.
*/
public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> {
private static final long serialVersionUID = 1L;
......
......@@ -113,7 +113,9 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
KeyedDataStream<Integer, Integer> keyedStream = env
.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
.keyBy(new ModKey(4));
keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
private static final long serialVersionUID = 1L;
......@@ -163,7 +165,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
@SuppressWarnings("unchecked")
private StreamMap<Integer, String> createOperatorWithContext(List<String> output,
KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception {
KeySelector<Integer, ? extends Serializable> partitioner, byte[] serializedState) throws Exception {
final List<String> outputList = output;
StreamingRuntimeContext context = new StreamingRuntimeContext(
......@@ -355,7 +357,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
}
public static class ModKey implements KeySelector<Integer, Serializable> {
public static class ModKey implements KeySelector<Integer, Integer> {
private static final long serialVersionUID = 4193026742083046736L;
......
......@@ -22,15 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
import org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
import org.apache.flink.util.Collector;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@SuppressWarnings("serial")
public class GroupedProcessingTimeWindowExample {
......@@ -75,31 +76,20 @@ public class GroupedProcessingTimeWindowExample {
});
stream
.groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS))
// .reduceWindow(new SummingReducer())
// .flatten()
// .partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .transform(
// "Aligned time window",
// TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
// new AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long, Long>>(
// new SummingWindowFunction<Long>(),
// new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
// 2500, 500))
.transform(
"Aligned time window",
TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
new AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>(
new SummingReducer(),
new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
2500, 500))
.keyBy(0)
.window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.reduceWindow(new SummingReducer())
// alternative: use a mapWindow function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
// .mapWindow(new SummingWindowFunction())
.addSink(new SinkFunction<Tuple2<Long, Long>>() {
@Override
public void invoke(Tuple2<Long, Long> value) {
}
});
@Override
public void invoke(Tuple2<Long, Long> value) {
}
});
env.execute();
}
......@@ -113,47 +103,16 @@ public class GroupedProcessingTimeWindowExample {
}
}
public static class IdentityKeyExtractor<T> implements KeySelector<T, T> {
@Override
public T getKey(T value) {
return value;
}
}
public static class IdentityWindowFunction<K, T> implements KeyedWindowFunction<K, T, T> {
@Override
public void evaluate(K k, Iterable<T> values, Collector<T> out) throws Exception {
for (T v : values) {
out.collect(v);
}
}
}
public static class CountingWindowFunction<K, T> implements KeyedWindowFunction<K, T, Long> {
@Override
public void evaluate(K k, Iterable<T> values, Collector<Long> out) throws Exception {
long count = 0;
for (T ignored : values) {
count++;
}
out.collect(count);
}
}
public static class SummingWindowFunction<K> implements KeyedWindowFunction<K, Tuple2<K, Long>, Tuple2<K, Long>> {
public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
@Override
public void evaluate(K key, Iterable<Tuple2<K, Long>> values, Collector<Tuple2<K, Long>> out) throws Exception {
public void evaluate(Long key, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
long sum = 0L;
for (Tuple2<K, Long> value : values) {
for (Tuple2<Long, Long> value : values) {
sum += value.f1;
}
out.collect(new Tuple2<K, Long>(key, sum));
out.collect(new Tuple2<>(key, sum));
}
}
......
......@@ -21,8 +21,8 @@ package org.apache.flink.streaming.api.scala
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, MapFunction,
Partitioner, FoldFunction, FilterFunction}
import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
......@@ -30,17 +30,12 @@ import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
import org.apache.flink.api.common.state.OperatorState
import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.KeyedDataStream
import org.apache.flink.streaming.api.scala.function.StatefulFunction
......@@ -244,20 +239,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy(fields: Int*): GroupedDataStream[T] = javaStream.groupBy(fields: _*)
def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(fields: _*)
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T] =
def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
javaStream.groupBy(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T] = {
def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = {
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
......@@ -605,7 +600,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
private[flink] def isStatePartitioned: Boolean = {
javaStream.isInstanceOf[KeyedDataStream[T]]
javaStream.isInstanceOf[KeyedDataStream[_, _]]
}
/**
......
......@@ -29,7 +29,8 @@ import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.common.functions.ReduceFunction
class GroupedDataStream[T](javaStream: GroupedJavaStream[T]) extends DataStream[T](javaStream){
class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K])
extends DataStream[T](javaStream) {
/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream
......
......@@ -18,13 +18,15 @@
package org.apache.flink.streaming.api.scala
import java.util.Objects
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.runtime.state.StateHandleProvider
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
......@@ -293,6 +295,27 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.registerType(typeClass)
}
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for the stream, e.g., processing time, event time,
* or ingestion time.
*
* @param characteristic The time characteristic.
*/
def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
javaEnv.setStreamTimeCharacteristic(characteristic)
}
/**
* Gets the time characteristic for the stream, e.g., processing time, event time,
* or ingestion time.
*
* @return The time characteristic.
*/
def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
......
......@@ -38,8 +38,8 @@ package object scala {
implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
new DataStream[R](javaStream)
implicit def javaToScalaGroupedStream[R](javaStream: GroupedJavaStream[R]):
GroupedDataStream[R] = new GroupedDataStream[R](javaStream)
implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, K]):
GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream)
implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
new WindowedDataStream[R](javaWStream)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册