Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
0888bb62
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0888bb62
编写于
12月 06, 2017
作者:
P
Piotr Nowojski
提交者:
Stefan Richter
1月 08, 2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-8214][streaming-tests] Collect results into proper mock in StreamMockEnvironment
上级
af6bdb60
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
111 addition
and
100 deletion
+111
-100
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
.../writer/RecordOrEventCollectingResultPartitionWriter.java
+103
-0
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
.../flink/streaming/runtime/tasks/StreamMockEnvironment.java
+8
-100
未找到文件。
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
0 → 100644
浏览文件 @
0888bb62
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.runtime.io.network.api.writer
;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.runtime.event.AbstractEvent
;
import
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer
;
import
org.apache.flink.runtime.io.network.api.serialization.EventSerializer
;
import
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
;
import
org.apache.flink.runtime.io.network.buffer.Buffer
;
import
org.apache.flink.runtime.io.network.buffer.BufferProvider
;
import
org.apache.flink.runtime.io.network.partition.ResultPartitionID
;
import
org.apache.flink.runtime.plugable.DeserializationDelegate
;
import
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
;
import
java.io.IOException
;
import
java.util.Collection
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkNotNull
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkState
;
/**
* {@link ResultPartitionWriter} that collects records or events on the List.
*/
public
class
RecordOrEventCollectingResultPartitionWriter
<
T
>
implements
ResultPartitionWriter
{
private
final
Collection
<
Object
>
output
;
private
final
BufferProvider
bufferProvider
;
private
final
NonReusingDeserializationDelegate
<
T
>
delegate
;
private
final
RecordDeserializer
<
DeserializationDelegate
<
T
>>
deserializer
=
new
AdaptiveSpanningRecordDeserializer
<>();
public
RecordOrEventCollectingResultPartitionWriter
(
Collection
<
Object
>
output
,
BufferProvider
bufferProvider
,
TypeSerializer
<
T
>
serializer
)
{
this
.
output
=
checkNotNull
(
output
);
this
.
bufferProvider
=
checkNotNull
(
bufferProvider
);
this
.
delegate
=
new
NonReusingDeserializationDelegate
<>(
checkNotNull
(
serializer
));
}
@Override
public
BufferProvider
getBufferProvider
()
{
return
bufferProvider
;
}
@Override
public
ResultPartitionID
getPartitionId
()
{
return
new
ResultPartitionID
();
}
@Override
public
int
getNumberOfSubpartitions
()
{
return
1
;
}
@Override
public
int
getNumTargetKeyGroups
()
{
return
1
;
}
@Override
public
void
writeBuffer
(
Buffer
buffer
,
int
targetChannel
)
throws
IOException
{
checkState
(
targetChannel
<
getNumberOfSubpartitions
());
if
(
buffer
.
isBuffer
())
{
deserializer
.
setNextBuffer
(
buffer
);
while
(
deserializer
.
hasUnfinishedData
())
{
RecordDeserializer
.
DeserializationResult
result
=
deserializer
.
getNextRecord
(
delegate
);
if
(
result
.
isFullRecord
())
{
output
.
add
(
delegate
.
getInstance
());
}
if
(
result
==
RecordDeserializer
.
DeserializationResult
.
LAST_RECORD_FROM_BUFFER
||
result
==
RecordDeserializer
.
DeserializationResult
.
PARTIAL_RECORD
)
{
break
;
}
}
}
else
{
// is event
AbstractEvent
event
=
EventSerializer
.
fromBuffer
(
buffer
,
getClass
().
getClassLoader
());
output
.
add
(
event
);
}
}
}
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
浏览文件 @
0888bb62
...
...
@@ -24,54 +24,39 @@ import org.apache.flink.api.common.TaskInfo;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.core.fs.Path
;
import
org.apache.flink.core.memory.MemorySegmentFactory
;
import
org.apache.flink.runtime.accumulators.AccumulatorRegistry
;
import
org.apache.flink.runtime.broadcast.BroadcastVariableManager
;
import
org.apache.flink.runtime.checkpoint.CheckpointMetrics
;
import
org.apache.flink.runtime.checkpoint.TaskStateSnapshot
;
import
org.apache.flink.runtime.event.AbstractEvent
;
import
org.apache.flink.runtime.execution.Environment
;
import
org.apache.flink.runtime.executiongraph.ExecutionAttemptID
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManager
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
;
import
org.apache.flink.runtime.io.network.TaskEventDispatcher
;
import
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer
;
import
org.apache.flink.runtime.io.network.api.serialization.EventSerializer
;
import
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
;
import
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter
;
import
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
;
import
org.apache.flink.runtime.io.network.buffer.Buffer
;
import
org.apache.flink.runtime.io.network.buffer.BufferProvider
;
import
org.apache.flink.runtime.io.network.buffer.BufferRecycler
;
import
org.apache.flink.runtime.io.network.partition.consumer.InputGate
;
import
org.apache.flink.runtime.io.network.util.TestPooledBufferProvider
;
import
org.apache.flink.runtime.jobgraph.JobVertexID
;
import
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider
;
import
org.apache.flink.runtime.memory.MemoryManager
;
import
org.apache.flink.runtime.metrics.groups.TaskMetricGroup
;
import
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups
;
import
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider
;
import
org.apache.flink.runtime.plugable.DeserializationDelegate
;
import
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
;
import
org.apache.flink.runtime.query.KvStateRegistry
;
import
org.apache.flink.runtime.query.TaskKvStateRegistry
;
import
org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo
;
import
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo
;
import
org.mockito.invocation.InvocationOnMock
;
import
org.mockito.stubbing.Answer
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Queue
;
import
java.util.concurrent.Future
;
import
static
org
.
junit
.
Assert
.
fail
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Matchers
.
anyInt
;
import
static
org
.
mockito
.
Mockito
.
doAnswer
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
/**
* Mock {@link Environment}.
...
...
@@ -144,51 +129,12 @@ public class StreamMockEnvironment implements Environment {
inputs
.
add
(
gate
);
}
public
<
T
>
void
addOutput
(
final
Queue
<
Object
>
outputList
,
final
TypeSerializer
<
T
>
serializer
)
{
public
<
T
>
void
addOutput
(
final
Collection
<
Object
>
outputList
,
final
TypeSerializer
<
T
>
serializer
)
{
try
{
// The record-oriented writers wrap the buffer writer. We mock it
// to collect the returned buffers and deserialize the content to
// the output list
BufferProvider
mockBufferProvider
=
mock
(
BufferProvider
.
class
);
when
(
mockBufferProvider
.
requestBufferBlocking
()).
thenAnswer
(
new
Answer
<
Buffer
>()
{
@Override
public
Buffer
answer
(
InvocationOnMock
invocationOnMock
)
throws
Throwable
{
return
new
Buffer
(
MemorySegmentFactory
.
allocateUnpooledSegment
(
bufferSize
),
mock
(
BufferRecycler
.
class
));
}
});
ResultPartitionWriter
mockWriter
=
mock
(
ResultPartitionWriter
.
class
);
when
(
mockWriter
.
getNumberOfSubpartitions
()).
thenReturn
(
1
);
when
(
mockWriter
.
getBufferProvider
()).
thenReturn
(
mockBufferProvider
);
final
RecordDeserializer
<
DeserializationDelegate
<
T
>>
recordDeserializer
=
new
AdaptiveSpanningRecordDeserializer
<
DeserializationDelegate
<
T
>>();
final
NonReusingDeserializationDelegate
<
T
>
delegate
=
new
NonReusingDeserializationDelegate
<
T
>(
serializer
);
// Add records and events from the buffer to the output list
doAnswer
(
new
Answer
<
Void
>()
{
@Override
public
Void
answer
(
InvocationOnMock
invocationOnMock
)
throws
Throwable
{
Buffer
buffer
=
(
Buffer
)
invocationOnMock
.
getArguments
()[
0
];
addBufferToOutputList
(
recordDeserializer
,
delegate
,
buffer
,
outputList
);
return
null
;
}
}).
when
(
mockWriter
).
writeBuffer
(
any
(
Buffer
.
class
),
anyInt
());
doAnswer
(
new
Answer
<
Void
>()
{
@Override
public
Void
answer
(
InvocationOnMock
invocationOnMock
)
throws
Throwable
{
Buffer
buffer
=
(
Buffer
)
invocationOnMock
.
getArguments
()[
0
];
addBufferToOutputList
(
recordDeserializer
,
delegate
,
buffer
,
outputList
);
return
null
;
}
}).
when
(
mockWriter
).
writeBufferToAllSubpartitions
(
any
(
Buffer
.
class
));
outputs
.
add
(
mockWriter
);
outputs
.
add
(
new
RecordOrEventCollectingResultPartitionWriter
<
T
>(
outputList
,
new
TestPooledBufferProvider
(
Integer
.
MAX_VALUE
),
serializer
));
}
catch
(
Throwable
t
)
{
t
.
printStackTrace
();
...
...
@@ -196,44 +142,6 @@ public class StreamMockEnvironment implements Environment {
}
}
/**
* Adds the object behind the given <tt>buffer</tt> to the <tt>outputList</tt>.
*
* @param recordDeserializer de-serializer to use for the buffer
* @param delegate de-serialization delegate to use for non-event buffers
* @param buffer the buffer to add
* @param outputList the output list to add the object to
* @param <T> type of the objects behind the non-event buffers
*
* @throws java.io.IOException
*/
private
<
T
>
void
addBufferToOutputList
(
RecordDeserializer
<
DeserializationDelegate
<
T
>>
recordDeserializer
,
NonReusingDeserializationDelegate
<
T
>
delegate
,
Buffer
buffer
,
final
Queue
<
Object
>
outputList
)
throws
java
.
io
.
IOException
{
if
(
buffer
.
isBuffer
())
{
recordDeserializer
.
setNextBuffer
(
buffer
);
while
(
recordDeserializer
.
hasUnfinishedData
())
{
RecordDeserializer
.
DeserializationResult
result
=
recordDeserializer
.
getNextRecord
(
delegate
);
if
(
result
.
isFullRecord
())
{
outputList
.
add
(
delegate
.
getInstance
());
}
if
(
result
==
RecordDeserializer
.
DeserializationResult
.
LAST_RECORD_FROM_BUFFER
||
result
==
RecordDeserializer
.
DeserializationResult
.
PARTIAL_RECORD
)
{
break
;
}
}
}
else
{
// is event
AbstractEvent
event
=
EventSerializer
.
fromBuffer
(
buffer
,
getClass
().
getClassLoader
());
outputList
.
add
(
event
);
}
}
@Override
public
Configuration
getTaskConfiguration
()
{
return
this
.
taskConfiguration
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录