消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

简介

Kafka 是一个分布式的流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用

主要功能体现于三点:

  • 存储系统:kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效的降低了消息丢失的风险。这得益于其消息持久化和多副本机制。也可以将 kafka 作为长期的存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题日志压缩功能。
  • 消息系统:kafka 与传统的消息中间件都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯性消费的功能。
  • 流式处理平台:kafka 为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架,比如窗口、连接、变换和聚合等各类操作。

    优势

  • 支持多语言,Java优先
  • 单机吞吐量:十万级
  • 消息延迟:毫秒级
  • 可用性非常高(分布式)
  • 消息理论上不会丢失
  • 消息理论上不会丢失
  • 支持事务

    概念

  • Producer:生产者,负责将消息发送到 Broker
  • Consumer:消费者,从 Broker 接收消息
  • Consumer Group:消费者组,由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:可以看做一个独立的 Kafka 服务节点或 Kafka 服务实例。如果一台服务器上只部署了一个 Kafka 实例,那么我们也可以将 Broker 看做一台 Kafka 服务器。
  • Topic:一个逻辑上的概念,包含很多 Partition,同一个 Topic 下的 Partiton 的消息内容是不相同的。
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
  • Replica:副本,同一分区的不同副本保存的是相同的消息,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • Leader:每个分区的多个副本中的”主副本”,生产者以及消费者只与 Leader 交互。
  • Follower:每个分区的多个副本中的”从副本”,负责实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,从 Follower 副本中重新选举新的 Leader 副本对外提供服务。

    原理

    体系构架

    一个典型的 kafka 体系架构包括若干 Producer、若干 Consumer、以及一个 Zookeeper 集群(在 2.8.0 版本中移除了 Zookeeper,通过 KRaft 进行自己的集群管理)
    Producer 将消息发送到 Broker,Broker 负责将受到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

    负载均衡

    Kafka 的负责均衡主要是通过分区来实现的
  • broker 端分配不均

当创建 topic 的时候可能会出现某些 broker 分配到的分区数多,而有些 broker 分配的分区少,这就导致了 leader 多副本不均

  • 生产者写入消息不均

生产者可能只对某些 broker 中的 leader 副本进行大量的写入操作,而对其他的 leader 副本不闻不问。

  • 消费者消费不均

消费者可能只对某些 broker 中的 leader 副本进行大量的拉取操作,而对其他的 leader 副本不闻不问

  • leader 副本切换不均

当主从副本切换或者分区副本进行了重分配后,可能会导致各个 broker 中的 leader 副本分配不均匀

消费方式

  • 点对点
  • 发布订阅
  • 顺序读写
  • Page Cache
  • 零拷贝
  • 分区分段 & 索引
  • 批量读写
  • 批量压缩

    部署

    安装

    需要先安装好 JDK,并且安装启动好 [[Zookeeper 入门]](不要用自带版本),点击下载 Kafka

    1
    2
    3
    4
    5
    cd /app/software
    #解压并编译安装
    tar -zxvf kafka_2.12-2.4.0.tgz -C /app/serivce
    mv /app/serivce/kafka_2.12-2.4.0 /app/serivce/kafka
    mkdir -pv /app/data/kafka

    配置

    1
    2
    3
    4
    5
    6
    7
    vi /app/serivce/kafka/config/server.properties

    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    log.dirs=/app/data/kafka
    num.partitions=1
    zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181

    集群

    通过 scp 命令复制 /app/serivce/kafka 到其他需要部署 ZooKeeper 的服务器上

    1
    scp -r /app/serivce/kafka root@xxx.xxx.xxx.xxx:/app/midware

    然后修改每个节点配置文件 server.properties 中的 broker.idlisteners 参数

    1
    2
    3
    4
    5
    6
    7
    #全局唯一标识,每个节点上需要设置不同的值
    broker.id=x
    #设置为当前节点的 IP 地址
    listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=1

    然后启动全部节点

    命令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    #启动 Kafka
    cd /app/serivce/kafka && bin/kafka-server-start.sh -daemon config/server.properties &
    #关闭 Kafka
    cd /app/serivce/kafka && bin/kafka-server-stop.sh
    #创建 topic
    ./bin/kafka-topics.sh --create --topic Demo --zookeeper xxx.xxx.xxx.xxx:2181 --replication-factor 1 --partitions 1
    #查看所有 topic 的分区
    ./bin/kafka-topics.sh --bootstrap-server xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092 --list
    #查看所有 topic 的分区情况
    ./bin/kafka-topics.sh --bootstrap-server xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092 --describe
    #查看 Demo 的分区情况
    ./bin/kafka-topics.sh --bootstrap-server xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092 --topic Demo --describe
    #查看 topic
    ./bin/kafka-topics.sh --list --zookeeper xxx.xxx.xxx.xxx:2181
    #生产者发送消息
    ./bin/kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxx:9092 --topic Demo
    #消费者接受消息
    ./bin/kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic Demo --from-beginning
    #删除 topic(delete.topic.enable=true)
    ./bin/kafka-topics.sh --delete --zookeeper xxx.xxx.xxx.xxx:2181 --topic Demo

    报错

    Invalid receive

    Invalid receive (size = -720899)
    Invalid receive (size = 369296128 larger than 104857600)
    socket.request.max.bytes = 419430400

  • https://www.cnblogs.com/Andrew-Zhou/p/15366574.html

  • https://www.kafkamagic.com/download/
  • https://akhq.io/
  • https://kouncil.io/
  • https://www.kafkatool.com/
  • http://www.redisant.cn/ka/