未验证 提交 3073fe63 编写于 作者: X Xuan Ronaldo 提交者: GitHub

[IOTDB-6029] flink-sql-iotdb-connector (#10958)

上级 1ae952ce
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-parent</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>flink-sql-iotdb-connector</artifactId>
<name>IoTDB: Connector: Apache Flink SQL Connector</name>
<version>1.3.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<!-- required by implement of flink sql connector -->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>${websocket.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.client;
import org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction;
import org.apache.iotdb.flink.sql.wrapper.TabletWrapper;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.nio.ByteBuffer;
public class IoTDBWebSocketClient extends WebSocketClient {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBWebSocketClient.class);
private final IoTDBCDCSourceFunction function;
public IoTDBWebSocketClient(URI uri, IoTDBCDCSourceFunction function) {
super(uri);
this.function = function;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
String log =
String.format("The connection with %s:%d has been created.", uri.getHost(), uri.getPort());
LOGGER.info(log);
}
@Override
public void onMessage(String s) {
// Do nothing
}
@Override
public void onMessage(ByteBuffer bytes) {
super.onMessage(bytes);
long commitId = bytes.getLong();
Tablet tablet = Tablet.deserialize(bytes);
function.addTabletWrapper(new TabletWrapper(commitId, this, tablet));
}
@Override
public void onClose(int i, String s, boolean b) {
LOGGER.info("The connection to {}:{} has been closed.", uri.getHost(), uri.getPort());
}
@Override
public void onError(Exception e) {
String log =
String.format(
"An error occurred when connecting to %s:%s: %s.",
uri.getHost(), uri.getPort(), e.getMessage());
LOGGER.error(log);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.common;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
public class Options {
public static final ConfigOption<String> NODE_URLS =
ConfigOptions.key("nodeUrls").stringType().defaultValue("127.0.0.1:6667");
public static final ConfigOption<String> USER =
ConfigOptions.key("user").stringType().defaultValue("root");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().defaultValue("root");
public static final ConfigOption<String> DEVICE =
ConfigOptions.key("device").stringType().noDefaultValue();
public static final ConfigOption<Boolean> ALIGNED =
ConfigOptions.key("aligned").booleanType().defaultValue(false);
public static final ConfigOption<Mode> MODE =
ConfigOptions.key("mode").enumType(Mode.class).defaultValue(Mode.BOUNDED);
public static final ConfigOption<Integer> CDC_PORT =
ConfigOptions.key("cdc.port").intType().defaultValue(8080);
public static final ConfigOption<String> CDC_TASK_NAME =
ConfigOptions.key("cdc.task.name").stringType().noDefaultValue();
public static final ConfigOption<Integer> LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows").intType().defaultValue(-1);
public static final ConfigOption<Integer> LOOKUP_CACHE_TTL_SEC =
ConfigOptions.key("lookup.cache.ttl-sec").intType().defaultValue(-1);
public static final ConfigOption<Long> SCAN_BOUNDED_LOWER_BOUND =
ConfigOptions.key("scan.bounded.lower-bound").longType().defaultValue(-1L);
public static final ConfigOption<Long> SCAN_BOUNDED_UPPER_BOUND =
ConfigOptions.key("scan.bounded.upper-bound").longType().defaultValue(-1L);
public enum Mode {
CDC,
BOUNDED;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.common;
import org.apache.iotdb.flink.sql.exception.UnsupportedDataTypeException;
import org.apache.iotdb.tsfile.exception.NullFieldException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.Socket;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
public class Utils {
private Utils() {}
public static Object getValue(Field value, String dataType) {
try {
if ("INT32".equals(dataType)) {
return value.getIntV();
} else if ("INT64".equals(dataType)) {
return value.getLongV();
} else if ("FLOAT".equals(dataType)) {
return value.getFloatV();
} else if ("DOUBLE".equals(dataType)) {
return value.getDoubleV();
} else if ("BOOLEAN".equals(dataType)) {
return value.getBoolV();
} else if ("TEXT".equals(dataType)) {
return StringData.fromString(value.getStringValue());
} else {
String exception = String.format("IoTDB doesn't support the data type: %s", dataType);
throw new UnsupportedDataTypeException(exception);
}
} catch (NullFieldException e) {
return null;
}
}
public static Object getValue(Field value, DataType dataType) {
if (dataType.equals(DataTypes.INT())) {
return value.getIntV();
} else if (dataType.equals(DataTypes.BIGINT())) {
return value.getLongV();
} else if (dataType.equals(DataTypes.FLOAT())) {
return value.getFloatV();
} else if (dataType.equals(DataTypes.DOUBLE())) {
return value.getDoubleV();
} else if (dataType.equals(DataTypes.BOOLEAN())) {
return value.getBoolV();
} else if (dataType.equals(DataTypes.STRING())) {
return StringData.fromString(value.getStringValue());
} else {
throw new UnsupportedDataTypeException("IoTDB doesn't support the data type: " + dataType);
}
}
public static Object getValue(RowData value, DataType dataType, int index) {
try {
if (dataType.equals(DataTypes.INT())) {
return value.getInt(index);
} else if (dataType.equals(DataTypes.BIGINT())) {
return value.getLong(index);
} else if (dataType.equals(DataTypes.FLOAT())) {
return value.getFloat(index);
} else if (dataType.equals(DataTypes.DOUBLE())) {
return value.getDouble(index);
} else if (dataType.equals(DataTypes.BOOLEAN())) {
return value.getBoolean(index);
} else if (dataType.equals(DataTypes.STRING())) {
return value.getString(index).toString();
} else {
throw new UnsupportedDataTypeException("IoTDB don't support the data type: " + dataType);
}
} catch (NullPointerException e) {
return null;
}
}
public static boolean isNumeric(String s) {
Pattern pattern = Pattern.compile("\\d*");
return pattern.matcher(s).matches();
}
public static RowData convert(RowRecord rowRecord, List<String> columnTypes) {
ArrayList<Object> values = new ArrayList<>();
values.add(rowRecord.getTimestamp());
List<Field> fields = rowRecord.getFields();
for (int i = 0; i < fields.size(); i++) {
values.add(getValue(fields.get(i), columnTypes.get(i + 1)));
}
return GenericRowData.of(values.toArray());
}
public static List<Object> object2List(Object obj, TSDataType dataType) {
ArrayList<Object> objects = new ArrayList<>();
int length = Array.getLength(obj);
for (int i = 0; i < length; i++) {
if (dataType == TSDataType.TEXT) {
objects.add(StringData.fromString(((Binary) Array.get(obj, i)).getStringValue()));
} else {
objects.add(Array.get(obj, i));
}
}
return objects;
}
public static boolean isURIAvailable(URI uri) {
try {
new Socket(uri.getHost(), uri.getPort()).close();
return true;
} catch (IOException e) {
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.exception;
public class IllegalIoTDBPathException extends RuntimeException {
public IllegalIoTDBPathException(String s) {
super(s);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.exception;
public class IllegalOptionException extends RuntimeException {
public IllegalOptionException(String s) {
super(s);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.exception;
public class IllegalSchemaException extends RuntimeException {
public IllegalSchemaException(String s) {
super(s);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.exception;
public class IllegalUrlPathException extends RuntimeException {
public IllegalUrlPathException(String s) {
super(s);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.exception;
public class UnsupportedDataTypeException extends RuntimeException {
public UnsupportedDataTypeException(String s) {
super(s);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.factory;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.exception.IllegalIoTDBPathException;
import org.apache.iotdb.flink.sql.exception.IllegalOptionException;
import org.apache.iotdb.flink.sql.exception.IllegalSchemaException;
import org.apache.iotdb.flink.sql.exception.IllegalUrlPathException;
import org.apache.iotdb.flink.sql.exception.UnsupportedDataTypeException;
import org.apache.iotdb.flink.sql.provider.IoTDBDynamicTableSink;
import org.apache.iotdb.flink.sql.provider.IoTDBDynamicTableSource;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import java.util.HashSet;
import java.util.Set;
public class IoTDBDynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
private static final HashSet<DataType> supportedDataTypes = new HashSet<>();
static {
supportedDataTypes.add(DataTypes.INT());
supportedDataTypes.add(DataTypes.BIGINT());
supportedDataTypes.add(DataTypes.FLOAT());
supportedDataTypes.add(DataTypes.DOUBLE());
supportedDataTypes.add(DataTypes.BOOLEAN());
supportedDataTypes.add(DataTypes.STRING());
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
TableSchema schema = context.getCatalogTable().getSchema();
validate(options, schema);
return new IoTDBDynamicTableSource(options, schema);
}
@Override
public String factoryIdentifier() {
return "IoTDB";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
HashSet<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(Options.DEVICE);
return requiredOptions;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
HashSet<ConfigOption<?>> optionalOptions = new HashSet<>();
optionalOptions.add(Options.NODE_URLS);
optionalOptions.add(Options.USER);
optionalOptions.add(Options.PASSWORD);
optionalOptions.add(Options.LOOKUP_CACHE_MAX_ROWS);
optionalOptions.add(Options.LOOKUP_CACHE_TTL_SEC);
optionalOptions.add(Options.ALIGNED);
optionalOptions.add(Options.MODE);
optionalOptions.add(Options.CDC_TASK_NAME);
optionalOptions.add(Options.CDC_PORT);
return optionalOptions;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
TableSchema schema = context.getCatalogTable().getSchema();
validate(options, schema);
return new IoTDBDynamicTableSink(options, schema);
}
protected void validate(ReadableConfig options, TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
DataType[] fieldDataTypes = schema.getFieldDataTypes();
if (!"Time_".equals(fieldNames[0]) || !fieldDataTypes[0].equals(DataTypes.BIGINT())) {
throw new IllegalSchemaException(
"The first field's name must be `Time_`, and its data type must be BIGINT.");
}
for (String fieldName : fieldNames) {
if (fieldName.contains("\\.")) {
throw new IllegalIoTDBPathException(
String.format(
"The field name `%s` contains character `.`, it's not allowed in IoTDB.",
fieldName));
}
if (Utils.isNumeric(fieldName)) {
throw new IllegalIoTDBPathException(
String.format(
"The field name `%s` is a pure number, which is not allowed in IoTDB.", fieldName));
}
}
for (DataType fieldDataType : fieldDataTypes) {
if (!supportedDataTypes.contains(fieldDataType)) {
throw new UnsupportedDataTypeException(
"IoTDB doesn't support the data type: " + fieldDataType);
}
}
String device = options.get(Options.DEVICE);
if (!device.startsWith("root.")) {
throw new IllegalIoTDBPathException("The option `device` must starts with 'root.'.");
}
for (String s : device.split("\\.")) {
if (Utils.isNumeric(s)) {
throw new IllegalIoTDBPathException(
String.format(
"The option `device` contains a purely number path: `%s`, it's not allowed in IoTDB.",
s));
}
}
String[] nodeUrls = options.get(Options.NODE_URLS).split(",");
for (String nodeUrl : nodeUrls) {
String[] split = nodeUrl.split(":");
if (split.length != 2) {
throw new IllegalUrlPathException("Every node's URL must be in the format of `host:port`.");
}
if (!Utils.isNumeric(split[1])) {
throw new IllegalUrlPathException(
String.format("The port in url %s must be a number.", nodeUrl));
} else {
int port = Integer.parseInt(split[1]);
if (port > 65535) {
throw new IllegalUrlPathException(
String.format("The port in url %s must be smaller than 65536", nodeUrl));
} else if (port < 1) {
throw new IllegalUrlPathException(
String.format("The port in url %s must be greater than 0.", nodeUrl));
}
}
}
Long lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
Long upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
if (lowerBound > 0L && upperBound > 0L && upperBound < lowerBound) {
throw new IllegalOptionException(
"The value of option `scan.bounded.lower-bound` could not be greater than the value of option `scan.bounded.upper-bound`.");
}
if (options.get(Options.MODE) == Options.Mode.CDC
&& options.get(Options.CDC_TASK_NAME) == null) {
throw new IllegalOptionException(
"The option `cdc.task.name` is required when option `mode` equals `CDC`");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.function;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class IoTDBBoundedScanFunction extends RichInputFormat<RowData, InputSplit> {
private final ReadableConfig options;
private final String device;
private final long lowerBound;
private final long upperBound;
private final List<String> measurements;
private Session session;
private SessionDataSet dataSet;
private List<String> columnTypes;
public IoTDBBoundedScanFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
this.options = options;
List<Tuple2<String, DataType>> tableSchema = schemaWrapper.getSchema();
device = options.get(Options.DEVICE);
lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
measurements =
tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
}
@Override
public void configure(Configuration configuration) {
// fo nothing
}
@Override
public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
return baseStatistics;
}
@Override
public InputSplit[] createInputSplits(int i) {
return new GenericInputSplit[] {new GenericInputSplit(1, 1)};
}
@Override
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}
@Override
public void openInputFormat() {
session =
new Session.Builder()
.nodeUrls(Arrays.asList(options.get(Options.NODE_URLS).split(",")))
.username(options.get(Options.USER))
.password(options.get(Options.PASSWORD))
.build();
try {
session.open(false);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public void open(InputSplit inputSplit) {
String sql;
if (lowerBound < 0L && upperBound < 0L) {
sql = String.format("SELECT %s FROM %s", String.join(",", measurements), device);
} else if (lowerBound < 0L && upperBound > 0L) {
sql =
String.format(
"SELECT %s FROM %s WHERE TIME <= %d",
String.join(",", measurements), device, upperBound);
} else if (lowerBound > 0L && upperBound < 0L) {
sql =
String.format(
"SELECT %s FROM %s WHERE TIME >= %d",
String.join(",", measurements), device, lowerBound);
} else {
sql =
String.format(
"SELECT %s FROM %s WHERE TIME >= %d AND TIME <= %d",
String.join(",", measurements), device, lowerBound, upperBound);
}
try {
dataSet = session.executeQueryStatement(sql);
columnTypes = dataSet.getColumnTypes();
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean reachedEnd() {
try {
return !dataSet.hasNext();
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public RowData nextRecord(RowData rowData) {
try {
RowRecord rowRecord = dataSet.next();
return Utils.convert(rowRecord, columnTypes);
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {
if (dataSet != null) {
dataSet.close();
}
if (session != null) {
session.close();
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.function;
import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.exception.IllegalOptionException;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.flink.sql.wrapper.TabletWrapper;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class IoTDBCDCSourceFunction extends RichSourceFunction<RowData> {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBCDCSourceFunction.class);
private final List<IoTDBWebSocketClient> socketClients = new ArrayList<>();
private final int cdcPort;
private final List<String> nodeUrls;
private final String taskName;
private final String device;
private final String user;
private final String password;
private final List<String> measurements;
private final BlockingQueue<TabletWrapper> tabletWrappers;
private transient ExecutorService consumeExecutor;
public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
List<Tuple2<String, DataType>> tableSchema = schemaWrapper.getSchema();
cdcPort = options.get(Options.CDC_PORT);
nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
taskName = options.get(Options.CDC_TASK_NAME);
device = options.get(Options.DEVICE);
user = options.get(Options.USER);
password = options.get(Options.PASSWORD);
measurements =
tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size());
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Session session =
new Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
session.open(false);
boolean hasCreatedPipeTask =
session.executeQueryStatement(String.format("show pipe flink_cdc_%s", taskName)).hasNext();
if (!hasCreatedPipeTask) {
for (String nodeUrl : nodeUrls) {
URI uri = new URI(String.format("ws://%s:%d", nodeUrl.split(":")[0], cdcPort));
if (Utils.isURIAvailable(uri)) {
throw new IllegalOptionException(
String.format(
"The port `%d` has been bound. Please use another one by option `cdc.port`.",
cdcPort));
}
}
String createPipeCommand =
String.format(
"CREATE PIPE flink_cdc_%s\n"
+ "WITH EXTRACTOR (\n"
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.pattern' = '%s',\n"
+ "'extractor.history.enable' = 'true',\n"
+ "'extractor.realtime.enable' = 'true',\n"
+ "'extractor.realtime.mode' = 'hybrid',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d'"
+ ")",
taskName, device, cdcPort);
session.executeNonQueryStatement(createPipeCommand);
}
String status =
session
.executeQueryStatement(String.format("show pipe flink_cdc_%s", taskName))
.next()
.getFields()
.get(2)
.getStringValue();
if ("STOPPED".equals(status)) {
session.executeNonQueryStatement(String.format("start pipe flink_cdc_%s", taskName));
}
session.close();
consumeExecutor = Executors.newFixedThreadPool(1);
for (String nodeUrl : nodeUrls) {
URI uri = new URI(String.format("ws://%s:%s", nodeUrl.split(":")[0], cdcPort));
socketClients.add(initAndGet(uri));
}
}
@Override
public void run(SourceContext<RowData> ctx) throws InterruptedException {
consumeExecutor.submit(new ConsumeRunnable(ctx));
consumeExecutor.shutdown();
while (true) {
for (IoTDBWebSocketClient socketClient : socketClients) {
if (socketClient.getReadyState().equals(ReadyState.CLOSED)) {
while (!Utils.isURIAvailable(socketClient.getURI())) {
String log =
String.format(
"The URI %s:%d is not available now, sleep 5 seconds.",
socketClient.getURI().getHost(), socketClient.getURI().getPort());
LOGGER.warn(log);
Thread.sleep(5000);
}
socketClient.reconnect();
while (!socketClient.getReadyState().equals(ReadyState.OPEN)) {
Thread.sleep(1000);
}
socketClient.send("START");
} else {
Thread.sleep(1000);
}
}
}
}
@Override
public void cancel() {
socketClients.forEach(WebSocketClient::close);
}
public void addTabletWrapper(TabletWrapper tabletWrapper) {
try {
this.tabletWrappers.put(tabletWrapper);
} catch (InterruptedException e) {
String host = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getHostName();
int port = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getPort();
String log =
String.format(
"The tablet from %s:%d can't be put into queue, because: %s",
host, port, e.getMessage());
LOGGER.warn(log);
Thread.currentThread().interrupt();
}
}
private IoTDBWebSocketClient initAndGet(URI uri) throws InterruptedException {
while (!Utils.isURIAvailable(uri)) {
String log =
String.format(
"The URI %s:%d is not available now, sleep 5 seconds.", uri.getHost(), uri.getPort());
LOGGER.warn(log);
Thread.sleep(5000);
}
IoTDBWebSocketClient client = new IoTDBWebSocketClient(uri, this);
client.connect();
while (!client.getReadyState().equals(ReadyState.OPEN)) {
Thread.sleep(1000);
}
client.send("START");
return client;
}
public void collectTablet(Tablet tablet, SourceContext<RowData> ctx) {
if (!device.equals(tablet.deviceId)) {
return;
}
List<MeasurementSchema> schemas = tablet.getSchemas();
int rowSize = tablet.rowSize;
HashMap<String, Pair<BitMap, List<Object>>> values = new HashMap<>();
for (MeasurementSchema schema : schemas) {
String measurement = schema.getMeasurementId();
values.put(
measurement,
new Pair<>(
tablet.bitMaps[schemas.indexOf(schema)],
Utils.object2List(tablet.values[schemas.indexOf(schema)], schema.getType())));
}
for (int i = 0; i < rowSize; i++) {
ArrayList<Object> row = new ArrayList<>();
row.add(tablet.timestamps[i]);
for (String measurement : measurements) {
if (values.get(measurement).getLeft() == null
|| !values.get(measurement).getLeft().isMarked(i)) {
row.add(values.get(measurement).getRight().get(i));
} else {
row.add(null);
}
}
RowData rowData = GenericRowData.of(row.toArray());
ctx.collect(rowData);
}
}
private class ConsumeRunnable implements Runnable {
SourceContext<RowData> context;
public ConsumeRunnable(SourceContext<RowData> context) {
this.context = context;
}
@Override
public void run() {
while (true) {
try {
TabletWrapper tabletWrapper = tabletWrappers.take();
collectTablet(tabletWrapper.getTablet(), context);
tabletWrapper
.getWebSocketClient()
.send(String.format("ACK:%d", tabletWrapper.getCommitId()));
} catch (InterruptedException e) {
LOGGER.warn("The tablet can't be taken from queue!");
Thread.currentThread().interrupt();
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.function;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.curator5.com.google.common.cache.Cache;
import org.apache.flink.shaded.curator5.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class IoTDBLookupFunction extends TableFunction<RowData> {
private final List<Tuple2<String, DataType>> schema;
private final int cacheMaxRows;
private final int cacheTTLSec;
private final List<String> nodeUrls;
private final String user;
private final String password;
private final String device;
private final List<String> measurements;
private Session session;
private transient Cache<RowData, RowData> cache;
public IoTDBLookupFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
this.schema = schemaWrapper.getSchema();
cacheMaxRows = options.get(Options.LOOKUP_CACHE_MAX_ROWS);
cacheTTLSec = options.get(Options.LOOKUP_CACHE_TTL_SEC);
nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
user = options.get(Options.USER);
password = options.get(Options.PASSWORD);
device = options.get(Options.DEVICE);
measurements =
schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
}
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build();
session.open(false);
if (cacheMaxRows > 0 && cacheTTLSec > 0) {
cache =
CacheBuilder.newBuilder()
.expireAfterAccess(cacheTTLSec, TimeUnit.SECONDS)
.maximumSize(cacheMaxRows)
.build();
}
}
@Override
public void close() throws Exception {
if (cache != null) {
cache.invalidateAll();
}
if (session != null) {
session.close();
}
super.close();
}
public void eval(Object obj) throws IoTDBConnectionException, StatementExecutionException {
RowData lookupKey = GenericRowData.of(obj);
if (cache != null) {
RowData cacheRow = cache.getIfPresent(lookupKey);
if (cacheRow != null) {
collect(cacheRow);
return;
}
}
long timestamp = lookupKey.getLong(0);
String sql =
String.format(
"SELECT %s FROM %s WHERE TIME=%d",
StringUtils.join(measurements, ','), device, timestamp);
SessionDataSet dataSet = session.executeQueryStatement(sql);
List<String> columnNames = dataSet.getColumnNames();
columnNames.remove("Time");
RowRecord rowRecord = dataSet.next();
if (rowRecord == null) {
ArrayList<Object> values = new ArrayList<>();
values.add(timestamp);
for (int i = 0; i < schema.size(); i++) {
values.add(null);
}
GenericRowData rowData = GenericRowData.of(values.toArray());
collect(rowData);
return;
}
List<Field> fields = rowRecord.getFields();
ArrayList<Object> values = new ArrayList<>();
values.add(timestamp);
for (Tuple2<String, DataType> filed : schema) {
values.add(
Utils.getValue(fields.get(columnNames.indexOf(device + '.' + filed.f0)), filed.f1));
}
GenericRowData rowData = GenericRowData.of(values.toArray());
if (cache != null) {
cache.put(lookupKey, rowData);
}
collect(rowData);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.function;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class IoTDBSinkFunction implements SinkFunction<RowData> {
private final List<Tuple2<String, DataType>> schema;
private final List<String> nodeUrls;
private final String user;
private final String password;
private final String device;
private final boolean aligned;
private final List<String> measurements;
private final List<TSDataType> dataTypes;
private static final Map<DataType, TSDataType> TYPE_MAP = new HashMap<>();
private static Session session;
static {
TYPE_MAP.put(DataTypes.INT(), TSDataType.INT32);
TYPE_MAP.put(DataTypes.BIGINT(), TSDataType.INT64);
TYPE_MAP.put(DataTypes.FLOAT(), TSDataType.FLOAT);
TYPE_MAP.put(DataTypes.DOUBLE(), TSDataType.DOUBLE);
TYPE_MAP.put(DataTypes.BOOLEAN(), TSDataType.BOOLEAN);
TYPE_MAP.put(DataTypes.STRING(), TSDataType.TEXT);
}
public IoTDBSinkFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
// Get schema
this.schema = schemaWrapper.getSchema();
// Get options
nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
user = options.get(Options.USER);
password = options.get(Options.PASSWORD);
device = options.get(Options.DEVICE);
aligned = options.get(Options.ALIGNED);
// Get measurements and data types from schema
measurements =
schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
dataTypes = schema.stream().map(field -> TYPE_MAP.get(field.f1)).collect(Collectors.toList());
}
@Override
public void invoke(RowData rowData, Context context) throws Exception {
// Open the session if the session has not been opened
if (session == null) {
session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build();
session.open(false);
}
// Load data from RowData
if (rowData.getRowKind().equals(RowKind.INSERT)
|| rowData.getRowKind().equals(RowKind.UPDATE_AFTER)) {
long timestamp = rowData.getLong(0);
ArrayList<String> measurementsOfRow = new ArrayList<>();
ArrayList<TSDataType> dataTypesOfRow = new ArrayList<>();
ArrayList<Object> values = new ArrayList<>();
for (int i = 0; i < this.measurements.size(); i++) {
Object value = Utils.getValue(rowData, schema.get(i).f1, i + 1);
if (value == null) {
continue;
}
measurementsOfRow.add(this.measurements.get(i));
dataTypesOfRow.add(this.dataTypes.get(i));
values.add(value);
}
// insert data
if (aligned) {
session.insertAlignedRecord(device, timestamp, measurementsOfRow, dataTypesOfRow, values);
} else {
session.insertRecord(device, timestamp, measurementsOfRow, dataTypesOfRow, values);
}
} else if (rowData.getRowKind().equals(RowKind.DELETE)) {
ArrayList<String> paths = new ArrayList<>();
for (String measurement : measurements) {
paths.add(String.format("%s.%s", device, measurement));
}
session.deleteData(paths, rowData.getLong(0));
} else if (rowData.getRowKind().equals(RowKind.UPDATE_BEFORE)) {
// do nothing
}
}
@Override
public void finish() throws Exception {
if (session != null) {
session.close();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.provider;
import org.apache.iotdb.flink.sql.function.IoTDBSinkFunction;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
public class IoTDBDynamicTableSink implements DynamicTableSink {
private final ReadableConfig options;
private final TableSchema schema;
public IoTDBDynamicTableSink(ReadableConfig options, TableSchema schema) {
this.options = options;
this.schema = schema;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return SinkFunctionProvider.of(new IoTDBSinkFunction(options, new SchemaWrapper(schema)));
}
@Override
public DynamicTableSink copy() {
return new IoTDBDynamicTableSink(options, schema);
}
@Override
public String asSummaryString() {
return "IoTDB Dynamic Table Sink";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.provider;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.function.IoTDBBoundedScanFunction;
import org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction;
import org.apache.iotdb.flink.sql.function.IoTDBLookupFunction;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
public class IoTDBDynamicTableSource implements LookupTableSource, ScanTableSource {
private final ReadableConfig options;
private final TableSchema schema;
public IoTDBDynamicTableSource(ReadableConfig options, TableSchema schema) {
this.options = options;
this.schema = schema;
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return TableFunctionProvider.of(new IoTDBLookupFunction(options, new SchemaWrapper(schema)));
}
@Override
public DynamicTableSource copy() {
return new IoTDBDynamicTableSource(options, schema);
}
@Override
public String asSummaryString() {
return "IoTDB Dynamic Table Source";
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
if (options.get(Options.MODE) == Options.Mode.CDC) {
return SourceFunctionProvider.of(
new IoTDBCDCSourceFunction(options, new SchemaWrapper(schema)), false);
} else {
return InputFormatProvider.of(
new IoTDBBoundedScanFunction(options, new SchemaWrapper(schema)));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.wrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SchemaWrapper implements Serializable {
private final List<Tuple2<String, DataType>> schema;
public SchemaWrapper(TableSchema schema) {
this.schema = new ArrayList<>();
for (String fieldName : schema.getFieldNames()) {
if ("Time_".equals(fieldName)) {
continue;
}
this.schema.add(new Tuple2<>(fieldName, schema.getFieldDataType(fieldName).get()));
}
}
public List<Tuple2<String, DataType>> getSchema() {
return schema;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.wrapper;
import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient;
import org.apache.iotdb.tsfile.write.record.Tablet;
public class TabletWrapper {
private final long commitId;
private final IoTDBWebSocketClient websocketClient;
private final Tablet tablet;
public TabletWrapper(long commitId, IoTDBWebSocketClient websocketClient, Tablet tablet) {
this.commitId = commitId;
this.websocketClient = websocketClient;
this.tablet = tablet;
}
public long getCommitId() {
return commitId;
}
public IoTDBWebSocketClient getWebSocketClient() {
return websocketClient;
}
public Tablet getTablet() {
return tablet;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.apache.iotdb.flink.sql.factory.IoTDBDynamicTableFactory
\ No newline at end of file
......@@ -31,6 +31,7 @@
<name>IoTDB: Connector</name>
<modules>
<module>flink-iotdb-connector</module>
<module>flink-sql-iotdb-connector</module>
<module>flink-tsfile-connector</module>
<module>grafana-connector</module>
<module>hadoop</module>
......
......@@ -353,6 +353,11 @@
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>${websocket.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
......
......@@ -51,6 +51,9 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version";
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port";
public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.connector.protocol.websocket;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
public class WebSocketConnectorServer extends WebSocketServer {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class);
private final PriorityBlockingQueue<Pair<Long, Event>> events =
new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
private final WebsocketConnector websocketConnector;
private final ConcurrentMap<Long, Event> eventMap = new ConcurrentHashMap<>();
public WebSocketConnectorServer(
InetSocketAddress address, WebsocketConnector websocketConnector) {
super(address);
this.websocketConnector = websocketConnector;
}
@Override
public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
String log =
String.format(
"The connection from client %s:%d has been opened!",
webSocket.getRemoteSocketAddress().getHostName(),
webSocket.getRemoteSocketAddress().getPort());
LOGGER.info(log);
}
@Override
public void onClose(WebSocket webSocket, int i, String s, boolean b) {
String log =
String.format(
"The client from %s:%d has been closed!",
webSocket.getRemoteSocketAddress().getAddress(),
webSocket.getRemoteSocketAddress().getPort());
LOGGER.info(log);
}
@Override
public void onMessage(WebSocket webSocket, String s) {
String log =
String.format(
"Received a message `%s` from %s:%d",
s,
webSocket.getRemoteSocketAddress().getHostName(),
webSocket.getRemoteSocketAddress().getPort());
LOGGER.info(log);
if (s.startsWith("START")) {
handleStart(webSocket);
} else if (s.startsWith("ACK")) {
handleAck(webSocket, s);
} else if (s.startsWith("ERROR")) {
handleError(webSocket, s);
}
}
@Override
public void onError(WebSocket webSocket, Exception e) {
String log;
if (webSocket.getRemoteSocketAddress() != null) {
log =
String.format(
"Got an error `%s` from %s:%d",
e.getMessage(),
webSocket.getLocalSocketAddress().getHostName(),
webSocket.getLocalSocketAddress().getPort());
} else {
log = String.format("Got an error `%s` from client", e.getMessage());
}
LOGGER.error(log);
}
@Override
public void onStart() {
String log =
String.format(
"The webSocket server %s:%d has been started!",
this.getAddress().getHostName(), this.getPort());
LOGGER.error(log);
}
public void addEvent(Pair<Long, Event> event) {
if (events.size() >= 50) {
synchronized (events) {
while (events.size() >= 50) {
try {
events.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
events.put(event);
}
private void handleStart(WebSocket webSocket) {
try {
Pair<Long, Event> eventPair = events.take();
synchronized (events) {
events.notifyAll();
transfer(eventPair, webSocket);
}
} catch (InterruptedException e) {
String log = String.format("The event can't be taken, because: %s", e.getMessage());
LOGGER.warn(log);
Thread.currentThread().interrupt();
}
}
private void handleAck(WebSocket webSocket, String s) {
long commitId = Long.parseLong(s.replace("ACK:", ""));
Event event = eventMap.remove(commitId);
websocketConnector.commit(
commitId, event instanceof EnrichedEvent ? (EnrichedEvent) event : null);
handleStart(webSocket);
}
private void handleError(WebSocket webSocket, String s) {
long commitId = Long.parseLong(s.replace("ERROR:", ""));
String log =
String.format(
"The tablet of commitId: %d can't be parsed by client, it will be retried later.",
commitId);
LOGGER.warn(log);
events.put(new Pair<>(commitId, eventMap.remove(commitId)));
handleStart(webSocket);
}
private void transfer(Pair<Long, Event> eventPair, WebSocket webSocket) {
Long commitId = eventPair.getLeft();
Event event = eventPair.getRight();
try {
ByteBuffer tabletBuffer = null;
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
tabletBuffer = ((PipeInsertNodeTabletInsertionEvent) event).convertToTablet().serialize();
} else if (event instanceof PipeRawTabletInsertionEvent) {
tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize();
} else if (event instanceof PipeTsFileInsertionEvent) {
PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) event;
tsFileInsertionEvent.waitForTsFileClose();
Iterable<TabletInsertionEvent> subEvents = tsFileInsertionEvent.toTabletInsertionEvents();
for (TabletInsertionEvent subEvent : subEvents) {
tabletBuffer = ((PipeRawTabletInsertionEvent) subEvent).convertToTablet().serialize();
}
} else {
throw new NotImplementedException(
"IoTDBCDCConnector only support "
+ "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
}
if (tabletBuffer == null) {
return;
}
ByteBuffer payload = ByteBuffer.allocate(Long.BYTES + tabletBuffer.limit());
payload.putLong(commitId);
payload.put(tabletBuffer);
payload.flip();
this.broadcast(payload, Collections.singletonList(webSocket));
eventMap.put(eventPair.getLeft(), eventPair.getRight());
String log =
String.format(
"Transferred a message to client %s:%d",
webSocket.getRemoteSocketAddress().getAddress().getHostName(),
webSocket.getRemoteSocketAddress().getPort());
LOGGER.info(log);
} catch (InterruptedException e) {
events.put(eventPair);
Thread.currentThread().interrupt();
throw new PipeException(e.getMessage());
} catch (Exception e) {
events.put(eventPair);
e.printStackTrace();
throw new PipeException(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.connector.protocol.websocket;
import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Comparator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
public class WebsocketConnector implements PipeConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketConnector.class);
private WebSocketConnectorServer server;
private int port;
public final AtomicLong commitIdGenerator = new AtomicLong(0);
private final AtomicLong lastCommitId = new AtomicLong(0);
private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
new PriorityQueue<>(Comparator.comparing(o -> o.left));
@Override
public void validate(PipeParameterValidator validator) throws Exception {}
@Override
public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
throws Exception {
port =
parameters.getIntOrDefault(
PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
}
@Override
public void handshake() throws Exception {
if (server == null) {
server = new WebSocketConnectorServer(new InetSocketAddress(port), this);
server.start();
}
}
@Override
public void heartbeat() throws Exception {}
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) {
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
LOGGER.warn(
"WebsocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ "Current event: {}.",
tabletInsertionEvent);
return;
}
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) tabletInsertionEvent)
.increaseReferenceCount(WebsocketConnector.class.getName());
server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
}
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
LOGGER.warn(
"WebsocketConnector only support PipeTsFileInsertionEvent. Current event: {}.",
tsFileInsertionEvent);
return;
}
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) tsFileInsertionEvent)
.increaseReferenceCount(WebsocketConnector.class.getName());
server.addEvent(new Pair<>(commitId, tsFileInsertionEvent));
}
@Override
public void transfer(Event event) throws Exception {}
@Override
public void close() throws Exception {
server.stop();
}
public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) {
commitQueue.offer(
new Pair<>(
requestCommitId,
() ->
Optional.ofNullable(enrichedEvent)
.ifPresent(
event ->
event.decreaseReferenceCount(WebsocketConnector.class.getName()))));
while (!commitQueue.isEmpty()) {
final Pair<Long, Runnable> committer = commitQueue.peek();
if (lastCommitId.get() + 1 != committer.left) {
break;
}
committer.right.run();
lastCommitId.incrementAndGet();
commitQueue.poll();
}
}
}
......@@ -28,6 +28,7 @@ import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.websocket.WebsocketConnector;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
......@@ -78,6 +79,8 @@ public class PipeConnectorSubtaskManager {
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
pipeConnector = new IoTDBAirGapConnector();
} else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) {
pipeConnector = new WebsocketConnector();
} else {
pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
}
......
......@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeCon
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
......@@ -43,6 +44,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class),
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class),
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
;
private final String pipePluginName;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
/**
* This class is a placeholder and should not be initialized. It represents the IoTDB WebSocket
* connector. There is a real implementation in the server module but cannot be imported here. The
* pipe agent in the server module will replace this class with the real implementation when
* initializing the IoTDB Thrift connector.
*/
public class WebSocketConnector extends PlaceholderConnector {}
......@@ -209,6 +209,8 @@
<thrift.exec.absolute.path/>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
<boost.include.dir/>
<!-- websocket -->
<websocket.version>1.5.3</websocket.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册