提交 68a99d7a 编写于 作者: A Aljoscha Krettek

[hotfix] Remove leftover KeyedTimePanes

Recently, the aligned window operators were removes, these classes where
leftover after that removal.
上级 d8ed58b6
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
/**
* Base class for a multiple key/value maps organized in panes.
*/
@Internal
public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
/** The latest time pane. */
protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
/** The previous time panes, ordered by time (early to late). */
protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
// ------------------------------------------------------------------------
public abstract void addElementToLatestPane(Type element) throws Exception;
public abstract void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception;
public void dispose() {
// since all is heap data, there is no need to clean up anything
latestPane = null;
previousPanes.clear();
}
public int getNumPanes() {
return previousPanes.size() + 1;
}
public void slidePanes(int panesToKeep) {
if (panesToKeep > 1) {
// the current pane becomes the latest previous pane
previousPanes.addLast(latestPane);
// truncate the history
while (previousPanes.size() >= panesToKeep) {
previousPanes.removeFirst();
}
}
// we need a new latest pane
latestPane = new KeyMap<>();
}
public void truncatePanes(int numToRetain) {
while (previousPanes.size() >= numToRetain) {
previousPanes.removeFirst();
}
}
protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
// gather all panes in an array (faster iterations)
@SuppressWarnings({"unchecked", "rawtypes"})
KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
panes[panes.length - 1] = latestPane;
// let the maps make a coordinated traversal and evaluate the window function per contained key
KeyMap.traverseMaps(panes, traversal, traversalPass);
}
// ------------------------------------------------------------------------
// Serialization and de-serialization
// ------------------------------------------------------------------------
public void writeToOutput(
final DataOutputView output,
final TypeSerializer<Key> keySerializer,
final TypeSerializer<Aggregate> aggSerializer) throws IOException {
output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
int numPanes = getNumPanes();
output.writeInt(numPanes);
// write from the past
Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
for (int paneNum = 0; paneNum < numPanes; paneNum++) {
output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
output.writeInt(pane.size());
for (KeyMap.Entry<Key, Aggregate> entry : pane) {
keySerializer.serialize(entry.getKey(), output);
aggSerializer.serialize(entry.getValue(), output);
}
}
}
public void readFromInput(
final DataInputView input,
final TypeSerializer<Key> keySerializer,
final TypeSerializer<Aggregate> aggSerializer) throws IOException {
validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
int numPanes = input.readInt();
// read from the past towards the presence
while (numPanes > 0) {
validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
final int numElementsInPane = input.readInt();
for (int i = numElementsInPane - 1; i >= 0; i--) {
Key k = keySerializer.deserialize(input);
Aggregate a = aggSerializer.deserialize(input);
pane.put(k, a);
}
if (numPanes > 1) {
previousPanes.addLast(pane);
}
numPanes--;
}
}
private static void validateMagicNumber(int expected, int found) throws IOException {
if (expected != found) {
throw new IOException("Corrupt state stream - wrong magic number. " +
"Expected '" + Integer.toHexString(expected) +
"', found '" + Integer.toHexString(found) + '\'');
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.UnionIterator;
import java.util.ArrayList;
/**
* Key/value map organized in panes for accumulating windows (with a window function).
*/
@Internal
public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
private final KeySelector<Type, Key> keySelector;
private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
private final AccumulatingKeyedTimePanesContext context;
/**
* IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries
* have (zero).
*/
private long evaluationPass = 1L;
// ------------------------------------------------------------------------
public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
this.keySelector = keySelector;
this.function = function;
this.context = new AccumulatingKeyedTimePanesContext();
}
// ------------------------------------------------------------------------
@Override
public void addElementToLatestPane(Type element) throws Exception {
Key k = keySelector.getKey(element);
ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
elements.add(element);
}
@Override
public void evaluateWindow(Collector<Result> out, final TimeWindow window,
AbstractStreamOperator<Result> operator) throws Exception {
if (previousPanes.isEmpty()) {
// optimized path for single pane case (tumbling window)
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
Key key = entry.getKey();
operator.setCurrentKey(key);
context.globalState = operator.getKeyedStateStore();
function.process(entry.getKey(), window, context, entry.getValue(), out);
}
}
else {
// general code path for multi-pane case
WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
function, window, out, operator, context);
traverseAllPanes(evaluator, evaluationPass);
}
evaluationPass++;
}
// ------------------------------------------------------------------------
// Running a window function in a map traversal
// ------------------------------------------------------------------------
static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
private final UnionIterator<Type> unionIterator;
private final Collector<Result> out;
private final TimeWindow window;
private final AbstractStreamOperator<Result> contextOperator;
private Key currentKey;
private AccumulatingKeyedTimePanesContext context;
WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
this.window = window;
this.contextOperator = contextOperator;
this.context = context;
}
@Override
public void startNewKey(Key key) {
unionIterator.clear();
currentKey = key;
}
@Override
public void nextValue(ArrayList<Type> value) {
unionIterator.addList(value);
}
@Override
public void keyDone() throws Exception {
contextOperator.setCurrentKey(currentKey);
context.globalState = contextOperator.getKeyedStateStore();
function.process(currentKey, window, context, unionIterator, out);
}
}
// ------------------------------------------------------------------------
// Lazy factory for lists (put if absent)
// ------------------------------------------------------------------------
@SuppressWarnings("unchecked")
private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
}
private static class ThrowingKeyedStateStore implements KeyedStateStore {
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
}
@Override
public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
}
@Override
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
}
@Override
public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
}
@Override
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
}
}
private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext {
KeyedStateStore globalState;
KeyedStateStore throwingStore;
public AccumulatingKeyedTimePanesContext() {
this.throwingStore = new ThrowingKeyedStateStore();
}
@Override
public long currentProcessingTime() {
throw new UnsupportedOperationException("current processing time is not supported in this context");
}
@Override
public long currentWatermark() {
throw new UnsupportedOperationException("current watermark is not supported in this context");
}
@Override
public KeyedStateStore windowState() {
return throwingStore;
}
@Override
public KeyedStateStore globalState() {
return globalState;
}
}
private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
@Override
public ArrayList<?> create() {
return new ArrayList<>(4);
}
};
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* Key/value map organized in panes for aggregating windows (with a reduce function).
*/
@Internal
public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
private final KeySelector<Type, Key> keySelector;
private final ReduceFunction<Type> reducer;
/**
* IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries
* have (zero).
*/
private long evaluationPass = 1L;
// ------------------------------------------------------------------------
public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
this.keySelector = keySelector;
this.reducer = reducer;
}
// ------------------------------------------------------------------------
@Override
public void addElementToLatestPane(Type element) throws Exception {
Key k = keySelector.getKey(element);
latestPane.putOrAggregate(k, element, reducer);
}
@Override
public void evaluateWindow(Collector<Type> out, TimeWindow window,
AbstractStreamOperator<Type> operator) throws Exception {
if (previousPanes.isEmpty()) {
// optimized path for single pane case
for (KeyMap.Entry<Key, Type> entry : latestPane) {
out.collect(entry.getValue());
}
}
else {
// general code path for multi-pane case
AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out, operator);
traverseAllPanes(evaluator, evaluationPass);
}
evaluationPass++;
}
// ------------------------------------------------------------------------
// The maps traversal that performs the final aggregation
// ------------------------------------------------------------------------
static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
private final ReduceFunction<Type> function;
private final Collector<Type> out;
private final AbstractStreamOperator<Type> operator;
private Type currentValue;
AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out,
AbstractStreamOperator<Type> operator) {
this.function = function;
this.out = out;
this.operator = operator;
}
@Override
public void startNewKey(Key key) {
currentValue = null;
operator.setCurrentKey(key);
}
@Override
public void nextValue(Type value) throws Exception {
if (currentValue != null) {
currentValue = function.reduce(currentValue, value);
}
else {
currentValue = value;
}
}
@Override
public void keyDone() throws Exception {
out.collect(currentValue);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册