diff --git a/iotdb-connector/flink-sql-iotdb-connector/pom.xml b/iotdb-connector/flink-sql-iotdb-connector/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..0a23c56989536c8df40eed193233ddf983741c1d
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+ 4.0.0
+
+ org.apache.iotdb
+ iotdb-parent
+ 1.3.0-SNAPSHOT
+ ../../pom.xml
+
+ flink-sql-iotdb-connector
+ IoTDB: Connector: Apache Flink SQL Connector
+ 1.3.0-SNAPSHOT
+
+ UTF-8
+ 1.17.0
+
+
+
+
+ org.apache.iotdb
+ iotdb-session
+ ${project.version}
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+ org.java-websocket
+ Java-WebSocket
+ ${websocket.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..eb57749bc3935662293ffa41820e75308a66822b
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.client;
+
+import org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction;
+import org.apache.iotdb.flink.sql.wrapper.TabletWrapper;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+
+public class IoTDBWebSocketClient extends WebSocketClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBWebSocketClient.class);
+ private final IoTDBCDCSourceFunction function;
+
+ public IoTDBWebSocketClient(URI uri, IoTDBCDCSourceFunction function) {
+ super(uri);
+ this.function = function;
+ }
+
+ @Override
+ public void onOpen(ServerHandshake serverHandshake) {
+ String log =
+ String.format("The connection with %s:%d has been created.", uri.getHost(), uri.getPort());
+ LOGGER.info(log);
+ }
+
+ @Override
+ public void onMessage(String s) {
+ // Do nothing
+ }
+
+ @Override
+ public void onMessage(ByteBuffer bytes) {
+ super.onMessage(bytes);
+ long commitId = bytes.getLong();
+ Tablet tablet = Tablet.deserialize(bytes);
+ function.addTabletWrapper(new TabletWrapper(commitId, this, tablet));
+ }
+
+ @Override
+ public void onClose(int i, String s, boolean b) {
+ LOGGER.info("The connection to {}:{} has been closed.", uri.getHost(), uri.getPort());
+ }
+
+ @Override
+ public void onError(Exception e) {
+ String log =
+ String.format(
+ "An error occurred when connecting to %s:%s: %s.",
+ uri.getHost(), uri.getPort(), e.getMessage());
+ LOGGER.error(log);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
new file mode 100644
index 0000000000000000000000000000000000000000..0fe0196b00c81d85ca8f38336d2e4a8502b14e50
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class Options {
+ public static final ConfigOption NODE_URLS =
+ ConfigOptions.key("nodeUrls").stringType().defaultValue("127.0.0.1:6667");
+ public static final ConfigOption USER =
+ ConfigOptions.key("user").stringType().defaultValue("root");
+ public static final ConfigOption PASSWORD =
+ ConfigOptions.key("password").stringType().defaultValue("root");
+ public static final ConfigOption DEVICE =
+ ConfigOptions.key("device").stringType().noDefaultValue();
+ public static final ConfigOption ALIGNED =
+ ConfigOptions.key("aligned").booleanType().defaultValue(false);
+ public static final ConfigOption MODE =
+ ConfigOptions.key("mode").enumType(Mode.class).defaultValue(Mode.BOUNDED);
+ public static final ConfigOption CDC_PORT =
+ ConfigOptions.key("cdc.port").intType().defaultValue(8080);
+
+ public static final ConfigOption CDC_TASK_NAME =
+ ConfigOptions.key("cdc.task.name").stringType().noDefaultValue();
+ public static final ConfigOption LOOKUP_CACHE_MAX_ROWS =
+ ConfigOptions.key("lookup.cache.max-rows").intType().defaultValue(-1);
+ public static final ConfigOption LOOKUP_CACHE_TTL_SEC =
+ ConfigOptions.key("lookup.cache.ttl-sec").intType().defaultValue(-1);
+ public static final ConfigOption SCAN_BOUNDED_LOWER_BOUND =
+ ConfigOptions.key("scan.bounded.lower-bound").longType().defaultValue(-1L);
+ public static final ConfigOption SCAN_BOUNDED_UPPER_BOUND =
+ ConfigOptions.key("scan.bounded.upper-bound").longType().defaultValue(-1L);
+
+ public enum Mode {
+ CDC,
+ BOUNDED;
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java
new file mode 100644
index 0000000000000000000000000000000000000000..fb08e40c34521587778111640f768aeac47059a9
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.common;
+
+import org.apache.iotdb.flink.sql.exception.UnsupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.NullFieldException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.net.Socket;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class Utils {
+ private Utils() {}
+
+ public static Object getValue(Field value, String dataType) {
+ try {
+ if ("INT32".equals(dataType)) {
+ return value.getIntV();
+ } else if ("INT64".equals(dataType)) {
+ return value.getLongV();
+ } else if ("FLOAT".equals(dataType)) {
+ return value.getFloatV();
+ } else if ("DOUBLE".equals(dataType)) {
+ return value.getDoubleV();
+ } else if ("BOOLEAN".equals(dataType)) {
+ return value.getBoolV();
+ } else if ("TEXT".equals(dataType)) {
+ return StringData.fromString(value.getStringValue());
+ } else {
+ String exception = String.format("IoTDB doesn't support the data type: %s", dataType);
+ throw new UnsupportedDataTypeException(exception);
+ }
+ } catch (NullFieldException e) {
+ return null;
+ }
+ }
+
+ public static Object getValue(Field value, DataType dataType) {
+ if (dataType.equals(DataTypes.INT())) {
+ return value.getIntV();
+ } else if (dataType.equals(DataTypes.BIGINT())) {
+ return value.getLongV();
+ } else if (dataType.equals(DataTypes.FLOAT())) {
+ return value.getFloatV();
+ } else if (dataType.equals(DataTypes.DOUBLE())) {
+ return value.getDoubleV();
+ } else if (dataType.equals(DataTypes.BOOLEAN())) {
+ return value.getBoolV();
+ } else if (dataType.equals(DataTypes.STRING())) {
+ return StringData.fromString(value.getStringValue());
+ } else {
+ throw new UnsupportedDataTypeException("IoTDB doesn't support the data type: " + dataType);
+ }
+ }
+
+ public static Object getValue(RowData value, DataType dataType, int index) {
+ try {
+ if (dataType.equals(DataTypes.INT())) {
+ return value.getInt(index);
+ } else if (dataType.equals(DataTypes.BIGINT())) {
+ return value.getLong(index);
+ } else if (dataType.equals(DataTypes.FLOAT())) {
+ return value.getFloat(index);
+ } else if (dataType.equals(DataTypes.DOUBLE())) {
+ return value.getDouble(index);
+ } else if (dataType.equals(DataTypes.BOOLEAN())) {
+ return value.getBoolean(index);
+ } else if (dataType.equals(DataTypes.STRING())) {
+ return value.getString(index).toString();
+ } else {
+ throw new UnsupportedDataTypeException("IoTDB don't support the data type: " + dataType);
+ }
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ public static boolean isNumeric(String s) {
+ Pattern pattern = Pattern.compile("\\d*");
+ return pattern.matcher(s).matches();
+ }
+
+ public static RowData convert(RowRecord rowRecord, List columnTypes) {
+ ArrayList values = new ArrayList<>();
+ values.add(rowRecord.getTimestamp());
+ List fields = rowRecord.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ values.add(getValue(fields.get(i), columnTypes.get(i + 1)));
+ }
+ return GenericRowData.of(values.toArray());
+ }
+
+ public static List object2List(Object obj, TSDataType dataType) {
+ ArrayList objects = new ArrayList<>();
+ int length = Array.getLength(obj);
+ for (int i = 0; i < length; i++) {
+ if (dataType == TSDataType.TEXT) {
+ objects.add(StringData.fromString(((Binary) Array.get(obj, i)).getStringValue()));
+ } else {
+ objects.add(Array.get(obj, i));
+ }
+ }
+ return objects;
+ }
+
+ public static boolean isURIAvailable(URI uri) {
+ try {
+ new Socket(uri.getHost(), uri.getPort()).close();
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java
new file mode 100644
index 0000000000000000000000000000000000000000..d698a39c550b2c1dd5ef891351154bb6378668f7
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.exception;
+
+public class IllegalIoTDBPathException extends RuntimeException {
+ public IllegalIoTDBPathException(String s) {
+ super(s);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java
new file mode 100644
index 0000000000000000000000000000000000000000..e1e8808b263cfdca1895d71f59aca6e8ed1f554e
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.exception;
+
+public class IllegalOptionException extends RuntimeException {
+ public IllegalOptionException(String s) {
+ super(s);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java
new file mode 100644
index 0000000000000000000000000000000000000000..5f620b30170d207a823f69eac91a22bebb28b12e
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.exception;
+
+public class IllegalSchemaException extends RuntimeException {
+ public IllegalSchemaException(String s) {
+ super(s);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java
new file mode 100644
index 0000000000000000000000000000000000000000..7885ce5f0b51b1e9537ad8f77e38e53dc67a410a
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.exception;
+
+public class IllegalUrlPathException extends RuntimeException {
+ public IllegalUrlPathException(String s) {
+ super(s);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java
new file mode 100644
index 0000000000000000000000000000000000000000..9b7a6e3c517d3020c75d92f4cdf65bf4b608feeb
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.exception;
+
+public class UnsupportedDataTypeException extends RuntimeException {
+ public UnsupportedDataTypeException(String s) {
+ super(s);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..8c65d0377ca94e2aead4be93c68ef09fd9be141d
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.factory;
+
+import org.apache.iotdb.flink.sql.common.Options;
+import org.apache.iotdb.flink.sql.common.Utils;
+import org.apache.iotdb.flink.sql.exception.IllegalIoTDBPathException;
+import org.apache.iotdb.flink.sql.exception.IllegalOptionException;
+import org.apache.iotdb.flink.sql.exception.IllegalSchemaException;
+import org.apache.iotdb.flink.sql.exception.IllegalUrlPathException;
+import org.apache.iotdb.flink.sql.exception.UnsupportedDataTypeException;
+import org.apache.iotdb.flink.sql.provider.IoTDBDynamicTableSink;
+import org.apache.iotdb.flink.sql.provider.IoTDBDynamicTableSource;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class IoTDBDynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+ private static final HashSet supportedDataTypes = new HashSet<>();
+
+ static {
+ supportedDataTypes.add(DataTypes.INT());
+ supportedDataTypes.add(DataTypes.BIGINT());
+ supportedDataTypes.add(DataTypes.FLOAT());
+ supportedDataTypes.add(DataTypes.DOUBLE());
+ supportedDataTypes.add(DataTypes.BOOLEAN());
+ supportedDataTypes.add(DataTypes.STRING());
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ ReadableConfig options = helper.getOptions();
+ TableSchema schema = context.getCatalogTable().getSchema();
+
+ validate(options, schema);
+
+ return new IoTDBDynamicTableSource(options, schema);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "IoTDB";
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ HashSet> requiredOptions = new HashSet<>();
+ requiredOptions.add(Options.DEVICE);
+
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ HashSet> optionalOptions = new HashSet<>();
+ optionalOptions.add(Options.NODE_URLS);
+ optionalOptions.add(Options.USER);
+ optionalOptions.add(Options.PASSWORD);
+ optionalOptions.add(Options.LOOKUP_CACHE_MAX_ROWS);
+ optionalOptions.add(Options.LOOKUP_CACHE_TTL_SEC);
+ optionalOptions.add(Options.ALIGNED);
+ optionalOptions.add(Options.MODE);
+ optionalOptions.add(Options.CDC_TASK_NAME);
+ optionalOptions.add(Options.CDC_PORT);
+
+ return optionalOptions;
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ ReadableConfig options = helper.getOptions();
+ TableSchema schema = context.getCatalogTable().getSchema();
+
+ validate(options, schema);
+
+ return new IoTDBDynamicTableSink(options, schema);
+ }
+
+ protected void validate(ReadableConfig options, TableSchema schema) {
+ String[] fieldNames = schema.getFieldNames();
+ DataType[] fieldDataTypes = schema.getFieldDataTypes();
+
+ if (!"Time_".equals(fieldNames[0]) || !fieldDataTypes[0].equals(DataTypes.BIGINT())) {
+ throw new IllegalSchemaException(
+ "The first field's name must be `Time_`, and its data type must be BIGINT.");
+ }
+ for (String fieldName : fieldNames) {
+ if (fieldName.contains("\\.")) {
+ throw new IllegalIoTDBPathException(
+ String.format(
+ "The field name `%s` contains character `.`, it's not allowed in IoTDB.",
+ fieldName));
+ }
+ if (Utils.isNumeric(fieldName)) {
+ throw new IllegalIoTDBPathException(
+ String.format(
+ "The field name `%s` is a pure number, which is not allowed in IoTDB.", fieldName));
+ }
+ }
+
+ for (DataType fieldDataType : fieldDataTypes) {
+ if (!supportedDataTypes.contains(fieldDataType)) {
+ throw new UnsupportedDataTypeException(
+ "IoTDB doesn't support the data type: " + fieldDataType);
+ }
+ }
+
+ String device = options.get(Options.DEVICE);
+ if (!device.startsWith("root.")) {
+ throw new IllegalIoTDBPathException("The option `device` must starts with 'root.'.");
+ }
+ for (String s : device.split("\\.")) {
+ if (Utils.isNumeric(s)) {
+ throw new IllegalIoTDBPathException(
+ String.format(
+ "The option `device` contains a purely number path: `%s`, it's not allowed in IoTDB.",
+ s));
+ }
+ }
+
+ String[] nodeUrls = options.get(Options.NODE_URLS).split(",");
+ for (String nodeUrl : nodeUrls) {
+ String[] split = nodeUrl.split(":");
+ if (split.length != 2) {
+ throw new IllegalUrlPathException("Every node's URL must be in the format of `host:port`.");
+ }
+ if (!Utils.isNumeric(split[1])) {
+ throw new IllegalUrlPathException(
+ String.format("The port in url %s must be a number.", nodeUrl));
+ } else {
+ int port = Integer.parseInt(split[1]);
+ if (port > 65535) {
+ throw new IllegalUrlPathException(
+ String.format("The port in url %s must be smaller than 65536", nodeUrl));
+ } else if (port < 1) {
+ throw new IllegalUrlPathException(
+ String.format("The port in url %s must be greater than 0.", nodeUrl));
+ }
+ }
+ }
+
+ Long lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
+ Long upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
+ if (lowerBound > 0L && upperBound > 0L && upperBound < lowerBound) {
+ throw new IllegalOptionException(
+ "The value of option `scan.bounded.lower-bound` could not be greater than the value of option `scan.bounded.upper-bound`.");
+ }
+
+ if (options.get(Options.MODE) == Options.Mode.CDC
+ && options.get(Options.CDC_TASK_NAME) == null) {
+ throw new IllegalOptionException(
+ "The option `cdc.task.name` is required when option `mode` equals `CDC`");
+ }
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..e8c3f7cca21595e510fce15fd12998453d799d25
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.function;
+
+import org.apache.iotdb.flink.sql.common.Options;
+import org.apache.iotdb.flink.sql.common.Utils;
+import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class IoTDBBoundedScanFunction extends RichInputFormat {
+ private final ReadableConfig options;
+ private final String device;
+ private final long lowerBound;
+ private final long upperBound;
+ private final List measurements;
+ private Session session;
+ private SessionDataSet dataSet;
+ private List columnTypes;
+
+ public IoTDBBoundedScanFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
+ this.options = options;
+ List> tableSchema = schemaWrapper.getSchema();
+ device = options.get(Options.DEVICE);
+ lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
+ upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
+ measurements =
+ tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
+ }
+
+ @Override
+ public void configure(Configuration configuration) {
+ // fo nothing
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
+ return baseStatistics;
+ }
+
+ @Override
+ public InputSplit[] createInputSplits(int i) {
+ return new GenericInputSplit[] {new GenericInputSplit(1, 1)};
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void openInputFormat() {
+ session =
+ new Session.Builder()
+ .nodeUrls(Arrays.asList(options.get(Options.NODE_URLS).split(",")))
+ .username(options.get(Options.USER))
+ .password(options.get(Options.PASSWORD))
+ .build();
+
+ try {
+ session.open(false);
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void open(InputSplit inputSplit) {
+ String sql;
+ if (lowerBound < 0L && upperBound < 0L) {
+ sql = String.format("SELECT %s FROM %s", String.join(",", measurements), device);
+ } else if (lowerBound < 0L && upperBound > 0L) {
+ sql =
+ String.format(
+ "SELECT %s FROM %s WHERE TIME <= %d",
+ String.join(",", measurements), device, upperBound);
+ } else if (lowerBound > 0L && upperBound < 0L) {
+ sql =
+ String.format(
+ "SELECT %s FROM %s WHERE TIME >= %d",
+ String.join(",", measurements), device, lowerBound);
+ } else {
+ sql =
+ String.format(
+ "SELECT %s FROM %s WHERE TIME >= %d AND TIME <= %d",
+ String.join(",", measurements), device, lowerBound, upperBound);
+ }
+ try {
+ dataSet = session.executeQueryStatement(sql);
+ columnTypes = dataSet.getColumnTypes();
+ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ try {
+ return !dataSet.hasNext();
+ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public RowData nextRecord(RowData rowData) {
+ try {
+ RowRecord rowRecord = dataSet.next();
+ return Utils.convert(rowRecord, columnTypes);
+ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (dataSet != null) {
+ dataSet.close();
+ }
+ if (session != null) {
+ session.close();
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..a814f40b17d88952df3078f153ddba81599c0ea7
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.function;
+
+import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient;
+import org.apache.iotdb.flink.sql.common.Options;
+import org.apache.iotdb.flink.sql.common.Utils;
+import org.apache.iotdb.flink.sql.exception.IllegalOptionException;
+import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
+import org.apache.iotdb.flink.sql.wrapper.TabletWrapper;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.enums.ReadyState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+public class IoTDBCDCSourceFunction extends RichSourceFunction {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBCDCSourceFunction.class);
+ private final List socketClients = new ArrayList<>();
+ private final int cdcPort;
+ private final List nodeUrls;
+ private final String taskName;
+ private final String device;
+ private final String user;
+ private final String password;
+ private final List measurements;
+ private final BlockingQueue tabletWrappers;
+ private transient ExecutorService consumeExecutor;
+
+ public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
+ List> tableSchema = schemaWrapper.getSchema();
+ cdcPort = options.get(Options.CDC_PORT);
+ nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
+ taskName = options.get(Options.CDC_TASK_NAME);
+ device = options.get(Options.DEVICE);
+ user = options.get(Options.USER);
+ password = options.get(Options.PASSWORD);
+ measurements =
+ tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
+
+ tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ Session session =
+ new Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
+ session.open(false);
+ boolean hasCreatedPipeTask =
+ session.executeQueryStatement(String.format("show pipe flink_cdc_%s", taskName)).hasNext();
+ if (!hasCreatedPipeTask) {
+ for (String nodeUrl : nodeUrls) {
+ URI uri = new URI(String.format("ws://%s:%d", nodeUrl.split(":")[0], cdcPort));
+ if (Utils.isURIAvailable(uri)) {
+ throw new IllegalOptionException(
+ String.format(
+ "The port `%d` has been bound. Please use another one by option `cdc.port`.",
+ cdcPort));
+ }
+ }
+ String createPipeCommand =
+ String.format(
+ "CREATE PIPE flink_cdc_%s\n"
+ + "WITH EXTRACTOR (\n"
+ + "'extractor' = 'iotdb-extractor',\n"
+ + "'extractor.pattern' = '%s',\n"
+ + "'extractor.history.enable' = 'true',\n"
+ + "'extractor.realtime.enable' = 'true',\n"
+ + "'extractor.realtime.mode' = 'hybrid',\n"
+ + ") WITH CONNECTOR (\n"
+ + "'connector' = 'websocket-connector',\n"
+ + "'connector.websocket.port' = '%d'"
+ + ")",
+ taskName, device, cdcPort);
+ session.executeNonQueryStatement(createPipeCommand);
+ }
+
+ String status =
+ session
+ .executeQueryStatement(String.format("show pipe flink_cdc_%s", taskName))
+ .next()
+ .getFields()
+ .get(2)
+ .getStringValue();
+ if ("STOPPED".equals(status)) {
+ session.executeNonQueryStatement(String.format("start pipe flink_cdc_%s", taskName));
+ }
+
+ session.close();
+
+ consumeExecutor = Executors.newFixedThreadPool(1);
+ for (String nodeUrl : nodeUrls) {
+ URI uri = new URI(String.format("ws://%s:%s", nodeUrl.split(":")[0], cdcPort));
+ socketClients.add(initAndGet(uri));
+ }
+ }
+
+ @Override
+ public void run(SourceContext ctx) throws InterruptedException {
+ consumeExecutor.submit(new ConsumeRunnable(ctx));
+ consumeExecutor.shutdown();
+ while (true) {
+ for (IoTDBWebSocketClient socketClient : socketClients) {
+ if (socketClient.getReadyState().equals(ReadyState.CLOSED)) {
+ while (!Utils.isURIAvailable(socketClient.getURI())) {
+ String log =
+ String.format(
+ "The URI %s:%d is not available now, sleep 5 seconds.",
+ socketClient.getURI().getHost(), socketClient.getURI().getPort());
+ LOGGER.warn(log);
+ Thread.sleep(5000);
+ }
+ socketClient.reconnect();
+ while (!socketClient.getReadyState().equals(ReadyState.OPEN)) {
+ Thread.sleep(1000);
+ }
+ socketClient.send("START");
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ socketClients.forEach(WebSocketClient::close);
+ }
+
+ public void addTabletWrapper(TabletWrapper tabletWrapper) {
+ try {
+ this.tabletWrappers.put(tabletWrapper);
+ } catch (InterruptedException e) {
+ String host = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getHostName();
+ int port = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getPort();
+ String log =
+ String.format(
+ "The tablet from %s:%d can't be put into queue, because: %s",
+ host, port, e.getMessage());
+ LOGGER.warn(log);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private IoTDBWebSocketClient initAndGet(URI uri) throws InterruptedException {
+ while (!Utils.isURIAvailable(uri)) {
+ String log =
+ String.format(
+ "The URI %s:%d is not available now, sleep 5 seconds.", uri.getHost(), uri.getPort());
+ LOGGER.warn(log);
+ Thread.sleep(5000);
+ }
+ IoTDBWebSocketClient client = new IoTDBWebSocketClient(uri, this);
+ client.connect();
+ while (!client.getReadyState().equals(ReadyState.OPEN)) {
+ Thread.sleep(1000);
+ }
+ client.send("START");
+ return client;
+ }
+
+ public void collectTablet(Tablet tablet, SourceContext ctx) {
+ if (!device.equals(tablet.deviceId)) {
+ return;
+ }
+ List schemas = tablet.getSchemas();
+ int rowSize = tablet.rowSize;
+ HashMap>> values = new HashMap<>();
+ for (MeasurementSchema schema : schemas) {
+ String measurement = schema.getMeasurementId();
+ values.put(
+ measurement,
+ new Pair<>(
+ tablet.bitMaps[schemas.indexOf(schema)],
+ Utils.object2List(tablet.values[schemas.indexOf(schema)], schema.getType())));
+ }
+ for (int i = 0; i < rowSize; i++) {
+ ArrayList row = new ArrayList<>();
+ row.add(tablet.timestamps[i]);
+ for (String measurement : measurements) {
+ if (values.get(measurement).getLeft() == null
+ || !values.get(measurement).getLeft().isMarked(i)) {
+ row.add(values.get(measurement).getRight().get(i));
+ } else {
+ row.add(null);
+ }
+ }
+ RowData rowData = GenericRowData.of(row.toArray());
+ ctx.collect(rowData);
+ }
+ }
+
+ private class ConsumeRunnable implements Runnable {
+ SourceContext context;
+
+ public ConsumeRunnable(SourceContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ TabletWrapper tabletWrapper = tabletWrappers.take();
+ collectTablet(tabletWrapper.getTablet(), context);
+ tabletWrapper
+ .getWebSocketClient()
+ .send(String.format("ACK:%d", tabletWrapper.getCommitId()));
+ } catch (InterruptedException e) {
+ LOGGER.warn("The tablet can't be taken from queue!");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..9c26d2175b15880a1b3ec7fa8d6fc5a818368336
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.function;
+
+import org.apache.iotdb.flink.sql.common.Options;
+import org.apache.iotdb.flink.sql.common.Utils;
+import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.shaded.curator5.com.google.common.cache.Cache;
+import org.apache.flink.shaded.curator5.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class IoTDBLookupFunction extends TableFunction {
+ private final List> schema;
+ private final int cacheMaxRows;
+ private final int cacheTTLSec;
+ private final List nodeUrls;
+ private final String user;
+ private final String password;
+ private final String device;
+ private final List measurements;
+ private Session session;
+
+ private transient Cache cache;
+
+ public IoTDBLookupFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
+ this.schema = schemaWrapper.getSchema();
+
+ cacheMaxRows = options.get(Options.LOOKUP_CACHE_MAX_ROWS);
+
+ cacheTTLSec = options.get(Options.LOOKUP_CACHE_TTL_SEC);
+
+ nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
+
+ user = options.get(Options.USER);
+
+ password = options.get(Options.PASSWORD);
+
+ device = options.get(Options.DEVICE);
+
+ measurements =
+ schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build();
+ session.open(false);
+
+ if (cacheMaxRows > 0 && cacheTTLSec > 0) {
+ cache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheTTLSec, TimeUnit.SECONDS)
+ .maximumSize(cacheMaxRows)
+ .build();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (cache != null) {
+ cache.invalidateAll();
+ }
+ if (session != null) {
+ session.close();
+ }
+ super.close();
+ }
+
+ public void eval(Object obj) throws IoTDBConnectionException, StatementExecutionException {
+ RowData lookupKey = GenericRowData.of(obj);
+ if (cache != null) {
+ RowData cacheRow = cache.getIfPresent(lookupKey);
+ if (cacheRow != null) {
+ collect(cacheRow);
+ return;
+ }
+ }
+
+ long timestamp = lookupKey.getLong(0);
+
+ String sql =
+ String.format(
+ "SELECT %s FROM %s WHERE TIME=%d",
+ StringUtils.join(measurements, ','), device, timestamp);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ List columnNames = dataSet.getColumnNames();
+ columnNames.remove("Time");
+ RowRecord rowRecord = dataSet.next();
+ if (rowRecord == null) {
+ ArrayList values = new ArrayList<>();
+ values.add(timestamp);
+ for (int i = 0; i < schema.size(); i++) {
+ values.add(null);
+ }
+ GenericRowData rowData = GenericRowData.of(values.toArray());
+ collect(rowData);
+ return;
+ }
+ List fields = rowRecord.getFields();
+
+ ArrayList values = new ArrayList<>();
+ values.add(timestamp);
+ for (Tuple2 filed : schema) {
+ values.add(
+ Utils.getValue(fields.get(columnNames.indexOf(device + '.' + filed.f0)), filed.f1));
+ }
+
+ GenericRowData rowData = GenericRowData.of(values.toArray());
+ if (cache != null) {
+ cache.put(lookupKey, rowData);
+ }
+ collect(rowData);
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..4933e66b0f91e51bc1a1487a6d6b0e53d9c27f8c
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.function;
+
+import org.apache.iotdb.flink.sql.common.Options;
+import org.apache.iotdb.flink.sql.common.Utils;
+import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class IoTDBSinkFunction implements SinkFunction {
+ private final List> schema;
+ private final List nodeUrls;
+ private final String user;
+ private final String password;
+ private final String device;
+ private final boolean aligned;
+ private final List measurements;
+ private final List dataTypes;
+ private static final Map TYPE_MAP = new HashMap<>();
+
+ private static Session session;
+
+ static {
+ TYPE_MAP.put(DataTypes.INT(), TSDataType.INT32);
+ TYPE_MAP.put(DataTypes.BIGINT(), TSDataType.INT64);
+ TYPE_MAP.put(DataTypes.FLOAT(), TSDataType.FLOAT);
+ TYPE_MAP.put(DataTypes.DOUBLE(), TSDataType.DOUBLE);
+ TYPE_MAP.put(DataTypes.BOOLEAN(), TSDataType.BOOLEAN);
+ TYPE_MAP.put(DataTypes.STRING(), TSDataType.TEXT);
+ }
+
+ public IoTDBSinkFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
+ // Get schema
+ this.schema = schemaWrapper.getSchema();
+ // Get options
+ nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
+ user = options.get(Options.USER);
+ password = options.get(Options.PASSWORD);
+ device = options.get(Options.DEVICE);
+ aligned = options.get(Options.ALIGNED);
+ // Get measurements and data types from schema
+ measurements =
+ schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
+ dataTypes = schema.stream().map(field -> TYPE_MAP.get(field.f1)).collect(Collectors.toList());
+ }
+
+ @Override
+ public void invoke(RowData rowData, Context context) throws Exception {
+ // Open the session if the session has not been opened
+ if (session == null) {
+ session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build();
+ session.open(false);
+ }
+ // Load data from RowData
+ if (rowData.getRowKind().equals(RowKind.INSERT)
+ || rowData.getRowKind().equals(RowKind.UPDATE_AFTER)) {
+ long timestamp = rowData.getLong(0);
+ ArrayList measurementsOfRow = new ArrayList<>();
+ ArrayList dataTypesOfRow = new ArrayList<>();
+ ArrayList values = new ArrayList<>();
+ for (int i = 0; i < this.measurements.size(); i++) {
+ Object value = Utils.getValue(rowData, schema.get(i).f1, i + 1);
+ if (value == null) {
+ continue;
+ }
+ measurementsOfRow.add(this.measurements.get(i));
+ dataTypesOfRow.add(this.dataTypes.get(i));
+ values.add(value);
+ }
+ // insert data
+ if (aligned) {
+ session.insertAlignedRecord(device, timestamp, measurementsOfRow, dataTypesOfRow, values);
+ } else {
+ session.insertRecord(device, timestamp, measurementsOfRow, dataTypesOfRow, values);
+ }
+ } else if (rowData.getRowKind().equals(RowKind.DELETE)) {
+ ArrayList paths = new ArrayList<>();
+ for (String measurement : measurements) {
+ paths.add(String.format("%s.%s", device, measurement));
+ }
+ session.deleteData(paths, rowData.getLong(0));
+ } else if (rowData.getRowKind().equals(RowKind.UPDATE_BEFORE)) {
+ // do nothing
+ }
+ }
+
+ @Override
+ public void finish() throws Exception {
+ if (session != null) {
+ session.close();
+ }
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..1a0302980801c5e67e1f115c3e9decbbd868f092
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.provider;
+
+import org.apache.iotdb.flink.sql.function.IoTDBSinkFunction;
+import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.types.RowKind;
+
+public class IoTDBDynamicTableSink implements DynamicTableSink {
+ private final ReadableConfig options;
+ private final TableSchema schema;
+
+ public IoTDBDynamicTableSink(ReadableConfig options, TableSchema schema) {
+ this.options = options;
+ this.schema = schema;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ return SinkFunctionProvider.of(new IoTDBSinkFunction(options, new SchemaWrapper(schema)));
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new IoTDBDynamicTableSink(options, schema);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "IoTDB Dynamic Table Sink";
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..7feb4dcb2b13c13dca31262d802e0e84f7dfe3b4
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.provider;
+
+import org.apache.iotdb.flink.sql.common.Options;
+import org.apache.iotdb.flink.sql.function.IoTDBBoundedScanFunction;
+import org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction;
+import org.apache.iotdb.flink.sql.function.IoTDBLookupFunction;
+import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+
+public class IoTDBDynamicTableSource implements LookupTableSource, ScanTableSource {
+ private final ReadableConfig options;
+ private final TableSchema schema;
+
+ public IoTDBDynamicTableSource(ReadableConfig options, TableSchema schema) {
+ this.options = options;
+ this.schema = schema;
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
+ return TableFunctionProvider.of(new IoTDBLookupFunction(options, new SchemaWrapper(schema)));
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new IoTDBDynamicTableSource(options, schema);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "IoTDB Dynamic Table Source";
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ if (options.get(Options.MODE) == Options.Mode.CDC) {
+ return SourceFunctionProvider.of(
+ new IoTDBCDCSourceFunction(options, new SchemaWrapper(schema)), false);
+ } else {
+ return InputFormatProvider.of(
+ new IoTDBBoundedScanFunction(options, new SchemaWrapper(schema)));
+ }
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..6f8ec11fbf2ac82a9a53382d08f9d9951e7a0f96
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SchemaWrapper implements Serializable {
+ private final List> schema;
+
+ public SchemaWrapper(TableSchema schema) {
+ this.schema = new ArrayList<>();
+
+ for (String fieldName : schema.getFieldNames()) {
+ if ("Time_".equals(fieldName)) {
+ continue;
+ }
+ this.schema.add(new Tuple2<>(fieldName, schema.getFieldDataType(fieldName).get()));
+ }
+ }
+
+ public List> getSchema() {
+ return schema;
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..4ef08386695a20f10f230d87d126e2e85fc1ea91
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.flink.sql.wrapper;
+
+import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+public class TabletWrapper {
+ private final long commitId;
+ private final IoTDBWebSocketClient websocketClient;
+ private final Tablet tablet;
+
+ public TabletWrapper(long commitId, IoTDBWebSocketClient websocketClient, Tablet tablet) {
+ this.commitId = commitId;
+ this.websocketClient = websocketClient;
+ this.tablet = tablet;
+ }
+
+ public long getCommitId() {
+ return commitId;
+ }
+
+ public IoTDBWebSocketClient getWebSocketClient() {
+ return websocketClient;
+ }
+
+ public Tablet getTablet() {
+ return tablet;
+ }
+}
diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000000000000000000000000000000000..4eee20a99864784da329461a6636077e2967ac1c
--- /dev/null
+++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.flink.sql.factory.IoTDBDynamicTableFactory
\ No newline at end of file
diff --git a/iotdb-connector/pom.xml b/iotdb-connector/pom.xml
index 06e04d1e67429c7381948ec93dabe8a8edc39964..14321833059e97a9a95cac7cd8f4c34ab7d48526 100644
--- a/iotdb-connector/pom.xml
+++ b/iotdb-connector/pom.xml
@@ -31,6 +31,7 @@
IoTDB: Connector
flink-iotdb-connector
+ flink-sql-iotdb-connector
flink-tsfile-connector
grafana-connector
hadoop
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 3f04bf95a0e4a791e973e823cb3599b236f9dfa5..3fd0650d714ff16ac3db797045cea32551db290c 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -353,6 +353,11 @@
com.lmax
disruptor
+
+ org.java-websocket
+ Java-WebSocket
+ ${websocket.version}
+
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 2f65a39f86cd090f9bf01dbe690f60c8ad7a636c..3c870a9ba15d11ee664edcc002acdb4c16c4ce53 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -51,6 +51,9 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version";
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
+ public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port";
+ public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
new file mode 100644
index 0000000000000000000000000000000000000000..c7993045e368d62325b693f967c053342a6d12a3
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.pipe.connector.protocol.websocket;
+
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.java_websocket.WebSocket;
+import org.java_websocket.handshake.ClientHandshake;
+import org.java_websocket.server.WebSocketServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+public class WebSocketConnectorServer extends WebSocketServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class);
+ private final PriorityBlockingQueue> events =
+ new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
+ private final WebsocketConnector websocketConnector;
+
+ private final ConcurrentMap eventMap = new ConcurrentHashMap<>();
+
+ public WebSocketConnectorServer(
+ InetSocketAddress address, WebsocketConnector websocketConnector) {
+ super(address);
+ this.websocketConnector = websocketConnector;
+ }
+
+ @Override
+ public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
+ String log =
+ String.format(
+ "The connection from client %s:%d has been opened!",
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort());
+ LOGGER.info(log);
+ }
+
+ @Override
+ public void onClose(WebSocket webSocket, int i, String s, boolean b) {
+ String log =
+ String.format(
+ "The client from %s:%d has been closed!",
+ webSocket.getRemoteSocketAddress().getAddress(),
+ webSocket.getRemoteSocketAddress().getPort());
+ LOGGER.info(log);
+ }
+
+ @Override
+ public void onMessage(WebSocket webSocket, String s) {
+ String log =
+ String.format(
+ "Received a message `%s` from %s:%d",
+ s,
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort());
+ LOGGER.info(log);
+ if (s.startsWith("START")) {
+ handleStart(webSocket);
+ } else if (s.startsWith("ACK")) {
+ handleAck(webSocket, s);
+ } else if (s.startsWith("ERROR")) {
+ handleError(webSocket, s);
+ }
+ }
+
+ @Override
+ public void onError(WebSocket webSocket, Exception e) {
+ String log;
+ if (webSocket.getRemoteSocketAddress() != null) {
+ log =
+ String.format(
+ "Got an error `%s` from %s:%d",
+ e.getMessage(),
+ webSocket.getLocalSocketAddress().getHostName(),
+ webSocket.getLocalSocketAddress().getPort());
+ } else {
+ log = String.format("Got an error `%s` from client", e.getMessage());
+ }
+ LOGGER.error(log);
+ }
+
+ @Override
+ public void onStart() {
+ String log =
+ String.format(
+ "The webSocket server %s:%d has been started!",
+ this.getAddress().getHostName(), this.getPort());
+ LOGGER.error(log);
+ }
+
+ public void addEvent(Pair event) {
+ if (events.size() >= 50) {
+ synchronized (events) {
+ while (events.size() >= 50) {
+ try {
+ events.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ events.put(event);
+ }
+
+ private void handleStart(WebSocket webSocket) {
+ try {
+ Pair eventPair = events.take();
+ synchronized (events) {
+ events.notifyAll();
+ transfer(eventPair, webSocket);
+ }
+ } catch (InterruptedException e) {
+ String log = String.format("The event can't be taken, because: %s", e.getMessage());
+ LOGGER.warn(log);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void handleAck(WebSocket webSocket, String s) {
+ long commitId = Long.parseLong(s.replace("ACK:", ""));
+ Event event = eventMap.remove(commitId);
+ websocketConnector.commit(
+ commitId, event instanceof EnrichedEvent ? (EnrichedEvent) event : null);
+ handleStart(webSocket);
+ }
+
+ private void handleError(WebSocket webSocket, String s) {
+ long commitId = Long.parseLong(s.replace("ERROR:", ""));
+ String log =
+ String.format(
+ "The tablet of commitId: %d can't be parsed by client, it will be retried later.",
+ commitId);
+ LOGGER.warn(log);
+ events.put(new Pair<>(commitId, eventMap.remove(commitId)));
+ handleStart(webSocket);
+ }
+
+ private void transfer(Pair eventPair, WebSocket webSocket) {
+ Long commitId = eventPair.getLeft();
+ Event event = eventPair.getRight();
+ try {
+ ByteBuffer tabletBuffer = null;
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ tabletBuffer = ((PipeInsertNodeTabletInsertionEvent) event).convertToTablet().serialize();
+ } else if (event instanceof PipeRawTabletInsertionEvent) {
+ tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize();
+ } else if (event instanceof PipeTsFileInsertionEvent) {
+ PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) event;
+ tsFileInsertionEvent.waitForTsFileClose();
+ Iterable subEvents = tsFileInsertionEvent.toTabletInsertionEvents();
+ for (TabletInsertionEvent subEvent : subEvents) {
+ tabletBuffer = ((PipeRawTabletInsertionEvent) subEvent).convertToTablet().serialize();
+ }
+ } else {
+ throw new NotImplementedException(
+ "IoTDBCDCConnector only support "
+ + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
+ }
+ if (tabletBuffer == null) {
+ return;
+ }
+ ByteBuffer payload = ByteBuffer.allocate(Long.BYTES + tabletBuffer.limit());
+ payload.putLong(commitId);
+ payload.put(tabletBuffer);
+ payload.flip();
+ this.broadcast(payload, Collections.singletonList(webSocket));
+ eventMap.put(eventPair.getLeft(), eventPair.getRight());
+ String log =
+ String.format(
+ "Transferred a message to client %s:%d",
+ webSocket.getRemoteSocketAddress().getAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort());
+ LOGGER.info(log);
+ } catch (InterruptedException e) {
+ events.put(eventPair);
+ Thread.currentThread().interrupt();
+ throw new PipeException(e.getMessage());
+ } catch (Exception e) {
+ events.put(eventPair);
+ e.printStackTrace();
+ throw new PipeException(e.getMessage());
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
new file mode 100644
index 0000000000000000000000000000000000000000..f9a01b3fc20e77f8aea9900b952e87cc6bca2954
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.pipe.connector.protocol.websocket;
+
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class WebsocketConnector implements PipeConnector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketConnector.class);
+ private WebSocketConnectorServer server;
+ private int port;
+
+ public final AtomicLong commitIdGenerator = new AtomicLong(0);
+ private final AtomicLong lastCommitId = new AtomicLong(0);
+ private final PriorityQueue> commitQueue =
+ new PriorityQueue<>(Comparator.comparing(o -> o.left));
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {}
+
+ @Override
+ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ port =
+ parameters.getIntOrDefault(
+ PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+ PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ if (server == null) {
+ server = new WebSocketConnectorServer(new InetSocketAddress(port), this);
+ server.start();
+ }
+ }
+
+ @Override
+ public void heartbeat() throws Exception {}
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "WebsocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ + "Current event: {}.",
+ tabletInsertionEvent);
+ return;
+ }
+ long commitId = commitIdGenerator.incrementAndGet();
+ ((EnrichedEvent) tabletInsertionEvent)
+ .increaseReferenceCount(WebsocketConnector.class.getName());
+ server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
+ }
+
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ LOGGER.warn(
+ "WebsocketConnector only support PipeTsFileInsertionEvent. Current event: {}.",
+ tsFileInsertionEvent);
+ return;
+ }
+ long commitId = commitIdGenerator.incrementAndGet();
+ ((EnrichedEvent) tsFileInsertionEvent)
+ .increaseReferenceCount(WebsocketConnector.class.getName());
+ server.addEvent(new Pair<>(commitId, tsFileInsertionEvent));
+ }
+
+ @Override
+ public void transfer(Event event) throws Exception {}
+
+ @Override
+ public void close() throws Exception {
+ server.stop();
+ }
+
+ public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) {
+ commitQueue.offer(
+ new Pair<>(
+ requestCommitId,
+ () ->
+ Optional.ofNullable(enrichedEvent)
+ .ifPresent(
+ event ->
+ event.decreaseReferenceCount(WebsocketConnector.class.getName()))));
+
+ while (!commitQueue.isEmpty()) {
+ final Pair committer = commitQueue.peek();
+ if (lastCommitId.get() + 1 != committer.left) {
+ break;
+ }
+
+ committer.right.run();
+ lastCommitId.incrementAndGet();
+
+ commitQueue.poll();
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index fb06743874ffbc48a2a4f64b3fc757a38586d03e..bc956afe58ca78d97cf8eefd3a8c8255ffe97ece 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.websocket.WebsocketConnector;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -78,6 +79,8 @@ public class PipeConnectorSubtaskManager {
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
pipeConnector = new IoTDBAirGapConnector();
+ } else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) {
+ pipeConnector = new WebsocketConnector();
} else {
pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index ef973b1cc306dd9358acc813a5972460ea08c94e..f625e41cf72b128da91dad4f8998fdfcaad0bdf1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeCon
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
@@ -43,6 +44,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class),
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class),
+ WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
;
private final String pipePluginName;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
new file mode 100644
index 0000000000000000000000000000000000000000..a9d3a8ccf9ef796644e55df88f9126323fdaa232
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the IoTDB WebSocket
+ * connector. There is a real implementation in the server module but cannot be imported here. The
+ * pipe agent in the server module will replace this class with the real implementation when
+ * initializing the IoTDB Thrift connector.
+ */
+public class WebSocketConnector extends PlaceholderConnector {}
diff --git a/pom.xml b/pom.xml
index 3655d5cded53a28d2967f7a8fa6375e289f6578b..7479cc7ade4197cfc37c7e95c0280fbc9720f1e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,8 @@
chmod
+
+ 1.5.3