Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
caopu16
whatsmars
提交
a8dd07e4
W
whatsmars
项目概览
caopu16
/
whatsmars
与 Fork 源项目一致
Fork自
武汉红喜 / whatsmars
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
W
whatsmars
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a8dd07e4
编写于
11月 28, 2019
作者:
武汉红喜
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
whatsmars-mq-kafka-spring
上级
8b30d5be
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
124 addition
and
2 deletion
+124
-2
whatsmars-mq/pom.xml
whatsmars-mq/pom.xml
+1
-0
whatsmars-mq/whatsmars-mq-kafka-server/src/main/resources/server.properties
...mars-mq-kafka-server/src/main/resources/server.properties
+2
-2
whatsmars-mq/whatsmars-mq-kafka-spring/pom.xml
whatsmars-mq/whatsmars-mq-kafka-spring/pom.xml
+22
-0
whatsmars-mq/whatsmars-mq-kafka-spring/src/main/java/org/hongxi/whatsmars/kafka/spring/Main.java
...src/main/java/org/hongxi/whatsmars/kafka/spring/Main.java
+19
-0
whatsmars-mq/whatsmars-mq-kafka-spring/src/main/java/org/hongxi/whatsmars/kafka/spring/config/KafkaConfiguration.java
...gxi/whatsmars/kafka/spring/config/KafkaConfiguration.java
+67
-0
whatsmars-mq/whatsmars-mq-kafka-spring/src/main/java/org/hongxi/whatsmars/kafka/spring/consumer/Consumer.java
.../org/hongxi/whatsmars/kafka/spring/consumer/Consumer.java
+13
-0
未找到文件。
whatsmars-mq/pom.xml
浏览文件 @
a8dd07e4
...
...
@@ -27,6 +27,7 @@
<module>
whatsmars-mq-rocketmq-spring
</module>
<module>
whatsmars-otter-mq
</module>
<module>
whatsmars-mq-kafka-server
</module>
<module>
whatsmars-mq-kafka-spring
</module>
</modules>
<dependencyManagement>
...
...
whatsmars-mq/whatsmars-mq-kafka-server/src/main/resources/server.properties
浏览文件 @
a8dd07e4
...
...
@@ -28,12 +28,12 @@ broker.id=0
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://
:9092
listeners
=
PLAINTEXT://127.0.0.1
:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name
:9092
advertised.listeners
=
PLAINTEXT://127.0.0.1
:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
...
...
whatsmars-mq/whatsmars-mq-kafka-spring/pom.xml
0 → 100644
浏览文件 @
a8dd07e4
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
whatsmars-mq
</artifactId>
<groupId>
org.hongxi
</groupId>
<version>
Rocket.S7
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
whatsmars-mq-kafka-spring
</artifactId>
<dependencies>
<dependency>
<groupId>
org.springframework.kafka
</groupId>
<artifactId>
spring-kafka
</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
whatsmars-mq/whatsmars-mq-kafka-spring/src/main/java/org/hongxi/whatsmars/kafka/spring/Main.java
0 → 100644
浏览文件 @
a8dd07e4
package
org.hongxi.whatsmars.kafka.spring
;
import
org.hongxi.whatsmars.kafka.spring.config.KafkaConfiguration
;
import
org.springframework.context.annotation.AnnotationConfigApplicationContext
;
import
org.springframework.kafka.core.KafkaTemplate
;
public
class
Main
{
public
static
void
main
(
String
[]
args
)
{
AnnotationConfigApplicationContext
context
=
new
AnnotationConfigApplicationContext
();
context
.
register
(
KafkaConfiguration
.
class
);
context
.
scan
(
"org.hongxi.whatsmars.kafka.spring.consumer"
);
context
.
refresh
();
KafkaTemplate
<
String
,
String
>
kafkaTemplate
=
context
.
getBean
(
KafkaTemplate
.
class
);
kafkaTemplate
.
send
(
"kafkaTest"
,
"hello"
);
System
.
out
.
println
(
"send message: hello"
);
}
}
whatsmars-mq/whatsmars-mq-kafka-spring/src/main/java/org/hongxi/whatsmars/kafka/spring/config/KafkaConfiguration.java
0 → 100644
浏览文件 @
a8dd07e4
package
org.hongxi.whatsmars.kafka.spring.config
;
import
org.apache.kafka.clients.CommonClientConfigs
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.annotation.EnableKafka
;
import
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
;
import
org.springframework.kafka.config.KafkaListenerContainerFactory
;
import
org.springframework.kafka.core.*
;
import
org.springframework.kafka.listener.ConcurrentMessageListenerContainer
;
import
java.util.HashMap
;
import
java.util.Map
;
@Configuration
@EnableKafka
public
class
KafkaConfiguration
{
@Bean
public
KafkaTemplate
<?,
?>
kafkaTemplate
(
ProducerFactory
<?,
?>
kafkaProducerFactory
)
{
KafkaTemplate
<?,
?>
kafkaTemplate
=
new
KafkaTemplate
<>(
kafkaProducerFactory
);
return
kafkaTemplate
;
}
@Bean
public
ProducerFactory
<?,
?>
kafkaProducerFactory
()
{
Map
<
String
,
Object
>
props
=
new
HashMap
<>();
props
.
put
(
CommonClientConfigs
.
BOOTSTRAP_SERVERS_CONFIG
,
"127.0.0.1:9092"
);
props
.
put
(
ProducerConfig
.
CLIENT_ID_CONFIG
,
"DemoProducer"
);
props
.
put
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
props
.
put
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
DefaultKafkaProducerFactory
<?,
?>
factory
=
new
DefaultKafkaProducerFactory
<>(
props
);
return
factory
;
}
@Bean
public
KafkaListenerContainerFactory
<
ConcurrentMessageListenerContainer
<
Integer
,
String
>>
kafkaListenerContainerFactory
()
{
ConcurrentKafkaListenerContainerFactory
<
Integer
,
String
>
factory
=
new
ConcurrentKafkaListenerContainerFactory
<>();
factory
.
setConsumerFactory
(
consumerFactory
());
factory
.
setConcurrency
(
3
);
factory
.
getContainerProperties
().
setPollTimeout
(
3000
);
return
factory
;
}
@Bean
public
ConsumerFactory
<
Integer
,
String
>
consumerFactory
()
{
return
new
DefaultKafkaConsumerFactory
<>(
consumerConfigs
());
}
@Bean
public
Map
<
String
,
Object
>
consumerConfigs
()
{
Map
<
String
,
Object
>
props
=
new
HashMap
<>();
props
.
put
(
CommonClientConfigs
.
BOOTSTRAP_SERVERS_CONFIG
,
"127.0.0.1:9092"
);
props
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
"whatsmars-mq-kafka-spring"
);
props
.
put
(
ConsumerConfig
.
ENABLE_AUTO_COMMIT_CONFIG
,
"true"
);
props
.
put
(
ConsumerConfig
.
AUTO_COMMIT_INTERVAL_MS_CONFIG
,
"5000"
);
props
.
put
(
ConsumerConfig
.
FETCH_MIN_BYTES_CONFIG
,
512
*
1024
);
props
.
put
(
ConsumerConfig
.
AUTO_OFFSET_RESET_CONFIG
,
"latest"
);
props
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
.
getName
());
props
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
.
getName
());
return
props
;
}
}
whatsmars-mq/whatsmars-mq-kafka-spring/src/main/java/org/hongxi/whatsmars/kafka/spring/consumer/Consumer.java
0 → 100644
浏览文件 @
a8dd07e4
package
org.hongxi.whatsmars.kafka.spring.consumer
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.stereotype.Component
;
@Component
public
class
Consumer
{
@KafkaListener
(
topics
=
"kafkaTest"
)
public
void
onMessage
(
String
message
)
{
System
.
out
.
println
(
"receive message: "
+
message
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录