Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openanolis
dragonwell8_jdk
提交
d9898853
D
dragonwell8_jdk
项目概览
openanolis
/
dragonwell8_jdk
通知
4
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
dragonwell8_jdk
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d9898853
编写于
3月 10, 2008
作者:
M
martin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
6602600: Fast removal of cancelled scheduled thread pool tasks
Reviewed-by: alanb Contributed-by:
N
Doug Lea
<
dl@cs.oswego.edu
>
上级
37541d41
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
683 addition
and
48 deletion
+683
-48
src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java
...ses/java/util/concurrent/ScheduledThreadPoolExecutor.java
+516
-48
test/java/util/concurrent/ScheduledThreadPoolExecutor/BasicCancelTest.java
...ncurrent/ScheduledThreadPoolExecutor/BasicCancelTest.java
+113
-0
test/java/util/concurrent/ScheduledThreadPoolExecutor/Stress.java
...a/util/concurrent/ScheduledThreadPoolExecutor/Stress.java
+54
-0
未找到文件。
src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java
浏览文件 @
d9898853
...
@@ -35,6 +35,7 @@
...
@@ -35,6 +35,7 @@
package
java.util.concurrent
;
package
java.util.concurrent
;
import
java.util.concurrent.atomic.*
;
import
java.util.concurrent.atomic.*
;
import
java.util.concurrent.locks.*
;
import
java.util.*
;
import
java.util.*
;
/**
/**
...
@@ -45,12 +46,21 @@ import java.util.*;
...
@@ -45,12 +46,21 @@ import java.util.*;
* flexibility or capabilities of {@link ThreadPoolExecutor} (which
* flexibility or capabilities of {@link ThreadPoolExecutor} (which
* this class extends) are required.
* this class extends) are required.
*
*
* <p>
Delayed tasks execute no sooner than they are enabled, but
* <p>Delayed tasks execute no sooner than they are enabled, but
* without any real-time guarantees about when, after they are
* without any real-time guarantees about when, after they are
* enabled, they will commence. Tasks scheduled for exactly the same
* enabled, they will commence. Tasks scheduled for exactly the same
* execution time are enabled in first-in-first-out (FIFO) order of
* execution time are enabled in first-in-first-out (FIFO) order of
* submission.
* submission.
*
*
* <p>When a submitted task is cancelled before it is run, execution
* is suppressed. By default, such a cancelled task is not
* automatically removed from the work queue until its delay
* elapses. While this enables further inspection and monitoring, it
* may also cause unbounded retention of cancelled tasks. To avoid
* this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
* causes tasks to be immediately removed from the work queue at
* time of cancellation.
*
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
* of the inherited tuning methods are not useful for it. In
* of the inherited tuning methods are not useful for it. In
* particular, because it acts as a fixed-sized pool using
* particular, because it acts as a fixed-sized pool using
...
@@ -111,21 +121,11 @@ public class ScheduledThreadPoolExecutor
...
@@ -111,21 +121,11 @@ public class ScheduledThreadPoolExecutor
* ScheduledExecutorService methods) which are treated as
* ScheduledExecutorService methods) which are treated as
* delayed tasks with a delay of zero.
* delayed tasks with a delay of zero.
*
*
* 2. Using a custom queue (DelayedWorkQueue)
based on an
* 2. Using a custom queue (DelayedWorkQueue)
, a variant of
* unbounded DelayQueue. The lack of capacity constraint and
* unbounded DelayQueue. The lack of capacity constraint and
* the fact that corePoolSize and maximumPoolSize are
* the fact that corePoolSize and maximumPoolSize are
* effectively identical simplifies some execution mechanics
* effectively identical simplifies some execution mechanics
* (see delayedExecute) compared to ThreadPoolExecutor
* (see delayedExecute) compared to ThreadPoolExecutor.
* version.
*
* The DelayedWorkQueue class is defined below for the sake of
* ensuring that all elements are instances of
* RunnableScheduledFuture. Since DelayQueue otherwise
* requires type be Delayed, but not necessarily Runnable, and
* the workQueue requires the opposite, we need to explicitly
* define a class that requires both to ensure that users don't
* add objects that aren't RunnableScheduledFutures via
* getQueue().add() etc.
*
*
* 3. Supporting optional run-after-shutdown parameters, which
* 3. Supporting optional run-after-shutdown parameters, which
* leads to overrides of shutdown methods to remove and cancel
* leads to overrides of shutdown methods to remove and cancel
...
@@ -149,6 +149,11 @@ public class ScheduledThreadPoolExecutor
...
@@ -149,6 +149,11 @@ public class ScheduledThreadPoolExecutor
*/
*/
private
volatile
boolean
executeExistingDelayedTasksAfterShutdown
=
true
;
private
volatile
boolean
executeExistingDelayedTasksAfterShutdown
=
true
;
/**
* True if ScheduledFutureTask.cancel should remove from queue
*/
private
volatile
boolean
removeOnCancel
=
false
;
/**
/**
* Sequence number to break scheduling ties, and in turn to
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
* guarantee FIFO order among tied entries.
...
@@ -167,8 +172,10 @@ public class ScheduledThreadPoolExecutor
...
@@ -167,8 +172,10 @@ public class ScheduledThreadPoolExecutor
/** Sequence number to break ties FIFO */
/** Sequence number to break ties FIFO */
private
final
long
sequenceNumber
;
private
final
long
sequenceNumber
;
/** The time the task is enabled to execute in nanoTime units */
/** The time the task is enabled to execute in nanoTime units */
private
long
time
;
private
long
time
;
/**
/**
* Period in nanoseconds for repeating tasks. A positive
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* value indicates fixed-rate execution. A negative value
...
@@ -180,6 +187,11 @@ public class ScheduledThreadPoolExecutor
...
@@ -180,6 +187,11 @@ public class ScheduledThreadPoolExecutor
/** The actual task to be re-enqueued by reExecutePeriodic */
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture
<
V
>
outerTask
=
this
;
RunnableScheduledFuture
<
V
>
outerTask
=
this
;
/**
* Index into delay queue, to support faster cancellation.
*/
int
heapIndex
;
/**
/**
* Creates a one-shot action with given nanoTime-based trigger time.
* Creates a one-shot action with given nanoTime-based trigger time.
*/
*/
...
@@ -255,6 +267,13 @@ public class ScheduledThreadPoolExecutor
...
@@ -255,6 +267,13 @@ public class ScheduledThreadPoolExecutor
time
=
now
()
-
p
;
time
=
now
()
-
p
;
}
}
public
boolean
cancel
(
boolean
mayInterruptIfRunning
)
{
boolean
cancelled
=
super
.
cancel
(
mayInterruptIfRunning
);
if
(
cancelled
&&
removeOnCancel
&&
heapIndex
>=
0
)
remove
(
this
);
return
cancelled
;
}
/**
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
*/
...
@@ -654,6 +673,33 @@ public class ScheduledThreadPoolExecutor
...
@@ -654,6 +673,33 @@ public class ScheduledThreadPoolExecutor
return
executeExistingDelayedTasksAfterShutdown
;
return
executeExistingDelayedTasksAfterShutdown
;
}
}
/**
* Sets the policy on whether cancelled tasks should be immediately
* removed from the work queue at time of cancellation. This value is
* by default {@code false}.
*
* @param value if {@code true}, remove on cancellation, else don't
* @see #getRemoveOnCancelPolicy
* @since 1.7
*/
public
void
setRemoveOnCancelPolicy
(
boolean
value
)
{
removeOnCancel
=
value
;
}
/**
* Gets the policy on whether cancelled tasks should be immediately
* removed from the work queue at time of cancellation. This value is
* by default {@code false}.
*
* @return {@code true} if cancelled tasks are immediately removed
* from the queue
* @see #setRemoveOnCancelPolicy
* @since 1.7
*/
public
boolean
getRemoveOnCancelPolicy
()
{
return
removeOnCancel
;
}
/**
/**
* Initiates an orderly shutdown in which previously submitted
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted. If the
* tasks are executed, but no new tasks will be accepted. If the
...
@@ -707,56 +753,478 @@ public class ScheduledThreadPoolExecutor
...
@@ -707,56 +753,478 @@ public class ScheduledThreadPoolExecutor
}
}
/**
/**
* An annoying wrapper class to convince javac to use a
* Specialized delay queue. To mesh with TPE declarations, this
* DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
* class must be declared as a BlockingQueue<Runnable> even though
* it can only hold RunnableScheduledFutures.
*/
*/
private
static
class
DelayedWorkQueue
static
class
DelayedWorkQueue
extends
AbstractQueue
<
Runnable
>
extends
AbstractCollection
<
Runnable
>
implements
BlockingQueue
<
Runnable
>
{
implements
BlockingQueue
<
Runnable
>
{
private
final
DelayQueue
<
RunnableScheduledFuture
>
dq
=
new
DelayQueue
<
RunnableScheduledFuture
>();
/*
public
Runnable
poll
()
{
return
dq
.
poll
();
}
* A DelayedWorkQueue is based on a heap-based data structure
public
Runnable
peek
()
{
return
dq
.
peek
();
}
* like those in DelayQueue and PriorityQueue, except that
public
Runnable
take
()
throws
InterruptedException
{
return
dq
.
take
();
}
* every ScheduledFutureTask also records its index into the
public
Runnable
poll
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
{
* heap array. This eliminates the need to find a task upon
return
dq
.
poll
(
timeout
,
unit
);
* cancellation, greatly speeding up removal (down from O(n)
* to O(log n)), and reducing garbage retention that would
* otherwise occur by waiting for the element to rise to top
* before clearing. But because the queue may also hold
* RunnableScheduledFutures that are not ScheduledFutureTasks,
* we are not guaranteed to have such indices available, in
* which case we fall back to linear search. (We expect that
* most tasks will not be decorated, and that the faster cases
* will be much more common.)
*
* All heap operations must record index changes -- mainly
* within siftUp and siftDown. Upon removal, a task's
* heapIndex is set to -1. Note that ScheduledFutureTasks can
* appear at most once in the queue (this need not be true for
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
private
static
final
int
INITIAL_CAPACITY
=
16
;
private
RunnableScheduledFuture
[]
queue
=
new
RunnableScheduledFuture
[
INITIAL_CAPACITY
];
private
final
ReentrantLock
lock
=
new
ReentrantLock
();
private
int
size
=
0
;
/**
* Thread designated to wait for the task at the head of the
* queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with a
* task with an earlier expiration time, the leader field is
* invalidated by being reset to null, and some waiting
* thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private
Thread
leader
=
null
;
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private
final
Condition
available
=
lock
.
newCondition
();
/**
* Set f's heapIndex if it is a ScheduledFutureTask.
*/
private
void
setIndex
(
RunnableScheduledFuture
f
,
int
idx
)
{
if
(
f
instanceof
ScheduledFutureTask
)
((
ScheduledFutureTask
)
f
).
heapIndex
=
idx
;
}
}
public
boolean
add
(
Runnable
x
)
{
/**
return
dq
.
add
((
RunnableScheduledFuture
)
x
);
* Sift element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
*/
private
void
siftUp
(
int
k
,
RunnableScheduledFuture
key
)
{
while
(
k
>
0
)
{
int
parent
=
(
k
-
1
)
>>>
1
;
RunnableScheduledFuture
e
=
queue
[
parent
];
if
(
key
.
compareTo
(
e
)
>=
0
)
break
;
queue
[
k
]
=
e
;
setIndex
(
e
,
k
);
k
=
parent
;
}
queue
[
k
]
=
key
;
setIndex
(
key
,
k
);
}
}
/**
* Sift element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
private
void
siftDown
(
int
k
,
RunnableScheduledFuture
key
)
{
int
half
=
size
>>>
1
;
while
(
k
<
half
)
{
int
child
=
(
k
<<
1
)
+
1
;
RunnableScheduledFuture
c
=
queue
[
child
];
int
right
=
child
+
1
;
if
(
right
<
size
&&
c
.
compareTo
(
queue
[
right
])
>
0
)
c
=
queue
[
child
=
right
];
if
(
key
.
compareTo
(
c
)
<=
0
)
break
;
queue
[
k
]
=
c
;
setIndex
(
c
,
k
);
k
=
child
;
}
queue
[
k
]
=
key
;
setIndex
(
key
,
k
);
}
/**
* Resize the heap array. Call only when holding lock.
*/
private
void
grow
()
{
int
oldCapacity
=
queue
.
length
;
int
newCapacity
=
oldCapacity
+
(
oldCapacity
>>
1
);
// grow 50%
if
(
newCapacity
<
0
)
// overflow
newCapacity
=
Integer
.
MAX_VALUE
;
queue
=
Arrays
.
copyOf
(
queue
,
newCapacity
);
}
/**
* Find index of given object, or -1 if absent
*/
private
int
indexOf
(
Object
x
)
{
if
(
x
!=
null
)
{
if
(
x
instanceof
ScheduledFutureTask
)
{
int
i
=
((
ScheduledFutureTask
)
x
).
heapIndex
;
// Sanity check; x could conceivably be a
// ScheduledFutureTask from some other pool.
if
(
i
>=
0
&&
i
<
size
&&
queue
[
i
]
==
x
)
return
i
;
}
else
{
for
(
int
i
=
0
;
i
<
size
;
i
++)
if
(
x
.
equals
(
queue
[
i
]))
return
i
;
}
}
return
-
1
;
}
public
boolean
contains
(
Object
x
)
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
return
indexOf
(
x
)
!=
-
1
;
}
finally
{
lock
.
unlock
();
}
}
public
boolean
remove
(
Object
x
)
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
int
i
=
indexOf
(
x
);
if
(
i
<
0
)
return
false
;
setIndex
(
queue
[
i
],
-
1
);
int
s
=
--
size
;
RunnableScheduledFuture
replacement
=
queue
[
s
];
queue
[
s
]
=
null
;
if
(
s
!=
i
)
{
siftDown
(
i
,
replacement
);
if
(
queue
[
i
]
==
replacement
)
siftUp
(
i
,
replacement
);
}
return
true
;
}
finally
{
lock
.
unlock
();
}
}
public
int
size
()
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
return
size
;
}
finally
{
lock
.
unlock
();
}
}
public
boolean
isEmpty
()
{
return
size
()
==
0
;
}
public
int
remainingCapacity
()
{
return
Integer
.
MAX_VALUE
;
}
public
RunnableScheduledFuture
peek
()
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
return
queue
[
0
];
}
finally
{
lock
.
unlock
();
}
}
public
boolean
offer
(
Runnable
x
)
{
public
boolean
offer
(
Runnable
x
)
{
return
dq
.
offer
((
RunnableScheduledFuture
)
x
);
if
(
x
==
null
)
throw
new
NullPointerException
();
RunnableScheduledFuture
e
=
(
RunnableScheduledFuture
)
x
;
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
int
i
=
size
;
if
(
i
>=
queue
.
length
)
grow
();
size
=
i
+
1
;
if
(
i
==
0
)
{
queue
[
0
]
=
e
;
setIndex
(
e
,
0
);
}
else
{
siftUp
(
i
,
e
);
}
}
public
void
put
(
Runnable
x
)
{
if
(
queue
[
0
]
==
e
)
{
dq
.
put
((
RunnableScheduledFuture
)
x
);
leader
=
null
;
available
.
signal
();
}
}
finally
{
lock
.
unlock
();
}
return
true
;
}
public
void
put
(
Runnable
e
)
{
offer
(
e
);
}
public
boolean
add
(
Runnable
e
)
{
return
offer
(
e
);
}
public
boolean
offer
(
Runnable
e
,
long
timeout
,
TimeUnit
unit
)
{
return
offer
(
e
);
}
/**
* Performs common bookkeeping for poll and take: Replaces
* first element with last and sifts it down. Call only when
* holding lock.
* @param f the task to remove and return
*/
private
RunnableScheduledFuture
finishPoll
(
RunnableScheduledFuture
f
)
{
int
s
=
--
size
;
RunnableScheduledFuture
x
=
queue
[
s
];
queue
[
s
]
=
null
;
if
(
s
!=
0
)
siftDown
(
0
,
x
);
setIndex
(
f
,
-
1
);
return
f
;
}
public
RunnableScheduledFuture
poll
()
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
RunnableScheduledFuture
first
=
queue
[
0
];
if
(
first
==
null
||
first
.
getDelay
(
TimeUnit
.
NANOSECONDS
)
>
0
)
return
null
;
else
return
finishPoll
(
first
);
}
finally
{
lock
.
unlock
();
}
}
public
RunnableScheduledFuture
take
()
throws
InterruptedException
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lockInterruptibly
();
try
{
for
(;;)
{
RunnableScheduledFuture
first
=
queue
[
0
];
if
(
first
==
null
)
available
.
await
();
else
{
long
delay
=
first
.
getDelay
(
TimeUnit
.
NANOSECONDS
);
if
(
delay
<=
0
)
return
finishPoll
(
first
);
else
if
(
leader
!=
null
)
available
.
await
();
else
{
Thread
thisThread
=
Thread
.
currentThread
();
leader
=
thisThread
;
try
{
available
.
awaitNanos
(
delay
);
}
finally
{
if
(
leader
==
thisThread
)
leader
=
null
;
}
}
}
}
}
finally
{
if
(
leader
==
null
&&
queue
[
0
]
!=
null
)
available
.
signal
();
lock
.
unlock
();
}
}
public
RunnableScheduledFuture
poll
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
{
long
nanos
=
unit
.
toNanos
(
timeout
);
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lockInterruptibly
();
try
{
for
(;;)
{
RunnableScheduledFuture
first
=
queue
[
0
];
if
(
first
==
null
)
{
if
(
nanos
<=
0
)
return
null
;
else
nanos
=
available
.
awaitNanos
(
nanos
);
}
else
{
long
delay
=
first
.
getDelay
(
TimeUnit
.
NANOSECONDS
);
if
(
delay
<=
0
)
return
finishPoll
(
first
);
if
(
nanos
<=
0
)
return
null
;
if
(
nanos
<
delay
||
leader
!=
null
)
nanos
=
available
.
awaitNanos
(
nanos
);
else
{
Thread
thisThread
=
Thread
.
currentThread
();
leader
=
thisThread
;
try
{
long
timeLeft
=
available
.
awaitNanos
(
delay
);
nanos
-=
delay
-
timeLeft
;
}
finally
{
if
(
leader
==
thisThread
)
leader
=
null
;
}
}
}
}
}
finally
{
if
(
leader
==
null
&&
queue
[
0
]
!=
null
)
available
.
signal
();
lock
.
unlock
();
}
}
public
void
clear
()
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
RunnableScheduledFuture
t
=
queue
[
i
];
if
(
t
!=
null
)
{
queue
[
i
]
=
null
;
setIndex
(
t
,
-
1
);
}
}
size
=
0
;
}
finally
{
lock
.
unlock
();
}
}
/**
* Return and remove first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
private
RunnableScheduledFuture
pollExpired
()
{
RunnableScheduledFuture
first
=
queue
[
0
];
if
(
first
==
null
||
first
.
getDelay
(
TimeUnit
.
NANOSECONDS
)
>
0
)
return
null
;
return
finishPoll
(
first
);
}
public
int
drainTo
(
Collection
<?
super
Runnable
>
c
)
{
if
(
c
==
null
)
throw
new
NullPointerException
();
if
(
c
==
this
)
throw
new
IllegalArgumentException
();
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
RunnableScheduledFuture
first
;
int
n
=
0
;
while
((
first
=
pollExpired
())
!=
null
)
{
c
.
add
(
first
);
++
n
;
}
return
n
;
}
finally
{
lock
.
unlock
();
}
}
public
boolean
offer
(
Runnable
x
,
long
timeout
,
TimeUnit
unit
)
{
return
dq
.
offer
((
RunnableScheduledFuture
)
x
,
timeout
,
unit
);
}
}
public
Runnable
remove
()
{
return
dq
.
remove
();
}
public
Runnable
element
()
{
return
dq
.
element
();
}
public
void
clear
()
{
dq
.
clear
();
}
public
int
drainTo
(
Collection
<?
super
Runnable
>
c
)
{
return
dq
.
drainTo
(
c
);
}
public
int
drainTo
(
Collection
<?
super
Runnable
>
c
,
int
maxElements
)
{
public
int
drainTo
(
Collection
<?
super
Runnable
>
c
,
int
maxElements
)
{
return
dq
.
drainTo
(
c
,
maxElements
);
if
(
c
==
null
)
throw
new
NullPointerException
();
if
(
c
==
this
)
throw
new
IllegalArgumentException
();
if
(
maxElements
<=
0
)
return
0
;
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
RunnableScheduledFuture
first
;
int
n
=
0
;
while
(
n
<
maxElements
&&
(
first
=
pollExpired
())
!=
null
)
{
c
.
add
(
first
);
++
n
;
}
return
n
;
}
finally
{
lock
.
unlock
();
}
}
public
Object
[]
toArray
()
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
return
Arrays
.
copyOf
(
queue
,
size
,
Object
[].
class
);
}
finally
{
lock
.
unlock
();
}
}
@SuppressWarnings
(
"unchecked"
)
public
<
T
>
T
[]
toArray
(
T
[]
a
)
{
final
ReentrantLock
lock
=
this
.
lock
;
lock
.
lock
();
try
{
if
(
a
.
length
<
size
)
return
(
T
[])
Arrays
.
copyOf
(
queue
,
size
,
a
.
getClass
());
System
.
arraycopy
(
queue
,
0
,
a
,
0
,
size
);
if
(
a
.
length
>
size
)
a
[
size
]
=
null
;
return
a
;
}
finally
{
lock
.
unlock
();
}
}
}
public
int
remainingCapacity
()
{
return
dq
.
remainingCapacity
();
}
public
boolean
remove
(
Object
x
)
{
return
dq
.
remove
(
x
);
}
public
boolean
contains
(
Object
x
)
{
return
dq
.
contains
(
x
);
}
public
int
size
()
{
return
dq
.
size
();
}
public
boolean
isEmpty
()
{
return
dq
.
isEmpty
();
}
public
Object
[]
toArray
()
{
return
dq
.
toArray
();
}
public
<
T
>
T
[]
toArray
(
T
[]
array
)
{
return
dq
.
toArray
(
array
);
}
public
Iterator
<
Runnable
>
iterator
()
{
public
Iterator
<
Runnable
>
iterator
()
{
return
new
Iterator
<
Runnable
>()
{
return
new
Itr
(
Arrays
.
copyOf
(
queue
,
size
));
private
Iterator
<
RunnableScheduledFuture
>
it
=
dq
.
iterator
();
}
public
boolean
hasNext
()
{
return
it
.
hasNext
();
}
public
Runnable
next
()
{
return
it
.
next
();
}
/**
public
void
remove
()
{
it
.
remove
();
}
* Snapshot iterator that works off copy of underlying q array.
};
*/
private
class
Itr
implements
Iterator
<
Runnable
>
{
final
RunnableScheduledFuture
[]
array
;
int
cursor
=
0
;
// index of next element to return
int
lastRet
=
-
1
;
// index of last element, or -1 if no such
Itr
(
RunnableScheduledFuture
[]
array
)
{
this
.
array
=
array
;
}
public
boolean
hasNext
()
{
return
cursor
<
array
.
length
;
}
public
Runnable
next
()
{
if
(
cursor
>=
array
.
length
)
throw
new
NoSuchElementException
();
lastRet
=
cursor
;
return
array
[
cursor
++];
}
public
void
remove
()
{
if
(
lastRet
<
0
)
throw
new
IllegalStateException
();
DelayedWorkQueue
.
this
.
remove
(
array
[
lastRet
]);
lastRet
=
-
1
;
}
}
}
}
}
}
}
test/java/util/concurrent/ScheduledThreadPoolExecutor/BasicCancelTest.java
0 → 100644
浏览文件 @
d9898853
/*
* Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/*
* @test
* @bug 6602600
* @run main/othervm -Xmx8m BasicCancelTest
* @summary Check effectiveness of RemoveOnCancelPolicy
*/
import
java.util.concurrent.*
;
import
java.util.Random
;
/**
* Simple timer cancellation test. Submits tasks to a scheduled executor
* service and immediately cancels them.
*/
public
class
BasicCancelTest
{
void
checkShutdown
(
final
ExecutorService
es
)
{
final
Runnable
nop
=
new
Runnable
()
{
public
void
run
()
{}};
try
{
if
(
new
Random
().
nextBoolean
())
{
check
(
es
.
isShutdown
());
if
(
es
instanceof
ThreadPoolExecutor
)
check
(((
ThreadPoolExecutor
)
es
).
isTerminating
()
||
es
.
isTerminated
());
THROWS
(
RejectedExecutionException
.
class
,
new
F
(){
void
f
(){
es
.
execute
(
nop
);}});
}
}
catch
(
Throwable
t
)
{
unexpected
(
t
);
}
}
void
checkTerminated
(
final
ThreadPoolExecutor
tpe
)
{
try
{
checkShutdown
(
tpe
);
check
(
tpe
.
getQueue
().
isEmpty
());
check
(
tpe
.
isTerminated
());
check
(!
tpe
.
isTerminating
());
equal
(
tpe
.
getActiveCount
(),
0
);
equal
(
tpe
.
getPoolSize
(),
0
);
equal
(
tpe
.
getTaskCount
(),
tpe
.
getCompletedTaskCount
());
check
(
tpe
.
awaitTermination
(
0
,
TimeUnit
.
SECONDS
));
}
catch
(
Throwable
t
)
{
unexpected
(
t
);
}
}
void
test
(
String
[]
args
)
throws
Throwable
{
final
ScheduledThreadPoolExecutor
pool
=
new
ScheduledThreadPoolExecutor
(
1
);
// Needed to avoid OOME
pool
.
setRemoveOnCancelPolicy
(
true
);
final
long
moreThanYouCanChew
=
Runtime
.
getRuntime
().
freeMemory
()
/
4
;
System
.
out
.
printf
(
"moreThanYouCanChew=%d%n"
,
moreThanYouCanChew
);
Runnable
noopTask
=
new
Runnable
()
{
public
void
run
()
{}};
for
(
long
i
=
0
;
i
<
moreThanYouCanChew
;
i
++)
pool
.
schedule
(
noopTask
,
10
,
TimeUnit
.
MINUTES
).
cancel
(
true
);
pool
.
shutdown
();
check
(
pool
.
awaitTermination
(
1L
,
TimeUnit
.
DAYS
));
checkTerminated
(
pool
);
equal
(
pool
.
getTaskCount
(),
0L
);
equal
(
pool
.
getCompletedTaskCount
(),
0L
);
}
//--------------------- Infrastructure ---------------------------
volatile
int
passed
=
0
,
failed
=
0
;
void
pass
()
{
passed
++;}
void
fail
()
{
failed
++;
Thread
.
dumpStack
();}
void
fail
(
String
msg
)
{
System
.
err
.
println
(
msg
);
fail
();}
void
unexpected
(
Throwable
t
)
{
failed
++;
t
.
printStackTrace
();}
void
check
(
boolean
cond
)
{
if
(
cond
)
pass
();
else
fail
();}
void
equal
(
Object
x
,
Object
y
)
{
if
(
x
==
null
?
y
==
null
:
x
.
equals
(
y
))
pass
();
else
fail
(
x
+
" not equal to "
+
y
);}
public
static
void
main
(
String
[]
args
)
throws
Throwable
{
new
BasicCancelTest
().
instanceMain
(
args
);}
void
instanceMain
(
String
[]
args
)
throws
Throwable
{
try
{
test
(
args
);}
catch
(
Throwable
t
)
{
unexpected
(
t
);}
System
.
out
.
printf
(
"%nPassed = %d, failed = %d%n%n"
,
passed
,
failed
);
if
(
failed
>
0
)
throw
new
AssertionError
(
"Some tests failed"
);}
abstract
class
F
{
abstract
void
f
()
throws
Throwable
;}
void
THROWS
(
Class
<?
extends
Throwable
>
k
,
F
...
fs
)
{
for
(
F
f
:
fs
)
try
{
f
.
f
();
fail
(
"Expected "
+
k
.
getName
()
+
" not thrown"
);}
catch
(
Throwable
t
)
{
if
(
k
.
isAssignableFrom
(
t
.
getClass
()))
pass
();
else
unexpected
(
t
);}}
}
test/java/util/concurrent/ScheduledThreadPoolExecutor/Stress.java
0 → 100644
浏览文件 @
d9898853
/*
* Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
import
java.util.concurrent.*
;
/**
* This is not a regression test, but a stress benchmark test for
* 6602600: Fast removal of cancelled scheduled thread pool tasks
*
* This runs in the same wall clock time, but much reduced cpu time,
* with the changes for 6602600.
*/
public
class
Stress
{
public
static
void
main
(
String
[]
args
)
throws
Throwable
{
final
CountDownLatch
count
=
new
CountDownLatch
(
1000
);
final
ScheduledThreadPoolExecutor
pool
=
new
ScheduledThreadPoolExecutor
(
100
);
pool
.
prestartAllCoreThreads
();
final
Runnable
incTask
=
new
Runnable
()
{
public
void
run
()
{
count
.
countDown
();
}};
pool
.
scheduleAtFixedRate
(
incTask
,
0
,
10
,
TimeUnit
.
MILLISECONDS
);
count
.
await
();
pool
.
shutdown
();
pool
.
awaitTermination
(
1L
,
TimeUnit
.
DAYS
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录