You need to sign in or sign up before continuing.
提交 a936b901 编写于 作者: 小傅哥's avatar 小傅哥

feat:kafka java简明教程系列

上级
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:
zookeeper:
image: zookeeper:3.9.0
container_name: zookeeper
restart: always
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
ZOOKEEPER_CLIENT_PORT: 2181
ALLOW_ANONYMOUS_LOGIN: yes
TZ: Asia/Shanghai
networks:
- my-network
kafka:
image: bitnami/kafka:3.7.0
container_name: kafka
volumes:
- /etc/localtime:/etc/localtime
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_MESSAGE_MAX_BYTES: "2000000"
KAFKA_ENABLE_KRAFT: no
JMX_PORT: 9999
TZ: Asia/Shanghai
depends_on:
- zookeeper
networks:
- my-network
kafka-eagle:
image: echo21bash/kafka-eagle:3.0.2
container_name: kafka-eagle
environment:
KAFKA_EAGLE_ZK_LIST: zookeeper:2181
volumes:
- ./kafka-eagle/system-config.properties:/opt/kafka-eagle/conf/system-config.properties
ports:
- "8048:8048"
depends_on:
- kafka
networks:
- my-network
networks:
my-network:
driver: bridge
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=zookeeper:2181
# cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
# cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.efak.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16
######################################
# EFAK webui port
######################################
efak.webui.port=8048
######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk
######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15
######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=
######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=
######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:13306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=123456
######################################
# kafka mysql jdbc driver address
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/opt/kafka-eagle/db/ke.db
efak.username=root
efak.password=root
\ No newline at end of file
<mxfile host="Electron" modified="2024-03-17T03:49:27.991Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/20.2.8 Chrome/102.0.5005.167 Electron/19.0.15 Safari/537.36" etag="6r6gsrXdD-ScC5Yy290K" version="20.2.8" type="device"><diagram id="CsHzwRoAxna5DpsTygfh" name="第 1 页">7T1bc6s4mr+Gqu6tSkoCAeIREnt2q+Zsn+re3el5JDZxPMcxGdvJSfrXr64YJIGxDYLE9unqYIFk4Ltf5Xh3z+9/26QvT9/yebZyXDB/d7x7x3Vh4GHyh4588JEwdPnAYrOc8yGwH/hj+VcmZsrR1+U824oxPrTL89Vu+VIdnOXrdTbbVcbSzSb/Wb3sMV/NKwMv6SKr3AYd+GOWrjLtsn8s57snPhqB0tX/mS0XT/KXA3nmOZUXi4HtUzrPf5aGvInj3W3yfMePnt/vshV9edX3Mq05W9zYJlvv2kxY3P25yfJ/bN/+O1z/EU7+fffy+283HhI3t/uQT5zNyQsQX9f5mvxJNvnrep7RdQD59q/X5xflgnyze8oX+Tpd/T3PX8g4pNdlu92HgGj6usvJ0NPueSXOZu/L3Z9iRXr8T3p8G/ri6/176dz9h/yy3m0+/pQr0C/lafT7fh77Jic+5uuduBXoiu93+SrfsGf2APuwWfOYYg0Znq3S7XY544PT5UreOH9h9C3VAkIMbfPXzSxrevuhwOh0s8h2DRciVCAMobQsf87Iw5GJm2yV7pZv1TtJBcoviuv2WEEOBGIcgyQ+X/gtXb2Kn4oXi02W7jJn4jkYOBhO1rvl7qP4+n+/GfHq7+kD4Q8VPEhXy8Wavm7y9jICjuQt2+yWhABjceJ5OZ+vGBZm2+Vf6QNbj8LqJV+ud+xZ/cTx74+Asngc8kPZu2PgIuJHykxhD9wGOtIBJJYHtxB6gvEJRujxb60hKNb+Th96v3BYWfPGq87PHx+3BK9U+Bf3dwZKhBpKfNdBvucbFNI/n5a77I+XlJHETyIuqoiworiRpLMfCzZNwk1wmO1uk/8oWDA8DGz+oeOEdEvjjz79V6xYOhOwTxN6aDReC3CMvQpkokAs8XMvRKDErqeSAAlAX0SMh+b08pizbLclpy++lKfZ4fTzdPtUYG+HbB+hlmyfU9lgbD8y0fhk6iR3Dr5zJshJJk5070ywk/hOFDgTnzL/yHUmoZPcO/GEHkSxg5EzCegFSUSvodOnbBaRFB67BjmxGT2/rsDAjfzjBtyCyEXnyYj+xQByG5nKnllM9qP9apN7DfKfpVOHeEyJrRTTWvIY0I7HiEfbMxjwWXkKm0oeK/0oXSBIq1ZPcSGsyEMIEK5aK8qECHqNE8gBv4du8dnTWN5b7s4JtrnTVf78kP8Ymks1oVv3XIrTd4NaCwBSVFC3E732xryqBY6GNAxwJpETuw5mAi6a0v/IQTxl4oxIMejE5PmCFXmRycOGHC12DBh8hAKrgjHBv19zeeJmy8AakwsgeHnn08R5udAvcqXtS7o2rjTjuEBX2SwefiHAJc8O5J9fjasWBpxYnLwsvn717jv9zSZj0XgXv5bG+XuUd3c1NOQEt8pYhzc0fN00HBheNXCpqPf1QKoDdwfAc6WzUAAP+a4GPOzrsPPC3oCHOwZe4fUEB2EFVJikm5k4h8BJ9NUj6JCin4S+p9OdH90aoAcDXw53Dz/dZtM1lr1WOvn9r2yT/0/+LV1/VOFWVdLbG9WFRo1wVaW+BWF4QK1m375nmyV5F1RnEoNljXlL9NudvPucPKUc61Or1pGgBGTfwFvl2JlKEAyrWhCEvkL3/AHEtAZtmqx0SzSM4iOxQi6MXaLJwaj4KOjJjQrtZ7rSuaQoOsOK7Mwt1eyVaksHFTQtBFUVTe2GGqCUIwcNxGBQp5NUXxoZ2Gq1fNlmh4VPun3hMcHH5TtFleOVuwKiivDZQ1rKtz28ztPngqhCmkSI6HIlMAmVvgDS7OIx+o2v1HiQGttG/sIaC9wSNer+kIumRiQl8mDU2ByvH5GorAvgdBW/GR9RB21pmiPVYDTtayh0RZBxIUgIBkWQYLQS/4oz9ThTI8ks4Yzu9HvmuYCWnEdttQUB1A50Aw+60o0jdXWgawfY4B6QmUDdQ+GEHI+rrn6IAqO2FFgT0rdEgbrnLy1iPF+WCBHyxkaE0jdfAkTG410XBQYfDQwG3ZFEDvOHf10UGDyghwPtgsHVwLDJXvLtcpdvvjJF6NoBkhx6MFB4VrWDvUZwVCZWJWq01+S/ml4f4pZaBR7UFgxPqA+54szgODOoLRjqDqax+yiVcqKv605oj0Lnxh1OShMNQqDqkaJQpi6yrc/wYKigcA+JomGzj+xr4XjUP4qXs8DTzWwgpPfPteBPQvoQ46NQXr0ee4EFhDdl9Ymc131+p0+LPiJW94EntL5jgh0ijKKYpc4Ch9w5rQS5c/A9qxaJS6mz5LYeahM87WZZNyF8rW1wcpI15yUNSdYhBKgCcfGL5+ZYB1XGeaOs0F+SdWjIMtQxZUDj8JF9FOOQjKcuAK4553B6CD/a246+LgUNTi3PEHfuz3aMRmsHfF5lTFr//StjZ8Ee6w7NEqdHtNQv0l0LI6BW+JDCzDVRKwDBJJ72Rq0G36dVasWGJLorxPY2gwtHBzHdTSrKaZO7MQIKAJwCYAYUIYjeBKHJn20XUKaMuOMqvaKaQq89wGvLyw7VQZFXvetPQ6byeitwRQO9kKgGb/rZRdZHBEBk+rBEF1fXm5ChJMPtK18PG8oJO0QXhaNf8eZUvPEDVSKEerRGCgk7mKM33ekOc3Sr/Io5nWGOp4sou5gTGDDHgvFm2RArxX8gcJ2Sc5M2zzjUwsFYa+Z0aNu1zdvDg2YN4Xq33vlcJqBdXQgroVxm4uDkymVOziaQzKLgMbq9YpfHmOqEOSznyzcj2lAY3giwUbxZZY8c1qBtJ4SDTQkeiiLxm1J/guV6uVuyStH6Jc/u0gBvydlWKhlvoBRxakBOHIoDKo4DJ/KZ2zygnZGICkBNgoC2V6ILYkZDRf+lNr/FZ02qK/PWTORrzGYFdDXML47kj96Jhk7qgmQdQtNT+AZLzZ18OithrZzI/5PyTXJvf8QOyNlA/ArvoYGndH06kjixD9/WsiWEa/5pTC8rremzzlFT+ZigYkGRWRF7cDJCX7JcuY7lNPbBIMMMr6ujl4nqrkT1OmNVYHhMG6fQhmAxOzV1cGi6WIdUsXJA24tRiFOUEw1YCOeJkwruqSIFJB/sBjnAY1NUSm/oQkYiuoCISmH2mxPa4Ywf0DvlCybVFmhYxrLIc4TtcFPBxH3PGI+/2Rba+f4Vt+ECos9KI8zKxHzPGrTxX79jVJ2wU/wgrPAg8jqTPV9od/OdkqId51jkaD6wjH2c2qYVoaZL0B4UgRd58460Ai3H0DN0oUDA4B+TfT66Vwz0QNG1UshKNCmS7bEPWRy8u8NQFkfU3OzhWl02TpzxBsUZU1OII7UZ94CmVcpiMbV7I5KrIrAeDGKpuxZlR3ZNalkmn3kzbA7aJCDyu7JVoackqyBDCYIpZBP1hT123F+XVRUn48eHOceg/q3IEK7jyjrmZgVkqrnmqbJJvTXqo98RPbpKmpeJHmV2nx16tFtTcCH02La9U1STbWiJHg1RLWoXImbUXyQ9Yt1kC009H3sjR0O4iHpKYuatC6TroPAhcCBJ50cSV7wBxN6nilPoxAHzA7IR6nEpeUoKr0KdB4dfHN+PBupq58noMSw1MOqheFA23apHEWiXZYejZdkXaLS1LWeIBu0zFJmSvE/lK0lxTREW4XylYCeFm5Uzj5h5KyFjQhfPTvxqwYhJA7TMTwxtarkDm0oPFiyIInmA5cFd6aAkT2jsbFqVQlOGAKXYE8E6GgnQkKSd8U/jY77R+KdS7E6EtoXfW0dE3d5gjnQee6P6DaoEzMgd0wdD0sMf0GBhgoSPnc5ClDzwRKzDT9GoBSq51keK648z8jG5ILp0jPtKGqAJ4YsdBq0gPJR5qyOUoJ9XEsLWVo+ooBpKFtIEJd2DqeLDBTXSDEN8Gw7c2RYCuw0xTqRJC9W3XZKkLFVsQZKDeiKKGy2R5DbbvC1nX7lflh4yDkNgokSrnWkgaG4ZMZLyws9GiUFrShzUR1/cqOKkT0KW++NTTdeiRmufKJHvFiQ4Kqq06/W5EKps672BYFD3TXGjFaosHPMsNY/Y4hdFlRhGI6BKk+8kpP6tiHneiU0UN7X4/vxwgRpcZOPbwYACDTXymm+pqwTc/3B4lqian1qKmok9dosYC/ebgf1mu5OIEnDE88Ax/Y8m64bM84Rp9AbDikuNzI1l/i11zbGcbeFcvac4x11Z3LUrfFpFynpxGzzTmyWKx6D6ExHNeE/Kfi/9DgOqE9AsW5ZDntSXj7ToKDP2kpKTetAcQUVe1UUWSdooF1ZDU6ks6o2GTF0LTOnEwr1beFxDSgAiw5rjYtkXzEdKWdgNmc9qmEFP9C5oAzP91JjeXHUF85vnrY54QnzCQt1082wo/Nc8uZo+YFApgVCpjr+EiXBt87x1nYzJLyaNzmFOtDGQRSBIZNjz4grhK+c3di9EPXarZKw8V32EhhCdtKpBKaO/+bHv2JvVOkUVkRx98/AKqBKRB09Wo6eKEFNA352wKaaMDxWnuEs9kYwt+ASe9MdH8ob79qRrHRiwYYNRNyx2NKwkmfemm8L6svpSbEbHBA0BrpLiIAqotYe+oQWHsfbQ6w36ptJ41VD94ptcmo3mwk42Gs0dmrF1u8jb2Rgz8qrRBMIPlB0r226MSVaqboxZXXfYfTEhPL/LqN5YszMPTTOyndqeuUMkhbB1psy5DZPZ1GMbf6poLPZBbdjEteHyftp+QmhoEMBTIxKueUOhVfMCCSpzJ1THoxqazzwSRbJCfVmFVN6+//aNaYlFKWNRsGnKCaq9B2b+Rr5IvKDivqGiAzgh3bLe3MwU0apQeheIqYRcyyVECb7/xm40ohkZCTdTmG6+V14b7s9oVbgsA5ZnswTCHOZlqIqmO5J2qbWaxcl9UAW706lQ6U96qvDov69pwXAMemltJfcZhUr1ONZcqFQtq7Vo3yg2TIDCAJvLZCHyiFJZ34+uA8UWR1XFdvBCJejaDYNeSJKQ2zoO6g4bcXHbNG76wklCkeKT9PzC+TBYjpDbHAMdUaP/Tkmm7Z53sM4QtEUyTW3CP0EVwkBAGzbfw9UjmC/ZZrvc7jJhSFuLXCo+1XQepsbs5AcU+AB1xOZkCwTJ5pBrYHN245fe+Xv2dOppGIrxeW3b9kE0rK7g6a6hkYBjDKywPRj9QbdXK260xArnaW6XB1rN3oBFmHdM3E83218uCwg4Gh4IzVuMDCyC3H522eqS6aHPIrtQcwuoK6S7g/TA4k161MYCaZZUfWKOsWWvVXsYh+dS80nBKwjDqgyhDaRKCKNP8JCaSHDkhAA3bHRnmA6gcbq18C1qri+zh/0dbXPapim7kaWG9gy41mTjgk5ivjrakX8VtEO4XW7C0QSokgeKmveN1Ca4whHTbwQZGXK1aGrfdN/ZNy4CVvteuCx/8U42tL13It3TZ3dTSEYCtcr36VFPVNNaUawPboGLqhh1I76eu/+jMqPHuCj6xO4li9xzIEZ4bh3uieyrhk/Wsy9lgisaO/bMvkzFiSxBJZlqSRll9sVa6dPkjpAlW0++Kvuq6Sa8Z18IV/WwT5DHgXS33CZ7ybfLXS7MopHtxzfHAISeY4hcxETNRh2lAmt+IyTbOgzmNUL1GTelRJn6LS6uCeCHoU5QS+0zHyKDv9C4s15vWeBIjyLGdklziOAhMS+qWaBm/7nVPTGhr1c/2ssrGxEojF50y6AwFNG1qYzR90iR2xQ1pBKKStZZ8X73OYvebMY4kHmXU6WAT9+cqNwcVKkTM26+0rre7otsONY3y4eyzLOh6AfKvT7s8HtZdzbC5MhP0oNUN6GdDg3OoHXjJ3RuzpjZQESyM0Kd461vf6/s2quaikpBBNuEKXLHYhhG/diFQTOHuQG3AQbdGoZyabO3zILZGBhcncem/8OG7TTV6mWxCdif7++T9W65o6/3v8iX34WpSgeGzfk36WY+/WfSzQL20dkg/zhKdXQHUg75igfKYM3CyDOVNvenwAVtiluvcs8k9+b5K+FTySqf/bAr+0K3regLzm1DamZ5XqS29WsbdTLEdSPV2rcvSJtTga51MKc1y4VtsZTj82BZI6GpmZKKD1+4DkZtlht4ensru1UwYbPtNer4XX0aT5O86JLw2osHPGyXanmjJcLL3qzXZIwp4qH3yw08oBGj3ZBHaNjTzBTySALWL4u1q6I783L7AQunGIbX2EcL+Lu+Cv9Ad/HabX8TXvdP64NJt95KIBy25DFsYyB+Ye0oUvYXDUKD1W5XOxpZm55utCMrRGcvyflMEOsJIQNoRlUKewCZlwUmDQhkGLDegF2Qm9xCtSA3Q5cMu/pPy5SPq/7TgSkKQDVnOAT6XoSW1R8972NCSfE7MeaW2ycCEhdUtw9gx/ttvIazZfov61NFoyFq6xuitv0Rq6GB6heWjaWmd261M6T8YqE6DNsLxJ6JG6bNuS81XoprfK4N8VLx+s4tWDGtaSFYik8IlopMIxoo3SwefiEvl/buk39+ZZABx0RUVR1BRFRr46aH854Ej69t33Z2IPh/t9nmW7bdpotswlXQxns9lOl0Df3WefWsh36xwbIfbit0AwDnfobnNHtysUnny6wCqiic8+bHGnCx++Ax4HYAN1fJ8XHlHkUluAWeVaC1ado2zo5hPXnrxBW9eQ1kImCLcEpNbcq5EhNXXcKa1Gwba9ejC9pSfYfasam/r5YePKTQeHx0zTsGz4OHwA/M3OjxcR71xXNMaUKWeY7JA3JZIYHemYzXmsn0k82qMpmTE3p0JmM9n0dmnqhMprrjymUzGaWyzqSQ2mUyorr2ymT6jH+Atlymmx0KDnKZAClRstO5jLZU71xGb7cy8r0wDnkF2+9IYzeRsHWovCesdZGGbNi1jGytdqBqtXlDUWnX2dZl+qaKxq3U4v1WcpVT5eK7alHgsJ7XRhqolcGnu2Sj5s4sN+AWYKUEtJsKlqLM3oIbNjI7nRScGFQ3m81dbNTNwuiBwd6om6V+V0FQVTer2Zzbsnp2pN/pqp4dL+jkdmAtBJ0dT1OIgwLzztfQTKv1Ljft7ik/Aiy1opG13ejMBcM2BI3aeK6+dPJiVZQUSwyWvBiNuu3ycRRrl+oELVnpNXkWjIsbLVHdLJ09ZcOqdTb2L1b32nChnr5mNXvRlTkcJVj8ns2XW7uwGLbd/AigcHHKshWG2DZo4oJz9eUzwW8qobpkNUQmYw2lhriguajpqoYcoiUrnV7PhLHuaprl68fl4gL1EFkzNJwENPW11XrW8pZqMfMEJlMHN4VYvp6aMjyQetgeakX99Uk6+7Fg05T3ONJeiyj0FOeVL7Nyyg1IXR040t3fA3AMvozDTesHhZUVXue7eHyw0ouGWnTovgBYybjpWCDlGeTSw+uCGDazH7czmjnvIF7eBWgglEdKeUNnQnWxXt41ygK+TiCH1LQuKb7KSrxnqs07XosnXzc5LUMozv2NEMDTt3xOY7+T/wc=</diagram></mxfile>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>xfg-dev-tech-app</module>
<module>xfg-dev-tech-trigger</module>
<module>xfg-dev-tech-domain</module>
<module>xfg-dev-tech-infrastructure</module>
<module>xfg-dev-tech-types</module>
</modules>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.28</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>xfg-dev-tech</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<profileActive>dev</profileActive>
</properties>
</profile>
<profile>
<id>test</id>
<properties>
<profileActive>test</profileActive>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<profileActive>prod</profileActive>
</properties>
</profile>
</profiles>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>xfg-dev-tech-app</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- -->
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-trigger</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-domain</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-infrastructure</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<finalName>xfg-dev-tech-app</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/**</include>
</includes>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/**</include>
</includes>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.6</version>
<configuration>
<skipTests>true</skipTests>
<testFailureIgnore>false</testFailureIgnore>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>cn.bugstack.xfg.dev.tech.Application</mainClass>
<layout>JAR</layout>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package cn.bugstack.xfg.dev.tech;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@Slf4j
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
package cn.bugstack.xfg.dev.tech.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
//@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory(Map<String, Object> producerConfigs) {
return new DefaultKafkaProducerFactory<>(producerConfigs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
server:
port: 8091
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
# 配置主题
kafka:
topic:
group: xfg-group
user: xfg-topic
logging:
level:
root: info
config: classpath:logback-spring.xml
\ No newline at end of file
server:
port: 8091
tomcat:
max-connections: 20
threads:
max: 20
min-spare: 10
accept-count: 10
logging:
level:
root: info
config: classpath:logback-spring.xml
\ No newline at end of file
server:
port: 8091
tomcat:
max-connections: 20
threads:
max: 20
min-spare: 10
accept-count: 10
logging:
level:
root: info
config: classpath:logback-spring.xml
\ No newline at end of file
spring:
config:
name: xfg-dev-tech
profiles:
active: dev
<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<configuration scan="true" scanPeriod="10 seconds">
<contextName>logback</contextName>
<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
<springProperty scope="context" name="log.path" source="logging.path"/>
<!-- 日志格式 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 输出到控制台 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!-- 此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c{0}%X{ServiceId} -%X{trace-id} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--输出到文件-->
<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>./data/log/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c{0}%X{ServiceId} -%X{trace-id} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>./data/log/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
</appender>
<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>./data/log/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c{0}%X{ServiceId} -%X{trace-id} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>./data/log/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!-- 日志文件保留天数【根据服务器预留,可自行调整】 -->
<maxHistory>7</maxHistory>
<totalSizeCap>5GB</totalSizeCap>
</rollingPolicy>
<!-- WARN 级别及以上 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>
<!-- 异步输出 -->
<appender name="ASYNC_FILE_INFO" class="ch.qos.logback.classic.AsyncAppender">
<!-- 队列剩余容量小于discardingThreshold,则会丢弃TRACT、DEBUG、INFO级别的日志;默认值-1,为queueSize的20%;0不丢失日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>8192</queueSize>
<!-- neverBlock:true 会丢失日志,但业务性能不受影响 -->
<neverBlock>true</neverBlock>
<!--是否提取调用者数据-->
<includeCallerData>false</includeCallerData>
<appender-ref ref="INFO_FILE"/>
</appender>
<appender name="ASYNC_FILE_ERROR" class="ch.qos.logback.classic.AsyncAppender">
<!-- 队列剩余容量小于discardingThreshold,则会丢弃TRACT、DEBUG、INFO级别的日志;默认值-1,为queueSize的20%;0不丢失日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>1024</queueSize>
<!-- neverBlock:true 会丢失日志,但业务性能不受影响 -->
<neverBlock>true</neverBlock>
<!--是否提取调用者数据-->
<includeCallerData>false</includeCallerData>
<appender-ref ref="ERROR_FILE"/>
</appender>
<!-- 开发环境:控制台打印 -->
<springProfile name="dev">
<logger name="com.nmys.view" level="debug"/>
</springProfile>
<root level="info">
<appender-ref ref="CONSOLE"/>
<!-- 异步日志-INFO -->
<appender-ref ref="ASYNC_FILE_INFO"/>
<!-- 异步日志-ERROR -->
<appender-ref ref="ASYNC_FILE_ERROR"/>
</root>
</configuration>
\ No newline at end of file
package cn.bugstack.xfg.dev.tech.test;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiTest {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
private String topicName = "test-001";
@Test
public void test_send() throws InterruptedException {
for (int i = 0; i < 100; i++) {
kafkaTemplate.send(topicName, "hello world");
Thread.sleep(1500);
}
new CountDownLatch(1).await();
}
}
package cn.bugstack.xfg.dev.tech.test.domain;
import cn.bugstack.xfg.dev.tech.domain.model.entity.UserEntity;
import cn.bugstack.xfg.dev.tech.domain.model.valobj.UserTypeVO;
import cn.bugstack.xfg.dev.tech.domain.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {
@Resource
private IUserService userService;
@Test
public void test_register() throws InterruptedException {
while (true) {
UserEntity userEntity = new UserEntity();
userEntity.setUserId("10001");
userEntity.setUserName("小傅哥");
userEntity.setUserTypeVO(UserTypeVO.T8);
userService.register(userEntity);
Thread.sleep(1500);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>xfg-dev-tech-domain</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-types</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<finalName>xfg-dev-tech-domain</finalName>
</build>
</project>
\ No newline at end of file
package cn.bugstack.xfg.dev.tech.domain.event;
import cn.bugstack.xfg.dev.tech.types.BaseEvent;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Value;
import java.util.Date;
/**
* 用户推送实践消息的聚合
*/
public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {
@Value("${kafka.topic.user}")
private String topic;
@Override
public EventMessage<UserMessage> buildEventMessage(UserMessage data) {
return EventMessage.<UserMessage>builder()
.id(RandomStringUtils.randomNumeric(11))
.timestamp(new Date())
.data(data)
.build();
}
@Override
public String topic() {
return topic;
}
/**
* 要推送的事件消息,聚合到当前类下。
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class UserMessage {
private String userId;
private String userName;
private String userType;
}
}
package cn.bugstack.xfg.dev.tech.domain.model.entity;
import cn.bugstack.xfg.dev.tech.domain.model.valobj.UserTypeVO;
import lombok.Data;
@Data
public class UserEntity {
private String userId;
private String userName;
private UserTypeVO userTypeVO;
}
package cn.bugstack.xfg.dev.tech.domain.model.valobj;
public enum UserTypeVO {
T1("T-1", "初级工程师"),
T2("T-2", "初级工程师"),
T3("T-3", "中级工程师"),
T4("T-4", "中级工程师"),
T5("T-5", "高级工程师"),
T6("T-6", "高级工程师"),
T7("T-7", "架构师"),
T8("T-8", "架构师");
private final String code;
private final String desc;
UserTypeVO(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
package cn.bugstack.xfg.dev.tech.domain.repository;
import cn.bugstack.xfg.dev.tech.domain.model.entity.UserEntity;
public interface IUserRepository {
void doSaveUser(UserEntity userEntity);
}
package cn.bugstack.xfg.dev.tech.domain.service;
import cn.bugstack.xfg.dev.tech.domain.model.entity.UserEntity;
public interface IUserService {
void register(UserEntity userEntity);
}
package cn.bugstack.xfg.dev.tech.domain.service;
import cn.bugstack.xfg.dev.tech.domain.event.UserMessageEvent;
import cn.bugstack.xfg.dev.tech.domain.model.entity.UserEntity;
import cn.bugstack.xfg.dev.tech.domain.repository.IUserRepository;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class UserService implements IUserService {
@Resource
private IUserRepository userRepository;
@Override
public void register(UserEntity userEntity) {
// 省略业务逻辑
userRepository.doSaveUser(userEntity);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>xfg-dev-tech-infrastructure</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-domain</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-types</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<finalName>xfg-dev-tech-infrastructure</finalName>
</build>
</project>
\ No newline at end of file
package cn.bugstack.xfg.dev.tech.infrastructure.event;
import cn.bugstack.xfg.dev.tech.types.BaseEvent;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class EventPublisher {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
try {
String messageJson = JSON.toJSONString(eventMessage);
kafkaTemplate.send(topic, messageJson);
log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
throw e;
}
}
}
package cn.bugstack.xfg.dev.tech.infrastructure.repository;
import cn.bugstack.xfg.dev.tech.domain.event.UserMessageEvent;
import cn.bugstack.xfg.dev.tech.domain.model.entity.UserEntity;
import cn.bugstack.xfg.dev.tech.domain.repository.IUserRepository;
import cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {
@Resource
private EventPublisher publisher;
@Override
public void doSaveUser(UserEntity userEntity) {
// 推送消息
publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder()
.userId(userEntity.getUserId())
.userName(userEntity.getUserName())
.userType(userEntity.getUserTypeVO().getDesc())
.build()));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>xfg-dev-tech-trigger</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<finalName>xfg-dev-tech-trigger</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<compilerVersion>${java.version}</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package cn.bugstack.xfg.dev.tech.trigger.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Slf4j
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
try {
// 逻辑处理
// 确认消息消费完成,如果抛异常消息会进入重试
ack.acknowledge();
log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);
} catch (Exception e) {
e.printStackTrace();
log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>xfg-dev-tech-types</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
</dependencies>
<build>
<finalName>xfg-dev-tech-types</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package cn.bugstack.xfg.dev.tech.types;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
public abstract class BaseEvent<T> {
public abstract EventMessage<T> buildEventMessage(T data);
public abstract String topic();
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class EventMessage<T> {
private String id;
private Date timestamp;
private T data;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册