Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Iotdb
提交
190f0090
I
Iotdb
项目概览
apache
/
Iotdb
11 个月 前同步成功
通知
25
Star
3344
Fork
916
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
I
Iotdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
190f0090
编写于
6月 28, 2023
作者:
W
Weihao Li
提交者:
GitHub
6月 28, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix code smells of UDF and Trigger in ConfigNode
上级
d0b6a661
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
303 addition
and
100 deletion
+303
-100
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java
...consensus/request/read/function/GetFunctionTablePlan.java
+3
-1
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTransferringTriggersPlan.java
...sus/request/read/trigger/GetTransferringTriggersPlan.java
+3
-1
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTriggerJarPlan.java
...ode/consensus/request/read/trigger/GetTriggerJarPlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTriggerLocationPlan.java
...onsensus/request/read/trigger/GetTriggerLocationPlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTriggerTablePlan.java
...e/consensus/request/read/trigger/GetTriggerTablePlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/udf/GetUDFJarPlan.java
.../confignode/consensus/request/read/udf/GetUDFJarPlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/CreateFunctionPlan.java
.../consensus/request/write/function/CreateFunctionPlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java
...de/consensus/request/write/function/DropFunctionPlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
...onsensus/request/write/trigger/AddTriggerInTablePlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
...ensus/request/write/trigger/DeleteTriggerInTablePlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
...nsus/request/write/trigger/UpdateTriggerLocationPlan.java
+22
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
.../request/write/trigger/UpdateTriggerStateInTablePlan.java
+21
-0
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
...uest/write/trigger/UpdateTriggersOnTransferNodesPlan.java
+22
-2
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java
...ignode/consensus/response/function/FunctionTableResp.java
+4
-14
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/udf/JarResp.java
...ache/iotdb/confignode/consensus/response/udf/JarResp.java
+1
-2
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
...a/org/apache/iotdb/confignode/manager/TriggerManager.java
+8
-16
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
.../java/org/apache/iotdb/confignode/manager/UDFManager.java
+9
-17
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
...confignode/manager/pipe/plugin/PipePluginCoordinator.java
+6
-14
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
.../org/apache/iotdb/confignode/persistence/TriggerInfo.java
+7
-21
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
...java/org/apache/iotdb/confignode/persistence/UDFInfo.java
+11
-10
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
...ignode/procedure/impl/trigger/CreateTriggerProcedure.java
+9
-1
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
...nfignode/procedure/impl/trigger/DropTriggerProcedure.java
+9
-1
未找到文件。
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java
浏览文件 @
190f0090
...
...
@@ -38,5 +38,7 @@ public class GetFunctionTablePlan extends ConfigPhysicalPlan {
}
@Override
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{}
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
// do nothing
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTransferringTriggersPlan.java
浏览文件 @
190f0090
...
...
@@ -38,5 +38,7 @@ public class GetTransferringTriggersPlan extends ConfigPhysicalPlan {
}
@Override
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{}
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
// do nothing
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTriggerJarPlan.java
浏览文件 @
190f0090
...
...
@@ -28,6 +28,7 @@ import java.io.IOException;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Objects
;
public
class
GetTriggerJarPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -65,4 +66,24 @@ public class GetTriggerJarPlan extends ConfigPhysicalPlan {
size
--;
}
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
GetTriggerJarPlan
that
=
(
GetTriggerJarPlan
)
o
;
return
Objects
.
equals
(
jarNames
,
that
.
jarNames
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
jarNames
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTriggerLocationPlan.java
浏览文件 @
190f0090
...
...
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
GetTriggerLocationPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -59,4 +60,24 @@ public class GetTriggerLocationPlan extends ConfigPhysicalPlan {
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
this
.
triggerName
=
ReadWriteIOUtils
.
readString
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
GetTriggerLocationPlan
that
=
(
GetTriggerLocationPlan
)
o
;
return
Objects
.
equals
(
triggerName
,
that
.
triggerName
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
triggerName
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/trigger/GetTriggerTablePlan.java
浏览文件 @
190f0090
...
...
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
GetTriggerTablePlan
extends
ConfigPhysicalPlan
{
...
...
@@ -59,4 +60,24 @@ public class GetTriggerTablePlan extends ConfigPhysicalPlan {
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
this
.
onlyStateful
=
ReadWriteIOUtils
.
readBool
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
GetTriggerTablePlan
that
=
(
GetTriggerTablePlan
)
o
;
return
onlyStateful
==
that
.
onlyStateful
;
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
onlyStateful
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/udf/GetUDFJarPlan.java
浏览文件 @
190f0090
...
...
@@ -28,6 +28,7 @@ import java.io.IOException;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Objects
;
public
class
GetUDFJarPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -65,4 +66,24 @@ public class GetUDFJarPlan extends ConfigPhysicalPlan {
size
--;
}
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
GetUDFJarPlan
that
=
(
GetUDFJarPlan
)
o
;
return
Objects
.
equals
(
jarNames
,
that
.
jarNames
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
jarNames
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/CreateFunctionPlan.java
浏览文件 @
190f0090
...
...
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
CreateFunctionPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -73,4 +74,24 @@ public class CreateFunctionPlan extends ConfigPhysicalPlan {
}
jarFile
=
ReadWriteIOUtils
.
readBinary
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
CreateFunctionPlan
that
=
(
CreateFunctionPlan
)
o
;
return
Objects
.
equals
(
udfInformation
,
that
.
udfInformation
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
udfInformation
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java
浏览文件 @
190f0090
...
...
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
DropFunctionPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -54,4 +55,24 @@ public class DropFunctionPlan extends ConfigPhysicalPlan {
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
functionName
=
ReadWriteIOUtils
.
readString
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
DropFunctionPlan
that
=
(
DropFunctionPlan
)
o
;
return
Objects
.
equals
(
functionName
,
that
.
functionName
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
functionName
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
浏览文件 @
190f0090
...
...
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
AddTriggerInTablePlan
extends
ConfigPhysicalPlan
{
...
...
@@ -81,4 +82,24 @@ public class AddTriggerInTablePlan extends ConfigPhysicalPlan {
}
jarFile
=
ReadWriteIOUtils
.
readBinary
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
AddTriggerInTablePlan
that
=
(
AddTriggerInTablePlan
)
o
;
return
Objects
.
equals
(
triggerInformation
,
that
.
triggerInformation
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
triggerInformation
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
浏览文件 @
190f0090
...
...
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
DeleteTriggerInTablePlan
extends
ConfigPhysicalPlan
{
...
...
@@ -59,4 +60,24 @@ public class DeleteTriggerInTablePlan extends ConfigPhysicalPlan {
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
triggerName
=
ReadWriteIOUtils
.
readString
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
DeleteTriggerInTablePlan
that
=
(
DeleteTriggerInTablePlan
)
o
;
return
Objects
.
equals
(
triggerName
,
that
.
triggerName
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
triggerName
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
浏览文件 @
190f0090
...
...
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
UpdateTriggerLocationPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -73,4 +74,25 @@ public class UpdateTriggerLocationPlan extends ConfigPhysicalPlan {
this
.
triggerName
=
ReadWriteIOUtils
.
readString
(
buffer
);
this
.
dataNodeLocation
=
ThriftCommonsSerDeUtils
.
deserializeTDataNodeLocation
(
buffer
);
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
UpdateTriggerLocationPlan
that
=
(
UpdateTriggerLocationPlan
)
o
;
return
Objects
.
equals
(
triggerName
,
that
.
triggerName
)
&&
Objects
.
equals
(
dataNodeLocation
,
that
.
dataNodeLocation
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
triggerName
,
dataNodeLocation
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
浏览文件 @
190f0090
...
...
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
public
class
UpdateTriggerStateInTablePlan
extends
ConfigPhysicalPlan
{
...
...
@@ -72,4 +73,24 @@ public class UpdateTriggerStateInTablePlan extends ConfigPhysicalPlan {
triggerName
=
ReadWriteIOUtils
.
readString
(
buffer
);
triggerState
=
TTriggerState
.
findByValue
(
ReadWriteIOUtils
.
readInt
(
buffer
));
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
UpdateTriggerStateInTablePlan
that
=
(
UpdateTriggerStateInTablePlan
)
o
;
return
Objects
.
equals
(
triggerName
,
that
.
triggerName
)
&&
triggerState
==
that
.
triggerState
;
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
triggerName
,
triggerState
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
浏览文件 @
190f0090
...
...
@@ -30,6 +30,7 @@ import java.io.IOException;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Objects
;
public
class
UpdateTriggersOnTransferNodesPlan
extends
ConfigPhysicalPlan
{
...
...
@@ -37,6 +38,7 @@ public class UpdateTriggersOnTransferNodesPlan extends ConfigPhysicalPlan {
public
UpdateTriggersOnTransferNodesPlan
()
{
super
(
ConfigPhysicalPlanType
.
UpdateTriggersOnTransferNodes
);
this
.
dataNodeLocations
=
new
ArrayList
<>();
}
public
UpdateTriggersOnTransferNodesPlan
(
List
<
TDataNodeLocation
>
dataNodeLocations
)
{
...
...
@@ -65,11 +67,29 @@ public class UpdateTriggersOnTransferNodesPlan extends ConfigPhysicalPlan {
@Override
protected
void
deserializeImpl
(
ByteBuffer
buffer
)
throws
IOException
{
int
size
=
ReadWriteIOUtils
.
readInt
(
buffer
);
List
<
TDataNodeLocation
>
dataNodeLocations
=
new
ArrayList
<>(
size
);
while
(
size
>
0
)
{
dataNodeLocations
.
add
(
ThriftCommonsSerDeUtils
.
deserializeTDataNodeLocation
(
buffer
));
size
--;
}
this
.
dataNodeLocations
=
dataNodeLocations
;
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
if
(!
super
.
equals
(
o
))
{
return
false
;
}
UpdateTriggersOnTransferNodesPlan
that
=
(
UpdateTriggersOnTransferNodesPlan
)
o
;
return
Objects
.
equals
(
dataNodeLocations
,
that
.
dataNodeLocations
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
dataNodeLocations
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java
浏览文件 @
190f0090
...
...
@@ -33,13 +33,11 @@ public class FunctionTableResp implements DataSet {
private
TSStatus
status
;
private
List
<
UDFInformation
>
allUDF
Information
;
private
final
List
<
UDFInformation
>
allUdf
Information
;
public
FunctionTableResp
()
{}
public
FunctionTableResp
(
TSStatus
status
,
List
<
UDFInformation
>
allUDFInformation
)
{
public
FunctionTableResp
(
TSStatus
status
,
List
<
UDFInformation
>
allUdfInformation
)
{
this
.
status
=
status
;
this
.
allU
DFInformation
=
allUDF
Information
;
this
.
allU
dfInformation
=
allUdf
Information
;
}
public
TSStatus
getStatus
()
{
...
...
@@ -50,18 +48,10 @@ public class FunctionTableResp implements DataSet {
this
.
status
=
status
;
}
public
List
<
UDFInformation
>
getAllUDFInformation
()
{
return
allUDFInformation
;
}
public
void
setAllUDFInformation
(
List
<
UDFInformation
>
allUDFInformation
)
{
this
.
allUDFInformation
=
allUDFInformation
;
}
public
TGetUDFTableResp
convertToThriftResponse
()
throws
IOException
{
List
<
ByteBuffer
>
udfInformationByteBuffers
=
new
ArrayList
<>();
for
(
UDFInformation
udfInformation
:
allU
DF
Information
)
{
for
(
UDFInformation
udfInformation
:
allU
df
Information
)
{
udfInformationByteBuffers
.
add
(
udfInformation
.
serialize
());
}
...
...
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/udf/JarResp.java
浏览文件 @
190f0090
...
...
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp
;
import
org.apache.iotdb.consensus.common.DataSet
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.List
;
...
...
@@ -46,7 +45,7 @@ public class JarResp implements DataSet {
this
.
status
=
status
;
}
public
TGetJarInListResp
convertToThriftResponse
()
throws
IOException
{
public
TGetJarInListResp
convertToThriftResponse
()
{
return
new
TGetJarInListResp
(
status
,
jarList
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
浏览文件 @
190f0090
...
...
@@ -107,8 +107,8 @@ public class TriggerManager {
}
}
final
String
triggerName
=
req
.
getTriggerName
();
final
boolean
isUsingURI
=
req
.
isIsUsingURI
()
,
needToSaveJar
=
isUsingURI
&&
triggerInfo
.
needToSaveJar
(
triggerName
);
final
boolean
isUsingURI
=
req
.
isIsUsingURI
()
;
final
boolean
needToSaveJar
=
isUsingURI
&&
triggerInfo
.
needToSaveJar
(
triggerName
);
TriggerInformation
triggerInformation
=
new
TriggerInformation
(
(
PartialPath
)
PathDeserializeUtil
.
deserialize
(
req
.
pathPattern
),
...
...
@@ -159,20 +159,12 @@ public class TriggerManager {
}
public
TGetJarInListResp
getTriggerJar
(
TGetJarInListReq
req
)
{
try
{
return
((
JarResp
)
configManager
.
getConsensusManager
()
.
read
(
new
GetTriggerJarPlan
(
req
.
getJarNameList
()))
.
getDataset
())
.
convertToThriftResponse
();
}
catch
(
IOException
e
)
{
LOGGER
.
error
(
"Fail to get TriggerJar"
,
e
);
return
new
TGetJarInListResp
(
new
TSStatus
(
TSStatusCode
.
EXECUTE_STATEMENT_ERROR
.
getStatusCode
())
.
setMessage
(
e
.
getMessage
()),
Collections
.
emptyList
());
}
return
((
JarResp
)
configManager
.
getConsensusManager
()
.
read
(
new
GetTriggerJarPlan
(
req
.
getJarNameList
()))
.
getDataset
())
.
convertToThriftResponse
();
}
/**
...
...
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
浏览文件 @
190f0090
...
...
@@ -78,9 +78,9 @@ public class UDFManager {
udfInfo
.
acquireUDFTableLock
();
try
{
final
boolean
isUsingURI
=
req
.
isIsUsingURI
();
final
String
udfName
=
req
.
udfName
.
toUpperCase
()
,
jarMD5
=
req
.
getJarMD5
(),
jarName
=
req
.
getJarName
();
final
String
udfName
=
req
.
udfName
.
toUpperCase
()
;
final
String
jarMD5
=
req
.
getJarMD5
();
final
String
jarName
=
req
.
getJarName
();
final
byte
[]
jarFile
=
req
.
getJarFile
();
udfInfo
.
validate
(
udfName
,
jarName
,
jarMD5
);
...
...
@@ -183,19 +183,11 @@ public class UDFManager {
}
public
TGetJarInListResp
getUDFJar
(
TGetJarInListReq
req
)
{
try
{
return
((
JarResp
)
configManager
.
getConsensusManager
()
.
read
(
new
GetUDFJarPlan
(
req
.
getJarNameList
()))
.
getDataset
())
.
convertToThriftResponse
();
}
catch
(
IOException
e
)
{
LOGGER
.
error
(
"Fail to get TriggerJar"
,
e
);
return
new
TGetJarInListResp
(
new
TSStatus
(
TSStatusCode
.
EXECUTE_STATEMENT_ERROR
.
getStatusCode
())
.
setMessage
(
e
.
getMessage
()),
Collections
.
emptyList
());
}
return
((
JarResp
)
configManager
.
getConsensusManager
()
.
read
(
new
GetUDFJarPlan
(
req
.
getJarNameList
()))
.
getDataset
())
.
convertToThriftResponse
();
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
浏览文件 @
190f0090
...
...
@@ -93,19 +93,11 @@ public class PipePluginCoordinator {
}
public
TGetJarInListResp
getPipePluginJar
(
TGetJarInListReq
req
)
{
try
{
return
((
JarResp
)
configManager
.
getConsensusManager
()
.
read
(
new
GetPipePluginJarPlan
(
req
.
getJarNameList
()))
.
getDataset
())
.
convertToThriftResponse
();
}
catch
(
IOException
e
)
{
LOGGER
.
error
(
"Fail to get PipePluginJar"
,
e
);
return
new
TGetJarInListResp
(
new
TSStatus
(
TSStatusCode
.
EXECUTE_STATEMENT_ERROR
.
getStatusCode
())
.
setMessage
(
e
.
getMessage
()),
Collections
.
emptyList
());
}
return
((
JarResp
)
configManager
.
getConsensusManager
()
.
read
(
new
GetPipePluginJarPlan
(
req
.
getJarNameList
()))
.
getDataset
())
.
convertToThriftResponse
();
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
浏览文件 @
190f0090
...
...
@@ -63,7 +63,6 @@ import java.util.Collections;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.UUID
;
import
java.util.concurrent.locks.ReentrantLock
;
public
class
TriggerInfo
implements
SnapshotProcessor
{
...
...
@@ -74,7 +73,6 @@ public class TriggerInfo implements SnapshotProcessor {
private
final
TriggerTable
triggerTable
;
private
final
Map
<
String
,
String
>
existedJarToMD5
;
// private final Map<String, AtomicInteger> jarReferenceTable;
private
final
TriggerExecutableManager
triggerExecutableManager
;
...
...
@@ -85,7 +83,6 @@ public class TriggerInfo implements SnapshotProcessor {
public
TriggerInfo
()
throws
IOException
{
triggerTable
=
new
TriggerTable
();
existedJarToMD5
=
new
HashMap
<>();
// jarReferenceTable = new ConcurrentHashMap<>();
triggerExecutableManager
=
TriggerExecutableManager
.
setupAndGetInstance
(
CONFIG_NODE_CONF
.
getTriggerTemporaryLibDir
(),
CONFIG_NODE_CONF
.
getTriggerDir
());
...
...
@@ -101,8 +98,9 @@ public class TriggerInfo implements SnapshotProcessor {
triggerTableLock
.
unlock
();
}
/** Validate whether the trigger can be created */
public
void
validate
(
String
triggerName
,
String
jarName
,
String
jarMD5
)
{
/** Validate whether the trigger can be created. */
public
void
validate
(
String
triggerName
,
String
jarName
,
String
jarMD5
)
throws
TriggerManagementException
{
if
(
triggerTable
.
containsTrigger
(
triggerName
))
{
throw
new
TriggerManagementException
(
String
.
format
(
...
...
@@ -118,8 +116,8 @@ public class TriggerInfo implements SnapshotProcessor {
}
}
/** Validate whether the trigger can be dropped */
public
void
validate
(
String
triggerName
)
{
/** Validate whether the trigger can be dropped
.
*/
public
void
validate
(
String
triggerName
)
throws
TriggerManagementException
{
if
(
triggerTable
.
containsTrigger
(
triggerName
))
{
return
;
}
...
...
@@ -248,29 +246,17 @@ public class TriggerInfo implements SnapshotProcessor {
snapshotFile
.
getAbsolutePath
());
return
false
;
}
File
tmpFile
=
new
File
(
snapshotFile
.
getAbsolutePath
()
+
"-"
+
UUID
.
randomUUID
());
acquireTriggerTableLock
();
try
(
FileOutputStream
fileOutputStream
=
new
FileOutputStream
(
tmp
File
))
{
try
(
FileOutputStream
fileOutputStream
=
new
FileOutputStream
(
snapshot
File
))
{
serializeExistedJarToMD5
(
fileOutputStream
);
triggerTable
.
serializeTriggerTable
(
fileOutputStream
);
fileOutputStream
.
flush
();
fileOutputStream
.
close
();
return
tmpFile
.
renameTo
(
snapshotFile
);
return
true
;
}
finally
{
releaseTriggerTableLock
();
for
(
int
retry
=
0
;
retry
<
5
;
retry
++)
{
if
(!
tmpFile
.
exists
()
||
tmpFile
.
delete
())
{
break
;
}
else
{
LOGGER
.
warn
(
"Can't delete temporary snapshot file: {}, retrying..."
,
tmpFile
.
getAbsolutePath
());
}
}
}
}
...
...
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
浏览文件 @
190f0090
...
...
@@ -69,7 +69,7 @@ public class UDFInfo implements SnapshotProcessor {
private
final
ReentrantLock
udfTableLock
=
new
ReentrantLock
();
private
static
final
String
snapshotFileName
=
"udf_info.bin"
;
private
static
final
String
SNAPSHOT_FILENAME
=
"udf_info.bin"
;
public
UDFInfo
()
throws
IOException
{
udfTable
=
new
UDFTable
();
...
...
@@ -89,23 +89,24 @@ public class UDFInfo implements SnapshotProcessor {
udfTableLock
.
unlock
();
}
/** Validate whether the UDF can be created */
public
void
validate
(
String
UDFName
,
String
jarName
,
String
jarMD5
)
{
if
(
udfTable
.
containsUDF
(
UDFName
))
{
/** Validate whether the UDF can be created. */
public
void
validate
(
String
udfName
,
String
jarName
,
String
jarMD5
)
throws
UDFManagementException
{
if
(
udfTable
.
containsUDF
(
udfName
))
{
throw
new
UDFManagementException
(
String
.
format
(
"Failed to create UDF [%s], the same name UDF has been created"
,
UDF
Name
));
String
.
format
(
"Failed to create UDF [%s], the same name UDF has been created"
,
udf
Name
));
}
if
(
existedJarToMD5
.
containsKey
(
jarName
)
&&
!
existedJarToMD5
.
get
(
jarName
).
equals
(
jarMD5
))
{
throw
new
UDFManagementException
(
String
.
format
(
"Failed to create UDF [%s], the same name Jar [%s] but different MD5 [%s] has existed"
,
UDF
Name
,
jarName
,
jarMD5
));
udf
Name
,
jarName
,
jarMD5
));
}
}
/** Validate whether the UDF can be dropped */
public
void
validate
(
String
udfName
)
{
/** Validate whether the UDF can be dropped
.
*/
public
void
validate
(
String
udfName
)
throws
UDFManagementException
{
if
(
udfTable
.
containsUDF
(
udfName
))
{
return
;
}
...
...
@@ -185,7 +186,7 @@ public class UDFInfo implements SnapshotProcessor {
@Override
public
boolean
processTakeSnapshot
(
File
snapshotDir
)
throws
IOException
{
File
snapshotFile
=
new
File
(
snapshotDir
,
snapshotFileName
);
File
snapshotFile
=
new
File
(
snapshotDir
,
SNAPSHOT_FILENAME
);
if
(
snapshotFile
.
exists
()
&&
snapshotFile
.
isFile
())
{
LOGGER
.
error
(
"Failed to take snapshot, because snapshot file [{}] is already exist."
,
...
...
@@ -208,7 +209,7 @@ public class UDFInfo implements SnapshotProcessor {
@Override
public
void
processLoadSnapshot
(
File
snapshotDir
)
throws
IOException
{
File
snapshotFile
=
new
File
(
snapshotDir
,
snapshotFileName
);
File
snapshotFile
=
new
File
(
snapshotDir
,
SNAPSHOT_FILENAME
);
if
(!
snapshotFile
.
exists
()
||
!
snapshotFile
.
isFile
())
{
LOGGER
.
error
(
"Failed to load snapshot,snapshot file [{}] is not exist."
,
...
...
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
浏览文件 @
190f0090
...
...
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
/** create trigger procedure */
public
class
CreateTriggerProcedure
extends
AbstractNodeProcedure
<
CreateTriggerState
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CreateTriggerProcedure
.
class
);
private
static
final
int
RETRY_THRESHOLD
=
5
;
...
...
@@ -150,6 +150,9 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS
case
CONFIG_NODE_ACTIVE:
env
.
getConfigManager
().
getTriggerManager
().
getTriggerInfo
().
releaseTriggerTableLock
();
return
Flow
.
NO_MORE_STATE
;
default
:
throw
new
IllegalArgumentException
(
"Unknown CreateTriggerState: "
+
state
);
}
}
catch
(
Exception
e
)
{
if
(
isRollbackSupported
(
state
))
{
...
...
@@ -285,4 +288,9 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS
}
return
false
;
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
getProcId
(),
getState
(),
triggerInformation
);
}
}
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
浏览文件 @
190f0090
...
...
@@ -39,8 +39,8 @@ import org.slf4j.LoggerFactory;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.Objects
;
/** drop trigger procedure */
public
class
DropTriggerProcedure
extends
AbstractNodeProcedure
<
DropTriggerState
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
DropTriggerProcedure
.
class
);
private
static
final
int
RETRY_THRESHOLD
=
5
;
...
...
@@ -102,6 +102,9 @@ public class DropTriggerProcedure extends AbstractNodeProcedure<DropTriggerState
case
CONFIG_NODE_DROPPED:
env
.
getConfigManager
().
getTriggerManager
().
getTriggerInfo
().
releaseTriggerTableLock
();
return
Flow
.
NO_MORE_STATE
;
default
:
throw
new
IllegalArgumentException
(
"Unknown DropTriggerState: "
+
state
);
}
}
catch
(
Exception
e
)
{
if
(
isRollbackSupported
(
state
))
{
...
...
@@ -173,4 +176,9 @@ public class DropTriggerProcedure extends AbstractNodeProcedure<DropTriggerState
}
return
false
;
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
getProcId
(),
getState
(),
triggerName
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录