提交 de06d958 编写于 作者: G Gyula Fora

[scala] [streaming] Finished scala StreamExecutionEnvrionment functionality +...

[scala] [streaming] Finished scala StreamExecutionEnvrionment functionality + DataStream sinks + docs
上级 c123e11a
......@@ -212,7 +212,8 @@ public abstract class StreamExecutionEnvironment {
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set.
* the system's default character set. This functionality can be used for
* testing a topology.
*
* @param filePath
* The path of the file, as a URI (e.g.,
......@@ -350,8 +351,17 @@ public abstract class StreamExecutionEnvironment {
return addSource(new GenSequenceFunction(from, to));
}
private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function);
jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
/**
* Ads a data source thus opening a {@link DataStream}.
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
*
* @param function
* the user defined function
......@@ -371,11 +381,27 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function);
jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(SourceFunction)}
*
* @param function
* the user defined function
* @param outTypeInfo
* the user defined type information for the stream
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo) {
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
null, outTypeInfo, "source", 1);
return returnStream;
}
......
......@@ -25,7 +25,7 @@ import java.util.ArrayList;
/**
* Writes tuples in csv format.
*
*
* @param <IN>
* Input tuple type
*/
......@@ -37,8 +37,8 @@ public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
try {
PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
for (IN tupleToWrite : tupleList) {
outStream.println(tupleToWrite.toString().substring(1,
tupleToWrite.toString().length() - 1));
String strTuple = tupleToWrite.toString();
outStream.println(strTuple.substring(1, strTuple.length() - 1));
}
outStream.close();
} catch (IOException e) {
......
......@@ -35,13 +35,11 @@ public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
protected final String path;
protected ArrayList<IN> tupleList = new ArrayList<IN>();
protected final IN endTuple;
protected WriteFormat<IN> format;
public WriteSinkFunction(String path, WriteFormat<IN> format, IN endTuple) {
public WriteSinkFunction(String path, WriteFormat<IN> format) {
this.path = path;
this.format = format;
this.endTuple = endTuple;
cleanFile(path);
}
......@@ -82,16 +80,13 @@ public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
*/
@Override
public void invoke(IN tuple) {
if (!tuple.equals(endTuple)) {
tupleList.add(tuple);
if (updateCondition()) {
format.write(path, tupleList);
resetParameters();
}
} else {
tupleList.add(tuple);
if (updateCondition()) {
format.write(path, tupleList);
resetParameters();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.sink;
/**
* Implementation of WriteSinkFunction. Writes tuples to file in equally sized
* batches.
*
* @param <IN>
* Input tuple type
*/
public class WriteSinkFunctionByBatches<IN> extends WriteSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final int batchSize;
public WriteSinkFunctionByBatches(String path, WriteFormat<IN> format, int batchSize,
IN endTuple) {
super(path, format, endTuple);
this.batchSize = batchSize;
}
@Override
protected boolean updateCondition() {
return tupleList.size() >= batchSize;
}
@Override
protected void resetParameters() {
tupleList.clear();
}
}
......@@ -17,11 +17,10 @@
package org.apache.flink.streaming.api.function.sink;
/**
* Implementation of WriteSinkFunction. Writes tuples to file in every millis
* milliseconds.
*
*
* @param <IN>
* Input tuple type
*/
......@@ -31,8 +30,8 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
private final long millis;
private long lastTime;
public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis, IN endTuple) {
super(path, format, endTuple);
public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
super(path, format);
this.millis = millis;
lastTime = System.currentTimeMillis();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
public class WriteAsCsvTest {
private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsCsvTest.class.getSimpleName() + "_";
private static final long MEMORYSIZE = 32;
private static List<String> result1 = new ArrayList<String>();
private static List<String> result2 = new ArrayList<String>();
private static List<String> result3 = new ArrayList<String>();
private static List<String> result4 = new ArrayList<String>();
private static List<String> result5 = new ArrayList<String>();
private static List<String> expected1 = new ArrayList<String>();
private static List<String> expected2 = new ArrayList<String>();
private static List<String> expected3 = new ArrayList<String>();
private static List<String> expected4 = new ArrayList<String>();
private static List<String> expected5 = new ArrayList<String>();
public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 27; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
}
private static void readFile(String path, List<String> result) {
try {
BufferedReader br = new BufferedReader(new FileReader(path));
String line;
line = br.readLine();
while (line != null) {
result.add(line);
line = br.readLine();
}
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void fillExpected1() {
for (int i = 0; i < 27; i++) {
expected1.add(i + "");
}
}
private static void fillExpected2() {
for (int i = 0; i < 25; i++) {
expected2.add(i + "");
}
}
private static void fillExpected3() {
for (int i = 0; i < 20; i++) {
expected3.add(i + "");
}
}
private static void fillExpected4() {
for (int i = 0; i < 26; i++) {
expected4.add(i + "");
}
}
private static void fillExpected5() {
for (int i = 0; i < 14; i++) {
expected5.add(i + "");
}
for (int i = 15; i < 25; i++) {
expected5.add(i + "");
}
}
@BeforeClass
public static void createFileCleanup() {
Runnable r = new Runnable() {
@Override
public void run() {
try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
}
};
Runtime.getRuntime().addShutdownHook(new Thread(r));
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
fillExpected1();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test2.txt", 5);
fillExpected2();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test3.txt", 10);
fillExpected3();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
fillExpected4();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
fillExpected5();
env.execute();
readFile(PREFIX + "test1.txt", result1);
readFile(PREFIX + "test2.txt", result2);
readFile(PREFIX + "test3.txt", result3);
readFile(PREFIX + "test4.txt", result4);
readFile(PREFIX + "test5.txt", result5);
assertTrue(expected1.equals(result1));
assertTrue(expected2.equals(result2));
assertTrue(expected3.equals(result3));
assertTrue(expected4.equals(result4));
assertTrue(expected5.equals(result5));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
public class WriteAsTextTest {
private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsTextTest.class.getSimpleName() + "_";
private static final long MEMORYSIZE = 32;
private static List<String> result1 = new ArrayList<String>();
private static List<String> result2 = new ArrayList<String>();
private static List<String> result3 = new ArrayList<String>();
private static List<String> result4 = new ArrayList<String>();
private static List<String> result5 = new ArrayList<String>();
private static List<String> expected1 = new ArrayList<String>();
private static List<String> expected2 = new ArrayList<String>();
private static List<String> expected3 = new ArrayList<String>();
private static List<String> expected4 = new ArrayList<String>();
private static List<String> expected5 = new ArrayList<String>();
public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 27; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
}
private static void readFile(String path, List<String> result) {
try {
BufferedReader br = new BufferedReader(new FileReader(path));
String line;
line = br.readLine();
while (line != null) {
result.add(line);
line = br.readLine();
}
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void fillExpected1() {
for (int i = 0; i < 27; i++) {
expected1.add("(" + i + ")");
}
}
private static void fillExpected2() {
for (int i = 0; i < 25; i++) {
expected2.add("(" + i + ")");
}
}
private static void fillExpected3() {
for (int i = 0; i < 20; i++) {
expected3.add("(" + i + ")");
}
}
private static void fillExpected4() {
for (int i = 0; i < 26; i++) {
expected4.add("(" + i + ")");
}
}
private static void fillExpected5() {
for (int i = 0; i < 14; i++) {
expected5.add("(" + i + ")");
}
for (int i = 15; i < 25; i++) {
expected5.add("(" + i + ")");
}
}
@BeforeClass
public static void createFileCleanup() {
Runnable r = new Runnable() {
@Override
public void run() {
try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
}
};
Runtime.getRuntime().addShutdownHook(new Thread(r));
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
fillExpected1();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsText(PREFIX + "test2.txt", 5);
fillExpected2();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsText(PREFIX + "test3.txt", 10);
fillExpected3();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
fillExpected4();
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
fillExpected5();
env.execute();
readFile(PREFIX + "test1.txt", result1);
readFile(PREFIX + "test2.txt", result2);
readFile(PREFIX + "test3.txt", result3);
readFile(PREFIX + "test4.txt", result4);
readFile(PREFIX + "test5.txt", result5);
assertEquals(expected1,result1);
assertEquals(expected2,result2);
assertEquals(expected3,result3);
assertEquals(expected4,result4);
assertEquals(expected5,result5);
}
}
......@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.GroupedDataStream
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.function.sink.SinkFunction
class DataStream[T](javaStream: JavaStream[T]) {
......@@ -86,15 +87,36 @@ class DataStream[T](javaStream: JavaStream[T]) {
"parallelism.")
}
/**
* Creates a new DataStream by merging DataStream outputs of
* the same type with each other. The DataStreams merged using this operator
* will be transformed simultaneously.
*
*/
def merge(dataStreams: DataStream[T]*): DataStream[T] =
new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
/**
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations
*
*/
def groupBy(fields: Int*): DataStream[T] =
new DataStream[T](javaStream.groupBy(fields: _*))
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations
*
*/
def groupBy(firstField: String, otherFields: String*): DataStream[T] =
new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*))
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations
*
*/
def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
val keyExtractor = new KeySelector[T, K] {
......@@ -104,12 +126,27 @@ class DataStream[T](javaStream: JavaStream[T]) {
new DataStream[T](javaStream.groupBy(keyExtractor))
}
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy(fields: Int*): DataStream[T] =
new DataStream[T](javaStream.partitionBy(fields: _*))
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*))
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the given Key. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
val keyExtractor = new KeySelector[T, K] {
......@@ -119,56 +156,124 @@ class DataStream[T](javaStream: JavaStream[T]) {
new DataStream[T](javaStream.partitionBy(keyExtractor))
}
/**
* Sets the partitioning of the DataStream so that the output tuples
* are broadcasted to every parallel instance of the next component. This
* setting only effects the how the outputs will be distributed between the
* parallel instances of the next processing operator.
*
*/
def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
/**
* Sets the partitioning of the DataStream so that the output tuples
* are shuffled to the next component. This setting only effects the how the
* outputs will be distributed between the parallel instances of the next
* processing operator.
*
*/
def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
/**
* Sets the partitioning of the DataStream so that the output tuples
* are forwarded to the local subtask of the next component (whenever
* possible). This is the default partitioner setting. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
*/
def forward: DataStream[T] = new DataStream[T](javaStream.forward())
/**
* Sets the partitioning of the DataStream so that the output tuples
* are distributed evenly to the next component.This setting only effects
* the how the outputs will be distributed between the parallel instances of
* the next processing operator.
*
*/
def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
/**
* Applies an aggregation that that gives the current maximum of the data stream at
* the given position.
*
*/
def max(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.max(field))
case field: String => return new DataStream[T](javaStream.max(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current minimum of the data stream at
* the given position.
*
*/
def min(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.min(field))
case field: String => return new DataStream[T](javaStream.min(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that sums the data stream at the given position.
*
*/
def sum(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.sum(field))
case field: String => return new DataStream[T](javaStream.sum(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current maximum element of the data stream by
* the given position. When equality, returns the first.
*
*/
def maxBy(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.maxBy(field))
case field: String => return new DataStream[T](javaStream.maxBy(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given position. When equality, returns the first.
*
*/
def minBy(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.minBy(field))
case field: String => return new DataStream[T](javaStream.minBy(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given position. When equality, the user can set to get the first or last element with the minimal value.
*
*/
def minBy(field: Any, first: Boolean): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.minBy(field, first))
case field: String => return new DataStream[T](javaStream.minBy(field, first))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current maximum element of the data stream by
* the given position. When equality, the user can set to get the first or last element with the maximal value.
*
*/
def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
case field: String => return new DataStream[T](javaStream.maxBy(field, first))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Creates a new DataStream containing the current number (count) of
* received records.
*
*/
def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count())
/**
......@@ -239,7 +344,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
* Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce
* Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce
* function.
*/
def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
......@@ -253,7 +358,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
* Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce
* Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce
* function.
*/
def reduce(fun: (T, T) => T): DataStream[T] = {
......@@ -268,7 +373,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
* Creates a new DataSet that contains only the elements satisfying the given filter predicate.
* Creates a new DataStream that contains only the elements satisfying the given filter predicate.
*/
def filter(filter: FilterFunction[T]): DataStream[T] = {
if (filter == null) {
......@@ -277,6 +382,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
new DataStream[T](javaStream.filter(filter))
}
/**
* Creates a new DataStream that contains only the elements satisfying the given filter predicate.
*/
def filter(fun: T => Boolean): DataStream[T] = {
if (fun == null) {
throw new NullPointerException("Filter function must not be null.")
......@@ -288,6 +396,71 @@ class DataStream[T](javaStream: JavaStream[T]) {
this.filter(filter)
}
def print() = javaStream.print()
/**
* Writes a DataStream to the standard output stream (stdout). For each
* element of the DataStream the result of .toString is
* written.
*
*/
def print(): DataStream[T] = new DataStream[T](javaStream.print())
/**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds. For
* every element of the DataStream the result of .toString
* is written.
*
*/
def writeAsText(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis))
/**
* Writes a DataStream to the file specified by path in text format.
* For every element of the DataStream the result of .toString
* is written.
*
*/
def writeAsText(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsText(path))
/**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds. For
* every element of the DataStream the result of .toString
* is written.
*
*/
def writeAsCsv(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis))
/**
* Writes a DataStream to the file specified by path in text format.
* For every element of the DataStream the result of .toString
* is written.
*
*/
def writeAsCsv(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path))
/**
* Adds the given sink to this DataStream. Only streams with sinks added
* will be executed once the StreamExecutionEnvironment.execute(...)
* method is called.
*
*/
def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new DataStream[T](javaStream.addSink(sinkFuntion))
/**
* Adds the given sink to this DataStream. Only streams with sinks added
* will be executed once the StreamExecutionEnvironment.execute(...)
* method is called.
*
*/
def addSink(fun: T => Unit): DataStream[T] = {
if (fun == null) {
throw new NullPointerException("Sink function must not be null.")
}
val sinkFunction = new SinkFunction[T] {
val cleanFun = clean(fun)
def invoke(in: T) = cleanFun(in)
}
this.addSink(sinkFunction)
}
}
\ No newline at end of file
......@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.invokable.SourceInvokable
import org.apache.flink.streaming.api.function.source.FromElementsFunction
import org.apache.flink.streaming.api.function.source.SourceFunction
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
......@@ -44,10 +45,78 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
/**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. By default the output buffers flush frequently to provide
* low latency and to aid smooth developer experience. Setting the parameter
* can result in three logical modes:
*
* <ul>
* <li>
* A positive integer triggers flushing periodically by that integer</li>
* <li>
* 0 triggers flushing after every record thus minimizing latency</li>
* <li>
* -1 triggers flushing only when the output buffer is full thus maximizing
* throughput</li>
* </ul>
*
*/
def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
javaEnv.setBufferTimeout(timeoutMillis)
this
}
/**
* Gets the default buffer timeout set for this environment
*/
def getBufferTimout: Long = javaEnv.getBufferTimeout()
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
*/
def readTextFile(filePath: String): DataStream[String] =
new DataStream[String](javaEnv.readTextFile(filePath))
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set. This functionality can be used for
* testing a topology.
*
*/
def readTextStream(StreamPath: String): DataStream[String] =
new DataStream[String](javaEnv.readTextStream(StreamPath))
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
* character set.
*
*/
def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter))
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
* character set, uses '\n' as delimiter.
*
*/
def socketTextStream(hostname: String, port: Int): DataStream[String] =
new DataStream[String](javaEnv.socketTextStream(hostname, port))
/**
* Creates a new DataStream that contains a sequence of numbers.
*
*/
def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from, to))
/**
* Creates a new data stream that contains the given elements. The elements must all be of the
* Creates a DataStream that contains the given elements. The elements must all be of the
* same type and must be serializable.
*
* * Note that this operation will result in a non-parallel data source, i.e. a data source with
......@@ -78,8 +147,38 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
new DataStream(returnStream)
}
/**
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
*
*/
def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
Validate.notNull(function, "Function must not be null.")
val typeInfo = implicitly[TypeInformation[T]]
new DataStream[T](javaEnv.addSource(function, typeInfo))
}
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
* <p>
* The program execution will be logged and displayed with a generated
* default name.
*
*/
def execute() = javaEnv.execute()
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
* <p>
* The program execution will be logged and displayed with the provided name
*
*/
def execute(jobName: String) = javaEnv.execute(jobName)
}
object StreamExecutionEnvironment {
......@@ -108,7 +207,7 @@ object StreamExecutionEnvironment {
* Creates a remote execution environment. The remote environment sends (parts of) the program to
* a cluster for execution. Note that all file paths used in the program must be accessible from
* the cluster. The execution will use the cluster's default degree of parallelism, unless the
* parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
* parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册