diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java index 60bbe095b4f5efd2554a636a811d099289f5178f..f79b6d11f621373fdd57f88e671bd0087b07875b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java @@ -90,7 +90,7 @@ public class IoTDBPipeDataSyncIT { extractorAttributes.put("extractor.realtime.mode", "log"); connectorAttributes.put("connector", "iotdb-thrift-connector"); - connectorAttributes.put("connector.batch.enabled", "false"); + connectorAttributes.put("connector.batch.enable", "false"); connectorAttributes.put("connector.ip", receiverIp); connectorAttributes.put("connector.port", Integer.toString(receiverPort)); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java new file mode 100644 index 0000000000000000000000000000000000000000..a906cf61abfe1da8578c6217d29a56d4e806837e --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2.class}) +public class IoTDBPipeLifeCycleIT { + + private BaseEnv senderEnv; + private BaseEnv receiverEnv; + + @Before + public void setUp() throws Exception { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + @After + public void tearDown() { + senderEnv.cleanClusterEnvironment(); + receiverEnv.cleanClusterEnvironment(); + } + + @Test + public void testLifeCycleWithHistoryEnabled() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + expectedResSet.add("2,2.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertDataOnReceiver(receiverEnv, expectedResSet); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + expectedResSet.add("3,3.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + } + } + + @Test + public void testLifeCycleWithHistoryDisabled() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.history.enable", "false"); + // start-time and end-time should not work + extractorAttributes.put("extractor.history.start-time", "0001.01.01T00:00:00"); + extractorAttributes.put("extractor.history.end-time", "2100.01.01T00:00:00"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Set expectedResSet = new HashSet<>(); + expectedResSet.add("2,2.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertDataOnReceiver(receiverEnv, expectedResSet); + } + } + + @Test + public void testLifeCycleLogMode() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.mode", "log"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + expectedResSet.add("2,2.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertDataOnReceiver(receiverEnv, expectedResSet); + } + } + + @Test + public void testLifeCycleFileMode() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.mode", "file"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + expectedResSet.add("2,2.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertDataOnReceiver(receiverEnv, expectedResSet); + } + } + + @Test + public void testLifeCycleHybridMode() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.mode", "hybrid"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + expectedResSet.add("2,2.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertDataOnReceiver(receiverEnv, expectedResSet); + } + } + + @Test + public void testLifeCycleWithClusterRestart() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + Set expectedResSet = new HashSet<>(); + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + expectedResSet.add("1,1.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + expectedResSet.add("2,2.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + } + + restartCluster(senderEnv); + restartCluster(receiverEnv); + + try (SyncConfigNodeIServiceClient ignored = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + expectedResSet.add("3,3.0,"); + assertDataOnReceiver(receiverEnv, expectedResSet); + } + } + + private void assertDataOnReceiver(BaseEnv receiverEnv, Set expectedResSet) { + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from root.**"), + "Time,root.sg1.d1.at1,", + expectedResSet)); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void restartCluster(BaseEnv env) { + for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) { + env.shutdownConfigNode(i); + } + for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) { + env.shutdownDataNode(i); + } + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ignored) { + } + for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) { + env.startConfigNode(i); + } + for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) { + env.startDataNode(i); + } + ((AbstractEnv) env).testWorkingNoUnknown(); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java new file mode 100644 index 0000000000000000000000000000000000000000..b4c9e5c53fa8706791f5c48e5350a1643b8e4319 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2.class}) +public class IoTDBPipeSwitchStatusIT { + + private BaseEnv senderEnv; + private BaseEnv receiverEnv; + + @Before + public void setUp() throws Exception { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + @After + public void tearDown() { + senderEnv.cleanClusterEnvironment(); + receiverEnv.cleanClusterEnvironment(); + } + + @Test + public void testPipeSwitchStatus() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + status = + client.createPipe( + new TCreatePipeReq("p3", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p2") && o.state.equals("STOPPED"))); + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p3") && o.state.equals("STOPPED"))); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p3").getCode()); + + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING"))); + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p2") && o.state.equals("RUNNING"))); + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p3"))); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p2").getCode()); + + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p2"))); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p1").getCode()); + + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p1"))); + } + } + + @Test + public void testPipeIllegallySwitchStatus() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.stopPipe("p1").getCode()); + status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), status.getCode()); + + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.stream().filter((o) -> o.id.equals("p1")).count()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING"))); + + Assert.assertEquals( + TSStatusCode.PIPE_ERROR.getStatusCode(), client.startPipe("p1").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING"))); + Assert.assertEquals(1, showPipeResult.stream().filter((o) -> o.id.equals("p1")).count()); + } + } + + @Test + public void testDropPipeAndCreateAgain() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING"))); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("drop database root.**"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p1").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + + status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.stream().filter((o) -> o.id.equals("p1")).count()); + } + } + + @Test + public void testWrongPipeName() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.startPipe("").getCode()); + Assert.assertEquals( + TSStatusCode.PIPE_ERROR.getStatusCode(), client.startPipe("p0").getCode()); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.startPipe("p").getCode()); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.startPipe("*").getCode()); + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING"))); + + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.stopPipe("").getCode()); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.stopPipe("p0").getCode()); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.stopPipe("p").getCode()); + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), client.stopPipe("*").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING"))); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + + Assert.assertEquals( + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client.dropPipe("").getCode()); + Assert.assertEquals( + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client.dropPipe("p0").getCode()); + Assert.assertEquals( + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client.dropPipe("p").getCode()); + Assert.assertEquals( + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client.dropPipe("*").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p1").getCode()); + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p1"))); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java new file mode 100644 index 0000000000000000000000000000000000000000..d3c679d13cdba9bf02221470f12bf5775f7fb8d7 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2.class}) +public class IoTDBPipeSyntaxIT { + + private BaseEnv senderEnv; + private BaseEnv receiverEnv; + + @Before + public void setUp() throws Exception { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + @After + public void tearDown() { + senderEnv.cleanClusterEnvironment(); + receiverEnv.cleanClusterEnvironment(); + } + + @Test + public void testValidPipeName() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + List validPipeNames = Arrays.asList("Pipe_1", "null", "`33`", "`root`", "中文", "with"); + for (String pipeName : validPipeNames) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe %s" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + pipeName, receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + for (String pipeName : validPipeNames) { + Assert.assertTrue( + showPipeResult.stream() + .anyMatch((o) -> o.id.equals(pipeName) && o.state.equals("STOPPED"))); + } + + for (String pipeName : validPipeNames) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute(String.format("drop pipe %s", pipeName)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + } + + @Test + public void testRevertParameterOrder() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p1" + + " with extractor (" + + "'extractor.realtime.mode'='hybrid'," + + "'extractor.history.enable'='false') " + + " with connector (" + + "'connector.batch.enable'='false', " + + "'connector.port'='%s'," + + "'connector.ip'='%s'," + + "'connector'='iotdb-thrift-connector')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertTrue( + showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("STOPPED"))); + } + } + + @Test + public void testRevertStageOrder() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p1" + + " with connector (" + + "'connector.batch.enable'='false', " + + "'connector.port'='%s'," + + "'connector.ip'='%s'," + + "'connector'='iotdb-thrift-connector') " + + " with extractor (" + + "'extractor.realtime.mode'='hybrid'," + + "'extractor.history.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + } + + @Test + public void testMissingStage() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("create pipe p1"); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("create pipe p2 with extractor ('extractor'='iotdb-extractor')"); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "create pipe p3" + + " with extractor ('extractor'='iotdb-extractor')" + + " with processor ('processor'='do-nothing-processor')"); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p4" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p5" + + " with extractor ('extractor'='iotdb-extractor')" + + " with processor ('processor'='do-nothing-processor')" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(2, showPipeResult.size()); + } + } + + @Test + public void testInvalidParameter() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p1" + + " with extractor ()" + + " with processor ()" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p2" + + " with extractor ('extractor'='invalid-param')" + + " with processor ()" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p3" + + " with extractor ()" + + " with processor ('processor'='invalid-param')" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe p4" + + " with extractor ()" + + " with processor ()" + + " with connector (" + + "'connector'='invalid-param'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + } + } + + @Test + public void testBrackets() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe extractor1" + + " with extractor ('extractor'='iotdb-extractor')" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe extractor2" + + " with extractor (\"extractor\"=\"iotdb-extractor\")" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe extractor3" + + " with extractor ('extractor'=\"iotdb-extractor\")" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe extractor4" + + " with extractor (extractor=iotdb-extractor)" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe extractor5" + + " with extractor ('extractor'=`iotdb-extractor`)" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe processor1" + + " with processor ('processor'='do-nothing-processor')" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe processor2" + + " with processor (\"processor\"=\"do-nothing-processor\")" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe processor3" + + " with processor ('processor'=\"do-nothing-processor\")" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe processor4" + + " with processor (processor=do-nothing-processor)" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe processor5" + + " with processor ('processor'=`do-nothing-processor`)" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe connector1" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe connector2" + + " with connector (" + + "\"connector\"=\"iotdb-thrift-connector\"," + + "\"connector.ip\"=\"%s\"," + + "\"connector.port\"=\"%s\"," + + "\"connector.batch.enable\"=\"false\")", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe connector3" + + " with connector (" + + "'connector'=\"iotdb-thrift-connector\"," + + "\"connector.ip\"='%s'," + + "'connector.port'=\"%s\"," + + "\"connector.batch.enable\"='false')", + receiverIp, receiverPort)); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe connector4" + + " with connector (" + + "connector=iotdb-thrift-connector," + + "connector.ip=%s," + + "connector.port=%s," + + "connector.batch.enable=false)", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe connector5" + + " with connector (" + + "'connector'=`iotdb-thrift-connector`," + + "'connector.ip'=`%s`," + + "'connector.port'=`%s`," + + "'connector.batch.enable'=`false`)", + receiverIp, receiverPort)); + fail(); + } catch (SQLException ignored) { + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(9, showPipeResult.size()); + } + } + + @Test + public void testShowPipeWithWrongPipeName() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + connectorAttributes.replace("connector.batch.enable", "true"); + + status = + client.createPipe( + new TCreatePipeReq("p3", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(3, showPipeResult.size()); + + showPipeResult = client.showPipe(new TShowPipeReq().setPipeName("p1")).pipeInfoList; + Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> o.id.equals("p1"))); + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p2"))); + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p3"))); + + // Show all pipes whose connector is also used by p1. + // p1 and p2 share the same connector parameters, so they have the same connector. + showPipeResult = + client.showPipe(new TShowPipeReq().setPipeName("p1").setWhereClause(true)).pipeInfoList; + Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> o.id.equals("p1"))); + Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> o.id.equals("p2"))); + Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> o.id.equals("p3"))); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java new file mode 100644 index 0000000000000000000000000000000000000000..2c2ac03252644085586655c59d452088d8d94243 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.extractor; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2.class}) +public class IoTDBPipeExtractorIT { + + private BaseEnv senderEnv; + private BaseEnv receiverEnv; + + @Before + public void setUp() throws Exception { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + @After + public void tearDown() { + senderEnv.cleanClusterEnvironment(); + receiverEnv.cleanClusterEnvironment(); + } + + @Test + public void testExtractorValidParameter() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor", "iotdb-extractor"); + extractorAttributes.put("extractor.history.enabled", "true"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + // test when start-time and end-time are not set + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // test when only extractor.history.start-time is set + extractorAttributes.put("extractor.history.start-time", "2000.01.01T08:00:00"); + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // test when only extractor.history.end-time is set + extractorAttributes.remove("extractor.history.start-time"); + extractorAttributes.put("extractor.history.end-time", "2000.01.01T08:00:00"); + status = + client.createPipe( + new TCreatePipeReq("p3", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // test when extractor.history.start-time equals extractor.history.end-time + extractorAttributes.put("extractor.history.start-time", "2000.01.01T08:00:00"); + status = + client.createPipe( + new TCreatePipeReq("p4", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // test when extractor.history.start-time is greater than extractor.history.end-time + extractorAttributes.replace("extractor.history.start-time", "2001.01.01T08:00:00"); + status = + client.createPipe( + new TCreatePipeReq("p5", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // test when extractor.history.end-time is future time + extractorAttributes.replace("extractor.history.end-time", "2100.01.01T08:00:00"); + status = + client.createPipe( + new TCreatePipeReq("p6", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + } + + @Test + public void testExtractorInvalidParameter() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + // insert data to create data region + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + String formatString = + String.format( + "create pipe p1" + + " with extractor (" + + "'extractor.history.enabled'='true'," + + "'extractor.history.start-time'=%s)" + + " with connector (" + + "'connector'='iotdb-thrift-connector'," + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.batch.enable'='false')", + "%s", receiverIp, receiverPort); + + List invalidStartTimes = + Arrays.asList( + "''", "null", "'null'", "'1'", "'-1000-01-01T00:00:00'", "'2000-01-01T00:00:0'"); + for (String invalidStartTime : invalidStartTimes) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute(String.format(formatString, invalidStartTime)); + fail(); + } catch (SQLException ignored) { + } + } + + List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + } + + @Test + public void testExtractorPatternMatch() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)"); + statement.execute("insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)"); + statement.execute("insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)"); + statement.execute( + "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)"); + statement.execute( + "insert into root.nonAligned.6TS.`6` (" + + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) " + + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)"); + statement.execute( + "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)"); + statement.execute( + "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)"); + statement.execute( + "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)"); + statement.execute( + "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)"); + statement.execute( + "insert into root.aligned.6TS.`6` (" + + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) " + + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.pattern", null); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + List patterns = + Arrays.asList( + "root.db.nonExistPath", // match nothing + "root.nonAligned.1TS.s_float", + "root.nonAligned.6TS.`6`.`s_text(1)`", + "root.aligned.1TS", + "root.aligned.6TS.`6`.`s_int(1)`", + "root.aligned.`1(TS)`", + "root.nonAligned.`1(TS)`", + "root.aligned.6TS.`6`", // match device root.aligned.6TS.`6` + "root.nonAligned.6TS.`6`", // match device root.nonAligned.6TS.`6` + "root.nonAligned.`1`", // match nothing + "root.nonAligned.1", // match device root.nonAligned.1TS, 100TS and 100TS + "root.aligned.1" // match device root.aligned.1TS, 100TS and 100TS + ); + + List expectedTimeseriesCount = + Arrays.asList(0, 1, 2, 3, 4, 5, 6, 11, 16, 16, 18, 20); + + for (int i = 0; i < patterns.size(); ++i) { + extractorAttributes.replace("extractor.pattern", patterns.get(i)); + TSStatus status = + client.createPipe( + new TCreatePipeReq("p" + i, connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p" + i).getCode()); + assertTimeseriesCountOnReceiver(receiverEnv, expectedTimeseriesCount.get(i)); + } + + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + Set expectedDevices = new HashSet<>(); + expectedDevices.add("root.nonAligned.1TS,false,"); + expectedDevices.add("root.nonAligned.100TS,false,"); + expectedDevices.add("root.nonAligned.1000TS,false,"); + expectedDevices.add("root.nonAligned.`1(TS)`,false,"); + expectedDevices.add("root.nonAligned.6TS.`6`,false,"); + expectedDevices.add("root.aligned.1TS,true,"); + expectedDevices.add("root.aligned.100TS,true,"); + expectedDevices.add("root.aligned.1000TS,true,"); + expectedDevices.add("root.aligned.`1(TS)`,true,"); + expectedDevices.add("root.aligned.6TS.`6`,true,"); + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("show devices"), + "Device,IsAligned,", + expectedDevices)); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + + @Test + public void testMatchingMultipleDatabases() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.pattern", "root.db1"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + assertTimeseriesCountOnReceiver(receiverEnv, 0); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.db1.d1 (time, at1) values (1, 10)"); + statement.execute("insert into root.db2.d1 (time, at1) values (1, 20)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + extractorAttributes.replace("extractor.pattern", "root.db2"); + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); + assertTimeseriesCountOnReceiver(receiverEnv, 2); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p1").getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p2").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.db1.d1 (time, at1) values (2, 11)"); + statement.execute("insert into root.db2.d1 (time, at1) values (2, 21)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + extractorAttributes.remove("extractor.pattern"); // no pattern, will match all databases + status = + client.createPipe( + new TCreatePipeReq("p3", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p3").getCode()); + + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select count(*) from root.**"), + "count(root.db1.d1.at1),count(root.db2.d1.at1),", + Collections.singleton("2,2,"))); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + + @Test + public void testHistoryAndRealtime() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.db.d1 (time, at1) values (1, 10)"); + statement.execute("insert into root.db.d2 (time, at1) values (1, 20)"); + statement.execute("insert into root.db.d3 (time, at1) values (1, 30)"); + statement.execute("insert into root.db.d4 (time, at1) values (1, 40)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + extractorAttributes.put("extractor.pattern", "root.db.d1"); + extractorAttributes.put("extractor.history.enable", "false"); + extractorAttributes.put("extractor.realtime.enable", "false"); + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + // can not set both to false + Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), status.getCode()); + + extractorAttributes.replace("extractor.pattern", "root.db.d2"); + extractorAttributes.replace("extractor.history.enable", "false"); + extractorAttributes.replace("extractor.realtime.enable", "true"); + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); + + extractorAttributes.replace("extractor.pattern", "root.db.d3"); + extractorAttributes.replace("extractor.history.enable", "true"); + extractorAttributes.replace("extractor.realtime.enable", "false"); + status = + client.createPipe( + new TCreatePipeReq("p3", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p3").getCode()); + + extractorAttributes.replace("extractor.pattern", "root.db.d4"); + extractorAttributes.replace("extractor.history.enable", "true"); + extractorAttributes.replace("extractor.realtime.enable", "true"); + status = + client.createPipe( + new TCreatePipeReq("p4", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p4").getCode()); + + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.db.d1 (time, at1) values (2, 11)"); + statement.execute("insert into root.db.d2 (time, at1) values (2, 21)"); + statement.execute("insert into root.db.d3 (time, at1) values (2, 31)"); + statement.execute("insert into root.db.d4 (time, at1) values (2, 41)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select count(*) from root.** where time <= 1"), + "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),", + Collections.singleton("1,0,1,"))); + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select count(*) from root.** where time >= 2"), + "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),", + Collections.singleton("1,1,0,"))); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + + @Test + public void testStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + try (Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "insert into root.db.d1 (time, at1)" + + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)"); + statement.execute( + "insert into root.db.d2 (time, at1)" + + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)"); + statement.execute("flush"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.pattern", "root.db.d1"); + extractorAttributes.put("extractor.history.enable", "true"); + extractorAttributes.put("extractor.history.start-time", "1970-01-01T08:00:02+08:00"); + extractorAttributes.put("extractor.history.end-time", "1970-01-01T08:00:04+08:00"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select count(*) from root.**"), + "count(root.db.d1.at1),", + Collections.singleton("3,"))); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + extractorAttributes.remove("extractor.pattern"); + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); + + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select count(*) from root.**"), + "count(root.db.d1.at1),count(root.db.d2.at1),", + Collections.singleton("3,3,"))); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + + private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) { + try (Connection connection = receiverEnv.getConnection(); + Statement statement = connection.createStatement()) { + await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("count timeseries"), + "count(timeseries),", + Collections.singleton(count + ","))); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}