提交 f15e8e7b 编写于 作者: 武汉红喜's avatar 武汉红喜

spring-kafka

上级 2732a8ff
[**Kafka**](https://github.com/apache/kafka):<br>
# [Kafka](https://github.com/apache/kafka):
linux:<br>
启动zk ./zookeeper-server-start.sh ../config/zookeeper.properties<br>
启动server ./kafka-server-start.sh ../config/server.properties<br>
win:<br>
修改bin/windows/kafka-server-start.bat set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M<br>
server.properties port=9092<br>
......
......@@ -11,40 +11,27 @@
<artifactId>whatsmars-mq-kafka</artifactId>
<properties>
<kafka.version>0.8.0</kafka.version>
<scala.version>2.8.2</scala.version>
<metrics.version>2.2.0</metrics.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.hongxi.whatsmars.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* Created by shenhongxi on 2018/12/12.
*/
@Slf4j
@Component
public class Consumer {
@KafkaListener(topics = "kafkaTest")
public void onMessage(String message) {
log.info("receive message:{}", message);
}
}
package org.hongxi.whatsmars.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaTemplate;
/**
* Created by shenhongxi on 2018/12/12.
*/
@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {
@Autowired
private KafkaTemplate<Object, String> kafkaTemplate;
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
kafkaTemplate.send("kafkaTest", "hello");
}
}
package org.hongxi.whatsmars.kafka;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:2181");
//group 代表一个消费组
props.put("group.id", "jd-group");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("TestTopic", 1);
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
// Map<String, List<KafkaStream<Object, Object>>> consumerMap =
// consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
// KafkaStream<Object, Object> stream = consumerMap.get("TestTopic").get(0);
// ConsumerIterator<Object, Object> it = stream.iterator();
// while (it.hasNext()) {
// System.out.println(it.next().message());
// }
System.out.println("finished");
}
}
\ No newline at end of file
package org.hongxi.whatsmars.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "127.0.0.1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks","-1");
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
int messageNo = 100;
final int COUNT = 1000;
while (messageNo < COUNT) {
String key = String.valueOf(messageNo);
String data = "hello kafka message " + key;
producer.send(new KeyedMessage<String, String>("TestTopic", key ,data));
System.out.println(data);
messageNo ++;
}
}
}
\ No newline at end of file
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: testGroup
auto-offset-reset: earliest
\ No newline at end of file
package org.hongxi.whatsmars.kafka;
/**
* Created by shenhongxi on 2018/12/12.
*/
public class KafkaTests {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册