diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index 114026eca0ac69587fa9ad4b8bf9bb914e67e128..db49e5f3952761c78f0796e706cb4a9b567467f5 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -1256,6 +1256,7 @@ The source code of the sample application is under `TDengine/examples/JDBC`: - connectionPools: using taos-jdbcdriver in connection pools such as HikariCP, Druid, dbcp, c3p0, etc. - SpringJdbcTemplate: using taos-jdbcdriver in Spring JdbcTemplate. - mybatisplus-demo: using taos-jdbcdriver in Springboot + Mybatis. +- consumer-demo: consumer TDengine data example, the consumption rate can be controlled by parameters. [JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC) diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index e4cf4a83e7508cf0e8a978e3de5d602b1f658bc0..46800226d77cc880359f73b71c9e0a396954399a 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -1258,6 +1258,7 @@ public static void main(String[] args) throws Exception { - connectionPools:HikariCP, Druid, dbcp, c3p0 等连接池中使用 taos-jdbcdriver。 - SpringJdbcTemplate:Spring JdbcTemplate 中使用 taos-jdbcdriver。 - mybatisplus-demo:Springboot + Mybatis 中使用 taos-jdbcdriver。 +- consumer-demo:Consumer 消费 TDengine 数据示例,可通过参数控制消费速度。 请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC) diff --git a/examples/JDBC/consumer-demo/pom.xml b/examples/JDBC/consumer-demo/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..aa3cb154e5df957c3ca8b7447c2a5a02168b6be7 --- /dev/null +++ b/examples/JDBC/consumer-demo/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + com.taosdata + consumer + 1.0-SNAPSHOT + + + 8 + 8 + + + + + com.taosdata.jdbc + taos-jdbcdriver + 3.2.1 + + + com.google.guava + guava + 30.1.1-jre + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.3.0 + + + ConsumerDemo + + ConsumerDemo + + + com.taosdata.ConsumerDemo + + + + jar-with-dependencies + + + package + + single + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + UTF-8 + + + + + + + \ No newline at end of file diff --git a/examples/JDBC/consumer-demo/readme.md b/examples/JDBC/consumer-demo/readme.md new file mode 100644 index 0000000000000000000000000000000000000000..c211b017a74cd2a84bbab6953bb31500d73b3ba2 --- /dev/null +++ b/examples/JDBC/consumer-demo/readme.md @@ -0,0 +1,52 @@ +# How to Run the Consumer Demo Code On Linux OS +TDengine's Consumer demo project is organized in a Maven way so that users can easily compile, package and run the project. If you don't have Maven on your server, you may install it using +``` +sudo apt-get install maven +``` + +## Install TDengine Client and TaosAdapter +Make sure you have already installed a tdengine client on your current develop environment. +Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client. + +## Run Consumer Demo using mvn plugin +run command: +``` +mvn clean compile exec:java -Dexec.mainClass="com.taosdata.ConsumerDemo" +``` + +## Custom configuration +```shell +# the host of TDengine server +export TAOS_HOST="127.0.0.1" + +# the port of TDengine server +export TAOS_PORT="6041" + +# the consumer type, can be "ws" or "jni" +export TAOS_TYPE="ws" + +# the number of consumers +export TAOS_JDBC_CONSUMER_NUM="1" + +# the number of processors to consume +export TAOS_JDBC_PROCESSOR_NUM="2" + +# the number of records to be consumed per processor per second +export TAOS_JDBC_RATE_PER_PROCESSOR="1000" + +# poll wait time in ms +export TAOS_JDBC_POLL_SLEEP="100" +``` + +## Run Consumer Demo using jar + +To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/consumer-demo`` and execute +``` +mvn clean package assembly:single +``` + +To run ConsumerDemo.jar, go to ``TDengine/tests/examples/JDBC/consumer-demo`` and execute +``` +java -jar target/ConsumerDemo-jar-with-dependencies.jar +``` + diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java new file mode 100644 index 0000000000000000000000000000000000000000..2f2467b3713b2d6bcedd708f3ea4b20d3db53c23 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java @@ -0,0 +1,43 @@ +package com.taosdata; + +import java.sql.Timestamp; + +public class Bean { + private Timestamp ts; + private Integer c1; + private String c2; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public Integer getC1() { + return c1; + } + + public void setC1(Integer c1) { + this.c1 = c1; + } + + public String getC2() { + return c2; + } + + public void setC2(String c2) { + this.c2 = c2; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Bean {"); + sb.append("ts=").append(ts); + sb.append(", c1=").append(c1); + sb.append(", c2='").append(c2).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..478af9e70da51657f9d2f7a49e7b919bf5399d55 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java @@ -0,0 +1,6 @@ +package com.taosdata; + +import com.taosdata.jdbc.tmq.ReferenceDeserializer; + +public class BeanDeserializer extends ReferenceDeserializer { +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java new file mode 100644 index 0000000000000000000000000000000000000000..08579926e3cbc1dcaa4c8c01027340d8c2635cb2 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java @@ -0,0 +1,78 @@ +package com.taosdata; + +public class Config { + public static final String TOPIC = "test_consumer"; + public static final String TAOS_HOST = "127.0.0.1"; + public static final String TAOS_PORT = "6041"; + public static final String TAOS_TYPE = "ws"; + public static final int TAOS_JDBC_CONSUMER_NUM = 1; + public static final int TAOS_JDBC_PROCESSOR_NUM = 2; + public static final int TAOS_JDBC_RATE_PER_PROCESSOR = 1000; + public static final int TAOS_JDBC_POLL_SLEEP = 100; + + private final int consumerNum; + private final int processCapacity; + private final int rate; + private final int pollSleep; + private final String type; + private final String host; + private final String port; + + public Config(String type, String host, String port, int consumerNum, int processCapacity, int rate, int pollSleep) { + this.type = type; + this.consumerNum = consumerNum; + this.processCapacity = processCapacity; + this.rate = rate; + this.pollSleep = pollSleep; + this.host = host; + this.port = port; + } + + public int getConsumerNum() { + return consumerNum; + } + + public int getProcessCapacity() { + return processCapacity; + } + + public int getRate() { + return rate; + } + + public int getPollSleep() { + return pollSleep; + } + + public String getHost() { + return host; + } + + public String getPort() { + return port; + } + + public String getType() { + return type; + } + + public static Config getFromENV() { + String host = System.getenv("TAOS_HOST") != null ? System.getenv("TAOS_HOST") : TAOS_HOST; + String port = System.getenv("TAOS_PORT") != null ? System.getenv("TAOS_PORT") : TAOS_PORT; + String type = System.getenv("TAOS_TYPE") != null ? System.getenv("TAOS_TYPE") : TAOS_TYPE; + + String c = System.getenv("TAOS_JDBC_CONSUMER_NUM"); + int num = c != null ? Integer.parseInt(c) : TAOS_JDBC_CONSUMER_NUM; + + String p = System.getenv("TAOS_JDBC_PROCESSOR_NUM"); + int capacity = p != null ? Integer.parseInt(p) : TAOS_JDBC_PROCESSOR_NUM; + + String r = System.getenv("TAOS_JDBC_RATE_PER_PROCESSOR"); + int rate = r != null ? Integer.parseInt(r) : TAOS_JDBC_RATE_PER_PROCESSOR; + + String s = System.getenv("TAOS_JDBC_POLL_SLEEP"); + int sleep = s != null ? Integer.parseInt(s) : TAOS_JDBC_POLL_SLEEP; + + return new Config(type, host, port, num, capacity, rate, sleep); + } +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java new file mode 100644 index 0000000000000000000000000000000000000000..7c7719c63986cc309363c13fecb5fafe0243cdba --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java @@ -0,0 +1,65 @@ +package com.taosdata; + +import com.taosdata.jdbc.tmq.TMQConstants; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.taosdata.Config.*; + +public class ConsumerDemo { + public static void main(String[] args) throws SQLException { + // Config + Config config = Config.getFromENV(); + // Generated data + mockData(); + + Properties prop = new Properties(); + prop.setProperty(TMQConstants.CONNECT_TYPE, config.getType()); + prop.setProperty(TMQConstants.BOOTSTRAP_SERVERS, config.getHost() + ":" + config.getPort()); + prop.setProperty(TMQConstants.CONNECT_USER, "root"); + prop.setProperty(TMQConstants.CONNECT_PASS, "taosdata"); + prop.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true"); + prop.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); + prop.setProperty(TMQConstants.GROUP_ID, "gId"); + prop.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.BeanDeserializer"); + for (int i = 0; i < config.getConsumerNum() - 1; i++) { + new Thread(new Worker(prop, config)).start(); + } + new Worker(prop, config).run(); + } + + public static void mockData() throws SQLException { + String dbName = "test_consumer"; + String tableName = "st"; + String url = "jdbc:TAOS-RS://" + TAOS_HOST + ":" + TAOS_PORT + "/?user=root&password=taosdata&batchfetch=true"; + Connection connection = DriverManager.getConnection(url); + Statement statement = connection.createStatement(); + statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650"); + statement.executeUpdate("use " + dbName); + statement.executeUpdate("create table if not exists " + tableName + " (ts timestamp, c1 int, c2 nchar(100)) "); + statement.executeUpdate("create topic if not exists " + TOPIC + " as select ts, c1, c2 from " + tableName); + + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r); + t.setName("mock-data-thread-" + t.getId()); + return t; + }); + AtomicInteger atomic = new AtomicInteger(); + scheduledExecutorService.scheduleWithFixedDelay(() -> { + int i = atomic.getAndIncrement(); + try { + statement.executeUpdate("insert into " + tableName + " values(now, " + i + ",'" + i + "')"); + } catch (SQLException e) { + // ignore + } + }, 0, 10, TimeUnit.MILLISECONDS); + } +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java new file mode 100644 index 0000000000000000000000000000000000000000..f6e21cd7294333aeb96a6e99f0cafe326073f5fa --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java @@ -0,0 +1,60 @@ +package com.taosdata; + +import com.google.common.util.concurrent.RateLimiter; +import com.taosdata.jdbc.tmq.ConsumerRecord; +import com.taosdata.jdbc.tmq.ConsumerRecords; +import com.taosdata.jdbc.tmq.TaosConsumer; + +import java.sql.SQLException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Semaphore; + +public class Worker implements Runnable { + + int sleepTime; + int rate; + + ForkJoinPool pool = new ForkJoinPool(); + Semaphore semaphore; + + TaosConsumer consumer; + + public Worker(Properties prop, Config config) throws SQLException { + consumer = new TaosConsumer<>(prop); + consumer.subscribe(Collections.singletonList(Config.TOPIC)); + semaphore = new Semaphore(config.getProcessCapacity()); + sleepTime = config.getPollSleep(); + rate = config.getRate(); + } + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + // 控制请求频率 + if (semaphore.tryAcquire()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(sleepTime)); + pool.submit(() -> { + RateLimiter limiter = RateLimiter.create(rate); + try { + for (ConsumerRecord record : records) { + // 流量控制 + limiter.acquire(); + // 业务处理数据 + System.out.println("[" + LocalDateTime.now() + "] Thread id:" + Thread.currentThread().getId() + " -> " + record.value()); + } + } finally { + semaphore.release(); + } + }); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + } +}