Loading... # 消息中间件之Apache Kafka(四) ## 数据同步机制 Kafka的Topic被分为多个分区,分区是按照Segments存储的文件块。分区日志是存储在磁盘上的日志序列,Kafka可以保证分区里的事件是有序的。其中Leader负责对应分区的读写,Follower负责同步分区的数据,0.11版本之前,Kafka使用HighWatermarker机制保证数据的同步,但是基于HighWatermarker的同步数据,可能会导致数据不一致或者乱序。Kafka数据同步有以下概念: - LEO: Log End Offset,标识的是每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO。 - HW: High Watermarker,高水位线,所有HW之前的数据(被水位线覆盖的)都可以理解为是已经备份的,当所有节点都备份成功,Leader会更新水位线。 - ISR: in-sync-replicas,Kafka的Leader会维护一份处于同步的副本集合,如果在`replica.lag.time.max.ms`时间内,系统没有发送fatch请求,或者发送了请求,但是在该限定时间内,没有赶上Leader的数据(无法追上Leader的水位线),就会被剔除出ISR列表。在Kafka-0.9.0版本剔除`replica.lag.max.messages`消息个数限定,因为这个会导致其它的Broker节点频繁的加入和退出ISR。 ### 高水位数据丢失 ![高水位 数据丢失.jpg][1] 只有被水位线覆盖的信息是有效的,如果Follower去Leader中同步数据,数据已经同步到本地,但是还没等到Leader的确认(更新水位线),就宕机了。服务重新启动之后,会根据自己本地记录的水位线,将水位线之外的数据截断丢弃,然后再次尝试去Leader中同步数据,这时,如果Leader宕机了,该Follower变成了Leader,没同步的数据就丢失了。 ### 高水位数据不一致 ![高水位 数据不一致.jpg][2] 在Follower同步之前,Follower和Leader同时宕机。Follower比Leader先处理好故障启动了,该Follower就会晋升成Leader,这时,如果它收到一个写入请求,就会将数据保存在本地日志,并更新水位线,然后原来的Leader启动了,它降级成了Follower,需要去Leader中同步数据,但是它发现Leader中保存的水位线与自己的相同,就不同步了,但是数据是不一致的。 ### Leader Epoch 可以看出0.11版本之前Kafka的副本备份机制的设计存在问题。依赖HW的概念实现数据同步,存在数据不一致和丢失数据的问题,因此Kafka-0.11版本引入了Leader Epoch的概念,任意一个Leader持有一个LeaderEpoch。该LeaderEpoch是一个由Controller管理的32位数,存储在Zookeeper的分区状态信息中,并作为LeaderAndISRRequest的一部分传递给每一个新的Leader。Leader接受Producer请求数据时,使用LeaderEpoch标记每个Message。然后,该LeaderEpoch编号将通过复制协议传播,并用于替换HW标记,作为截断的参考点。 ![leaderepoch.png][3] 改进消息格式,以便每个消息集都带有一个4字节的Leader Epoch号。在每个日志记录中,会创建一个新的Leader Epoch Sequence文件,在其中存储Leader Epoch的序列和在该Epoch中生成的消息的Start Offset。它也缓存在每个副本中,也缓存在内存中。 #### Follower变成Leader 当Follower成为Leader时,它首先将新的Leader Epoch和副本的LEO添加到Leader Epoch Sequence序列文件的末尾,并刷新数据。该Leader产生的每个新消息集都带有新的“Leader Epoch”标记。 #### Leader变成Follower 如果需要从本地的Leader Epoch Sequence加载数据,将数据加载到内存,给相应的分区的Leader发送epoch请求,该请求包含最新的EpochID,Start Offset信息,Leader接收到信息以后,返回该EpochID所对应的Last Offset信息。该信息可能是最新EpochID的Start Offset或者是当前EpochID的Log End Offset信息。 情形1: Follower的Offset比Leader小 ![01.png][4] 如图,左图的Follower的Offset小于Leader的,同时Leader Epoch序列号也小于Leader,当Followr上线时,加载自己本地的Leader Epoch Sequence信息,发现是[0,0],它就会给Leader发送[0,0],Leader发现自己的Leader Epoch序列号已经到2了,但是它会先返回比Follower序列号大1的,所以它返回[1,100],Follower收到请求后,就会先同步0-99的数据。 右图Follower虽然Offset小于Leader,但是它们的Leader Epoch序列号相同,Follower请求[0,0],Leader看到自己也是0,就会返回到LEO的数据,(Leader Epoch存的是Start Offset,也就是起始位置),就是当前消息的位置+1,比如当前LEO是100,它就会返回[0,100],Follower收到之后就会同步0-99的数据。 情形2: Follower的Leader Epoch的信息Start Offset比Leader返回的LastOffset要大,Follower会去重置自己的Leader Epoch文件,将Offset修改为Leader的LastOffset信息,并且截断自己的日志信息. ![02.png][5] 如图,左图,Follower发送[1,100],Leader会返回比它Leader Epoch序列号大1的版本,也就是[2,90],Follower发现自己的本地的Start Offset比Leader返回的LastOffset还要大,所以,它会截断自己的日志,改为[1,90]。 右图,Follower发送[1,100],由于Leader的Leader Epoch序列号也是1,所以会返回到LEO的数据[1,90].Follower发现自己本地的Offset大于Leader的LastOffset,截断本地日志,改为[1,90] ![高水位 数据丢失.jpg][1] 再看该图,如果用Leader Epoch,最开始BrokerA和BrokerB都是[0,0],BorkerA宕机之后加载本地Leader Epoch依然是[0,0],在它去BokerB同步数据之前,BrokerB宕机了,BrokerA成为Leader,BrokerB去请求同步数据,发送[0,0],BrokerA会返回[0,2],BrokerB此时的LEO就是2而且Leader Epoch的版本没有变化,不需要同步,并不会根据HW截断,数据不会丢失。 ![高水位 数据不一致.jpg][2] 该图中,起始都是[0,0],同时宕机,BrokerA先启动,晋升成Leader,收到一条新数据,添加[1,1],BrokerB启动,给BorkerA发送[0,0],BrokerA返回[1,1],BrokerB发现自己的LEO是2,但是Leader Epoch的版本已经变了,新版本的Start Offset是1,于是将自己在1的位置截断,从BrokerA将1的数据同步过来,数据此时就一致了。 ## Kafka-Flume集成 Flume 是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也提供数据写到各种数据接受方(可定制)的能力,用于转发数据。Flume的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具。 官网: > http://flume.apache.org/ 下载Flume并解压,在conf文件夹中,新建kafka.properties文件,写入: ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro # 流类型 a1.sources.r1.bind = kafka01 # 主机名 a1.sources.r1.port = 4141 # 端口 a1.channels.c1.type = memory # 内存通道 a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic01 # 写入的Topic a1.sinks.k1.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092 # Kafka集群地址 a1.sinks.k1.kafka.flumeBatchSize = 20 # 写入缓冲区大小 a1.sinks.k1.kafka.producer.acks = -1 # 应答级别 a1.sinks.k1.kafka.producer.linger.ms = 1 # 缓冲区等待时间 a1.sinks.k1.kafka.producer.compression.type = snappy # 压缩格式 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` 进入bin目录,执行 ``` ./flume-ng agent -c ../conf/ -n a1 -f ../conf/kafka.properties -Dflume.root.logger=INFO,console ``` 注意a1为kafka.properties中配置得到名称。 发送日志文件 ``` ./flume-ng avro-client --host Kafka01 --port 4141 --filename ~/install.log ``` 将家目录中的install.log文件发送到Kafka的topic01中,在消费者中就可以拿到数据了。 ## Kafka-Eagle安装 这是一个监视系统,监视Kafka集群以及提供可视化界面。 ### Github > https://github.com/smartloli/kafka-eagle ### 下载地址 > http://download.kafka-eagle.org/ ### 安装 #### 解压 ``` tar -zxf kafka-eagle-bin-2.0.0.tar.gz cd kafka-eagle-bin-2.0.0 tar -zxf kafka-eagle-web-2.0.0-bin.tar.gz ``` #### 配置环境变量 vi /etc/profile ``` export KE_HOME=/usr/local/kafka-eagle export PATH=$PATH:$KE_HOME/bin ``` #### 修改配置 vi conf/system-config.properties ``` kafka.eagle.zk.cluster.alias=cluster1 # 集群名,多个用,分隔 cluster1.zk.list=kafka01:2181,kafka02:2181,kafka03:2181 # zookeeper集群地址 kafka.eagle.webui.port=8048 # webUI的端口 cluster1.kafka.eagle.offset.storage=kafka # cluster2.kafka.eagle.offset.storage=zk kafka.eagle.metrics.charts=true # 是否开启JMX 需要配置JMX端口 kafka.eagle.topic.token=keadmin # 删除Topic时需要输入 #### 切换db为mysql Kafka-eagle会自动创建数据库和表 # kafka.eagle.driver=org.sqlite.JDBC # kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db # kafka.eagle.username=root # kafka.eagle.password=www.kafka-eagle.org kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456 ``` ### 启动 ``` ke.sh start ``` 如果开启了JMX需要修改Kafka启动文件 vi kafka-server-start.sh ``` if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="7788" fi ``` 重启Kafka [1]: https://www.princelei.club/usr/uploads/2020/07/4138498744.jpg [2]: https://www.princelei.club/usr/uploads/2020/07/2727696609.jpg [3]: https://www.princelei.club/usr/uploads/2020/07/3417630766.png [4]: https://www.princelei.club/usr/uploads/2020/07/1306666048.png [5]: https://www.princelei.club/usr/uploads/2020/07/3634865093.png Last modification:July 15th, 2020 at 07:48 pm © 允许规范转载
哈哈哈,写的太好了https://www.lawjida.com/