Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Iotdb
提交
cd57fefb
I
Iotdb
项目概览
apache
/
Iotdb
大约 1 年 前同步成功
通知
25
Star
3344
Fork
916
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
I
Iotdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
cd57fefb
编写于
9月 03, 2023
作者:
马
马子坤
提交者:
GitHub
9月 03, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Pipe: Add IT for pipe lifecycle, pipe syntax, pipe extractor parameters (#11019)
上级
ed87567e
变更
5
展开全部
显示空白变更内容
内联
并排
Showing
5 changed file
with
2183 addition
and
1 deletion
+2183
-1
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
...st/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+1
-1
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
...t/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+546
-0
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
...ava/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+343
-0
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
...test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
+667
-0
integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
.../apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
+626
-0
未找到文件。
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
浏览文件 @
cd57fefb
...
...
@@ -90,7 +90,7 @@ public class IoTDBPipeDataSyncIT {
extractorAttributes
.
put
(
"extractor.realtime.mode"
,
"log"
);
connectorAttributes
.
put
(
"connector"
,
"iotdb-thrift-connector"
);
connectorAttributes
.
put
(
"connector.batch.enable
d
"
,
"false"
);
connectorAttributes
.
put
(
"connector.batch.enable"
,
"false"
);
connectorAttributes
.
put
(
"connector.ip"
,
receiverIp
);
connectorAttributes
.
put
(
"connector.port"
,
Integer
.
toString
(
receiverPort
));
...
...
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
0 → 100644
浏览文件 @
cd57fefb
此差异已折叠。
点击以展开。
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
0 → 100644
浏览文件 @
cd57fefb
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package
org.apache.iotdb.pipe.it
;
import
org.apache.iotdb.common.rpc.thrift.TSStatus
;
import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient
;
import
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq
;
import
org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo
;
import
org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq
;
import
org.apache.iotdb.it.env.MultiEnvFactory
;
import
org.apache.iotdb.it.env.cluster.node.DataNodeWrapper
;
import
org.apache.iotdb.it.framework.IoTDBTestRunner
;
import
org.apache.iotdb.itbase.category.MultiClusterIT2
;
import
org.apache.iotdb.itbase.env.BaseEnv
;
import
org.apache.iotdb.rpc.TSStatusCode
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.experimental.categories.Category
;
import
org.junit.runner.RunWith
;
import
java.sql.Connection
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
static
org
.
junit
.
Assert
.
fail
;
@RunWith
(
IoTDBTestRunner
.
class
)
@Category
({
MultiClusterIT2
.
class
})
public
class
IoTDBPipeSwitchStatusIT
{
private
BaseEnv
senderEnv
;
private
BaseEnv
receiverEnv
;
@Before
public
void
setUp
()
throws
Exception
{
MultiEnvFactory
.
createEnv
(
2
);
senderEnv
=
MultiEnvFactory
.
getEnv
(
0
);
receiverEnv
=
MultiEnvFactory
.
getEnv
(
1
);
senderEnv
.
getConfig
().
getCommonConfig
().
setAutoCreateSchemaEnabled
(
true
);
receiverEnv
.
getConfig
().
getCommonConfig
().
setAutoCreateSchemaEnabled
(
true
);
senderEnv
.
initClusterEnvironment
();
receiverEnv
.
initClusterEnvironment
();
}
@After
public
void
tearDown
()
{
senderEnv
.
cleanClusterEnvironment
();
receiverEnv
.
cleanClusterEnvironment
();
}
@Test
public
void
testPipeSwitchStatus
()
throws
Exception
{
DataNodeWrapper
receiverDataNode
=
receiverEnv
.
getDataNodeWrapper
(
0
);
String
receiverIp
=
receiverDataNode
.
getIp
();
int
receiverPort
=
receiverDataNode
.
getPort
();
try
(
SyncConfigNodeIServiceClient
client
=
(
SyncConfigNodeIServiceClient
)
senderEnv
.
getLeaderConfigNodeConnection
())
{
Map
<
String
,
String
>
extractorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
processorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
connectorAttributes
=
new
HashMap
<>();
connectorAttributes
.
put
(
"connector"
,
"iotdb-thrift-connector"
);
connectorAttributes
.
put
(
"connector.batch.enable"
,
"false"
);
connectorAttributes
.
put
(
"connector.ip"
,
receiverIp
);
connectorAttributes
.
put
(
"connector.port"
,
Integer
.
toString
(
receiverPort
));
TSStatus
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p1"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p2"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p3"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
List
<
TShowPipeInfo
>
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p2"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p3"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
startPipe
(
"p1"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
startPipe
(
"p2"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
dropPipe
(
"p3"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p2"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
Assert
.
assertFalse
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p3"
)));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
stopPipe
(
"p1"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
dropPipe
(
"p2"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertFalse
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p2"
)));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
dropPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertFalse
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)));
}
}
@Test
public
void
testPipeIllegallySwitchStatus
()
throws
Exception
{
DataNodeWrapper
receiverDataNode
=
receiverEnv
.
getDataNodeWrapper
(
0
);
String
receiverIp
=
receiverDataNode
.
getIp
();
int
receiverPort
=
receiverDataNode
.
getPort
();
try
(
SyncConfigNodeIServiceClient
client
=
(
SyncConfigNodeIServiceClient
)
senderEnv
.
getLeaderConfigNodeConnection
())
{
Map
<
String
,
String
>
extractorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
processorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
connectorAttributes
=
new
HashMap
<>();
connectorAttributes
.
put
(
"connector"
,
"iotdb-thrift-connector"
);
connectorAttributes
.
put
(
"connector.batch.enable"
,
"false"
);
connectorAttributes
.
put
(
"connector.ip"
,
receiverIp
);
connectorAttributes
.
put
(
"connector.port"
,
Integer
.
toString
(
receiverPort
));
TSStatus
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p1"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
List
<
TShowPipeInfo
>
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
stopPipe
(
"p1"
).
getCode
());
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p1"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
status
.
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertEquals
(
1
,
showPipeResult
.
stream
().
filter
((
o
)
->
o
.
id
.
equals
(
"p1"
)).
count
());
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
startPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
startPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
Assert
.
assertEquals
(
1
,
showPipeResult
.
stream
().
filter
((
o
)
->
o
.
id
.
equals
(
"p1"
)).
count
());
}
}
@Test
public
void
testDropPipeAndCreateAgain
()
throws
Exception
{
DataNodeWrapper
receiverDataNode
=
receiverEnv
.
getDataNodeWrapper
(
0
);
String
receiverIp
=
receiverDataNode
.
getIp
();
int
receiverPort
=
receiverDataNode
.
getPort
();
try
(
SyncConfigNodeIServiceClient
client
=
(
SyncConfigNodeIServiceClient
)
senderEnv
.
getLeaderConfigNodeConnection
())
{
try
(
Connection
connection
=
senderEnv
.
getConnection
();
Statement
statement
=
connection
.
createStatement
())
{
statement
.
execute
(
"insert into root.sg1.d1(time, at1) values (1, 1)"
);
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
fail
(
e
.
getMessage
());
}
Map
<
String
,
String
>
extractorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
processorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
connectorAttributes
=
new
HashMap
<>();
connectorAttributes
.
put
(
"connector"
,
"iotdb-thrift-connector"
);
connectorAttributes
.
put
(
"connector.batch.enable"
,
"false"
);
connectorAttributes
.
put
(
"connector.ip"
,
receiverIp
);
connectorAttributes
.
put
(
"connector.port"
,
Integer
.
toString
(
receiverPort
));
TSStatus
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p1"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
startPipe
(
"p1"
).
getCode
());
List
<
TShowPipeInfo
>
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
try
(
Connection
connection
=
senderEnv
.
getConnection
();
Statement
statement
=
connection
.
createStatement
())
{
statement
.
execute
(
"drop database root.**"
);
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
fail
(
e
.
getMessage
());
}
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
dropPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertEquals
(
0
,
showPipeResult
.
size
());
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p1"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertEquals
(
1
,
showPipeResult
.
stream
().
filter
((
o
)
->
o
.
id
.
equals
(
"p1"
)).
count
());
}
}
@Test
public
void
testWrongPipeName
()
throws
Exception
{
DataNodeWrapper
receiverDataNode
=
receiverEnv
.
getDataNodeWrapper
(
0
);
String
receiverIp
=
receiverDataNode
.
getIp
();
int
receiverPort
=
receiverDataNode
.
getPort
();
try
(
SyncConfigNodeIServiceClient
client
=
(
SyncConfigNodeIServiceClient
)
senderEnv
.
getLeaderConfigNodeConnection
())
{
Map
<
String
,
String
>
extractorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
processorAttributes
=
new
HashMap
<>();
Map
<
String
,
String
>
connectorAttributes
=
new
HashMap
<>();
connectorAttributes
.
put
(
"connector"
,
"iotdb-thrift-connector"
);
connectorAttributes
.
put
(
"connector.batch.enable"
,
"false"
);
connectorAttributes
.
put
(
"connector.ip"
,
receiverIp
);
connectorAttributes
.
put
(
"connector.port"
,
Integer
.
toString
(
receiverPort
));
TSStatus
status
=
client
.
createPipe
(
new
TCreatePipeReq
(
"p1"
,
connectorAttributes
)
.
setExtractorAttributes
(
extractorAttributes
)
.
setProcessorAttributes
(
processorAttributes
));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
status
.
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
startPipe
(
""
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
startPipe
(
"p0"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
startPipe
(
"p"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
startPipe
(
"*"
).
getCode
());
List
<
TShowPipeInfo
>
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
startPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
stopPipe
(
""
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
stopPipe
(
"p0"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
stopPipe
(
"p"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_ERROR
.
getStatusCode
(),
client
.
stopPipe
(
"*"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"RUNNING"
)));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
stopPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_NOT_EXIST_ERROR
.
getStatusCode
(),
client
.
dropPipe
(
""
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_NOT_EXIST_ERROR
.
getStatusCode
(),
client
.
dropPipe
(
"p0"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_NOT_EXIST_ERROR
.
getStatusCode
(),
client
.
dropPipe
(
"p"
).
getCode
());
Assert
.
assertEquals
(
TSStatusCode
.
PIPE_NOT_EXIST_ERROR
.
getStatusCode
(),
client
.
dropPipe
(
"*"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertTrue
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)
&&
o
.
state
.
equals
(
"STOPPED"
)));
Assert
.
assertEquals
(
TSStatusCode
.
SUCCESS_STATUS
.
getStatusCode
(),
client
.
dropPipe
(
"p1"
).
getCode
());
showPipeResult
=
client
.
showPipe
(
new
TShowPipeReq
()).
pipeInfoList
;
Assert
.
assertFalse
(
showPipeResult
.
stream
().
anyMatch
((
o
)
->
o
.
id
.
equals
(
"p1"
)));
}
}
}
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
0 → 100644
浏览文件 @
cd57fefb
此差异已折叠。
点击以展开。
integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
0 → 100644
浏览文件 @
cd57fefb
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录