Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jeecg
jeecg-boot
提交
cf0d2955
J
jeecg-boot
项目概览
jeecg
/
jeecg-boot
上一次同步 3 年多
通知
863
Star
24375
Fork
84
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
J
jeecg-boot
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cf0d2955
编写于
3月 09, 2021
作者:
JEECG低代码平台
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
重写xxljob避免默认 9999端口冲突
上级
bb4c3c86
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
228 addition
and
0 deletion
+228
-0
jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
...c/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
+228
-0
未找到文件。
jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
0 → 100644
浏览文件 @
cf0d2955
package
com.xxl.job.core.executor
;
import
com.xxl.job.core.biz.AdminBiz
;
import
com.xxl.job.core.biz.client.AdminBizClient
;
import
com.xxl.job.core.handler.IJobHandler
;
import
com.xxl.job.core.log.XxlJobFileAppender
;
import
com.xxl.job.core.server.EmbedServer
;
import
com.xxl.job.core.thread.JobLogFileCleanThread
;
import
com.xxl.job.core.thread.JobThread
;
import
com.xxl.job.core.thread.TriggerCallbackThread
;
import
com.xxl.job.core.util.IpUtil
;
import
com.xxl.job.core.util.NetUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
/**
* 重写目的修改默认端口9999为10000避免和网关冲突
*/
public
class
XxlJobExecutor
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
XxlJobExecutor
.
class
);
// ---------------------- param ----------------------
private
String
adminAddresses
;
private
String
accessToken
;
private
String
appname
;
private
String
address
;
private
String
ip
;
private
int
port
;
private
String
logPath
;
private
int
logRetentionDays
;
public
void
setAdminAddresses
(
String
adminAddresses
)
{
this
.
adminAddresses
=
adminAddresses
;
}
public
void
setAccessToken
(
String
accessToken
)
{
this
.
accessToken
=
accessToken
;
}
public
void
setAppname
(
String
appname
)
{
this
.
appname
=
appname
;
}
public
void
setAddress
(
String
address
)
{
this
.
address
=
address
;
}
public
void
setIp
(
String
ip
)
{
this
.
ip
=
ip
;
}
public
void
setPort
(
int
port
)
{
this
.
port
=
port
;
}
public
void
setLogPath
(
String
logPath
)
{
this
.
logPath
=
logPath
;
}
public
void
setLogRetentionDays
(
int
logRetentionDays
)
{
this
.
logRetentionDays
=
logRetentionDays
;
}
// ---------------------- start + stop ----------------------
public
void
start
()
throws
Exception
{
// init logpath
XxlJobFileAppender
.
initLogPath
(
logPath
);
// init invoker, admin-client
initAdminBizList
(
adminAddresses
,
accessToken
);
// init JobLogFileCleanThread
JobLogFileCleanThread
.
getInstance
().
start
(
logRetentionDays
);
// init TriggerCallbackThread
TriggerCallbackThread
.
getInstance
().
start
();
// init executor-server
initEmbedServer
(
address
,
ip
,
port
,
appname
,
accessToken
);
}
public
void
destroy
()
{
// destory executor-server
stopEmbedServer
();
// destory jobThreadRepository
if
(
jobThreadRepository
.
size
()
>
0
)
{
for
(
Map
.
Entry
<
Integer
,
JobThread
>
item
:
jobThreadRepository
.
entrySet
())
{
JobThread
oldJobThread
=
removeJobThread
(
item
.
getKey
(),
"web container destroy and kill the job."
);
// wait for job thread push result to callback queue
if
(
oldJobThread
!=
null
)
{
try
{
oldJobThread
.
join
();
}
catch
(
InterruptedException
e
)
{
logger
.
error
(
">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}"
,
item
.
getKey
(),
e
);
}
}
}
jobThreadRepository
.
clear
();
}
jobHandlerRepository
.
clear
();
// destory JobLogFileCleanThread
JobLogFileCleanThread
.
getInstance
().
toStop
();
// destory TriggerCallbackThread
TriggerCallbackThread
.
getInstance
().
toStop
();
}
// ---------------------- admin-client (rpc invoker) ----------------------
private
static
List
<
AdminBiz
>
adminBizList
;
private
void
initAdminBizList
(
String
adminAddresses
,
String
accessToken
)
throws
Exception
{
if
(
adminAddresses
!=
null
&&
adminAddresses
.
trim
().
length
()
>
0
)
{
for
(
String
address
:
adminAddresses
.
trim
().
split
(
","
))
{
if
(
address
!=
null
&&
address
.
trim
().
length
()
>
0
)
{
AdminBiz
adminBiz
=
new
AdminBizClient
(
address
.
trim
(),
accessToken
);
if
(
adminBizList
==
null
)
{
adminBizList
=
new
ArrayList
<
AdminBiz
>();
}
adminBizList
.
add
(
adminBiz
);
}
}
}
}
public
static
List
<
AdminBiz
>
getAdminBizList
()
{
return
adminBizList
;
}
// ---------------------- executor-server (rpc provider) ----------------------
private
EmbedServer
embedServer
=
null
;
private
void
initEmbedServer
(
String
address
,
String
ip
,
int
port
,
String
appname
,
String
accessToken
)
throws
Exception
{
// fill ip port 修改默认端口
port
=
port
>
0
?
port
:
NetUtil
.
findAvailablePort
(
10000
);
ip
=
(
ip
!=
null
&&
ip
.
trim
().
length
()
>
0
)
?
ip
:
IpUtil
.
getIp
();
// generate address
if
(
address
==
null
||
address
.
trim
().
length
()
==
0
)
{
String
ip_port_address
=
IpUtil
.
getIpPort
(
ip
,
port
);
// registry-address:default use address to registry , otherwise use ip:port if address is null
address
=
"http://{ip_port}/"
.
replace
(
"{ip_port}"
,
ip_port_address
);
}
// accessToken
if
(
accessToken
==
null
||
accessToken
.
trim
().
length
()
==
0
)
{
logger
.
warn
(
">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."
);
}
// start
embedServer
=
new
EmbedServer
();
embedServer
.
start
(
address
,
port
,
appname
,
accessToken
);
}
private
void
stopEmbedServer
()
{
// stop provider factory
if
(
embedServer
!=
null
)
{
try
{
embedServer
.
stop
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
}
// ---------------------- job handler repository ----------------------
private
static
ConcurrentMap
<
String
,
IJobHandler
>
jobHandlerRepository
=
new
ConcurrentHashMap
<
String
,
IJobHandler
>();
public
static
IJobHandler
loadJobHandler
(
String
name
)
{
return
jobHandlerRepository
.
get
(
name
);
}
public
static
IJobHandler
registJobHandler
(
String
name
,
IJobHandler
jobHandler
)
{
logger
.
info
(
">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}"
,
name
,
jobHandler
);
return
jobHandlerRepository
.
put
(
name
,
jobHandler
);
}
// ---------------------- job thread repository ----------------------
private
static
ConcurrentMap
<
Integer
,
JobThread
>
jobThreadRepository
=
new
ConcurrentHashMap
<
Integer
,
JobThread
>();
public
static
JobThread
registJobThread
(
int
jobId
,
IJobHandler
handler
,
String
removeOldReason
)
{
JobThread
newJobThread
=
new
JobThread
(
jobId
,
handler
);
newJobThread
.
start
();
logger
.
info
(
">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}"
,
new
Object
[]{
jobId
,
handler
});
JobThread
oldJobThread
=
jobThreadRepository
.
put
(
jobId
,
newJobThread
);
// putIfAbsent | oh my god, map's put method return the old value!!!
if
(
oldJobThread
!=
null
)
{
oldJobThread
.
toStop
(
removeOldReason
);
oldJobThread
.
interrupt
();
}
return
newJobThread
;
}
public
static
JobThread
removeJobThread
(
int
jobId
,
String
removeOldReason
)
{
JobThread
oldJobThread
=
jobThreadRepository
.
remove
(
jobId
);
if
(
oldJobThread
!=
null
)
{
oldJobThread
.
toStop
(
removeOldReason
);
oldJobThread
.
interrupt
();
return
oldJobThread
;
}
return
null
;
}
public
static
JobThread
loadJobThread
(
int
jobId
)
{
JobThread
jobThread
=
jobThreadRepository
.
get
(
jobId
);
return
jobThread
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录