未验证 提交 3186a0c3 编写于 作者: H HZYWPT 提交者: GitHub

[IOTDB-2184] Update the code of Kafka example to use Session API and update...

[IOTDB-2184] Update the code of Kafka example to use Session API and update kafka version from 0.8.2.0 to 2.8.0 (#4849)

* [IOTDB-2184] Update the code of Kafka example to use Session API and update kafka version from 0.8.2.0 to 2.8.0

* [IOTDB-2184] sessionPoll replacement session,insertRecords replacement insertRecord

Co-authored-by: 19858181030@163.com <hzy19980801>
上级 d8bb6d5b
......@@ -43,7 +43,13 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -26,21 +26,21 @@ The example is to show how to send data from localhost to IoTDB through Kafka.
## Usage
### Version usage
IoTDB: 0.13.0-SNAPSHOT
Kafka: 0.8.2.0
Kafka: 2.8.0
### Dependencies with Maven
```
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>0.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.13.0-SNAPSHOT</version>
</dependency>
</dependencies>
```
......@@ -51,7 +51,7 @@ Kafka: 0.8.2.0
 For details, please refer to http://kafka.apache.org/081/documentation.html#quickstart
```
### Run KafkaProducer.java
### Run Producer.java
```
The class is to send data from localhost to Kafka clusters.
......@@ -62,13 +62,13 @@ Kafka: 0.8.2.0
 Finally, run KafkaProducer.java
```
### Run KafkaConsumer.java
### Run Consumer.java
```
The class is to show how to consume data from kafka through multi-threads.
The data is sent by class KafkaProducer.
 You can set the parameter of CONSUMER_THREAD_NUM in Constant.java to make sure the number of consumer threads:(for example: "5")
> private final static int CONSUMER_THREAD_NUM = 5;
 You can set the parameter of CONSUMER_THREAD_NUM in Constant.java to make sure the number of consumer threads:(for example: "3")
> private final static int CONSUMER_THREAD_NUM = 3;
```
#### Notice
......
......@@ -18,46 +18,50 @@
*/
package org.apache.iotdb.kafka;
import org.apache.iotdb.jdbc.Config;
public class Constant {
private Constant() {}
public static final String TOPIC = "Kafka-Test";
public static final String KAFKA_SERVICE_URL = "127.0.0.1:9092";
public static final int CONSUMER_THREAD_NUM = 5;
public static final String IOTDB_CONNECTION_URL = Config.IOTDB_URL_PREFIX + "localhost:6667/";
public static final int SESSION_SIZE = 3;
public static final String IOTDB_CONNECTION_HOST = "localhost";
public static final int IOTDB_CONNECTION_PORT = 6667;
public static final String IOTDB_CONNECTION_USER = "root";
public static final String IOTDB_CONNECTION_PASSWORD = "root";
public static final String STORAGE_GROUP = "root.vehicle";
/** If you write level3 as device, timeseries will not be created because device is the keyword */
public static final String[] ALL_TIMESERIES = {
"root.vehicle.deviceid.sensor1",
"root.vehicle.deviceid.sensor2",
"root.vehicle.deviceid.sensor3",
"root.vehicle.deviceid.sensor4"
public static final String[] STORAGE_GROUP = {"root.vehicle", "root.test"};
public static final String INT32 = "INT32";
public static final String PLAIN = "PLAIN";
public static final String SNAPPY = "SNAPPY";
public static final String[][] CREATE_TIMESERIES = {
{"root.vehicle.d0.s0", INT32, PLAIN, SNAPPY},
{"root.vehicle.d0.s1", "TEXT", PLAIN, SNAPPY},
{"root.vehicle.d1.s2", "FLOAT", PLAIN, SNAPPY},
{"root.vehicle.d1.s3", "BOOLEAN", PLAIN, SNAPPY},
{"root.test.d0.s0", INT32, PLAIN, SNAPPY},
{"root.test.d0.s1", "TEXT", PLAIN, SNAPPY},
{"root.test.d1.s0", INT32, PLAIN, SNAPPY},
};
public static final String[] ALL_DATA = {
"sensor1,2017/10/24 19:30:00,606162908",
"sensor2,2017/10/24 19:30:00,160161162",
"sensor3,2017/10/24 19:30:00,260261262",
"sensor4,2017/10/24 19:30:00,360361362",
"sensor1,2017/10/24 19:31:00,818182346",
"sensor2,2017/10/24 19:31:00,180181182",
"sensor3,2017/10/24 19:31:00,280281282",
"sensor4,2017/10/24 19:31:00,380381382",
"sensor1,2017/10/24 19:32:00,505152421",
"sensor2,2017/10/24 19:32:00,150151152",
"sensor3,2017/10/24 19:32:00,250251252",
"sensor4,2017/10/24 19:32:00,350351352",
"sensor1,2017/10/24 19:33:00,404142234",
"sensor2,2017/10/24 19:33:00,140141142",
"sensor3,2017/10/24 19:33:00,240241242",
"sensor4,2017/10/24 19:33:00,340341342",
"sensor1,2017/10/24 19:34:00,101112567",
"sensor2,2017/10/24 19:34:00,110111112",
"sensor3,2017/10/24 19:34:00,210211212",
"sensor4,2017/10/24 19:34:00,310311312",
"root.vehicle.d0,10,s0,INT32,100",
"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'",
"root.vehicle.d0,19,s1,TEXT,'employeeId103'",
"root.vehicle.d1,11,s2,FLOAT,104.0",
"root.vehicle.d1,15,s2:s3,FLOAT:BOOLEAN,105.0:true",
"root.vehicle.d1,17,s3,BOOLEAN,false",
"root.vehicle.d0,20,s0,INT32,1000",
"root.vehicle.d0,22,s0:s1,INT32:TEXT,1001:'employeeId1002'",
"root.vehicle.d0,29,s1,TEXT,'employeeId1003'",
"root.vehicle.d1,21,s2,FLOAT,1004.0",
"root.vehicle.d1,25,s2:s3,FLOAT:BOOLEAN,1005.0:true",
"root.vehicle.d1,27,s3,BOOLEAN,true",
"root.test.d0,10,s0,INT32,106",
"root.test.d0,14,s0:s1,INT32:TEXT,107:'employeeId108'",
"root.test.d0,16,s1,TEXT,'employeeId109'",
"root.test.d1,1,s0,INT32,110",
"root.test.d0,30,s0,INT32,1006",
"root.test.d0,34,s0:s1,INT32:TEXT,1007:'employeeId1008'",
"root.test.d0,36,s1,TEXT,'employeeId1090'",
"root.test.d1,10,s0,INT32,1100",
};
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.kafka;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* The class is to show how to get data from kafka through multi-threads. The data is sent by class
* Consumer.
*/
public class Consumer {
private List<KafkaConsumer<String, String>> consumerList;
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private static SessionPool pool;
private Consumer(List<KafkaConsumer<String, String>> consumerList) {
this.consumerList = consumerList;
pool =
new SessionPool.Builder()
.host(Constant.IOTDB_CONNECTION_HOST)
.port(Constant.IOTDB_CONNECTION_PORT)
.user(Constant.IOTDB_CONNECTION_USER)
.password(Constant.IOTDB_CONNECTION_PASSWORD)
.maxSize(Constant.SESSION_SIZE)
.build();
}
public static void main(String[] args) {
List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
for (int i = 0; i < Constant.CONSUMER_THREAD_NUM; i++) {
/** Consumer configuration */
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_SERVICE_URL);
/** serializer class */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
/**
* What to do when there is no initial offset in ZooKeeper or if an offset is out of range
* earliest: automatically reset the offset to the earliest offset latest:automatically reset
* the offset to the latest offset
*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/** There is only one consumer in a group */
props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.TOPIC);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumerList.add(consumer);
consumer.subscribe(Collections.singleton(Constant.TOPIC));
}
Consumer consumer = new Consumer(consumerList);
initIoTDB();
consumer.consumeInParallel();
}
@SuppressWarnings("squid:S2068")
private static void initIoTDB() {
try {
for (String storageGroup : Constant.STORAGE_GROUP) {
addStorageGroup(storageGroup);
}
for (String[] sql : Constant.CREATE_TIMESERIES) {
createTimeseries(sql);
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error(e.getMessage());
}
}
private static void addStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
pool.setStorageGroup(storageGroup);
}
private static void createTimeseries(String[] sql)
throws StatementExecutionException, IoTDBConnectionException {
String timeseries = sql[0];
TSDataType dataType = TSDataType.valueOf(sql[1]);
TSEncoding encoding = TSEncoding.valueOf(sql[2]);
CompressionType compressionType = CompressionType.valueOf(sql[3]);
pool.createTimeseries(timeseries, dataType, encoding, compressionType);
}
private void consumeInParallel() {
/** Specify the number of consumer thread */
ExecutorService executor = Executors.newFixedThreadPool(Constant.CONSUMER_THREAD_NUM);
for (int i = 0; i < consumerList.size(); i++) {
ConsumerThread consumerThread = new ConsumerThread(consumerList.get(i), pool);
executor.submit(consumerThread);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.kafka;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/** The class is Thread class of Consumer. ConsumerThread. */
public class ConsumerThread implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
private KafkaConsumer<String, String> consumer;
private SessionPool pool;
public ConsumerThread(KafkaConsumer<String, String> consumer, SessionPool pool) {
this.consumer = consumer;
this.pool = pool;
}
/** insert data to IoTDB */
private void insert(String data) throws IoTDBConnectionException, StatementExecutionException {
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
List<String> measurements = Arrays.asList(dataArray[2].split(":"));
List<TSDataType> types = new ArrayList<>();
for (String type : dataArray[3].split(":")) {
types.add(TSDataType.valueOf(type));
}
List<Object> values = new ArrayList<>();
String[] valuesStr = dataArray[4].split(":");
for (int i = 0; i < valuesStr.length; i++) {
switch (types.get(i)) {
case INT64:
values.add(Long.parseLong(valuesStr[i]));
break;
case DOUBLE:
values.add(Double.parseDouble(valuesStr[i]));
break;
case INT32:
values.add(Integer.parseInt(valuesStr[i]));
break;
case TEXT:
values.add(valuesStr[i]);
break;
case FLOAT:
values.add(Float.parseFloat(valuesStr[i]));
break;
case BOOLEAN:
values.add(Boolean.parseBoolean(valuesStr[i]));
break;
}
}
pool.insertRecord(device, time, measurements, types, values);
}
/** insert data to IoTDB */
private void insertDatas(List<String> datas)
throws IoTDBConnectionException, StatementExecutionException {
int size = datas.size();
List<String> deviceIds = new ArrayList<>(size);
List<Long> times = new ArrayList<>(size);
;
List<List<String>> measurementsList = new ArrayList<>(size);
;
List<List<TSDataType>> typesList = new ArrayList<>(size);
;
List<List<Object>> valuesList = new ArrayList<>(size);
;
for (String data : datas) {
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
List<String> measurements = Arrays.asList(dataArray[2].split(":"));
List<TSDataType> types = new ArrayList<>();
for (String type : dataArray[3].split(":")) {
types.add(TSDataType.valueOf(type));
}
List<Object> values = new ArrayList<>();
String[] valuesStr = dataArray[4].split(":");
for (int i = 0; i < valuesStr.length; i++) {
switch (types.get(i)) {
case INT64:
values.add(Long.parseLong(valuesStr[i]));
break;
case DOUBLE:
values.add(Double.parseDouble(valuesStr[i]));
break;
case INT32:
values.add(Integer.parseInt(valuesStr[i]));
break;
case TEXT:
values.add(valuesStr[i]);
break;
case FLOAT:
values.add(Float.parseFloat(valuesStr[i]));
break;
case BOOLEAN:
values.add(Boolean.parseBoolean(valuesStr[i]));
break;
}
}
deviceIds.add(device);
times.add(time);
measurementsList.add(measurements);
typesList.add(types);
valuesList.add(values);
}
pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
}
@Override
public void run() {
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
List<String> datas = new ArrayList<>(records.count());
for (ConsumerRecord<String, String> record : records) {
datas.add(record.value());
}
insertDatas(datas);
} while (true);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.kafka;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* The class is to show how to get data from kafka through multi-threads. The data is sent by class
* KafkaProducer.
*/
public class KafkaConsumer {
private ConsumerConnector consumer;
private KafkaConsumer() {
/** Consumer configuration */
Properties props = new Properties();
/** Zookeeper configuration */
props.put("zookeeper.connect", "127.0.0.1:2181");
props.put("group.id", "consumeGroup");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "1200");
props.put("auto.commit.interval.ms", "1000");
/**
* What to do when there is no initial offset in ZooKeeper or if an offset is out of range
* smallest : automatically reset the offset to the smallest offset
*/
props.put("auto.offset.reset", "smallest");
/** serializer class */
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
private void consume() {
/** Specify the number of consumer thread */
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(Constant.TOPIC, Constant.CONSUMER_THREAD_NUM);
/** Specify data decoder */
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
List<KafkaStream<String, String>> streams = consumerMap.get(Constant.TOPIC);
ExecutorService executor = Executors.newFixedThreadPool(Constant.CONSUMER_THREAD_NUM);
for (final KafkaStream<String, String> stream : streams) {
executor.submit(new KafkaConsumerThread(stream));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.kafka;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class KafkaConsumerThread implements Runnable {
private Connection connection = null;
private Statement statement = null;
private KafkaStream<String, String> stream;
private static boolean setStorageGroup = true;
private static boolean createTimeSeries = true;
private String createStorageGroupSqlTemplate = "SET STORAGE GROUP TO %s";
private String createTimeseriesSqlTemplate =
"CREATE TIMESERIES %s WITH DATATYPE=TEXT, ENCODING=PLAIN";
private String insertDataSqlTemplate =
"INSERT INTO root.vehicle.deviceid(timestamp,%s) VALUES (%s,'%s')";
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerThread.class);
public KafkaConsumerThread(KafkaStream<String, String> stream) {
this.stream = stream;
/** Establish JDBC connection of IoTDB */
initIoTDB();
}
private void initIoTDB() {
try {
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
connection =
DriverManager.getConnection(
Constant.IOTDB_CONNECTION_URL,
Constant.IOTDB_CONNECTION_USER,
Constant.IOTDB_CONNECTION_PASSWORD);
statement = connection.createStatement();
if (setStorageGroup) {
try {
statement.execute(String.format(createStorageGroupSqlTemplate, Constant.STORAGE_GROUP));
} catch (SQLException e) {
}
setStorageGroup = false;
}
if (createTimeSeries) {
for (String timeseries : Constant.ALL_TIMESERIES) {
statement.addBatch(String.format(createTimeseriesSqlTemplate, timeseries));
}
statement.executeBatch();
statement.clearBatch();
createTimeSeries = false;
}
} catch (ClassNotFoundException | SQLException e) {
logger.error(e.getMessage());
}
}
/** Write data to IoTDB */
private void writeData(String message) {
String[] items = message.split(",");
try {
String sql = String.format(insertDataSqlTemplate, items[0], items[1], items[2]);
statement.execute(sql);
} catch (SQLException e) {
logger.error(e.getMessage());
}
}
@Override
public void run() {
for (MessageAndMetadata<String, String> consumerIterator : stream) {
String uploadMessage = consumerIterator.message();
logger.info(
String.format(
"%s from partiton[%d]: %s",
Thread.currentThread().getName(), consumerIterator.partition(), uploadMessage));
writeData(uploadMessage);
}
}
}
......@@ -18,42 +18,40 @@
*/
package org.apache.iotdb.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/** The class is to show how to send data to kafka through multi-threads. */
public class KafkaProducer {
public class Producer {
private final Producer<String, String> producer;
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaProducer<String, String> producer;
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
public KafkaProducer() {
public Producer() {
Properties props = new Properties();
props.put("metadata.broker.list", "127.0.0.1:9092");
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "-1");
producer = new Producer<>(new ProducerConfig(props));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_SERVICE_URL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producer = new KafkaProducer<>(props);
}
public static void main(String[] args) {
KafkaProducer kafkaProducer = new KafkaProducer();
kafkaProducer.produce();
kafkaProducer.close();
Producer producer = new Producer();
producer.produce();
producer.close();
}
private void produce() {
for (int i = 0; i < Constant.ALL_DATA.length; i++) {
String key = Integer.toString(i);
producer.send(new KeyedMessage<>(Constant.TOPIC, key, Constant.ALL_DATA[i]));
producer.send(new ProducerRecord<>(Constant.TOPIC, key, Constant.ALL_DATA[i]));
logger.info(Constant.ALL_DATA[i]);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册