Skip to content

  • 体验新版
    • 正在加载...
  • 登录
  • KnowledgePlanet
  • docdoc
  • Issue
  • #34

doc
doc
  • 项目概览

KnowledgePlanet / doc

通知 1303
Star 822
Fork 117
  • 代码
    • 文件
    • 提交
    • 分支
    • Tags
    • 贡献者
    • 分支图
    • Diff
  • Issue 42
    • 列表
    • 看板
    • 标记
    • 里程碑
  • 合并请求 0
  • DevOps
    • 流水线
    • 流水线任务
    • 计划
  • Wiki 2
    • Wiki
  • 分析
    • 仓库
    • DevOps
  • 项目成员
  • Pages
doc
doc
  • 项目概览
    • 项目概览
    • 详情
    • 发布
  • 仓库
    • 仓库
    • 文件
    • 提交
    • 分支
    • 标签
    • 贡献者
    • 分支图
    • 比较
  • Issue 42
    • Issue 42
    • 列表
    • 看板
    • 标记
    • 里程碑
  • 合并请求 0
    • 合并请求 0
  • Pages
  • DevOps
    • DevOps
    • 流水线
    • 流水线任务
    • 计划
  • 分析
    • 分析
    • 仓库分析
    • DevOps
  • Wiki 2
    • Wiki
  • 成员
    • 成员
  • 收起侧边栏
  • 动态
  • 分支图
  • 创建新Issue
  • 流水线任务
  • 提交
  • Issue看板
已关闭
开放中
Opened 2月 29, 2024 by 小傅哥@Yao__Shun__Yu⛹Owner

kafka生产消费回调案例

Apache Kafka是一个分布式流处理平台,它允许你发布和订阅数据流。以下是使用Java编写的Kafka生产者和消费者的简单示例,包括生产者的回调。

首先,确保你已经添加了Kafka客户端依赖到你的项目中。如果你使用的是Maven,可以在pom.xml文件中添加以下依赖:

<dependencies>
    <!-- Kafka client -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.1.0</version> <!-- 请使用最新的稳定版本 -->
    </dependency>
</dependencies>

以下是生产者的代码示例:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 设置Kafka生产者的属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println("The offset of the record we just sent is: " + metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();
    }
}

以下是消费者的代码示例:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 设置Kafka消费者的属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest");

        // 创建Kafka消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 轮询消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

请注意,你需要根据你的Kafka集群配置来修改bootstrap.servers、group.id和主题名称(在这个例子中是test-topic)。此外,确保你的Kafka集群正在运行,并且你有权限访问它。

在实际部署中,你可能需要处理更多的异常情况,并且可能需要根据你的业务逻辑来调整生产者和消费者的配置。这些代码仅作为一个简单的入门示例。

指派人
分配到
无
里程碑
无
分配里程碑
工时统计
无
截止日期
无
标识: KnowledgePlanet/doc#34
渝ICP备2023009037号

京公网安备11010502055752号

网络110报警服务 Powered by GitLab CE v13.7
开源知识
Git 入门 Pro Git 电子书 在线学 Git
Markdown 基础入门 IT 技术知识开源图谱
帮助
使用手册 反馈建议 博客
《GitCode 隐私声明》 《GitCode 服务条款》 关于GitCode
Powered by GitLab CE v13.7