未验证 提交 b9e1f151 编写于 作者: X Xieqijun 提交者: GitHub

[IOTDB-2373][Trigger] MQTTSink: there should be a timeout rule (#4953)

上级 ae8e9dad
......@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import java.net.InetAddress;
public class MQTTHandler implements Handler<MQTTConfiguration, MQTTEvent> {
private BlockingConnection connection;
......@@ -35,6 +37,7 @@ public class MQTTHandler implements Handler<MQTTConfiguration, MQTTEvent> {
@Override
public void open(MQTTConfiguration configuration) throws Exception {
MQTT mqtt = new MQTT();
ping(configuration.getHost());
mqtt.setHost(configuration.getHost(), configuration.getPort());
mqtt.setUserName(configuration.getUsername());
mqtt.setPassword(configuration.getPassword());
......@@ -42,10 +45,16 @@ public class MQTTHandler implements Handler<MQTTConfiguration, MQTTEvent> {
mqtt.setReconnectDelay(configuration.getReconnectDelay());
connection = mqtt.blockingConnection();
connection.connect();
payloadFormatter = generatePayloadFormatter(configuration);
}
private void ping(String host) throws Exception {
InetAddress inet = InetAddress.getByName(host);
if (!inet.isReachable(1000)) {
throw new Exception("Connection refused");
}
}
private static String generatePayloadFormatter(MQTTConfiguration configuration)
throws SinkException {
return String.format(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册