提交 cd7bfd31 编写于 作者: J Jonas Traub (powibol) 提交者: mbalassi

[streaming] Introduced grouped windowing invokable, added required cloneable...

[streaming] Introduced grouped windowing invokable, added required cloneable policy interfaces, and adjusted existing windowing invokable to make it usable together with the new grouped windowing invokable.
上级 3d11d5ba
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This invokable allows windowing based on {@link TriggerPolicy} and
* {@link EvictionPolicy} instances including their active and cloneable
* versions. It is additionally aware of the creation of windows per group.
* A {@link KeySelector} is used to specify the key position or key extraction.
* The {@link ReduceFunction} will be executed on each group separately. Trigger
* policies might either be centralized or distributed. Eviction policies are
* always distributed. A distributed policy have to be a
* {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it will
* be cloned to have separated instances for each group. At the startup time the
* distributed policies will be stored as sample, and only clones of them will
* be used to maintain the groups. Therefore, each group starts with the initial
* policy states.
* While a distributed policy only gets notified with the elements belonging to
* the respective group, a centralized policy get notified with all arriving
* elements. When a centralized trigger occurred, all groups get triggered. This
* is done by submitting the element which caused the trigger as real element to
* the groups it belongs to and as fake element to all other groups. Within the
* groups the element might be further processed, causing more triggered,
* prenotifications of active distributed policies and evictions like usual.
* Central policies can be instance of {@link ActiveTriggerPolicy} and also
* implement the
* {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
* method. Fake elements created on prenotification will be forwarded to all
* groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
* it forwards/distributed calls all groups.
* @param <IN>
* The type of input elements handled by this operator invokable.
public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
* Auto-generated serial version UID
private static final long serialVersionUID = -3469545957144404137L;
private static final Logger LOG = LoggerFactory.getLogger(GroupedWindowingInvokable.class);
private KeySelector<IN, ?> keySelector;
private Configuration parameters;
private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
private Map<Object, WindowingInvokable<IN>> windowingGroups = new HashMap<Object, WindowingInvokable<IN>>();
private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
* This constructor creates an instance of the grouped windowing invokable.
* A {@link KeySelector} is used to specify the key position or key
* extraction. The {@link ReduceFunction} will be executed on each group
* separately. Trigger policies might either be centralized or distributed.
* Eviction policies are always distributed. A distributed policy have to be
* a {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it
* will be cloned to have separated instances for each group. At the startup
* time the distributed policies will be stored as sample, and only clones
* of them will be used to maintain the groups. Therefore, each group starts
* with the initial policy states.
* While a distributed policy only gets notified with the elements belonging
* to the respective group, a centralized policy get notified with all
* arriving elements. When a centralized trigger occurred, all groups get
* triggered. This is done by submitting the element which caused the
* trigger as real element to the groups it belongs to and as fake element
* to all other groups. Within the groups the element might be further
* processed, causing more triggered, prenotifications of active distributed
* policies and evictions like usual.
* Central policies can be instance of {@link ActiveTriggerPolicy} and also
* implement the
* {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
* method. Fake elements created on prenotification will be forwarded to all
* groups. The {@link ActiveTriggerCallback} is also implemented in a way,
* that it forwards/distributed calls all groups.
* @param userFunction
* The user defined {@link ReduceFunction}.
* @param keySelector
* A key selector to extract the key for the groups from the
* input data.
* @param distributedTriggerPolicies
* Trigger policies to be distributed and maintained individually
* within each group.
* @param distributedEvictionPolicies
* Eviction policies to be distributed and maintained
* individually within each group. There are no central eviction
* policies because there is no central element buffer but only a
* buffer per group. Therefore evictions might always be done per
* group.
* @param centralTriggerPolicies
* Trigger policies which will only exist once at a central
* place. In case a central policy triggers, it will cause all
* groups to be emitted. (Remark: Empty groups cannot be emitted.
* If only one element is contained a group, this element itself
* is returned as aggregated result.)
public GroupedWindowingInvokable(ReduceFunction<IN> userFunction,
KeySelector<IN, ?> keySelector,
LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
this.keySelector = keySelector;
this.centralTriggerPolicies = centralTriggerPolicies;
this.distributedTriggerPolicies = distributedTriggerPolicies;
this.distributedEvictionPolicies = distributedEvictionPolicies;
for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
if (trigger instanceof ActiveTriggerPolicy) {
this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
protected void immutableInvoke() throws Exception {
// Prevent empty data streams
if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty");
// Continuously run
while (reuse != null) {
WindowingInvokable<IN> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
if (groupInvokable == null) {
groupInvokable = makeNewGroup(reuse);
// Run the precalls for central active triggers
for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
IN[] result = trigger.preNotifyTrigger(reuse.getObject());
for (IN in : result) {
for (WindowingInvokable<IN> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
// Process non-active central triggers
for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
if (triggerPolicy.notifyTrigger(reuse.getObject())) {
if (currentTriggerPolicies.isEmpty()) {
// only add the element to its group
} else {
// call user function for all groups
for (WindowingInvokable<IN> group : windowingGroups.values()) {
if (group == groupInvokable) {
// process real with initialized policies
group.processRealElement(reuse.getObject(), currentTriggerPolicies);
} else {
// process like a fake but also initialized with
// policies
group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
// clear current trigger list
// Recreate the reuse-StremRecord object and load next StreamRecord
reuse = recordIterator.next(reuse);
// Stop all remaining threads from policies
for (Thread t : activePolicyThreads) {
// finally trigger the buffer.
for (WindowingInvokable<IN> group : windowingGroups.values()) {
* This method creates a new group. The method gets called in case an
* element arrives which has a key which was not seen before. The method
* created a nested {@link WindowingInvokable} and therefore created clones
* of all distributed trigger and eviction policies.
* @param element
* The element which leads to the generation of a new group
* (previously unseen key)
* @throws Exception
* In case the {@link KeySelector} throws an exception in
* {@link KeySelector#getKey(Object)}, the exception is not
* catched by this method.
private WindowingInvokable<IN> makeNewGroup(StreamRecord<IN> element) throws Exception {
// clone the policies
LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) {
for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) {
WindowingInvokable<IN> groupInvokable = new WindowingInvokable<IN>(
(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
return groupInvokable;
protected void mutableInvoke() throws Exception {
if (LOG.isInfoEnabled()) {
LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
protected void callUserFunction() throws Exception {
// This method gets never called directly. The user function calls are
// all delegated to the invokable instanced which handle/represent the
// groups.
public void open(Configuration parameters) throws Exception {
this.parameters = parameters;
for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) {
Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
if (target != null) {
Thread thread = new Thread(target);
* This callback class allows to handle the the callbacks done by threads
* defined in active trigger policies
* @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
private class WindowingCallback implements ActiveTriggerCallback<IN> {
private ActiveTriggerPolicy<IN> policy;
public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
this.policy = policy;
public void sendFakeElement(IN datapoint) {
for (WindowingInvokable<IN> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
......@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
......@@ -50,6 +51,19 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
private ReduceFunction<IN> reducer;
* This constructor created a windowing invokable using trigger and eviction
* policies.
* @param userFunction
* The user defined {@link ReduceFunction}
* @param triggerPolicies
* A list of {@link TriggerPolicy}s and/or
* {@link ActiveTriggerPolicy}s
* @param evictionPolicies
* A list of {@link EvictionPolicy}s and/or
* {@link ActiveEvictionPolicy}s
public WindowingInvokable(ReduceFunction<IN> userFunction,
LinkedList<TriggerPolicy<IN>> triggerPolicies,
LinkedList<EvictionPolicy<IN>> evictionPolicies) {
......@@ -78,8 +92,8 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
Runnable target=tp.createActiveTriggerRunnable(new WindowingCallback(tp));
if (target!=null){
Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
if (target != null) {
Thread thread = new Thread(target);
......@@ -87,6 +101,10 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* This class allows the active trigger threads to call back and push fake
* elements at any time.
private class WindowingCallback implements ActiveTriggerCallback<IN> {
private ActiveTriggerPolicy<IN> policy;
......@@ -103,6 +121,7 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
protected void immutableInvoke() throws Exception {
// Prevent empty data streams
if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty");
......@@ -123,13 +142,7 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
// finally trigger the buffer.
if (!buffer.isEmpty()) {
for (TriggerPolicy<IN> policy : triggerPolicies) {
......@@ -141,6 +154,61 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* This method gets called in case of an grouped windowing in case central
* trigger occurred and the arriving element causing the trigger is not part
* of this group.
* Remark: This is NOT the same as
* {@link WindowingInvokable#processFakeElement(Object, TriggerPolicy)}!
* Here the eviction using active policies takes place after the call to the
* UDF. Usually it is done before when fake elements get submitted. This
* special behaviour is needed to allow the
* {@link GroupedWindowingInvokable} to send central triggers to all groups,
* even if the current element does not belong to the group.
* @param input
* a fake input element
* @param policies
* the list of policies which caused the call with this fake
* element
protected synchronized void externalTriggerFakeElement(IN input,
List<TriggerPolicy<IN>> policies) {
// Set the current triggers
// emit
// clear the flag collection
// Process the evictions and take care of double evictions
// In case there are multiple eviction policies present,
// only the one with the highest return value is recognized.
int currentMaxEviction = 0;
for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
// use temporary variable to prevent multiple calls to
// notifyEviction
int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
if (tmp > currentMaxEviction) {
currentMaxEviction = tmp;
for (int i = 0; i < currentMaxEviction; i++) {
try {
} catch (NoSuchElementException e) {
// In case no more elements are in the buffer:
// Prevent failure and stop deleting.
* This method processed an arrived fake element The method is synchronized
* to ensure that it cannot interleave with
......@@ -151,7 +219,8 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* @param currentPolicy
* the policy which produced this fake element
private synchronized void processFakeElement(IN input, TriggerPolicy<IN> currentPolicy) {
protected synchronized void processFakeElement(IN input, TriggerPolicy<IN> currentPolicy) {
// Process the evictions and take care of double evictions
// In case there are multiple eviction policies present,
// only the one with the highest return value is recognized.
......@@ -185,6 +254,23 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* This method processed an arrived real element The method is synchronized
* to ensure that it cannot interleave with
* {@link WindowingInvokable#processFakeElement(Object)}.
* @param input
* a real input element
* @param triggerPolicies
* Allows to set trigger policies which are maintained
* externally. This is the case for central policies in
* {@link GroupedWindowingInvokable}.
protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>> triggerPolicies) {
* This method processed an arrived real element The method is synchronized
* to ensure that it cannot interleave with
......@@ -193,7 +279,8 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* @param input
* a real input element
private synchronized void processRealElement(IN input) {
protected synchronized void processRealElement(IN input) {
// Run the precalls to detect missed windows
for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
// Remark: In case multiple active triggers are present the ordering
......@@ -257,6 +344,31 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* This method does the final reduce at the end of the stream and emits the
* result.
* @param centralTriggerPolicies
* Allows to set trigger policies which are maintained
* externally. This is the case for central policies in
* {@link GroupedWindowingInvokable}.
protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies) {
if (!buffer.isEmpty()) {
if (centralTriggerPolicies != null) {
for (TriggerPolicy<IN> policy : triggerPolicies) {
protected void callUserFunction() throws Exception {
Iterator<IN> reducedIterator = buffer.iterator();
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.api.windowing.policy;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
* When used in {@link GroupedWindowingInvokable}, eviction policies must
* provide a clone method. Eviction policies get cloned to provide an own
* instance for each group and respectively each individual element buffer as
* groups maintain their own buffers with the elements belonging to the
* respective group.
* This interface extends {@link EvictionPolicy} with such a clone method. It
* also adds the Java {@link Cloneable} interface as flag.
* @param <DATA>
* The data type handled by this policy
public interface CloneableEvictionPolicy<DATA> extends EvictionPolicy<DATA>, Cloneable {
* This method should return an exact copy of the object it belongs to
* including the current object state.
* @return a copy of this object
public CloneableEvictionPolicy<DATA> clone();
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.api.windowing.policy;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
* When used in {@link GroupedWindowingInvokable}, trigger policies can provide
* a clone method. Cloneable triggers can can be used in a distributed manner,
* which means they get cloned to provide an own instance for each group. This
* allows each group to trigger individually and only based on the elements
* belonging to the respective group.
* This interface extends {@link TriggerPolicy} with such a clone method. It
* also adds the Java {@link Cloneable} interface as flag.
* @param <DATA>
* The data type handled by this policy
public interface CloneableTriggerPolicy<DATA> extends TriggerPolicy<DATA>, Cloneable {
* This method should return an exact copy of the object it belongs to
* including the current object state.
* @return a copy of this object
public CloneableTriggerPolicy<DATA> clone();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册