diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java index 64ebac1a8bcdbeef0332425c679f73b4883984e3..632a0d2f5cc5cdefcf334c29ed9ce30dbbccf7d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java @@ -19,9 +19,10 @@ package org.apache.flink.api.common.functions; /** - * An base interface for all user-defined functions. This interface is empty in order - * to enable functions that are SAM (single abstract method) interfaces, so that they - * can be called as Java 8 lambdas + * The base interface for all user-defined functions. + * + *

This interface is empty in order to allow extending interfaces to + * be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.

*/ public interface Function { } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java new file mode 100644 index 0000000000000000000000000000000000000000..a95b540ae6edfa36432bcb93ee86c5426ddea6bb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.checkpoint; + +/** + * This interface must be implemented by functions/operations that want to receive + * a commit notification once a checkpoint has been completely acknowledged by all + * participants. + */ +public interface CheckpointCommitter { + + /** + * This method is called as a notification once a distributed checkpoint has been completed. + * + * Note that any exception during this method will not cause the checkpoint to + * fail any more. + * + * @param checkpointId The ID of the checkpoint that has been completed. + */ + void commitCheckpoint(long checkpointId); +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java new file mode 100644 index 0000000000000000000000000000000000000000..f491dd3efe5609f243f36aa3e2c7e64914d3fe4c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.checkpoint; + +import org.apache.flink.runtime.state.OperatorState; + +/** + * This method must be implemented by functions that have state that needs to be + * checkpointed. The functions get a call whenever a checkpoint should take place + * and return a snapshot of their state, which will be checkpointed. + * + *

This interface marks a function as synchronously checkpointed. While the + * state is written, the function is not called, so the function needs not return a + * copy of its state, but may return a reference to its state. Functions that can + * continue to work and mutate the state, even while the state snapshot is being accessed, + * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously} + * interface.

+ */ +public interface Checkpointed { + + /** + * Gets the current operator state as a checkpoint. The state must reflect all operations + * from all prior operations if this function. + * + * @param checkpointId The ID of the checkpoint. + * @param checkpointTimestamp The timestamp of the checkpoint, as derived by + * System.currentTimeMillis() on the JobManager. + * + * @return A snapshot of the operator state. + * + * @throws Exception Thrown if the creation of the state object failed. This causes the + * checkpoint to fail. The system may decide to fail the operation (and trigger + * recovery), or to discard this checkpoint attempt and to continue running + * and to try again with the next checkpoint attempt. + */ + OperatorState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception; +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java new file mode 100644 index 0000000000000000000000000000000000000000..196f7ece53442dac8f5dcae08d7c9753e3c5a9ce --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.checkpoint; + +/** + * This interface marks a function/operator as asynchronously checkpointed. + * Similar to the {@link Checkpointed} interface, the function must produce a + * snapshot of its state. However, the function must be able to continue working + * and mutating its state without mutating the returned state snapshot. + * + *

Asynchronous checkpoints are desirable, because they allow the data streams at the + * point of the checkpointed function/operator to continue running while the checkpoint + * is in progress.

+ * + *

To be able to support asynchronous snapshots, the state returned by the + * {@link #snapshotState(long, long)} method is typically a copy or shadow copy + * of the actual state.

+ */ +public interface CheckpointedAsynchronously extends Checkpointed {} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java new file mode 100644 index 0000000000000000000000000000000000000000..94529301c0a8453851fb475feebfb07c4d14a7a1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +/** + * Base interface for all stream data sources in Flink. The contract of a stream source + * is similar to an iterator - it is consumed as in the following pseudo code: + * + *
{@code
+ * StreamSource source = ...;
+ * Collector out = ...;
+ * while (!source.reachedEnd()) {
+ *   out.collect(source.next());
+ * }
+ * }
+ * 
+ * + * Note about blocking behavior + *

This implementations of the methods in the stream sources must have certain guarantees about + * blocking behavior. One of the two characteristics must be fulfilled.

+ * + * + * @param The type of the records produced by this source. + */ +public interface StreamSource { + + /** + * Checks whether the stream has reached its end. + * + *

This method must obey the contract about blocking behavior declared in the + * description of this class.

+ * + * @return True, if the end of the stream has been reached, false if more data is available. + * + * @throws InterruptedException The calling thread may be interrupted to pull the function out of this + * method during checkpoints. + * @throws Exception Any other exception that is thrown causes the source to fail and results in failure of + * the streaming program, or triggers recovery, depending on the program setup. + */ + boolean reachedEnd() throws Exception; + + + /** + * Produces the next record. + * + *

This method must obey the contract about blocking behavior declared in the + * description of this class.

+ * + * @return The next record produced by this stream source. + * + * @throws InterruptedException The calling thread may be interrupted to pull the function out of this + * method during checkpoints. + * @throws Exception Any other exception that is thrown causes the source to fail and results in failure of + * the streaming program, or triggers recovery, depending on the program setup. + */ + T next() throws Exception; +}