Kafka
什么是kafka
Kafka是由Apache软件基金会开发的 一个开源流处理平台,由Scala语言编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,2011年开源
kafka集群结构
broker:节点,集群有多大,broker就有多少,要是整数型且唯一
topic:主题,kafka面向主题存储,分门别类的存储数据
partition:分区,可以应对更多的数据,数据量允许超过单节点kafka的大小
leader:负责日常的读写操作
follower:负责备份leader的数据,如果leader,会从follower中选出新的leader,继续工作
消费者组:可以提高消费者性能,只要消费的组id是同一个,那么就是一个组的,一个组的消费者实例不允许多余分区的个数
kafka的特征和优势
Kafka作为消息队列,他和其他同类产品相比,突出的特点就是性能强大
kafka将消息保存在硬盘中
kafka对硬盘的读取规则进行优化后,效率能够接近内存
硬盘的优化规则主要依靠:顺序读写、零拷贝、日志压缩等技术
kafka的性能是恒定的,和数据的大小无关
kafka处理队列中的数据默认设置:
1 2
| kafka默认队列中的信息保存7天,可以配置这个时间 kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
|
kafka的安装和配置
必须将我们的kafka软件的解压位置设置在一个根目录,文件夹名称尽量短,然后路径不要有中文和空格(新建文件夹kafka , 在kafka 中存放解压后的kafka , 在D:\kafka 文件夹中创建 data)
还需创建一个data文件夹:用于保存kafka启动后,在运行过程中产生的数据
打开config目录下的server.properties文件,修改
打开config目录下的zookeeper.properties文件,修改
启动kafka
首先进入D:\kafka\kafka_2.13-2.4.1\bin\windows目录
打开dos窗口
启动zookeeper
1
| zookeeper-server-start.bat ..\..\config\zookeeper.properties
|
启动kafka
1
| kafka-server-start.bat ..\..\config\server.properties
|
演示kafka
启动的kafka和zookeeper的窗口不要关
我们 csmall-cart-webapi中测试
添加依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency>
|
配置
1 2 3 4 5 6
| spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: csmall
|
在SpringBoot启动类中添加启动Kafka的注解
1 2 3 4 5 6 7 8 9 10 11 12 13
| @EnableKafka
@EnableScheduling @SpringBootApplication @EnableDubbo public class CsmallCartWebapiApplication {
public static void main(String[] args) { SpringApplication.run(CsmallCartWebapiApplication.class, args); }
}
|
下面我们就可以实现周期性的向kafka发送消息并接收的操作了
编写消息的发送
cart-webapi包下创建kafka包
包中创建Producer类来发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.tedu.csmall.cart.webapi.kafka;
import cn.tedu.csmall.commons.pojo.cart.model.CartTbl; import com.google.gson.Gson; import org.apache.commons.lang.math.RandomUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
@Component public class Producer { @Autowired private KafkaTemplate<String,String> kafkaTemplate;
int i=1; @Scheduled(fixedRate = 10000) public void sendMessage(){ CartTbl cart = new CartTbl(); cart.setId(i++); cart.setCommodityCode("PC100"); cart.setPrice(RandomUtils.nextInt(300)+50); cart.setCount(RandomUtils.nextInt(10)+1); cart.setUserId("UU100"); Gson gson = new Gson(); String json = gson.toJson(cart); System.out.println("本次发送的消息为:"+json); kafkaTemplate.send("myCart",json); } }
|
创建一个叫Consumer的类来接收消息创建一个叫Consumer的类来接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component public class Consumer { @KafkaListener(topics = "myCart") public void received(ConsumerRecord<String,String> record){ String json = record.value(); Gson gson = new Gson(); CartTbl cart = gson.fromJson(json, CartTbl.class); System.out.println("接受到消息:"+cart);
} }
|