提交 e0865673 编写于 作者: H hequn8128 提交者: Chesnay Schepler

[FLINK-10691][e2e] Remove dependency on hadoop for StreamSQL E2E test

上级 2c929d6c
......@@ -47,11 +47,6 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
......
......@@ -20,18 +20,23 @@ package org.apache.flink.sql.tests;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
......@@ -44,6 +49,7 @@ import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
import org.apache.flink.types.Row;
import java.io.PrintStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
......@@ -139,9 +145,14 @@ public class StreamSQLTestProgram {
DataStream<Row> resultStream =
tEnv.toAppendStream(result, Types.ROW(Types.INT, Types.SQL_TIMESTAMP));
// define bucketing sink to emit the result
BucketingSink<Row> sink = new BucketingSink<Row>(outputPath)
.setBucketer(new BasePathBucketer<>());
final StreamingFileSink<Row> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), (Encoder<Row>) (element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.toString());
})
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
resultStream
// inject a KillMapper that forwards all records but terminates the first execution attempt
......@@ -152,6 +163,24 @@ public class StreamSQLTestProgram {
sEnv.execute();
}
/**
* Use first field for buckets.
*/
public static final class KeyBucketAssigner implements BucketAssigner<Row, String> {
private static final long serialVersionUID = 987325769970523326L;
@Override
public String getBucketId(final Row element, final Context context) {
return String.valueOf(element.getField(0));
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
/**
* TableSource for generated data.
*/
......
......@@ -48,7 +48,7 @@ trap sql_cleanup INT
trap sql_cleanup EXIT
# collect results from files
cat $TEST_DATA_DIR/out/result/part-0-0 $TEST_DATA_DIR/out/result/_part-0-1.pending > $TEST_DATA_DIR/out/result-complete
cat $TEST_DATA_DIR/out/result/20/.part-* $TEST_DATA_DIR/out/result/20/part-* | sort > $TEST_DATA_DIR/out/result-complete
# check result:
# 20,1970-01-01 00:00:00.0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册