Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
c4430e67
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c4430e67
编写于
10月 10, 2017
作者:
Z
zentol
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[refactor] [tests] Generalize gateway mocking in ClusterClientTest
上级
593ce53f
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
47 addition
and
24 deletion
+47
-24
flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
...va/org/apache/flink/client/program/ClusterClientTest.java
+47
-24
未找到文件。
flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
浏览文件 @
c4430e67
...
...
@@ -133,50 +133,38 @@ public class ClusterClientTest extends TestLogger {
}
}
private
static
class
TestCancelActorGateway
extends
DummyActorGateway
{
private
static
class
TestCancelActorGateway
extends
TestActorGateway
<
JobManagerMessages
.
CancelJob
,
JobManagerMessages
.
CancellationSuccess
>
{
private
final
JobID
expectedJobID
;
private
volatile
boolean
messageArrived
=
false
;
TestCancelActorGateway
(
JobID
expectedJobID
)
{
super
(
JobManagerMessages
.
CancelJob
.
class
);
this
.
expectedJobID
=
expectedJobID
;
}
@Override
public
Future
<
Object
>
ask
(
Object
message
,
FiniteDuration
timeout
)
{
messageArrived
=
true
;
if
(
message
instanceof
JobManagerMessages
.
CancelJob
)
{
JobManagerMessages
.
CancelJob
cancelJob
=
(
JobManagerMessages
.
CancelJob
)
message
;
Assert
.
assertEquals
(
expectedJobID
,
cancelJob
.
jobID
());
return
Future
$
.
MODULE
$
.
successful
(
new
JobManagerMessages
.
CancellationSuccess
(
cancelJob
.
jobID
(),
null
));
}
Assert
.
fail
(
"Expected CancelJob message, got: "
+
message
.
getClass
());
return
null
;
public
JobManagerMessages
.
CancellationSuccess
process
(
JobManagerMessages
.
CancelJob
message
)
{
Assert
.
assertEquals
(
expectedJobID
,
message
.
jobID
());
return
new
JobManagerMessages
.
CancellationSuccess
(
message
.
jobID
(),
null
);
}
}
private
static
class
TestCancelWithSavepointActorGateway
extends
DummyActorGateway
{
private
static
class
TestCancelWithSavepointActorGateway
extends
TestActorGateway
<
JobManagerMessages
.
CancelJobWithSavepoint
,
JobManagerMessages
.
CancellationSuccess
>
{
private
final
JobID
expectedJobID
;
private
final
String
expectedTargetDirectory
;
private
volatile
boolean
messageArrived
=
false
;
TestCancelWithSavepointActorGateway
(
JobID
expectedJobID
,
String
expectedTargetDirectory
)
{
super
(
JobManagerMessages
.
CancelJobWithSavepoint
.
class
);
this
.
expectedJobID
=
expectedJobID
;
this
.
expectedTargetDirectory
=
expectedTargetDirectory
;
}
@Override
public
Future
<
Object
>
ask
(
Object
message
,
FiniteDuration
timeout
)
{
messageArrived
=
true
;
if
(
message
instanceof
JobManagerMessages
.
CancelJobWithSavepoint
)
{
JobManagerMessages
.
CancelJobWithSavepoint
cancelJob
=
(
JobManagerMessages
.
CancelJobWithSavepoint
)
message
;
Assert
.
assertEquals
(
expectedJobID
,
cancelJob
.
jobID
());
Assert
.
assertEquals
(
expectedTargetDirectory
,
cancelJob
.
savepointDirectory
());
return
Future
$
.
MODULE
$
.
successful
(
new
JobManagerMessages
.
CancellationSuccess
(
cancelJob
.
jobID
(),
null
));
}
Assert
.
fail
(
"Expected CancelJobWithSavepoint message, got: "
+
message
.
getClass
());
return
null
;
public
JobManagerMessages
.
CancellationSuccess
process
(
JobManagerMessages
.
CancelJobWithSavepoint
message
)
{
Assert
.
assertEquals
(
expectedJobID
,
message
.
jobID
());
Assert
.
assertEquals
(
expectedTargetDirectory
,
message
.
savepointDirectory
());
return
new
JobManagerMessages
.
CancellationSuccess
(
message
.
jobID
(),
null
);
}
}
...
...
@@ -184,7 +172,7 @@ public class ClusterClientTest extends TestLogger {
private
final
ActorGateway
jobmanagerGateway
;
public
TestClusterClient
(
Configuration
config
,
ActorGateway
jobmanagerGateway
)
throws
Exception
{
TestClusterClient
(
Configuration
config
,
ActorGateway
jobmanagerGateway
)
throws
Exception
{
super
(
config
);
this
.
jobmanagerGateway
=
jobmanagerGateway
;
}
...
...
@@ -194,4 +182,39 @@ public class ClusterClientTest extends TestLogger {
return
jobmanagerGateway
;
}
}
/**
* Utility class for hiding akka/scala details.
*
* @param <M> expected type of incoming requests
* @param <R> type of outgoing requests
*/
private
abstract
static
class
TestActorGateway
<
M
,
R
>
extends
DummyActorGateway
{
private
final
Class
<
M
>
messageClass
;
volatile
boolean
messageArrived
=
false
;
TestActorGateway
(
Class
<
M
>
messageClass
)
{
this
.
messageClass
=
messageClass
;
}
@Override
@SuppressWarnings
(
"unchecked"
)
public
Future
<
Object
>
ask
(
Object
message
,
FiniteDuration
timeout
)
{
messageArrived
=
true
;
if
(
message
.
getClass
().
isAssignableFrom
(
messageClass
))
{
return
Future
$
.
MODULE
$
.
successful
(
process
((
M
)
message
));
}
Assert
.
fail
(
"Expected TriggerSavepoint message, got: "
+
message
.
getClass
());
return
null
;
}
/**
* Processes the incoming message and verifies it's correctness. Implementations may directly throw unchecked
* exceptions (like JUnit asserts) in case of errors or faulty behaviors.
*
* @param message incoming message
* @return response in case of success
*/
public
abstract
R
process
(
M
message
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录