Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
开发团队
Flowable Engine
提交
6ed56300
F
Flowable Engine
项目概览
开发团队
/
Flowable Engine
通知
9
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
Flowable Engine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
6ed56300
编写于
1月 12, 2012
作者:
M
meyerd
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ACT-34 merging branch
上级
99ee22fd
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
368 addition
and
301 deletion
+368
-301
modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/ProcessEngineConfigurationImpl.java
...iviti/engine/impl/cfg/ProcessEngineConfigurationImpl.java
+2
-1
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquireJobsRunnable.java
...activiti/engine/impl/jobexecutor/AcquireJobsRunnable.java
+133
-141
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java
...va/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java
+4
-0
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/DefaultJobExecutor.java
.../activiti/engine/impl/jobexecutor/DefaultJobExecutor.java
+138
-0
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java
...ava/org/activiti/engine/impl/jobexecutor/JobExecutor.java
+78
-146
modules/activiti-engine/src/test/java/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java
...va/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java
+13
-13
未找到文件。
modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/ProcessEngineConfigurationImpl.java
浏览文件 @
6ed56300
...
...
@@ -81,6 +81,7 @@ import org.activiti.engine.impl.interceptor.CommandInterceptor;
import
org.activiti.engine.impl.interceptor.DelegateInterceptor
;
import
org.activiti.engine.impl.interceptor.SessionFactory
;
import
org.activiti.engine.impl.jobexecutor.AsyncContinuationJobHandler
;
import
org.activiti.engine.impl.jobexecutor.DefaultJobExecutor
;
import
org.activiti.engine.impl.jobexecutor.JobExecutor
;
import
org.activiti.engine.impl.jobexecutor.JobHandler
;
import
org.activiti.engine.impl.jobexecutor.TimerCatchIntermediateEventJobHandler
;
...
...
@@ -642,7 +643,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
protected
void
initJobExecutor
()
{
if
(
jobExecutor
==
null
)
{
jobExecutor
=
new
JobExecutor
();
jobExecutor
=
new
Default
JobExecutor
();
}
jobHandlers
=
new
HashMap
<
String
,
JobHandler
>();
...
...
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/
JobAcquisitionThread
.java
→
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/
AcquireJobsRunnable
.java
浏览文件 @
6ed56300
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.activiti.engine.impl.jobexecutor
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.logging.Level
;
import
java.util.logging.Logger
;
import
org.activiti.engine.impl.Page
;
import
org.activiti.engine.impl.cmd.AcquireJobsCmd
;
import
org.activiti.engine.impl.interceptor.CommandExecutor
;
import
org.activiti.engine.impl.persistence.entity.TimerEntity
;
import
org.activiti.engine.impl.util.ClockUtil
;
/**
* Background thread responsible for retrieving the list of Jobs currently
* awaiting processing from the queue, and passing them to the
* {@link JobExecutor} to be run. There should only ever be one of these per
* {@link JobExecutor}. Note that in a clustered Environment, there can be
* multiple of these, so we need locking/transactions to ensure we don't fetch
* Jobs someone else already has.
*/
public
class
JobAcquisitionThread
extends
Thread
{
private
static
Logger
log
=
Logger
.
getLogger
(
JobAcquisitionThread
.
class
.
getName
());
private
final
AcquireJobsCmd
acquireJobsCmd
;
private
JobExecutor
jobExecutor
;
private
boolean
isActive
=
false
;
private
boolean
isJobAdded
=
false
;
public
JobAcquisitionThread
(
JobExecutor
jobExecutor
)
{
super
(
"JobAcquisitionThread"
);
this
.
jobExecutor
=
jobExecutor
;
this
.
acquireJobsCmd
=
new
AcquireJobsCmd
(
jobExecutor
);
}
public
void
run
()
{
log
.
info
(
getName
()
+
" starting to acquire jobs"
);
this
.
isActive
=
true
;
CommandExecutor
commandExecutor
=
jobExecutor
.
getCommandExecutor
();
long
millisToWait
=
0
;
float
waitIncreaseFactor
=
2
;
long
maxWait
=
60
*
1000
;
while
(
isActive
)
{
int
maxJobsPerAcquisition
=
jobExecutor
.
getMaxJobsPerAcquisition
();
try
{
AcquiredJobs
acquiredJobs
=
commandExecutor
.
execute
(
acquireJobsCmd
);
for
(
List
<
String
>
jobIds
:
acquiredJobs
.
getJobIdBatches
())
{
jobExecutor
.
executeJobs
(
jobIds
);
}
// if all jobs were executed
millisToWait
=
jobExecutor
.
getWaitTimeInMillis
();
int
jobsAcquired
=
acquiredJobs
.
getJobIdBatches
().
size
();
if
(
jobsAcquired
<
maxJobsPerAcquisition
)
{
isJobAdded
=
false
;
// check if the next timer should fire before the normal sleep time is over
Date
duedate
=
new
Date
(
ClockUtil
.
getCurrentTime
().
getTime
()
+
millisToWait
);
List
<
TimerEntity
>
nextTimers
=
commandExecutor
.
execute
(
new
GetUnlockedTimersByDuedateCmd
(
duedate
,
new
Page
(
0
,
1
)));
if
(!
nextTimers
.
isEmpty
())
{
long
millisTillNextTimer
=
nextTimers
.
get
(
0
).
getDuedate
().
getTime
()
-
ClockUtil
.
getCurrentTime
().
getTime
();
if
(
millisTillNextTimer
<
millisToWait
)
{
millisToWait
=
millisTillNextTimer
;
}
}
}
else
{
millisToWait
=
0
;
}
}
catch
(
Exception
e
)
{
log
.
log
(
Level
.
SEVERE
,
"exception during job acquisition: "
+
e
.
getMessage
(),
e
);
millisToWait
*=
waitIncreaseFactor
;
if
(
millisToWait
>
maxWait
)
{
millisToWait
=
maxWait
;
}
}
if
((
millisToWait
>
0
)
&&
(!
isJobAdded
))
{
try
{
log
.
fine
(
"job acquisition thread sleeping for "
+
millisToWait
+
" millis"
);
Thread
.
sleep
(
millisToWait
);
log
.
fine
(
"job acquisition thread woke up"
);
}
catch
(
InterruptedException
e
)
{
log
.
fine
(
"job acquisition wait interrupted"
);
}
}
}
log
.
info
(
getName
()
+
" stopped"
);
}
public
void
jobWasAdded
()
{
isJobAdded
=
true
;
log
.
fine
(
"Job was added. Interrupting "
+
this
);
interrupt
();
}
/**
* Triggers a shutdown
*/
public
void
shutdown
()
{
if
(
isActive
)
{
log
.
info
(
getName
()
+
" is shutting down"
);
isActive
=
false
;
interrupt
();
try
{
join
();
}
catch
(
InterruptedException
e
)
{
log
.
log
(
Level
.
WARNING
,
"Interruption while shutting down "
+
this
.
getClass
().
getName
(),
e
);
}
}
}
public
JobExecutor
getJobExecutor
()
{
return
jobExecutor
;
}
public
boolean
isActive
()
{
return
isActive
;
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.activiti.engine.impl.jobexecutor
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.logging.Level
;
import
java.util.logging.Logger
;
import
org.activiti.engine.impl.Page
;
import
org.activiti.engine.impl.interceptor.CommandExecutor
;
import
org.activiti.engine.impl.persistence.entity.TimerEntity
;
import
org.activiti.engine.impl.util.ClockUtil
;
/**
*
* @author Daniel Meyer
*/
public
class
AcquireJobsRunnable
implements
Runnable
{
private
static
Logger
log
=
Logger
.
getLogger
(
AcquireJobsRunnable
.
class
.
getName
());
protected
final
JobExecutor
jobExecutor
;
protected
volatile
boolean
isInterrupted
=
false
;
protected
volatile
boolean
isJobAdded
=
false
;
protected
final
Object
MONITOR
=
new
Object
();
protected
final
AtomicBoolean
isWaiting
=
new
AtomicBoolean
(
false
);
public
AcquireJobsRunnable
(
JobExecutor
jobExecutor
)
{
this
.
jobExecutor
=
jobExecutor
;
}
public
synchronized
void
run
()
{
log
.
info
(
jobExecutor
.
getName
()
+
" starting to acquire jobs"
);
final
CommandExecutor
commandExecutor
=
jobExecutor
.
getCommandExecutor
();
long
millisToWait
=
0
;
float
waitIncreaseFactor
=
2
;
long
maxWait
=
60
*
1000
;
while
(!
isInterrupted
)
{
int
maxJobsPerAcquisition
=
jobExecutor
.
getMaxJobsPerAcquisition
();
try
{
AcquiredJobs
acquiredJobs
=
commandExecutor
.
execute
(
jobExecutor
.
getAcquireJobsCmd
());
for
(
List
<
String
>
jobIds
:
acquiredJobs
.
getJobIdBatches
())
{
jobExecutor
.
executeJobs
(
jobIds
);
}
// if all jobs were executed
millisToWait
=
jobExecutor
.
getWaitTimeInMillis
();
int
jobsAcquired
=
acquiredJobs
.
getJobIdBatches
().
size
();
if
(
jobsAcquired
<
maxJobsPerAcquisition
)
{
isJobAdded
=
false
;
// check if the next timer should fire before the normal sleep time is over
Date
duedate
=
new
Date
(
ClockUtil
.
getCurrentTime
().
getTime
()
+
millisToWait
);
List
<
TimerEntity
>
nextTimers
=
commandExecutor
.
execute
(
new
GetUnlockedTimersByDuedateCmd
(
duedate
,
new
Page
(
0
,
1
)));
if
(!
nextTimers
.
isEmpty
())
{
long
millisTillNextTimer
=
nextTimers
.
get
(
0
).
getDuedate
().
getTime
()
-
ClockUtil
.
getCurrentTime
().
getTime
();
if
(
millisTillNextTimer
<
millisToWait
)
{
millisToWait
=
millisTillNextTimer
;
}
}
}
else
{
millisToWait
=
0
;
}
}
catch
(
Exception
e
)
{
log
.
log
(
Level
.
SEVERE
,
"exception during job acquisition: "
+
e
.
getMessage
(),
e
);
millisToWait
*=
waitIncreaseFactor
;
if
(
millisToWait
>
maxWait
)
{
millisToWait
=
maxWait
;
}
}
if
((
millisToWait
>
0
)
&&
(!
isJobAdded
))
{
try
{
log
.
fine
(
"job acquisition thread sleeping for "
+
millisToWait
+
" millis"
);
synchronized
(
MONITOR
)
{
if
(!
isInterrupted
)
{
isWaiting
.
set
(
true
);
MONITOR
.
wait
(
millisToWait
);
}
}
log
.
fine
(
"job acquisition thread woke up"
);
}
catch
(
InterruptedException
e
)
{
log
.
fine
(
"job acquisition wait interrupted"
);
}
finally
{
isWaiting
.
set
(
false
);
}
}
}
log
.
info
(
jobExecutor
.
getName
()
+
" stopped job acquisition"
);
}
public
void
stop
()
{
synchronized
(
MONITOR
)
{
isInterrupted
=
true
;
if
(
isWaiting
.
compareAndSet
(
true
,
false
))
{
MONITOR
.
notifyAll
();
}
}
}
public
void
jobWasAdded
()
{
isJobAdded
=
true
;
if
(
isWaiting
.
compareAndSet
(
true
,
false
))
{
// ensures we only notify once
// I am OK with the race condition
synchronized
(
MONITOR
)
{
MONITOR
.
notifyAll
();
}
}
}
}
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java
浏览文件 @
6ed56300
...
...
@@ -39,6 +39,10 @@ public class AcquiredJobs {
public
boolean
contains
(
String
jobId
)
{
return
acquiredJobs
.
contains
(
jobId
);
}
public
int
size
()
{
return
acquiredJobs
.
size
();
}
}
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/DefaultJobExecutor.java
0 → 100644
浏览文件 @
6ed56300
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.activiti.engine.impl.jobexecutor
;
import
java.util.List
;
import
java.util.concurrent.ArrayBlockingQueue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.RejectedExecutionHandler
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
org.activiti.engine.ActivitiException
;
/**
* <p>This is a simple implementation of the {@link JobExecutor} using self-managed
* threads for performing background work.</p>
*
* <p>This implementation uses a {@link ThreadPoolExecutor} backed by a queue to which
* work is submitted.</p>
*
* <p><em>NOTE: use this class in environments in which self-management of threads
* is permitted. Consider using a different thread-management strategy in
* J(2)EE-Environments.</em></p>
*
* @author Daniel Meyer
*/
public
class
DefaultJobExecutor
extends
JobExecutor
{
protected
int
queueSize
=
3
;
protected
int
corePoolSize
=
3
;
private
int
maxPoolSize
=
10
;
protected
Thread
jobAcquisitionThread
;
protected
BlockingQueue
<
Runnable
>
threadPoolQueue
;
protected
ThreadPoolExecutor
threadPoolExecutor
;
protected
RejectedExecutionHandler
rejectedExecutionHandler
;
protected
void
startExecutingJobs
()
{
if
(
threadPoolQueue
==
null
)
{
threadPoolQueue
=
new
ArrayBlockingQueue
<
Runnable
>(
queueSize
);
}
if
(
threadPoolExecutor
==
null
)
{
threadPoolExecutor
=
new
ThreadPoolExecutor
(
corePoolSize
,
maxPoolSize
,
0L
,
TimeUnit
.
MILLISECONDS
,
threadPoolQueue
);
if
(
rejectedExecutionHandler
==
null
)
{
rejectedExecutionHandler
=
new
ThreadPoolExecutor
.
CallerRunsPolicy
();
}
threadPoolExecutor
.
setRejectedExecutionHandler
(
rejectedExecutionHandler
);
}
if
(
jobAcquisitionThread
==
null
)
{
jobAcquisitionThread
=
new
Thread
(
acquireJobsRunnable
);
jobAcquisitionThread
.
setDaemon
(
true
);
jobAcquisitionThread
.
start
();
}
}
protected
void
stopExecutingJobs
()
{
// Ask the thread pool to finish and exit
threadPoolExecutor
.
shutdown
();
// Waits for 1 minute to finish all currently executing jobs
try
{
threadPoolExecutor
.
awaitTermination
(
60L
,
TimeUnit
.
SECONDS
);
}
catch
(
InterruptedException
e
)
{
throw
new
ActivitiException
(
"Timeout during shutdown of job executor. "
+
"The current running jobs could not end withing 60 seconds after shutdown operation."
,
e
);
}
threadPoolExecutor
=
null
;
jobAcquisitionThread
=
null
;
}
public
void
executeJobs
(
List
<
String
>
jobIds
)
{
// TODO: RejectedExecutionException handling!
threadPoolExecutor
.
execute
(
new
ExecuteJobsRunnable
(
commandExecutor
,
jobIds
));
}
// getters and setters //////////////////////////////////////////////////////
public
int
getQueueSize
()
{
return
queueSize
;
}
public
void
setQueueSize
(
int
queueSize
)
{
this
.
queueSize
=
queueSize
;
}
public
int
getCorePoolSize
()
{
return
corePoolSize
;
}
public
void
setCorePoolSize
(
int
corePoolSize
)
{
this
.
corePoolSize
=
corePoolSize
;
}
public
int
getMaxPoolSize
()
{
return
maxPoolSize
;
}
public
void
setMaxPoolSize
(
int
maxPoolSize
)
{
this
.
maxPoolSize
=
maxPoolSize
;
}
public
BlockingQueue
<
Runnable
>
getThreadPoolQueue
()
{
return
threadPoolQueue
;
}
public
void
setThreadPoolQueue
(
BlockingQueue
<
Runnable
>
threadPoolQueue
)
{
this
.
threadPoolQueue
=
threadPoolQueue
;
}
public
ThreadPoolExecutor
getThreadPoolExecutor
()
{
return
threadPoolExecutor
;
}
public
void
setThreadPoolExecutor
(
ThreadPoolExecutor
threadPoolExecutor
)
{
this
.
threadPoolExecutor
=
threadPoolExecutor
;
}
protected
RejectedExecutionHandler
getRejectedExecutionHandler
()
{
return
rejectedExecutionHandler
;
}
public
void
setRejectedExecutionHandler
(
RejectedExecutionHandler
rejectedExecutionHandler
)
{
this
.
rejectedExecutionHandler
=
rejectedExecutionHandler
;
}
}
modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java
浏览文件 @
6ed56300
...
...
@@ -10,196 +10,131 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.activiti.engine.impl.jobexecutor
;
import
java.util.List
;
import
java.util.UUID
;
import
java.util.concurrent.ArrayBlockingQueue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.logging.Logger
;
import
org.activiti.engine.ActivitiException
;
import
org.activiti.engine.impl.cmd.AcquireJobsCmd
;
import
org.activiti.engine.impl.interceptor.Command
;
import
org.activiti.engine.impl.interceptor.CommandExecutor
;
import
org.activiti.engine.runtime.Job
;
/**
* Manager class in charge of all background / asynchronous
* processing.
* You should generally only have one of these per Activiti
* instance in a JVM. In clustered situations, you can have
* multiple of these running against the same queue +
* pending job list.
* Uses a {@link ThreadPoolExecutor} internally.
* <p>Interface to the work management component of activiti.</p>
*
* <p>This component is responsible for performing all background work
* ({@link Job Jobs}) scheduled by activiti.</p>
*
* <p>You should generally only have one of these per Activiti instance (process
* engine) in a JVM.
* In clustered situations, you can have multiple of these running against the
* same queue + pending job list.</p>
*
* @author Daniel Meyer
*/
public
class
JobExecutor
{
public
abstract
class
JobExecutor
{
private
static
Logger
log
=
Logger
.
getLogger
(
JobExecutor
.
class
.
getName
());
protected
String
name
=
"JobExecutor["
+
getClass
().
getName
()+
"]"
;
protected
CommandExecutor
commandExecutor
;
protected
Command
<
AcquiredJobs
>
acquireJobsCmd
;
protected
AcquireJobsRunnable
acquireJobsRunnable
;
protected
boolean
isAutoActivate
=
false
;
protected
boolean
isActive
=
false
;
protected
int
maxJobsPerAcquisition
=
3
;
protected
int
waitTimeInMillis
=
5
*
1000
;
protected
String
lockOwner
=
UUID
.
randomUUID
().
toString
();
protected
int
lockTimeInMillis
=
5
*
60
*
1000
;
protected
int
queueSize
=
5
;
protected
int
corePoolSize
=
3
;
private
int
maxPoolSize
=
10
;
protected
JobAcquisitionThread
jobAcquisitionThread
;
protected
BlockingQueue
<
Runnable
>
threadPoolQueue
;
protected
ThreadPoolExecutor
threadPoolExecutor
;
protected
boolean
isActive
=
false
;
public
synchronized
void
start
()
{
if
(
isActive
)
{
// Already started, nothing to do
log
.
info
(
"Ignoring duplicate JobExecutor start invocation"
);
return
;
}
else
{
isActive
=
true
;
if
(
jobAcquisitionThread
==
null
)
{
jobAcquisitionThread
=
new
JobAcquisitionThread
(
this
);
}
if
(
threadPoolQueue
==
null
)
{
threadPoolQueue
=
new
ArrayBlockingQueue
<
Runnable
>(
queueSize
);
}
if
(
threadPoolExecutor
==
null
)
{
threadPoolExecutor
=
new
ThreadPoolExecutor
(
corePoolSize
,
maxPoolSize
,
0L
,
TimeUnit
.
MILLISECONDS
,
threadPoolQueue
);
threadPoolExecutor
.
setRejectedExecutionHandler
(
new
ThreadPoolExecutor
.
CallerRunsPolicy
());
}
// Create our pending jobs fetcher
log
.
fine
(
"JobExecutor is starting the JobAcquisitionThread"
);
jobAcquisitionThread
.
start
()
;
public
void
start
()
{
if
(
isActive
)
{
return
;
}
log
.
info
(
"Starting up the JobExecutor["
+
getClass
().
getName
()+
"]."
);
ensureInitialization
();
startExecutingJobs
();
isActive
=
true
;
}
public
void
shutdown
()
{
if
(!
isActive
)
{
log
.
info
(
"Ignoring request to shut down non-active JobExecutor"
);
public
synchronized
void
shutdown
()
{
if
(!
isActive
)
{
return
;
}
log
.
info
(
"Shutting down the JobExecutor"
);
// Ask the thread pool to finish and exit
threadPoolExecutor
.
shutdown
();
// Waits for 1 minute to finish all currently executing jobs
try
{
threadPoolExecutor
.
awaitTermination
(
60L
,
TimeUnit
.
SECONDS
);
}
catch
(
InterruptedException
e
)
{
throw
new
ActivitiException
(
"Timeout during shutdown of job executor. "
+
"The current running jobs could not end withing 60 seconds after shutdown operation."
,
e
);
}
// Close the pending jobs task
jobAcquisitionThread
.
shutdown
();
log
.
info
(
"Shutting down the JobExecutor["
+
getClass
().
getName
()+
"]."
);
acquireJobsRunnable
.
stop
();
stopExecutingJobs
();
ensureCleanup
();
isActive
=
false
;
// Clear references
threadPoolExecutor
=
null
;
jobAcquisitionThread
=
null
;
}
protected
void
ensureInitialization
()
{
acquireJobsCmd
=
new
AcquireJobsCmd
(
this
);
acquireJobsRunnable
=
new
AcquireJobsRunnable
(
this
);
}
protected
void
ensureCleanup
()
{
acquireJobsCmd
=
null
;
acquireJobsRunnable
=
null
;
}
/**
* Used to hint that new work exists on the
* queue, and that the {@link JobAcquisitionThread}
* should probably re-check for jobs.
*/
public
void
jobWasAdded
()
{
if
(
isActive
&&
jobAcquisitionThread
!=
null
&&
jobAcquisitionThread
.
isActive
()
)
{
jobAcquisitionThread
.
jobWasAdded
();
if
(
isActive
)
{
acquireJobsRunnable
.
jobWasAdded
();
}
}
public
void
executeJobs
(
List
<
String
>
jobIds
)
{
// TODO: RejectedExecutionException handling!
threadPoolExecutor
.
execute
(
new
ExecuteJobsRunnable
(
commandExecutor
,
jobIds
));
}
// getters and setters //////////////////////////////////////////////////////
protected
abstract
void
startExecutingJobs
();
protected
abstract
void
stopExecutingJobs
();
protected
abstract
void
executeJobs
(
List
<
String
>
jobIds
);
// getters and setters //////////////////////////////////////////////////////
public
CommandExecutor
getCommandExecutor
()
{
return
commandExecutor
;
}
public
int
getWaitTimeInMillis
()
{
return
waitTimeInMillis
;
}
public
void
setWaitTimeInMillis
(
int
waitTimeInMillis
)
{
this
.
waitTimeInMillis
=
waitTimeInMillis
;
}
public
int
getLockTimeInMillis
()
{
return
lockTimeInMillis
;
}
public
void
setLockTimeInMillis
(
int
lockTimeInMillis
)
{
this
.
lockTimeInMillis
=
lockTimeInMillis
;
}
public
int
getQueueSize
()
{
return
queueSize
;
}
public
void
setQueueSize
(
int
queueSize
)
{
this
.
queueSize
=
queueSize
;
}
public
int
getCorePoolSize
()
{
return
corePoolSize
;
}
public
void
setCorePoolSize
(
int
corePoolSize
)
{
this
.
corePoolSize
=
corePoolSize
;
}
public
int
getMaxPoolSize
()
{
return
maxPoolSize
;
public
String
getLockOwner
()
{
return
lockOwner
;
}
public
void
setMaxPoolSize
(
int
maxPoolSize
)
{
this
.
maxPoolSize
=
maxPoolSize
;
}
public
JobAcquisitionThread
getJobAcquisitionThread
()
{
return
jobAcquisitionThread
;
}
public
void
setJobAcquisitionThread
(
JobAcquisitionThread
jobAcquisitionThread
)
{
this
.
jobAcquisitionThread
=
jobAcquisitionThread
;
}
public
BlockingQueue
<
Runnable
>
getThreadPoolQueue
()
{
return
threadPoolQueue
;
public
void
setLockOwner
(
String
lockOwner
)
{
this
.
lockOwner
=
lockOwner
;
}
public
void
setThreadPoolQueue
(
BlockingQueue
<
Runnable
>
threadPoolQueue
)
{
this
.
threadPoolQueue
=
threadPoolQueu
e
;
public
boolean
isAutoActivate
(
)
{
return
isAutoActivat
e
;
}
public
ThreadPoolExecutor
getThreadPoolExecutor
()
{
return
threadPoolExecutor
;
}
public
void
setThreadPoolExecutor
(
ThreadPoolExecutor
threadPoolExecutor
)
{
this
.
threadPoolExecutor
=
threadPoolExecutor
;
public
void
setCommandExecutor
(
CommandExecutor
commandExecutor
)
{
this
.
commandExecutor
=
commandExecutor
;
}
public
boolean
isActive
(
)
{
return
isActiv
e
;
public
void
setAutoActivate
(
boolean
isAutoActivate
)
{
this
.
isAutoActivate
=
isAutoActivat
e
;
}
public
int
getMaxJobsPerAcquisition
()
{
return
maxJobsPerAcquisition
;
}
...
...
@@ -207,24 +142,21 @@ public class JobExecutor {
public
void
setMaxJobsPerAcquisition
(
int
maxJobsPerAcquisition
)
{
this
.
maxJobsPerAcquisition
=
maxJobsPerAcquisition
;
}
public
String
getLockOwner
()
{
return
lockOwner
;
}
public
void
setLockOwner
(
String
lockOwner
)
{
this
.
lockOwner
=
lockOwner
;
}
public
boolean
isAutoActivate
()
{
return
isAutoActivate
;
public
String
getName
()
{
return
name
;
}
public
void
setCommandExecutor
(
CommandExecutor
commandExecutor
)
{
this
.
commandExecutor
=
commandExecutor
;
public
Command
<
AcquiredJobs
>
getAcquireJobsCmd
(
)
{
return
acquireJobsCmd
;
}
public
void
setA
utoActivate
(
boolean
isAutoActivate
)
{
this
.
isAutoActivate
=
isAutoActivate
;
public
void
setA
cquireJobsCmd
(
Command
<
AcquiredJobs
>
acquireJobsCmd
)
{
this
.
acquireJobsCmd
=
acquireJobsCmd
;
}
public
boolean
isActive
()
{
return
isActive
;
}
}
modules/activiti-engine/src/test/java/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java
浏览文件 @
6ed56300
...
...
@@ -39,7 +39,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// the service was not invoked:
assertFalse
(
INVOCATION
);
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the service was invoked
assertTrue
(
INVOCATION
);
...
...
@@ -54,7 +54,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// the listener was not yet invoked:
assertNull
(
runtimeService
.
getVariable
(
pid
,
"listener"
));
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
assertEquals
(
0
,
managementService
.
createJobQuery
().
count
());
}
...
...
@@ -69,7 +69,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// the service was not invoked:
assertFalse
(
INVOCATION
);
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the service was invoked
assertTrue
(
INVOCATION
);
...
...
@@ -87,7 +87,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// the service was not invoked:
assertFalse
(
INVOCATION
);
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the service was invoked
assertTrue
(
INVOCATION
);
...
...
@@ -107,7 +107,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
fail
(
"the job must be a message"
);
}
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the service failed: the execution is still sitting in the service task:
Execution
execution
=
runtimeService
.
createExecutionQuery
().
singleResult
();
...
...
@@ -130,7 +130,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
assertEquals
(
2
,
managementService
.
createJobQuery
().
count
());
// let 'max-retires' on the message be reached
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the service failed: the execution is still sitting in the service task:
Execution
execution
=
runtimeService
.
createExecutionQuery
().
singleResult
();
...
...
@@ -142,7 +142,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// now the timer triggers:
ClockUtil
.
setCurrentTime
(
new
Date
(
System
.
currentTimeMillis
()+
10000
));
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// and we are done:
assertNull
(
runtimeService
.
createExecutionQuery
().
singleResult
());
...
...
@@ -161,7 +161,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// the service was not invoked:
assertFalse
(
INVOCATION
);
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the service was invoked
assertTrue
(
INVOCATION
);
...
...
@@ -177,7 +177,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
assertEquals
(
1
,
managementService
.
createJobQuery
().
count
());
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// both the timer and the message are cancelled
assertEquals
(
0
,
managementService
.
createJobQuery
().
count
());
...
...
@@ -191,7 +191,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// now there should be one job in the database:
assertEquals
(
1
,
managementService
.
createJobQuery
().
count
());
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the job is done
assertEquals
(
0
,
managementService
.
createJobQuery
().
count
());
...
...
@@ -207,7 +207,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
String
eid
=
runtimeService
.
createExecutionQuery
().
singleResult
().
getId
();
assertNull
(
runtimeService
.
getVariable
(
eid
,
"invoked"
));
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// and the job is done
assertEquals
(
0
,
managementService
.
createJobQuery
().
count
());
...
...
@@ -226,7 +226,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// now there should be one job in the database:
assertEquals
(
1
,
managementService
.
createJobQuery
().
count
());
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
assertEquals
(
0
,
managementService
.
createJobQuery
().
count
());
...
...
@@ -243,7 +243,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase {
// there is no usertask
assertNull
(
taskService
.
createTaskQuery
().
singleResult
());
waitForJobExecutorToProcessAllJobs
(
5
000L
,
25L
);
waitForJobExecutorToProcessAllJobs
(
10
000L
,
25L
);
// the listener was now invoked:
assertNotNull
(
runtimeService
.
getVariable
(
pid
,
"listener"
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录