简介
Apache的Kafka™是一个分布式流平台(a distributed streaming platform)
有以下几个特性:(来自百度百科)
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
- 支持通过Kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
本文记录的是如何将kafka和SpringBoot整合使用,调用生产者Producer和消费者Consumer的API进行消息发送和消费,更加深入的点并未详细了解。
环境准备
代码整合之后,需要安装zookeeper作为注册中心,还有kafka服务。
我选择使用的是Centos 7服务器上进行环境配置的。
Zookeeper启动
首先去官网下载tar压缩包,选择合适的版本,然后进行解压
1 | [root@VM_234_23_centos app]# wget |
复制一份配置文件,文件名修改成zoo.cfg,由于是单机模式,所以使用默认的配置即可,不需要进行修改1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34[root@VM_234_23_centos zookeeper-3.4.10]# cp conf/zoo_sample.cfg conf/zoo.cfg
[root@VM_234_23_centos zookeeper-3.4.10]# cat conf/zoo.cfg
# The number of milliseconds of each tick
# 服务器与客户端之间交互的基本事件单元(ms)
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
# 初始化可以连接的客户端数量
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# 客户端与服务器之前请求和应答的时间间隔
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
# 客户端进行连接的端口号
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
启动zkServer服务1
2
3
4
5
6
7
8
9
10
11
12
13[root@VM_234_23_centos zookeeper-3.4.10]# bin/zk
zkCleanup.sh zkCli.cmd zkCli.sh zkEnv.cmd zkEnv.sh zkServer.cmd zkServer.sh
[root@VM_234_23_centos zookeeper-3.4.10]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/app/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@VM_234_23_centos zookeeper-3.4.10]# jps
1122 QuorumPeerMain ---- zk启动好的标志
1139 Jps
[root@VM_234_23_centos zookeeper-3.4.10]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/app/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: standalone --- 单机模式
Kafka安装
同样需要先去官网进行下载
1 | [root@VM_234_23_centos app]# wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz |
接着解压和修改配置文件,同样也是按照默认即可(有个spring进行连接的坑,最后讲)
1 | [root@VM_234_23_centos app]# tar -zxvf kafka_2.12-1.1.0.tgz |
启动kafka1
2
3
4
5
6
7
8
9
10[root@VM_234_23_centos kafka]# bin/kafka-server-start.sh config/server.properties
[2018-07-11 17:10:24,315] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-07-11 17:10:25,645] INFO starting (kafka.server.KafkaServer)
[2018-07-11 17:10:25,647] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2018-07-11 17:10:25,693] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
···
[root@VM_234_23_centos ~]# jps
10002 Jps
1122 QuorumPeerMain
9673 Kafka --- 成功启动的标志
还有一些比较常用的客户端命令,例如查询topic列表,更多可以参考这里
代码进行整合
首先加入依赖,maven与gradle都可以1
2// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.5.RELEASE'
在application.properties进行配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
发送消息&处理消息
发送消息需要使用到kafkaTemplate,还要指定发送的topic名称
1 | public class TestController { |
监听消息,进行处理,使用到@KafkaListener注解
1 |
|
发送消息测试
正常启动之后,发送消息后,能够看到消息处理的结果
举个🌰,示例代码的执行结果如下所示:
Springboot配置简易化了许多,简单调用API就能进行消息的发送和消息处理,当然Kafka还有更强的功能和作用,可以深入去了解一下
遇到的坑
个人环境
Mac
IDEA 2017.03
SpringBoot 2.0.3.RELEASE
JDK 1.8
Centos 7 安装了 zookeeper-3.4.10 kafka_2.12
启动提示java.lang.NoSuchMethodError: org.springframework.util.Assert.state
kafka版本过高,springboot版本低不支持
原本打算引用最新的kafka依赖2.1.7.RELEASE,后来启动时报错,然后发现2.1.7版本的kafka需要依赖高版本的spring(5.0.6版本)
Group / Artifact | Version | Updates |
---|---|---|
org.springframework » spring-messaging | 5.0.6.RELEASE | 5.0.7.RELEASE |
org.springframework » spring-context | 5.0.6.RELEASE | 5.0.7.RELEASE |
org.springframework » spring-tx | 5.0.6.RELEASE | 5.0.7.RELEASE |
········ | ······· | ······· |
但是SpringBoot-2.0.3版本的spring使用的是spring-context是4.3.14,导致无法启动,所以需要调整依赖的版本号(我选择了kafka降级)
1 | // https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka |
启动提示Connection to node 0 could not be established. Broker may not be available.
我是用Centos服务器搭建的kafka服务,然后使用本地环境进行连接kafka,消息订阅和发布失败,看到两篇博文,发现是解析服务器的hostname失败所导致的。
解决方法:修改kafka的server.properties配置文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21[root@VM_234_23_centos kafka]# vim config/server.properties
····
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# 找到该行,进行修改
advertised.listeners=PLAINTEXT://个人服务器的ip:9092
然后重新启动解决问题1
[root@VM_234_23_centos kafka]# bin/kafka-server-start.sh config/server.properties