Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
0073204b
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0073204b
编写于
9月 04, 2017
作者:
S
Stefan Richter
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry.
上级
1ebd44a6
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
449 addition
and
192 deletion
+449
-192
flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
...main/java/org/apache/flink/core/fs/CloseableRegistry.java
+5
-4
flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
.../org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+16
-19
flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
...java/org/apache/flink/util/AbstractCloseableRegistry.java
+77
-15
flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
...rc/main/java/org/apache/flink/util/WrappingProxyUtil.java
+7
-1
flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
...g/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+223
-0
flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
.../java/org/apache/flink/core/fs/CloseableRegistryTest.java
+59
-0
flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
.../apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+60
-151
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
...pi/operators/StateSnapshotContextSynchronousImplTest.java
+2
-2
未找到文件。
flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
浏览文件 @
0073204b
...
...
@@ -21,8 +21,9 @@ package org.apache.flink.core.fs;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.util.AbstractCloseableRegistry
;
import
javax.annotation.Nonnull
;
import
java.io.Closeable
;
import
java.io.IOException
;
import
java.util.HashMap
;
import
java.util.Map
;
...
...
@@ -39,16 +40,16 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
private
static
final
Object
DUMMY
=
new
Object
();
public
CloseableRegistry
()
{
super
(
new
HashMap
<
Closeable
,
Object
>());
super
(
new
HashMap
<>());
}
@Override
protected
void
doRegister
(
Closeable
closeable
,
Map
<
Closeable
,
Object
>
closeableMap
)
throws
IOException
{
protected
void
doRegister
(
@Nonnull
Closeable
closeable
,
@Nonnull
Map
<
Closeable
,
Object
>
closeableMap
)
{
closeableMap
.
put
(
closeable
,
DUMMY
);
}
@Override
protected
void
doUnRegister
(
Closeable
closeable
,
Map
<
Closeable
,
Object
>
closeableMap
)
{
protected
void
doUnRegister
(
@Nonnull
Closeable
closeable
,
@Nonnull
Map
<
Closeable
,
Object
>
closeableMap
)
{
closeableMap
.
remove
(
closeable
);
}
}
flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
浏览文件 @
0073204b
...
...
@@ -27,7 +27,8 @@ import org.apache.flink.util.WrappingProxyUtil;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.annotation.concurrent.GuardedBy
;
import
javax.annotation.Nonnull
;
import
java.io.Closeable
;
import
java.io.IOException
;
import
java.lang.ref.PhantomReference
;
...
...
@@ -53,19 +54,17 @@ public class SafetyNetCloseableRegistry extends
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
SafetyNetCloseableRegistry
.
class
);
/** Lock for a
ccessing
reaper thread and registry count */
/** Lock for a
tomic modifications to
reaper thread and registry count */
private
static
final
Object
REAPER_THREAD_LOCK
=
new
Object
();
/** Singleton reaper thread takes care of all registries in VM */
@GuardedBy
(
"REAPER_THREAD_LOCK"
)
private
static
CloseableReaperThread
REAPER_THREAD
=
null
;
/** Global count of all instances of SafetyNetCloseableRegistry */
@GuardedBy
(
"REAPER_THREAD_LOCK"
)
private
static
int
GLOBAL_SAFETY_NET_REGISTRY_COUNT
=
0
;
public
SafetyNetCloseableRegistry
()
{
super
(
new
IdentityHashMap
<
Closeable
,
PhantomDelegatingCloseableRef
>());
SafetyNetCloseableRegistry
()
{
super
(
new
IdentityHashMap
<>());
synchronized
(
REAPER_THREAD_LOCK
)
{
if
(
0
==
GLOBAL_SAFETY_NET_REGISTRY_COUNT
)
{
...
...
@@ -79,8 +78,8 @@ public class SafetyNetCloseableRegistry extends
@Override
protected
void
doRegister
(
WrappingProxyCloseable
<?
extends
Closeable
>
wrappingProxyCloseable
,
Map
<
Closeable
,
PhantomDelegatingCloseableRef
>
closeableMap
)
throws
IOException
{
@Nonnull
WrappingProxyCloseable
<?
extends
Closeable
>
wrappingProxyCloseable
,
@Nonnull
Map
<
Closeable
,
PhantomDelegatingCloseableRef
>
closeableMap
)
{
assert
Thread
.
holdsLock
(
getSynchronizationLock
());
...
...
@@ -100,8 +99,8 @@ public class SafetyNetCloseableRegistry extends
@Override
protected
void
doUnRegister
(
WrappingProxyCloseable
<?
extends
Closeable
>
closeable
,
Map
<
Closeable
,
PhantomDelegatingCloseableRef
>
closeableMap
)
{
@Nonnull
WrappingProxyCloseable
<?
extends
Closeable
>
closeable
,
@Nonnull
Map
<
Closeable
,
PhantomDelegatingCloseableRef
>
closeableMap
)
{
assert
Thread
.
holdsLock
(
getSynchronizationLock
());
...
...
@@ -131,7 +130,7 @@ public class SafetyNetCloseableRegistry extends
}
@VisibleForTesting
public
static
boolean
isReaperThreadRunning
()
{
static
boolean
isReaperThreadRunning
()
{
synchronized
(
REAPER_THREAD_LOCK
)
{
return
null
!=
REAPER_THREAD
&&
REAPER_THREAD
.
isAlive
();
}
...
...
@@ -148,10 +147,10 @@ public class SafetyNetCloseableRegistry extends
private
final
SafetyNetCloseableRegistry
closeableRegistry
;
private
final
String
debugString
;
public
PhantomDelegatingCloseableRef
(
WrappingProxyCloseable
<?
extends
Closeable
>
referent
,
SafetyNetCloseableRegistry
closeableRegistry
,
ReferenceQueue
<?
super
WrappingProxyCloseable
<?
extends
Closeable
>>
q
)
{
PhantomDelegatingCloseableRef
(
WrappingProxyCloseable
<?
extends
Closeable
>
referent
,
SafetyNetCloseableRegistry
closeableRegistry
,
ReferenceQueue
<?
super
WrappingProxyCloseable
<?
extends
Closeable
>>
q
)
{
super
(
referent
,
q
);
this
.
innerCloseable
=
Preconditions
.
checkNotNull
(
WrappingProxyUtil
.
stripProxy
(
referent
));
...
...
@@ -159,15 +158,13 @@ public class SafetyNetCloseableRegistry extends
this
.
debugString
=
referent
.
toString
();
}
public
String
getDebugString
()
{
String
getDebugString
()
{
return
debugString
;
}
@Override
public
void
close
()
throws
IOException
{
synchronized
(
closeableRegistry
.
getSynchronizationLock
())
{
closeableRegistry
.
closeableToRef
.
remove
(
innerCloseable
);
}
closeableRegistry
.
removeCloseableInternal
(
innerCloseable
);
innerCloseable
.
close
();
}
}
...
...
flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
浏览文件 @
0073204b
...
...
@@ -19,9 +19,15 @@
package
org.apache.flink.util
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.annotation.VisibleForTesting
;
import
javax.annotation.Nonnull
;
import
javax.annotation.concurrent.GuardedBy
;
import
java.io.Closeable
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.Map
;
/**
...
...
@@ -38,11 +44,20 @@ import java.util.Map;
@Internal
public
abstract
class
AbstractCloseableRegistry
<
C
extends
Closeable
,
T
>
implements
Closeable
{
protected
final
Map
<
Closeable
,
T
>
closeableToRef
;
/** Lock that guards state of this registry. **/
private
final
Object
lock
;
/** Map from tracked Closeables to some associated meta data. */
@GuardedBy
(
"lock"
)
private
final
Map
<
Closeable
,
T
>
closeableToRef
;
/** Indicates if this registry is closed. */
@GuardedBy
(
"lock"
)
private
boolean
closed
;
public
AbstractCloseableRegistry
(
Map
<
Closeable
,
T
>
closeableToRef
)
{
this
.
closeableToRef
=
closeableToRef
;
public
AbstractCloseableRegistry
(
@Nonnull
Map
<
Closeable
,
T
>
closeableToRef
)
{
this
.
lock
=
new
Object
();
this
.
closeableToRef
=
Preconditions
.
checkNotNull
(
closeableToRef
);
this
.
closed
=
false
;
}
...
...
@@ -51,7 +66,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
* {@link IllegalStateException} and closes the passed {@link Closeable}.
*
* @param closeable Closeable tor register
*
* @throws IOException exception when the registry was closed before
*/
public
final
void
registerClosable
(
C
closeable
)
throws
IOException
{
...
...
@@ -61,13 +75,14 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
}
synchronized
(
getSynchronizationLock
())
{
if
(
closed
)
{
IOUtils
.
closeQuietly
(
closeable
);
throw
new
IOException
(
"Cannot register Closeable, registry is already closed. Closing argument."
)
;
if
(
!
closed
)
{
doRegister
(
closeable
,
closeableToRef
);
return
;
}
doRegister
(
closeable
,
closeableToRef
);
}
IOUtils
.
closeQuietly
(
closeable
);
throw
new
IOException
(
"Cannot register Closeable, registry is already closed. Closing argument."
);
}
/**
...
...
@@ -88,18 +103,22 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
@Override
public
void
close
()
throws
IOException
{
Collection
<
Closeable
>
toCloseCopy
;
synchronized
(
getSynchronizationLock
())
{
if
(
closed
)
{
return
;
}
IOUtils
.
closeAllQuietly
(
closeableToRef
.
keySet
())
;
closed
=
true
;
closeableToRef
.
clear
(
);
toCloseCopy
=
new
ArrayList
<>(
closeableToRef
.
keySet
()
);
close
d
=
true
;
close
ableToRef
.
clear
()
;
}
IOUtils
.
closeAllQuietly
(
toCloseCopy
);
}
public
boolean
isClosed
()
{
...
...
@@ -108,11 +127,54 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
}
}
/**
* Does the actual registration of the closeable with the registry map. This should not do any long running or
* potentially blocking operations as is is executed under the registry's lock.
*/
protected
abstract
void
doRegister
(
@Nonnull
C
closeable
,
@Nonnull
Map
<
Closeable
,
T
>
closeableMap
);
/**
* Does the actual un-registration of the closeable from the registry map. This should not do any long running or
* potentially blocking operations as is is executed under the registry's lock.
*/
protected
abstract
void
doUnRegister
(
@Nonnull
C
closeable
,
@Nonnull
Map
<
Closeable
,
T
>
closeableMap
);
/**
* Returns the lock on which manipulations to members closeableToRef and closeable must be synchronized.
*/
protected
final
Object
getSynchronizationLock
()
{
return
closeableToRef
;
return
lock
;
}
protected
abstract
void
doUnRegister
(
C
closeable
,
Map
<
Closeable
,
T
>
closeableMap
);
/**
* Adds a mapping to the registry map, respecting locking.
*/
protected
final
void
addCloseableInternal
(
Closeable
closeable
,
T
metaData
)
{
synchronized
(
getSynchronizationLock
())
{
closeableToRef
.
put
(
closeable
,
metaData
);
}
}
protected
abstract
void
doRegister
(
C
closeable
,
Map
<
Closeable
,
T
>
closeableMap
)
throws
IOException
;
/**
* Removes a mapping from the registry map, respecting locking.
*/
protected
final
void
removeCloseableInternal
(
Closeable
closeable
)
{
synchronized
(
getSynchronizationLock
())
{
closeableToRef
.
remove
(
closeable
);
}
}
@VisibleForTesting
public
final
int
getNumberOfRegisteredCloseables
()
{
synchronized
(
getSynchronizationLock
())
{
return
closeableToRef
.
size
();
}
}
@VisibleForTesting
public
final
boolean
isCloseableRegistered
(
Closeable
c
)
{
synchronized
(
getSynchronizationLock
())
{
return
closeableToRef
.
containsKey
(
c
);
}
}
}
flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
浏览文件 @
0073204b
...
...
@@ -27,10 +27,16 @@ public final class WrappingProxyUtil {
throw
new
AssertionError
();
}
@SuppressWarnings
(
"unchecked"
)
public
static
<
T
>
T
stripProxy
(
T
object
)
{
while
(
object
instanceof
WrappingProxy
)
{
T
previous
=
null
;
while
(
object
instanceof
WrappingProxy
&&
previous
!=
object
)
{
previous
=
object
;
object
=
((
WrappingProxy
<
T
>)
object
).
getWrappedDelegate
();
}
return
object
;
}
}
flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
0 → 100644
浏览文件 @
0073204b
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.core.fs
;
import
org.apache.flink.core.testutils.OneShotLatch
;
import
org.apache.flink.util.AbstractCloseableRegistry
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.io.Closeable
;
import
java.io.IOException
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
static
org
.
mockito
.
Mockito
.
doAnswer
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
spy
;
public
abstract
class
AbstractCloseableRegistryTest
<
C
extends
Closeable
,
T
>
{
protected
ProducerThread
[]
streamOpenThreads
;
protected
AbstractCloseableRegistry
<
C
,
T
>
closeableRegistry
;
protected
AtomicInteger
unclosedCounter
;
protected
abstract
C
createCloseable
();
protected
abstract
AbstractCloseableRegistry
<
C
,
T
>
createRegistry
();
protected
abstract
ProducerThread
<
C
,
T
>
createProducerThread
(
AbstractCloseableRegistry
<
C
,
T
>
registry
,
AtomicInteger
unclosedCounter
,
int
maxStreams
);
public
void
setup
(
int
maxStreams
)
{
Assert
.
assertFalse
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
this
.
closeableRegistry
=
createRegistry
();
this
.
unclosedCounter
=
new
AtomicInteger
(
0
);
this
.
streamOpenThreads
=
new
ProducerThread
[
10
];
for
(
int
i
=
0
;
i
<
streamOpenThreads
.
length
;
++
i
)
{
streamOpenThreads
[
i
]
=
createProducerThread
(
closeableRegistry
,
unclosedCounter
,
maxStreams
);
}
}
protected
void
startThreads
()
{
for
(
ProducerThread
t
:
streamOpenThreads
)
{
t
.
start
();
}
}
protected
void
joinThreads
()
throws
InterruptedException
{
for
(
Thread
t
:
streamOpenThreads
)
{
t
.
join
();
}
}
@Test
public
void
testClose
()
throws
Exception
{
setup
(
Integer
.
MAX_VALUE
);
startThreads
();
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
System
.
gc
();
Thread
.
sleep
(
40
);
}
closeableRegistry
.
close
();
joinThreads
();
Assert
.
assertEquals
(
0
,
unclosedCounter
.
get
());
Assert
.
assertEquals
(
0
,
closeableRegistry
.
getNumberOfRegisteredCloseables
());
final
C
testCloseable
=
spy
(
createCloseable
());
try
{
closeableRegistry
.
registerClosable
(
testCloseable
);
Assert
.
fail
(
"Closed registry should not accept closeables!"
);
}
catch
(
IOException
expected
)
{
//expected
}
Assert
.
assertEquals
(
0
,
unclosedCounter
.
get
());
Assert
.
assertEquals
(
0
,
closeableRegistry
.
getNumberOfRegisteredCloseables
());
verify
(
testCloseable
).
close
();
}
@Test
public
void
testNonBlockingClose
()
throws
Exception
{
setup
(
Integer
.
MAX_VALUE
);
final
OneShotLatch
waitRegistryClosedLatch
=
new
OneShotLatch
();
final
OneShotLatch
blockCloseLatch
=
new
OneShotLatch
();
final
C
spyCloseable
=
spy
(
createCloseable
());
doAnswer
(
invocationOnMock
->
{
invocationOnMock
.
callRealMethod
();
waitRegistryClosedLatch
.
trigger
();
blockCloseLatch
.
await
();
return
null
;
}).
when
(
spyCloseable
).
close
();
closeableRegistry
.
registerClosable
(
spyCloseable
);
Assert
.
assertEquals
(
1
,
closeableRegistry
.
getNumberOfRegisteredCloseables
());
Thread
closer
=
new
Thread
(()
->
{
try
{
closeableRegistry
.
close
();
}
catch
(
IOException
ignore
)
{
}
});
closer
.
start
();
waitRegistryClosedLatch
.
await
();
final
C
testCloseable
=
spy
(
createCloseable
());
try
{
closeableRegistry
.
registerClosable
(
testCloseable
);
Assert
.
fail
(
"Closed registry should not accept closeables!"
);
}
catch
(
IOException
ignore
)
{
}
blockCloseLatch
.
trigger
();
closer
.
join
();
verify
(
spyCloseable
).
close
();
verify
(
testCloseable
).
close
();
Assert
.
assertEquals
(
0
,
closeableRegistry
.
getNumberOfRegisteredCloseables
());
}
protected
static
abstract
class
ProducerThread
<
C
extends
Closeable
,
T
>
extends
Thread
{
protected
final
AbstractCloseableRegistry
<
C
,
T
>
registry
;
protected
final
AtomicInteger
refCount
;
protected
final
int
maxStreams
;
protected
int
numStreams
;
public
ProducerThread
(
AbstractCloseableRegistry
<
C
,
T
>
registry
,
AtomicInteger
refCount
,
int
maxStreams
)
{
this
.
registry
=
registry
;
this
.
refCount
=
refCount
;
this
.
maxStreams
=
maxStreams
;
this
.
numStreams
=
0
;
}
protected
abstract
void
createAndRegisterStream
()
throws
IOException
;
@Override
public
void
run
()
{
try
{
while
(
numStreams
<
maxStreams
)
{
createAndRegisterStream
();
try
{
Thread
.
sleep
(
2
);
}
catch
(
InterruptedException
ignored
)
{}
if
(
maxStreams
!=
Integer
.
MAX_VALUE
)
{
++
numStreams
;
}
}
}
catch
(
Exception
ex
)
{
// ignored
}
}
}
protected
static
final
class
TestStream
extends
FSDataInputStream
{
protected
AtomicInteger
refCount
;
public
TestStream
(
AtomicInteger
refCount
)
{
this
.
refCount
=
refCount
;
refCount
.
incrementAndGet
();
}
@Override
public
void
seek
(
long
desired
)
throws
IOException
{
}
@Override
public
long
getPos
()
throws
IOException
{
return
0
;
}
@Override
public
int
read
()
throws
IOException
{
return
0
;
}
@Override
public
synchronized
void
close
()
throws
IOException
{
if
(
refCount
!=
null
)
{
refCount
.
decrementAndGet
();
refCount
=
null
;
}
}
}
}
flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
0 → 100644
浏览文件 @
0073204b
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.core.fs
;
import
org.apache.flink.util.AbstractCloseableRegistry
;
import
java.io.Closeable
;
import
java.io.IOException
;
import
java.util.concurrent.atomic.AtomicInteger
;
public
class
CloseableRegistryTest
extends
AbstractCloseableRegistryTest
<
Closeable
,
Object
>
{
@Override
protected
Closeable
createCloseable
()
{
return
new
Closeable
()
{
@Override
public
void
close
()
throws
IOException
{
}
};
}
@Override
protected
AbstractCloseableRegistry
<
Closeable
,
Object
>
createRegistry
()
{
return
new
CloseableRegistry
();
}
@Override
protected
ProducerThread
<
Closeable
,
Object
>
createProducerThread
(
AbstractCloseableRegistry
<
Closeable
,
Object
>
registry
,
AtomicInteger
unclosedCounter
,
int
maxStreams
)
{
return
new
ProducerThread
<
Closeable
,
Object
>(
registry
,
unclosedCounter
,
maxStreams
)
{
@Override
protected
void
createAndRegisterStream
()
throws
IOException
{
TestStream
testStream
=
new
TestStream
(
unclosedCounter
);
registry
.
registerClosable
(
testStream
);
}
};
}
}
flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
浏览文件 @
0073204b
...
...
@@ -19,7 +19,9 @@
package
org.apache.flink.core.fs
;
import
org.apache.flink.core.testutils.CheckedThread
;
import
org.apache.flink.util.AbstractCloseableRegistry
;
import
org.apache.flink.util.ExceptionUtils
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Rule
;
...
...
@@ -30,41 +32,71 @@ import java.io.Closeable;
import
java.io.IOException
;
import
java.util.concurrent.atomic.AtomicInteger
;
public
class
SafetyNetCloseableRegistryTest
{
public
class
SafetyNetCloseableRegistryTest
extends
AbstractCloseableRegistryTest
<
WrappingProxyCloseable
<?
extends
Closeable
>,
SafetyNetCloseableRegistry
.
PhantomDelegatingCloseableRef
>
{
@Rule
public
final
TemporaryFolder
tmpFolder
=
new
TemporaryFolder
();
private
ProducerThread
[]
streamOpenThreads
;
pr
ivate
SafetyNetCloseableRegistry
closeableRegistry
;
private
AtomicInteger
unclosedCounter
;
@Override
pr
otected
WrappingProxyCloseable
<?
extends
Closeable
>
createCloseable
()
{
return
new
WrappingProxyCloseable
<
Closeable
>()
{
public
void
setup
()
{
Assert
.
assertFalse
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
this
.
closeableRegistry
=
new
SafetyNetCloseableRegistry
();
this
.
unclosedCounter
=
new
AtomicInteger
(
0
);
this
.
streamOpenThreads
=
new
ProducerThread
[
10
];
for
(
int
i
=
0
;
i
<
streamOpenThreads
.
length
;
++
i
)
{
streamOpenThreads
[
i
]
=
new
ProducerThread
(
closeableRegistry
,
unclosedCounter
,
Integer
.
MAX_VALUE
);
}
@Override
public
void
close
()
throws
IOException
{
}
@Override
public
Closeable
getWrappedDelegate
()
{
return
this
;
}
};
}
@After
public
void
tearDown
()
{
Assert
.
assertFalse
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
@Override
protected
AbstractCloseableRegistry
<
WrappingProxyCloseable
<?
extends
Closeable
>,
SafetyNetCloseableRegistry
.
PhantomDelegatingCloseableRef
>
createRegistry
()
{
return
new
SafetyNetCloseableRegistry
();
}
private
void
startThreads
(
int
maxStreams
)
{
for
(
ProducerThread
t
:
streamOpenThreads
)
{
t
.
setMaxStreams
(
maxStreams
);
t
.
start
();
}
@Override
protected
AbstractCloseableRegistryTest
.
ProducerThread
<
WrappingProxyCloseable
<?
extends
Closeable
>,
SafetyNetCloseableRegistry
.
PhantomDelegatingCloseableRef
>
createProducerThread
(
AbstractCloseableRegistry
<
WrappingProxyCloseable
<?
extends
Closeable
>,
SafetyNetCloseableRegistry
.
PhantomDelegatingCloseableRef
>
registry
,
AtomicInteger
unclosedCounter
,
int
maxStreams
)
{
return
new
AbstractCloseableRegistryTest
.
ProducerThread
<
WrappingProxyCloseable
<?
extends
Closeable
>,
SafetyNetCloseableRegistry
.
PhantomDelegatingCloseableRef
>(
registry
,
unclosedCounter
,
maxStreams
)
{
int
count
=
0
;
@Override
protected
void
createAndRegisterStream
()
throws
IOException
{
String
debug
=
Thread
.
currentThread
().
getName
()
+
" "
+
count
;
TestStream
testStream
=
new
TestStream
(
refCount
);
// this method automatically registers the stream with the given registry.
@SuppressWarnings
(
"unused"
)
ClosingFSDataInputStream
pis
=
ClosingFSDataInputStream
.
wrapSafe
(
testStream
,
(
SafetyNetCloseableRegistry
)
registry
,
debug
);
//reference dies here
++
count
;
}
};
}
private
void
joinThreads
()
throws
InterruptedException
{
for
(
Thread
t
:
streamOpenThreads
)
{
t
.
join
();
}
@After
public
void
tearDown
()
{
Assert
.
assertFalse
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
}
@Test
...
...
@@ -132,52 +164,10 @@ public class SafetyNetCloseableRegistryTest {
t1
.
sync
();
}
@Test
public
void
testClose
()
throws
Exception
{
setup
();
startThreads
(
Integer
.
MAX_VALUE
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
System
.
gc
();
Thread
.
sleep
(
40
);
}
closeableRegistry
.
close
();
joinThreads
();
Assert
.
assertEquals
(
0
,
unclosedCounter
.
get
());
try
{
WrappingProxyCloseable
<
Closeable
>
testCloseable
=
new
WrappingProxyCloseable
<
Closeable
>()
{
@Override
public
Closeable
getWrappedDelegate
()
{
return
this
;
}
@Override
public
void
close
()
throws
IOException
{
unclosedCounter
.
incrementAndGet
();
}
};
closeableRegistry
.
registerClosable
(
testCloseable
);
Assert
.
fail
(
"Closed registry should not accept closeables!"
);
}
catch
(
IOException
expected
)
{
//expected
}
Assert
.
assertEquals
(
1
,
unclosedCounter
.
get
());
}
@Test
public
void
testSafetyNetClose
()
throws
Exception
{
setup
();
startThreads
(
20
);
setup
(
20
);
startThreads
();
joinThreads
();
...
...
@@ -194,95 +184,14 @@ public class SafetyNetCloseableRegistryTest {
public
void
testReaperThreadSpawnAndStop
()
throws
Exception
{
Assert
.
assertFalse
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
try
(
SafetyNetCloseableRegistry
r1
=
new
SafetyNetCloseableRegistry
())
{
try
(
SafetyNetCloseableRegistry
ignored
=
new
SafetyNetCloseableRegistry
())
{
Assert
.
assertTrue
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
try
(
SafetyNetCloseableRegistry
r
2
=
new
SafetyNetCloseableRegistry
())
{
try
(
SafetyNetCloseableRegistry
ignored
2
=
new
SafetyNetCloseableRegistry
())
{
Assert
.
assertTrue
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
}
Assert
.
assertTrue
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
}
Assert
.
assertFalse
(
SafetyNetCloseableRegistry
.
isReaperThreadRunning
());
}
//------------------------------------------------------------------------------------------------------------------
private
static
final
class
ProducerThread
extends
Thread
{
private
final
SafetyNetCloseableRegistry
registry
;
private
final
AtomicInteger
refCount
;
private
int
maxStreams
;
public
ProducerThread
(
SafetyNetCloseableRegistry
registry
,
AtomicInteger
refCount
,
int
maxStreams
)
{
this
.
registry
=
registry
;
this
.
refCount
=
refCount
;
this
.
maxStreams
=
maxStreams
;
}
public
int
getMaxStreams
()
{
return
maxStreams
;
}
public
void
setMaxStreams
(
int
maxStreams
)
{
this
.
maxStreams
=
maxStreams
;
}
@Override
public
void
run
()
{
try
{
int
count
=
0
;
while
(
maxStreams
>
0
)
{
String
debug
=
Thread
.
currentThread
().
getName
()
+
" "
+
count
;
TestStream
testStream
=
new
TestStream
(
refCount
);
refCount
.
incrementAndGet
();
@SuppressWarnings
(
"unused"
)
ClosingFSDataInputStream
pis
=
ClosingFSDataInputStream
.
wrapSafe
(
testStream
,
registry
,
debug
);
//reference dies here
try
{
Thread
.
sleep
(
2
);
}
catch
(
InterruptedException
ignored
)
{}
if
(
maxStreams
!=
Integer
.
MAX_VALUE
)
{
--
maxStreams
;
}
++
count
;
}
}
catch
(
Exception
ex
)
{
// ignored
}
}
}
private
static
final
class
TestStream
extends
FSDataInputStream
{
private
AtomicInteger
refCount
;
public
TestStream
(
AtomicInteger
refCount
)
{
this
.
refCount
=
refCount
;
}
@Override
public
void
seek
(
long
desired
)
throws
IOException
{
}
@Override
public
long
getPos
()
throws
IOException
{
return
0
;
}
@Override
public
int
read
()
throws
IOException
{
return
0
;
}
@Override
public
void
close
()
throws
IOException
{
if
(
refCount
!=
null
)
{
refCount
.
decrementAndGet
();
refCount
=
null
;
}
}
}
}
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
浏览文件 @
0073204b
...
...
@@ -120,11 +120,11 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger {
static
final
class
InsightCloseableRegistry
extends
CloseableRegistry
{
public
int
size
()
{
return
closeableToRef
.
size
();
return
getNumberOfRegisteredCloseables
();
}
public
boolean
contains
(
Closeable
closeable
)
{
return
closeableToRef
.
containsKey
(
closeable
);
return
isCloseableRegistered
(
closeable
);
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录