Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
liyuanzhong001
DolphinScheduler
提交
d371745d
DolphinScheduler
项目概览
liyuanzhong001
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
11
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d371745d
编写于
1月 17, 2020
作者:
J
Jave-Chen
提交者:
Tboy
1月 17, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix bug: Use try-with-resources or close this "Socket" in a "finally" clause. (#1716)
* #1714
上级
c522ea7e
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
30 addition
and
30 deletion
+30
-30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java
...apache/dolphinscheduler/api/utils/FourLetterWordMain.java
+7
-13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
...ache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+23
-17
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java
浏览文件 @
d371745d
...
...
@@ -59,23 +59,22 @@ public class FourLetterWordMain {
*/
public
static
String
send4LetterWord
(
String
host
,
int
port
,
String
cmd
,
int
timeout
)
throws
IOException
{
LOG
.
info
(
"connecting to "
+
host
+
" "
+
port
);
Socket
sock
=
new
Socket
();
LOG
.
info
(
"connecting to {} {}"
,
host
,
port
);
InetSocketAddress
hostaddress
=
host
!=
null
?
new
InetSocketAddress
(
host
,
port
)
:
new
InetSocketAddress
(
InetAddress
.
getByName
(
null
),
port
);
BufferedReader
reader
=
null
;
try
{
try
(
Socket
sock
=
new
Socket
();
OutputStream
outstream
=
sock
.
getOutputStream
();
BufferedReader
reader
=
new
BufferedReader
(
new
InputStreamReader
(
sock
.
getInputStream
())))
{
sock
.
setSoTimeout
(
timeout
);
sock
.
connect
(
hostaddress
,
timeout
);
OutputStream
outstream
=
sock
.
getOutputStream
();
outstream
.
write
(
cmd
.
getBytes
());
outstream
.
flush
();
// this replicates NC - close the output stream before reading
sock
.
shutdownOutput
();
reader
=
new
BufferedReader
(
new
InputStreamReader
(
sock
.
getInputStream
()));
StringBuilder
sb
=
new
StringBuilder
();
String
line
;
while
((
line
=
reader
.
readLine
())
!=
null
)
{
...
...
@@ -84,11 +83,6 @@ public class FourLetterWordMain {
return
sb
.
toString
();
}
catch
(
SocketTimeoutException
e
)
{
throw
new
IOException
(
"Exception while executing four letter word: "
+
cmd
,
e
);
}
finally
{
sock
.
close
();
if
(
reader
!=
null
)
{
reader
.
close
();
}
}
}
}
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
浏览文件 @
d371745d
...
...
@@ -105,7 +105,7 @@ public class SqlTask extends AbstractTask {
// set the name of the current thread
String
threadLoggerInfoName
=
String
.
format
(
Constants
.
TASK_LOG_INFO_FORMAT
,
taskProps
.
getTaskAppId
());
Thread
.
currentThread
().
setName
(
threadLoggerInfoName
);
logger
.
info
(
sqlParameters
.
toString
()
);
logger
.
info
(
"Full sql parameters: {}"
,
sqlParameters
);
logger
.
info
(
"sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}"
,
sqlParameters
.
getType
(),
sqlParameters
.
getDatasource
(),
...
...
@@ -289,12 +289,12 @@ public class SqlTask extends AbstractTask {
}
}
try
(
PreparedStatement
stmt
=
prepareStatementAndBind
(
connection
,
mainSqlBinds
))
{
try
(
PreparedStatement
stmt
=
prepareStatementAndBind
(
connection
,
mainSqlBinds
);
ResultSet
resultSet
=
stmt
.
executeQuery
())
{
// decide whether to executeQuery or executeUpdate based on sqlType
if
(
sqlParameters
.
getSqlType
()
==
SqlType
.
QUERY
.
ordinal
())
{
// query statements need to be convert to JsonArray and inserted into Alert to send
JSONArray
resultJSONArray
=
new
JSONArray
();
ResultSet
resultSet
=
stmt
.
executeQuery
();
ResultSetMetaData
md
=
resultSet
.
getMetaData
();
int
num
=
md
.
getColumnCount
();
...
...
@@ -305,11 +305,10 @@ public class SqlTask extends AbstractTask {
}
resultJSONArray
.
add
(
mapOfColValues
);
}
resultSet
.
close
();
logger
.
debug
(
"execute sql : {}"
,
JSONObject
.
toJSONString
(
resultJSONArray
,
SerializerFeature
.
WriteMapNullValue
));
// if there is a result set
if
(
resultJSONArray
.
size
()
>
0
)
{
if
(
!
resultJSONArray
.
isEmpty
()
)
{
if
(
StringUtils
.
isNotEmpty
(
sqlParameters
.
getTitle
()))
{
sendAttachment
(
sqlParameters
.
getTitle
(),
JSONObject
.
toJSONString
(
resultJSONArray
,
SerializerFeature
.
WriteMapNullValue
));
...
...
@@ -337,6 +336,12 @@ public class SqlTask extends AbstractTask {
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
throw
new
RuntimeException
(
e
.
getMessage
());
}
finally
{
try
{
connection
.
close
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
return
connection
;
}
...
...
@@ -349,22 +354,23 @@ public class SqlTask extends AbstractTask {
* @throws Exception
*/
private
PreparedStatement
prepareStatementAndBind
(
Connection
connection
,
SqlBinds
sqlBinds
)
throws
Exception
{
PreparedStatement
stmt
=
connection
.
prepareStatement
(
sqlBinds
.
getSql
());
// is the timeout set
boolean
timeoutFlag
=
taskProps
.
getTaskTimeoutStrategy
()
==
TaskTimeoutStrategy
.
FAILED
||
taskProps
.
getTaskTimeoutStrategy
()
==
TaskTimeoutStrategy
.
WARNFAILED
;
if
(
timeoutFlag
){
stmt
.
setQueryTimeout
(
taskProps
.
getTaskTimeout
());
}
Map
<
Integer
,
Property
>
params
=
sqlBinds
.
getParamsMap
();
if
(
params
!=
null
)
{
for
(
Map
.
Entry
<
Integer
,
Property
>
entry
:
params
.
entrySet
())
{
Property
prop
=
entry
.
getValue
();
ParameterUtils
.
setInParameter
(
entry
.
getKey
(),
stmt
,
prop
.
getType
(),
prop
.
getValue
());
try
(
PreparedStatement
stmt
=
connection
.
prepareStatement
(
sqlBinds
.
getSql
()))
{
if
(
timeoutFlag
){
stmt
.
setQueryTimeout
(
taskProps
.
getTaskTimeout
());
}
Map
<
Integer
,
Property
>
params
=
sqlBinds
.
getParamsMap
();
if
(
params
!=
null
)
{
for
(
Map
.
Entry
<
Integer
,
Property
>
entry
:
params
.
entrySet
())
{
Property
prop
=
entry
.
getValue
();
ParameterUtils
.
setInParameter
(
entry
.
getKey
(),
stmt
,
prop
.
getType
(),
prop
.
getValue
());
}
}
logger
.
info
(
"prepare statement replace sql : {} "
,
stmt
);
return
stmt
;
}
logger
.
info
(
"prepare statement replace sql : {} "
,
stmt
.
toString
());
return
stmt
;
}
/**
...
...
@@ -452,7 +458,7 @@ public class SqlTask extends AbstractTask {
for
(
int
i
=
1
;
i
<=
sqlParamsMap
.
size
();
i
++){
logPrint
.
append
(
sqlParamsMap
.
get
(
i
).
getValue
()+
"("
+
sqlParamsMap
.
get
(
i
).
getType
()+
")"
);
}
logger
.
info
(
logPrint
.
toString
()
);
logger
.
info
(
"Sql Params are {}"
,
logPrint
);
}
/**
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录