Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
24f7fa9e
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
24f7fa9e
编写于
9月 02, 2015
作者:
H
HuangWHWHW
提交者:
Maximilian Michels
9月 08, 2015
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-2480][test] add a prefix test for PrintSinkFunction
- improve test layout This closes #1073.
上级
97fb9a47
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
41 addition
and
38 deletion
+41
-38
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
.../flink/streaming/api/functions/PrintSinkFunctionTest.java
+41
-38
未找到文件。
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
浏览文件 @
24f7fa9e
...
...
@@ -19,51 +19,30 @@ package org.apache.flink.streaming.api.functions;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext
;
import
static
org
.
junit
.
Assert
.*;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
org.mockito.Mockito
;
import
java.io.*
;
import
java.io.ByteArrayOutputStream
;
import
java.io.PrintStream
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
*/
public
class
PrintSinkFunctionTest
<
IN
>
extends
RichSinkFunction
<
IN
>
{
private
static
final
long
serialVersionUID
=
-
7194618347883773533L
;
public
class
PrintSinkFunctionTest
{
public
PrintStream
printStreamOriginal
=
System
.
out
;
public
class
printStreamMock
extends
PrintStream
{
public
String
result
;
public
printStreamMock
(
OutputStream
out
)
{
super
(
out
);
}
@Override
public
void
println
(
String
x
)
{
this
.
result
=
x
;
}
}
public
OutputStream
out
=
new
OutputStream
()
{
@Override
public
void
write
(
int
b
)
throws
IOException
{
}
};
private
String
line
=
System
.
lineSeparator
();
@Test
public
void
testPrintSinkStdOut
(){
printStreamMock
stream
=
new
printStreamMock
(
out
);
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
();
PrintStream
stream
=
new
PrintStream
(
baos
);
System
.
setOut
(
stream
);
final
StreamingRuntimeContext
ctx
=
Mockito
.
mock
(
StreamingRuntimeContext
.
class
);
...
...
@@ -73,21 +52,22 @@ public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
try
{
printSink
.
open
(
new
Configuration
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
Assert
.
fail
();
}
printSink
.
setTargetToStandardOut
();
printSink
.
invoke
(
"hello world!"
);
assertEquals
(
"Print to System.out"
,
printSink
.
toString
());
assertEquals
(
"hello world!"
,
stream
.
result
);
assertEquals
(
"hello world!"
+
line
,
baos
.
toString
()
);
printSink
.
close
();
stream
.
close
();
}
@Test
public
void
testPrintSinkStdErr
(){
printStreamMock
stream
=
new
printStreamMock
(
out
);
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
();
PrintStream
stream
=
new
PrintStream
(
baos
);
System
.
setOut
(
stream
);
final
StreamingRuntimeContext
ctx
=
Mockito
.
mock
(
StreamingRuntimeContext
.
class
);
...
...
@@ -97,20 +77,43 @@ public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
try
{
printSink
.
open
(
new
Configuration
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
Assert
.
fail
();
}
printSink
.
setTargetToStandardErr
();
printSink
.
invoke
(
"hello world!"
);
assertEquals
(
"Print to System.err"
,
printSink
.
toString
());
assertEquals
(
"hello world!"
,
stream
.
result
);
assertEquals
(
"hello world!"
+
line
,
baos
.
toString
()
);
printSink
.
close
();
stream
.
close
();
}
@Override
public
void
invoke
(
IN
record
)
{
@Test
public
void
testPrintSinkWithPrefix
(){
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
();
PrintStream
stream
=
new
PrintStream
(
baos
);
System
.
setOut
(
stream
);
final
StreamingRuntimeContext
ctx
=
Mockito
.
mock
(
StreamingRuntimeContext
.
class
);
Mockito
.
when
(
ctx
.
getNumberOfParallelSubtasks
()).
thenReturn
(
2
);
Mockito
.
when
(
ctx
.
getIndexOfThisSubtask
()).
thenReturn
(
1
);
PrintSinkFunction
<
String
>
printSink
=
new
PrintSinkFunction
<>();
printSink
.
setRuntimeContext
(
ctx
);
try
{
printSink
.
open
(
new
Configuration
());
}
catch
(
Exception
e
)
{
Assert
.
fail
();
}
printSink
.
setTargetToStandardErr
();
printSink
.
invoke
(
"hello world!"
);
assertEquals
(
"Print to System.err"
,
printSink
.
toString
());
assertEquals
(
"2> hello world!"
+
line
,
baos
.
toString
());
printSink
.
close
();
stream
.
close
();
}
@After
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录