基础
什么是消息队列
消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列。
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。
一般包含以下三个角色:
Producer:消息生产者,负责产生和发送消息到 Broker;
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
为什么要用MQ?
从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。从以前的单体架构到现在的微服务架构,成百上千的服务,之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥 10 亿日活的微信我们需要有一个「东西」来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。消息队列就应运而生了。
它常用来实现:异步处理、服务解耦、流量控制
消息队列的典型应用场景:
订单系统:在电商系统中,订单的创建、支付、发货等步骤可以通过消息队列进行异步处理和解耦
日志处理:使用消息队列将日志从应用系统传输到日志处理系统,实现实时分析和监控。
任务调度:在批量任务处理、任务调度系统中,通过消息队列将任务分发给多个工作节点,进行并行处理
数据同步:在数据同步系统中,消息队列可以用于将变更的数据异步同步到不同的存储系统或服务。
消息队列有什么优点和缺点?
核心优点
解耦:生产者和消费者无需同时在线,生产者可以发送消息后立即返回,而消费者在合适的时机处理消息.
削峰填谷:在高并发场景下,消息队列可以暂存大量请求,平滑高峰流量,避免系统过载。
异步处理:可以将不需要立即处理的任务放入消息队列中异步执行,减少用户请求的响应时间。
缺点
系统可用性降低:系统引入的外部依赖越多,越容易挂掉。
系统复杂度提高了
一致性问题:消息传递给多个系统,部分执行成功,部分执行失败,容易导致数据不一致
说一下消息队列的模型有哪些?
常见的消息队列模型主要有以下两种
队列模型(也称点对点模型)
在队列模型中,消息从生产者发送到队列,并且每条消息只能被一个消费者消费一次。消费之后,消息在队列中被删除。
适用于任务处理类场景,如一个任务只需要一个处理者执行。
发布/订阅模型(Publish/Subscribe)
在发布/订阅模型中,生产者将消息发布到某个主题(Topic),所有订阅了该主题的消费者都会接收到该消息。
每个订阅者都会接收到相同的消息,适用于广播通知、实时推送等场景。
消息队列如何保证消息不丢失?
这需要生产消息、存储消息和消费消息三个阶段共同努力才能保证消息不丢失。
生产者的消息确认:生产者在发送消息时,需要通过消息确认机制来确保消息成功到达。
存储消息:broker 收到消息后,需要将消息持久化到磁盘上,避免消息因内存丢失。即使消息队列服务器重启或宕机,也可以从磁盘中恢复消息。
消费者的消息确认:消费者在处理完消息后,再向消息队列发送确认(ACK),如果消费者未发送确认,消息队列需要重新投递该消息。
除此之外,如果消费者持续消费失败,消息队列可以自动进行重试或将消息发送到死信队列(DLQ)或通过日志等其他手段记录异常的消息,避免因一时的异常导致消息丢失。
consumer消费到重复消息怎么办?
消息消费模式
消息消费一般存在三种模式:最多一次,最少一次和有且仅有一次。
最多一次:这种可靠性最低,不管消费是否成功,投递一次就算完了。这种类型一般用在可靠性不高的场景中,比如我们一个对日志分析展示的场景,如果这种日志分析出现一定的缺失对业务也影响不大,那我们可以使用这种方式,这种方式性能最高(QMQ的非可靠消息)。
最少一次:基本上所有追求可靠性的消息队列都会采用这种模式。因为网络是不可靠的,要在不可靠的网络基础上构建可靠的业务,就必须使用重试来实现,那么重试就有可能引入重复的消息。
有且仅有一次:这是人们最期望的方式。也就是我如果真正的处理失败了(业务失败)你才给我重发,如果仅仅是因为网络等原因导致的超时不能给我重发消息。但是这种仅仅靠消息队列自身是很难保证的。不过借助一些其他手段,是能达到有且仅有一次的『效果』。
通过上面的描述,我们知道有且仅有一次的消息投递模式是很难达到的,那如果我们需要消息的可靠性,就必须接受重复消息这个事实。那么对于重复消息到底该怎么办呢?下面会列出一些场景和解决方案:
处理方式
不处理:不是说所有的重复消息都可以不处理的,但是是有场景是可以的。比如我们有一个缓存,数据库更新之后发送一条消息去将缓存删掉,等下一次数据访问的时候缓存没有命中,从数据库重新加载新数据并更新缓存。这里的消息就是删除缓存的消息,那么删除缓存这个消息就是可以接受重复消息的,这种重复消息对业务几乎没有影响(影响也是有,可能会稍微降低缓存命中率)。我们权衡一下处理重复消息的成本和这个对业务的影响,那么不处理就是个最佳方案了。可能有同学说,降低缓存命中率也不行啊,还是得解决。那么我们看这个重复消息会降低多少命中率呢?那就得看重复消息多不多呢?重复消息一般是网络不稳定导致的,这在内网里这种情况其实并不常见,所以我觉得是可以接受的。
业务处理:有很多业务逻辑本来就是幂等的。
即使我们不使用消息,也要尽量将我们的接口设计为幂等的。比如我们有一个创建新订单的消息,接到消息后会向数据库保存新订单。那么如果我们接到了重复订单(订单号相同),这样的订单肯定是不能保存的,但是这里切记一点,虽然最终我们不会保存两个一样的订单,但是收到重复订单的时候你就回复成功就可以了,不要抛出异常,因为抛出异常一般会认为是消息消费失败,又会重发。这在早期我们很多同学犯这个错误,就直接将DuplicateKeyException异常抛出了(其实对于接口幂等设计时也是一样,第二次重复调用的时候你返回成功的响应就行了,如果要告诉人家是重复的也在另外的字段告诉,而不是标识成功或失败响应的地方标识,这样会让请求方的处理代码更舒服些)。
去重表:如果不能接受重复消息,但是业务逻辑自身又没办法处理重复消息该怎么办呢?那就得借助额外的手段了。也就是引入一个去重表,我们在从消息里提取一个或多个字段作为我们去重表的唯一索引,在消息处理成功的时候我们在去重表记录,如果又接到重复的消息先查去重表,如果已经成功消费过这个消息则直接返回成功就行了。而且我们还可以根据我们对去重这个事情要求的可靠等级选择将去重表建在不同的位置:数据库或redis等。放在redis里那么去重的可靠性就是redis的可靠性,一般达到99.9%应该是没有问题的,而且放在redis里我们可以设置一个过期时间,因为重复消息这个东西一般会在一个短期时间区间内发生,比如很少几个小时后甚至是几分钟后还出现重复消息。那么如果我们对可靠性要求更高则可以将去重表放在数据库里,但使用数据库成本也更高,而且数据库一般没有自动过期机制,所以可能还需要一个自动的『垃圾回收』处理机制,将多久之前的去重表里的数据删除掉。
具体实现有两种方案:
基于唯一标识去重:给每条消息分配全局唯一ID(如UUID、雪花 ID),消费者接收消息时,先检查该ID是否已处理(可存在数据库或 Redis中)。若已处理则直接返回,未处理则执行逻辑并记录ID。
利用业务天然幂等性:依赖业务操作本身的特性,比如“转账时用订单号作为唯一索引”,重复执行会触发数据库唯一约束而失败,不影响最终结果,或者为业务数据设计严格的状态流转(订单状态:待支付→已支付→已发货),每次处理消息前检查当前状态是否符合操作条件
消息队列如何保证消息的有序性(顺序性)?
消息投递的顺序是消费者关心的第二个问题。很遗憾,实现顺序消费的成本也是非常高的,所以大多数消息队列没有提供顺序消费模式。Kafka因为它独特的存储模型,所以提供了顺序消费这种方式,但是也是有其他限制的。那么如果我要求顺序消费该怎么办呢?
保证消息有序性的常见方法如下
单一生产者和单一消费者
使用单个生产者发送消息到单个队列,并由单个消费者处理消息。这样可以确保消息按照生产者的发送顺序消费。
这种方法简单但容易成为性能瓶颈,无法充分利用并发的优势。
分区与顺序键(Partition Key)
在支持分区(partition)的消息队列中(如Kafka、RocketMQ),可以通过partifion Key将消息发送到特定的分区,每个分区内部是有序的,这样可以保证相同 PartionKey的消息按顺序消费。
例如,在订单处理系统中,可以使用订单号作为 Partition Key,将同一个订单的所有消息路由到同一个分区,确保该订单的消息顺序。
顺序队列(Ordered Queue)
一些消息队列系统(如 RabbitMQ)支持顺序队列,消息在队列中的存储顺序与投递顺序一致。如果使用单个顺序队列,消息将按顺序被消费。
可以使用多个顺序队列来提高并发处理能力,并使用特定规则将消息分配到不同的顺序队列中。
处理方式
不处理:这个场景可以直接借用上面重复消息里的场景,删除缓存的消息先发的后到是没有多大关系的。
业务处理:和上面一样,绝大多数业务逻辑是本身就是能处理顺序的。比如我们的交易系统里有很多很多状态机,状态机有严格的状态扭转流程。比如我们的支付状态机,我们从待支付->支付完成->退款中->退款完成。那假设现在我们是待支付状态,然后用户支付后又立即申请退款,那么有可能退款中的消息比支付完成的消息先到(这种几率也是非常非常低的),这是不可以的,不满足状态机扭转条件,所以我们可以抛出异常告诉消息队列消费失败即可,等到后面支付完成的消息到达后将状态扭转为支付完成,然后等到退款中的消息到之后才将状态扭转为退款中。
额外字段:如果系统没有状态机这种东西,靠业务逻辑不好处理顺序该怎么办呢?那我们可以借助额外的字段来处理了。
不要求严格有序:一般在数据库设计中,我们都建议每个数据库表都有这样两个字段:created_time和updated_time,也就是这条数据的创建时间戳和更新时间戳。那么如果我们不要求消息严格有序的时候就可以借助updated_time字段来控制顺序了。比如我们接到一条消息,然后需要更新数据库,然后我们发现消息中携带的时间戳比我们数据库中记录的时间戳还小呢,那这条消息我不用消费了,直接返回成功就行了。使用这种方式有两个限制:1. 不要求严格有序,只要有序就可以了,中间可能少几条的更新对业务没有影响。 2. 服务器之间的时间不能出现较大的偏差(这个通过时间同步服务一般都能保障)。
严格有序:严格有序就是中间不能出现缺口。那么这种就不能依靠时间戳了,可以添加一个整型的version字段,消息里也携带了一个version字段,每次更新数据的时候我们采用这种更新方式(乐观锁):
update tbl set ……, version=version+1 where @message.version=@message.version。如果这条语句没有更新成功,则返回行数就不为1,这个时候我们抛出异常让消息队列重试,等到合适的消息到来的时候就会更新成功了。使用version字段这种方式可能就要producer和consumer进行协调了(其实就是有些耦合了),因为消息里也要携带version字段。但是设计这个version字段的时候也要一些考虑,更建议的更新方式是update tbl set ……, version=@message.newversion where @message.oldversion=@message.oldversion。这样如果发送消息的version因为某些原因(比如有些更新并不发送消息)没有严格递增两边也可以兼容。还有一点是,要控制consumer端数据的写入点。比如我们的处理消息消费的地方更新数据外还有另外一个地方更新数据,这个地方将version给更新了,那么就会导致producer和consumer版本不一致,最后导致怎么也同步不起来了。而producer方也要控制,比如如果我们人肉直接去表里修了数据可能就没有消息发出了。
并发更新
既然说到version,这里还提一下用version来处理并发更新吧。很多场景里会将数据库当做key/value存储使用,比如一个订单,我们的表设计可能是订单号,订单内容(一个结构化json),created_time, updated_time, version。我们不再使用列的方式来存储订单,至于这种方式的优点和缺点就不在本文讨论之列了。那么如果来了一条订单更新消息,我们需要对这个结构化json进行更新该怎么处理呢?会先读取这个json,然后将消息里的内容与json进行merge操作,然后将merge结果写回数据库。那如果我们在merge的过程中订单已经被更新了怎么办,这就涉及并发更新控制的问题,如果不加以控制则可能导致更新被覆盖。那我们一般采取的方式是:
这种方式就可以避免并发更新冲突覆盖的问题了,但是冲突之后怎么办呢?一般会采取重试的办法。我们会发现这种并发冲突的概率并不会很大,而且出现之后只需要重试一下基本上就可以处理了:
我们发现这种处理方式在我们的场景中一遍又一遍的出现,那我们也将这种方式直接内置到组件之中了(NeedRetryException):
异步处理
有的时候我们接到消息后并不是在接收消息的线程里处理消息,比如我们接到消息后会发起一个异步的服务调用,那么我们并不能立即知道这个异步服务调用的返回结果。它的返回结果是在另外的线程里,但是消息队列需要知道消息的消费结果。
一般消息队列的consumer client会自动的返回消费结果,但是因为异步的方式切换了线程,所以需要应用显式的告诉结果,这就需要使用显式ack的机制了:
但是我在与一些开发沟通过程中也发现了显式ack误用的情况:使用显式ack一定一定一定要调用ack。很多时候我们程序因为各种原因,比如中途抛出异常,队列满了消息其实根本没有处理等等导致ack没有被调用。
消息队列如何处理消息堆积
消息堆积是指在消息队列中,消息的生产速度远大于消费速度,导致大量消息积压在队列中。 我们需要先定位消费慢的原因,如果是 bug 则处理 bug,同时可以临时扩容增加消费速率,减少线上的资损。
如果是因为本身消费能力较弱,则可以优化下消费逻辑
常见有以下几种方式提升消费者的消费能力:
增加消费者线程数量:提高并发消费能力。
增加消费实例:在分布式系统中,可以水平扩展多个消费实例,从而提高消费速率
优化消费者逻辑:检查消费者的代码,减少单个消息的处理时间。例如,减少 IO 操作、使用批量处理等。
注意上述的第二点:增加消费实例,一定要注意注意Topic 对应的分区/队列数需要大于等于消费实例数,不然新增加的消费者是没东西消费的。因为一个Topic中,一个分区/队列只会分配给一个消费实例
除此之外还可以进行限流和降级处理
对消息生产端进行限流,降低生产速率,避免消息积压进一步恶化。
对非关键消息进行丢弃或延迟处理,只保留高优先级的消息,提高系统的响应速度,
常见的消息队列实现
RabbitMQ:
RabbitMQ 是基于 AMQP 协议的消息队列,适合复杂路由和多种消息模式的场景。
具有强大的消息路由能力,包括Direct、Fanout、Topic 等多种交换机类型。
支持消息的持久化、确认机制和死信队列,确保消息的可靠传输。
RocketMQ:
RocketMQ 是阿里巴巴开源的一款消息队列,具有高吞吐、低延迟的特点,适用于企业级应用场景
支持事务消息、延时消息和顺序消息,适用于金融支付、交易系统等对数据一致性要求较高的场景。
Kafka :
Kafka 是一种高吞吐量、分布式的消息队列,适用于实时数据流和大数据分析的场景。
Kafka 的数据是以分区(Partition)为单位存储,支持水平扩展。
Kafka 的消费者可以以消费组的方式消费消息,实现消息的多次消费和负载均衡。
对比:
各种对比之后,有如下建议:
ActiveMQ,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐;
RabbitMQ,虽然erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是毕竟是开源的,比较稳定的支持,活跃度也高,推荐中小型公司使用;推荐
RocketMQ,阿里出品,Java语言编写,经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景;强烈推荐
Kafka,如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
RabbitMQ
说一下 AMQP 协议?
AMQP(Advanced Message Queuing Protocol)是一种应用层协议,用于在消息队列系统中定义消息的格式、传输方式和处理机制。它是一个面向消息的、异步传输的协议,具有高可靠性、可拓展性、跨平台的特性,适合在分布式系统中传输重要数据,被广泛用于构建可靠的消息队列和消息传输系统,是 RabbitMq、ActiveMQ 等消息中间件的核心协议
五大组成部分:
连接(Connection):客户端与消息代理(如RabbitMQ)建立的网络连接
信道(Channel):在连接内创建的逻辑通信链路,支持多信道复用一个连接
交换机(Exchange):负责将消息路由到绑定的队列,支持多种路由策略。
队列(Queue):存储实际传输的消息,消费者从队列拉取消息进行处理。
绑定(Binding):定义交换机与队列之间的关系,通过路由键实现消息路由。
RabbitMQ 和 AMQP 是什么关系?
RabbitMQ 和 AMQP 有着非常密切的关系,但是他们是属于完全不同的两个概念。
AMQP: AMQP 不是一个具体的消息中间件产品,而是一个协议规范。他是一个开放的消息产地协议,是一种应用层的标准协议,为面向消息的中间件设计。AMQP 提供了一种统一的消息服务,使得不同程序之间可以通过消息队列进行通信。 SpringBoot 框架默认就提供了对 AMQP 协议的支持。
RabbitMQ:RabbitMQ则是一个开源的消息中间件,是一个具体的软件产品。RabbitMQ 使用 AMQP 协议来实现消息传递的标准,但其实他也支持其他消息传递协议,如 STOMP 和 MQTT。RabbitMQ 基于 AMQP 协议定义的消息格式和交互流程,实现了消息在生产者、交换机、队列之间的传递和处理。
总之,AMQP 本质上是一个开放的标准,他不光可以被 RabbitMQ 实现,也可以被其他产品实现。
通过这种标准的协议,实际上是可以在不同的消息中间件系统之间进行灵活的消息传递。只不过,目前具体实现这种标准的产品目前并不多,RabbitMQ 则是最有影响力的一个产品。因此,RabbitMQ 成了 AMQP 协议事实上的代表。SpringBoot 框架默认提供的 AMQP 协议支持底层也是基于 RabbitMQ 产品实现的。
RabbitMQ 的核心组件有哪些?
RabbitMQ的核心组件包括以下几部分,他们共同构成了 RabbitMQ 的基本架构:

Broker:RabbitMQ服务器,负责接收和分发消息的应用。
Virtual Host:虚拟主机,是RabbitMQ中的逻辑容器,用于隔离不同环境或不同应用程序的信息流。每个虚拟主机都有自己的队列、交换机等设置,可以理解为一个独立的RabbitMQ服务。
Connection 连接:管理和维护与RabbitMQ服务器的TCP连接,生产者、消费者通过这个连接和 Broker 建立物理网络连接。
Channel通道:是在Connection 内创建的轻量级通信通道,用于进行消息的传输和交互。应用程序通过Channel进行消息的发送和接收。通常一个 Connection 可以建立多个 Channel。
Exchange交换机:交换机是消息的中转站,负责接收来自生产者的消息,并将其路由到一个或多个队列中。RabbitMQ 提供了多种不同类型的交换机,每种类型的交换机都有不同的消息路由规则。
直连交换机(Direct Exchange): 将消息路由到与消息中的路由键(Routing Key)完全匹配的队列。
主题交换机(Topic Exchange): 根据通配符匹配路由键,将消息路由到一个或多个队列。
扇出交换机(Fanout Exchange): 将消息广播到所有与交换机绑定的队列,忽略路由键。
头部交换机(Headers Exchange): 根据消息头中的属性进行匹配,将消息路由到与消息头匹配的队列。
Queue队列:队列是消息的存储位置。每个队列都有一个唯一的名称。消息从交换机路由到队列,然后等待消费者来获取和处理。
Binding绑定关系: Binding 是 Exchange 和 Queue 之间的关联规则,定义了消息如何从交换机路由到特定的队列。
此外,生产者和消费者也是RabbitMQ的核心组件,生产者负责发送消息到Exchange或者 Queue,消费者负责从Queue中订阅和处理消息。
这些核心组件共同构建了 RabbitMQ 的消息传递系统,他们协同工作才能实现消息的可靠传递、路由和业务处理等功能。
什么是 RabbitMQ 中的虚拟主机(vhost)?有什么作用?
RabbitMQ 中的虚拟主机(vhost)是一个逻辑上的隔离概念,用于隔离不同的应用或租户。每个虚拟主机可以拥有自己独立的队列、交换器、绑定、权限等资源。这样,多个相互独立的应用可以共存在一台 RabbitMQ 服务器上而不会互相影响。你可以将虚拟主机看作是RabbitMQ内部的“命名空间”。
资源隔离:不同的 vhost 可以拥有自己的交换器(exchange)、队列(queue)和绑定(binding),这些资源在不同的 vhost 中互不干扰。
安全控制:通过对 vhost 的不同用户角色进行权限管理,细化了对资源访问的控制。
管理便捷:使多租户应用管理更加便捷,可以在同一个 RabbitMQ 实例上运行多个独立的应用。
RabbitMQ 中有哪几种交换机类型?如何实现消息的路由
详情可以看这篇文章:RabbitMQ入门
RabbitMQ 支持多种交换机(Exchange)类型,每种类型都用于不同的消息路由和分发策略:
Direct Exchange: 这种交换机根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。只有当消息的路由键与队列绑定时指定的路由键完全相同时,消息才会被路由到队列。这是一种简单的路由策略,适用于点对点通信。
Topic Exchange: 这种交换机根据消息的路由键与队列绑定时指定的路由键模式(通配符)匹配程度,将消息路由到一个或多个队列。路由键可以使用通配符符号 *(匹配一个单词)和 #(匹配零个或多个单词),允许更灵活的消息路由。用于发布/订阅模式和复杂的消息路由需求。
Headers Exchange: 这种交换机根据消息的标头信息(Headers)来决定消息的路由,而不是使用路由键。队列和交换机之间的绑定规则是根据标头键值对来定义的,只有当消息的标头与绑定规则完全匹配时,消息才会被路由到队列。适用于需要复杂消息匹配的场景。
Fanout Exchange: 这种交换机将消息广播到与之绑定的所有队列,无论消息的路由键是什么。用于发布/订阅模式,其中一个消息被广播给所有订阅者。
Default Exchange: 这是 RabbitMQ 默认实现的一种交换机,它不需要手动创建。当消息发布到默认交换机时,路由键会被解释为队列的名称,消息会被路由到与路由键名称相同的队列。默认交换机通常用于点对点通信,但不支持复杂的路由策略。
如何在 RabbitMQ 中声明一个队列?有哪些必要参数?
在 RabbitMQ 中声明一个队列通常使用 queueDeclare 方法,它有几个必要参数
队列名称:可以指定名称,也可以留空,RabbitMQ将生成一个唯一的队列名称。
是否持久化(durable):设置为 true 表示在服务器重启后队列依然存在。
是否排他(exclusive):设置为 true 表示该队列只能被声明它的连接使用,并在连接断开时自动删除
是否自动删除(autoDelete):设置为 true 表示当最后一个消费者断开连接后,队列自动删除。
额外参数(arguments):用于扩展其它高级功能(比如TTL、DLX)
RabbitMQ 的 routing key 和 binding key 的最大长度是多少字节?
Routing key(路由键)和 Binding Key(绑定键)的最大长度限制为255字节,这个限制是 RabbitMQ 系统内置的,对所有路由键和绑定键均适用,如果超过该长度,RabbitMQ 将会抛出异常,拒绝处理该消息或绑定。
Routing Key:在消息发送到交换机时,由生产者指定,决定消息将被路由到哪些队列。
Binding Key:在交换机与队列建立绑定关系时指定,用于匹配 Routing Key,实现消息的选择性路由。
使用注意
Routing Key 和 Binding Key 的长度包括所有字符,使用前需确保不超过 255 字节。
如果使用了带有占位符的 Topic 交换机时,需要注意路由键和绑定键的匹配规则,确保长度在限制范围内。
RabbitMQ 中无法路由的消息会去到哪里?
在 RabbitMQ 中,无法被路由的消息通常有以下几种处理方式
丢弃消息:默认情况下,若消息无法找到符合条件的队列(即没有匹配的绑定关系),RabbitMQ 会直接丢弃消息,不会进行特殊处理
备份交换机(Altemnate Exchiange):可以为交换机配置一个备份交换机,无法被路由的消息将被发送到备份交换机,再由备份交换机根据其绑定关系决定如问处理消息。例如,可以将这些消息发送到指定队列进行保存或处理。
消息回退(Return listener):在使用mandatory参数的情况下,如果消息无法路由,则会触发返回机制,将消息退回到生产者,这样生产者可以自行处理未路由的消息.。
RabbitMQ 支持哪些消息模式?
RabbitMQ 支持多种消息传递模式,这些模式允许应用程序在不同的场景下进行灵活的消息交流。以下是几种最常见的消息分发机制:
SimpleQueue简单模式:生产者向一个队列发送消息,单个消费者从该队列中消费消息,是最基本的点对点模式。
workQueue 工作序列机制: Producer 将消息发送到 queue,多个 Consumer 同时消费Queue 上的消息。消息会均匀的分配给多个 Consumer 处理。
Publish/Subscribe 订阅发布机制: Producer 只负责将消息发送到exchange交换机上。Exchange 将消息转发到所有订阅的 Queue,并由对应的 Consumer 去进行消费
Routing 基于内容路由机制:在订阅发布机制的基础上,增加一个routingKey,并根据routingKey判断 Exchange 将消息转发到哪些 Queue 上。
Topic 基于话题路由机制:在基于内容路由的基础上,对routingKey增加了模糊匹配的功能。
另外,RabbitMQ 还支持双向同步的 RPC 机制,不过一般用得比较少。这些消息模式允许开发者根据应用程序的需求选择合适的消息通信方式,来满足不同的业务场景和可靠性要求。不同的模式可以用于构建各种类型的分布式系统和应用程序。
RabbitMQ 如何实现消息的持久化?
RabbitMQ 允许消息的持久化,以确保即使在 RabbitMQ 服务器重新启动后,消息也不会丢失。RabbitMQ 可以通过以下方式实现消息的持久化:
消息持久化:在 RabbitMQ 中,只需要在发送消息时,将delivery_mode属性设置为 2,就可以将消息标记为持久化。
队列持久化:在 RabbitMQ 中声明队列时,也可以将队列声明为持久化。RabbitMQ 中的队列分为三种不同类型经典队列,仲裁队列和流式队列。其中,经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。
交换机持久化:与经典队列类似,RabbitMQ 也可以在声明交换机时,将交换机的 durable 属性设置为true,这样就可以将交换机标记为持久化。
要注意,队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前 queue 里面如果还有未发出去的消息的话,重启之后,消息是否还存在队列里面就要取决于在发送消息时对消息的设置。 对于消息的可靠性来说,只需要设置队列的持久化和消息的持久化即可。exchange 的持久化并没有什么影响,但是,如果 exchange 不设置持久化的话,当 broker 服务重启之后,exchange 将不复存在,这样会导致消息发送者 producer 无法正常发送消息。
RabbitMQ 的持久化机制会对其性能产生影响。因此,需要根据具体的业务场景和需求来权衡是否需要持久化以及需要哪种类型的持久化。
RabbitMQ 中的持久化队列与非持久化队列有什么区别?
RabbitMQ 提供了两种队列类型:持久化队列和非持久化队列,它们的主要区别在于消息在 RabbitMQ 中的存储方式以及遇到服务器重启或者崩溃时的行为。
持久化队列:消息会被循序到磁盘上,即使 RabbitMQ 服务器重启或崩遗,消息也能被保留下来、所以,通过持久化以列,我们可以确保消息不会丢失、但需要注意的是,持久化以列也会略微影响性能、因为将消息写入磁盘比仅在内存中操作要慢。
非持久化队列:消息只存储在内存中,当 RabbitMQ 服务器重启或崩溃时,这些消息会丢失。这种以列的优势在于其高性能,适用于不特别关心教据持久性的应用场景
RabbitMQ 中消息什么时候会进入死信交换机?
消息会在以下几种情况进入死信交换机(Dead-LetterExchange,DLX)
消息被拒绝(Rejection):当消费者使用
basic.reject或basic.nck明确拒绝消息,并且不要求重新投递(requeue 设置为false)时,消息会被直接投递到死信交换机。消息过期(TTL Expiration):RabbitMQ 支持为消息或队列设置 TTL(Time-To-ive),即生存时间,当消息超过指定的存活时间后还未被消费,它会自动变为死信并被发送到死信交换机。
队列达到最大长度(或总大小):如果队列设置了最大长度(x-nax-length或x-max-1ength-bytes ),当消息数量或总大小超出限制时,最早进入队列的消息会被移入死信交换机。
这些条件下进入死信交换机的消息,可以再通过死信队列进行日志记录、重新处理或监控
RabbitMQ 是如何实现死信队列的?
死信队列是 RabbitMQ 提供的一种特殊序列,处理那些无法被正常消费的消息。有三种情况会产生死信:
消息被消费者明确拒绝。
消息达到预设的过期时间仍没有消费者消费。
消息由于队列已经达到最大长度限制而被丢弃。
在 RabbitMQ 中,实现死信队列只需要给正常队列增加三个核心参数即可:
dead-letter-exchange:指定当前队列对应的死信队列
dead-letter-routing-key:指定消息转入死信队列时的路由键
message-ttl:消息在队列中的过期时间。
接下来,就可以往正常队列中发送消息。如果消息满足了某些条件,就会成为死信,并被重新发送到对应的死信队列中。而此时,RabbitMQ 会在消息的头部添加一些与死信相关的补充信息,例如时间、成为死信的原因、原队列等。应用程序可以按需处理这些补充的信息。
最后,死信队列中的消息都是正常业务处理失败的消息,应用程序需要创建一个消费者来专门处理这些被遗漏的消息。例如记录日志、发送警报等。这样才能保证业务数据的完整性。
如何在 RabbitMQ 中配置消息的 TTL(过期时间)?
要在 RabbitMQ 中配置消息的 TTL(过期时间),需要通过设置队列或消息的 TTL,表示消息在队列中存活的时间。 配置 TTL 有两种方式,一种是队列级别的 TTL,另一种是消息级别的 TTL
队列级别的TTL:可以在声明队列时通过设置 x-message-ttl 参数来指定队列中所有消息的TTL。
消息级别的TTL:可以在发送消息时通过
AMQP.BasicProperties属性指定单个消息的 TTL。
注意:
消息 TTL >队列 TTL →取较小值
消息TTL=0→立即过期(需谨慎使用)
RabbitMQ 怎么实现延迟消息?
RabbitMQ 本身不支持延迟消息,但是可以通过它提供的两个特性 TTL(Time-To-Live and Exoiration,消息存活时间),DLX(Dead Letter Exchanges,死信交换器)来实现。还可以利用 RabbitMQ 插件来实现
- 使用TTL+ 死信队列:
在 RabbitMQ 中,通过设置消息的 TTL 和死信交换器可以实现延迟队列
不给原队列(正常队列)设置消费者,当消息在原队列中达到TL后,由于还未被消费,则会被转发到绑定的死信交换器,消费者从死信队列中消费消息,从而实现消息的延迟处理
- 使用 RabbitMQ插件:延迟消息插件(rabbitmg-delayed-message-exchange)
通过安装 RabbitMQ 的延迟消息插件,可以直接创建延迟交换器(Delayed Exchange)。在发送消息时,指定消息的延迟时间,RabbitMQ 会在消息达到延迟时间后将其转发到对应的队列进行消费
RabbitMQ 中如何进行事务处理?
RabbitMQ 提供了事务处理机制,允许生产者在发送消息时将操作包装在一个事务中,以确保消息的可靠性传递。在 RabbitMQ 中,事务是通过通道(Channel)来实现的。可以通过以下步骤进行事务处理:
开启事务:在生产者端,可以通过调用 Channel 的 tx_select 方法来开启一个事务。这将启动一个新的事务,并将所有后续的消息发布操作放在该事务内。
发送消息:接下来在事务中,可以正常发送消息。如果消息发送失败,事务会自动回滚。
提交事务:如果事务中所有消息发送成功后,需要提交事务。可以通过调用 Channel 的tx_commit方法提交事务。
处理异常:如果在事务过程中发生异常,可以使用 try/catch 快来捕获异常。然后在异常处理过程中,调用 Channel 的 tx_rollback 方法来回滚 RabbitMQ 相关的事务操作。
需要注意的是,RabbitMQ 的事务处理是基于存储过程的,它可以保证在事务中的操作要么全部成功,要么全部失败。但是,由于 RabbitMQ 是一个异步的消息队列系统,事务处理可能会对其性能产生影响。因此,需要根据具体的应用场景和需求来权衡是否需要使用事务以及如何使用事务。
RocketMQ 的事务消息有什么缺点?你还了解过别的事务消息实现吗?
从事务消息的改造成本来看,RocketMQ 的事务消息改造成本不小,需要改造原始逻辑实现特定的接口,且在应用层处理复杂的回查逻辑,确保回查不会重复或丢失。
从事务消息功能性来看,RocketMQ 仅支持单事务消息。
从可用性角度来看,如果MQ 集群挂了,事务就无法继续进行了,等于整个应用无法正常执行了 因为 RocketMQ 的事务消息的实现是先发半消息,如果 MQ 集群挂了,则半消息无法发送成功,后续的逻辑就无法执行。
RabbitMQ 如何构建集群?
RabbitMQ 支持两种主要类型的集群:普通集群(Classic Cluster)和镜像集群(Mirrored Cluster)。他们之间有一些重要的区别:
普通集群: 这种模式使用Erlang语言天生具备的集群方式搭建。这种集群模式下,集群的各个节点之间只会有相同的元数据,即队列结构,而消息不会进行冗余,只存在一个节点中。消费时,如果消费的不是存有数据的节点, RabbitMQ会临时在节点之间进行数据传输,将消息从存有数据的节点传输到消费的节点。很显然,这种集群模式的消息可靠性不是很高。因为如果其中有个节点服务宕机了,那这个节点上的数据就无法消费了,需要等到这个节点服务恢复后才能消费,而这时,消费者端已经消费过的消息就有可能给不了服务端正确应答,服务起来后,就会再次消费这些消息,造成这部分消息重复消费。 另外,如果消息没有做持久化,重启就消息就会丢失。并且,这种集群模式也不支持高可用,即当某一个节点服务挂了后,需要手动重启服务,才能保证这一部分消息能正常消费。所以这种集群模式只适合一些对消息安全性不是很高的场景。而在使用这种模式时,消费者应该尽量的连接上每一个节点,减少消息在集群中的传输。
镜像集群:这种模式是在普通集群模式基础上的一种增强方案,这也就是RabbitMQ的官方HA高可用方案。需要在搭建了普通集群之后再补充搭建。其本质区别在于,这种模式会在镜像节点中间主动进行消息同步,而不是在客户端拉取消息时临时同步。并且在集群内部有一个算法会选举产生master和slave,当一个master挂了后,也会自动选出一个来。从而给整个集群提供高可用能力。这种模式的消息可靠性更高,因为每个节点上都存着全量的消息。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯大量的消耗,进而降低整个集群的性能。这种模式下,队列数量最好不要过多
总的来说,普通集群适用于对性能要求高,但可以接受数据丢失的情况。而镜像集群则适用于对数据持久性和可用性有更高要求,并愿意付出一些性能代价的场景。
在 RabbitMQ 集群中,节点间如何同步数据?
在 RabbitMQ 集群中,节点间的数据同步主要是通过镜像以列实现的,镜像以列 (Mirrored Queue,也称为高可用以列)是 RabbitMQ 提供的一种机制,它可以将队列数据同步到集联中的多个节点上,这样一来、即使某个节点发生故障,其他节点也会有这部分数据,保证数据的高可用性。
具体来说,一个镜像队列包括一个主节点和若干个从节点。主节点负责处理生产者和消费者的所有请求,而从节点则被动地复制主节点的数据,当主节点发生变化时,例如发布消息或消息被消费者确认,从节点会立即同步这些变化。
配置镜像队列策略:通过策略配置,可以指定哪些队列需要镜像以及镜像到多少个节点。策略基于正则表达式匹配队列名称。
同步方式:主节点的更新会以消息的形式传播到所有的从节点。这些更新消息包括新消息的发布、消息的确认、队列元数据的变化等
故障转移:当镜像队列的主节点故障时,从节点会自动选举出新的主节点,继续提供服务,而这对于生产者和消费者是透明的。
什么是 RabbitMQ 中的分区问题?如何解决?
RabbitMQ 中的分区问题通常是指网络分区(或称为网络分隔),即在一个 RabbitMQ 集群中,某些节点由于网络故障而相互无法通信。这可能会导致消息丢失、重复消费、节点间状态不一致等问题。解决网络分区问题的方法包括配置集群告警、调整分布策略、采用镜像队列等。
网络分区的影响:网络分区会号致 RabbitMQ 集群中的各个节点无法互相同步消息和元数据。这种状态可能导致以下问题
消息丢失:由于网络分区,发送到某一节点的消息无法被其他节点消费,消息可能会丢失,
重复消费:在分区恢复后,两个节点都认为某些消息尚未被消费,可能导致重复消费。
状态不一致:因为节点无法同步,导致系统状态不一致,影响业务的可靠性。
解决方法
配置集群告警:确保集群监控可以及时发现网络分区问题。RabbitMQ 提供了各类告警和插件,可以实时监控网络和节点状态。
调整分布策略:适当规划队列和消息分布策略,例如,尽量保证队列和消费者的分布均衡,减少跨节点通信的需求,从而降低分区影响。
使用镜像队列(Mirrored Queue):配置镜像队列,可以将消息复制到多个节点上,在网络分区恢复后可以确保消息的完整性,需要注意的是,镜像队列可能带来额外的性能开销
启用 Quorum Queues:这是 RabbitMQ3.8之后的一个特性,使用 Raft 协议来保证消息的高可用性和一致性,相比传统镜像队列更稳定和高效。
实际开发中的注意点
性能优化:在使用镜像队列或 Quorum Queues 时,注意观察性能变化,可能需要进行相应优化.
配置管理:保持节点的配置和版本一致,减少因配置不一致导致的分区和故障,
RabbitMQ 的镜像队列和 Quorum Queue 有什么区别?
RabbitMQ提供了两种高级队列类型来满足容错性的需求:镜像队列(Mirored Queue)和 Quorum Queue(法定队列)
实现方式
镜像队列:通过同步所有消息和元数据到多个节点来实现高可用性,每一个消息都会被复制到其他的节点上,所有这些节点共同组成一个队列。
Quorum Queue:采用 Raft 一致性算法来管理队列中的消息。队列中的消息是被多数派(即半数以上的节点)确认后才被认为是持久的。
性能和开销
镜像队列:由于需要同步消息到多个节点,磁盘 IO 和网络带宽占用较高,同时写入延迟较大。
Quorum Queue:由于基于 Raft 算法,写操作和一些签立操作需要与多数节点进行同步,延迟较高,但读取操作更快。
自动故障转移
镜像队列:当主节点发生故障时,剩下的从节点会自动提升一个节点为新的主节点,其他业务逻辑会无缝切换到新的主节点。
Quorum Queue:由于其基于 Raft 算法,任何一个节点的故障不会影响队列的可用性,只要有多数派节点存活,队列就是可用的。
适用场景
镜像队列:适用于需要高可用性和一致性的应用,但由于其同步机制,性能开销比较大,适用在消息量不巨大但需要高持久性的场景
Quorum Queue:更适用于需要高持久性,同时能容忍少量写操作高延迟的场景,特别是需要线性一致性的场合。
RabbitMQ如何保证消息不丢失
丢失原因分析
从上述流程我们可以得知:消息从生产者到达消费者,经过两次网络传输,并且在 RabbitMQ 服务器中进行路由。
因此我们能知道整个流程中可能会出现三种消息丢失场景:
生产者发送消息到 RabbitMQ 服务器的过程中出现消息丢失。 可能是网络波动未收到消息,又或者是服务器宕机。
RabbitMQ 服务器消息持久化出现消息丢失。 消息发送到 RabbitMQ 之后,未能及时存储完成持久化,RabbitMQ 服务器出现宕机重启,消息出现丢失。
消费者拉取消息过程以及拿到消息后出现消息丢失。 消费者从 RabbitMQ 服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。
针对上述三种消息丢失场景,RabbitMQ 提供了相应的解决方案,confirm 消息确认机制(生产者),消息持久化机制(RabbitMQ 服务),ACK 事务机制(消费者)
接下来看解决方案
confirm 消息确认机制(生产者)
Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时,它会等待 RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。
消息正确投递到 queue 时,会返回 ack。
消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失。
使用方法:
生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式。
发送消息后,通过添加 add_confirm_listener 方法,监听消息的确认状态。
消息持久化机制(RabbitMQ 服务)
持久化机制是指将消息存储到磁盘,以保证在 RabbitMQ 服务器宕机或重启时,消息不会丢失。
使用方法:
生产者通过将消息的 delivery_mode 属性设置为 2,将消息标记为持久化。
队列也需要进行持久化设置,确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。
注意事项: 持久化机制会影响性能,因此在需要确保消息不丢失的场景下使用。
ACK 事务机制
用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认(ACK)给 RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭进行手工发送 ACK。
使用方法:
在 RabbitMQ 中,ACK 机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常。
可以手动开启 ACK 机制,通过将 auto_ack 参数设置为 False,手动控制消息的 ACK。
注意事项: ACK 机制可以确保消息不会被重复处理,但如果消费者发生异常或者未发送 ACK,消息可能会被重复投递。
RabbitMQ中如何解决消息堆积问题
消息堆积原因
接下来看解决方案
消费者处理消息的速度太慢
增加消费者数量:通过水平扩展,增加消费者的数量来提高处理能力。
优化消费者性能:提高消费者处理消息的效率,例如优化代码、增加资源。
消息预取限制(prefetch count):调整消费者的预取数量以避免一次处理过多消息而导致处理缓慢。 如果你将 prefetch 参数设置为1,那么消费者在不确认消息之前,只会接收一个消息。这有助于确保每个消息被处理成功后,才接收下一个消息,避免消息积压和资源耗尽。
队列的容量太小
增加队列的容量:调整队列设置以允许更多消息存储。
网络故障
监控和告警:通过监控网络状况并设置告警,确保在网络故障时快速发现并解决问题。
持久化和高可用性:确保消息和队列的持久化以避免消息丢失,并使用镜像队列提高可用性。
消费者故障
使用死信队列:将无法处理的消息转移到死信队列,防止堵塞主队列。
容错机制:实现消费者的自动重启和错误处理逻辑。
队列配置不当
优化队列配置:检查并优化消息确认模式、队列长度限制和其他相关配置。
消息大小
消息分片:将大型消息分割成小的消息片段,加快处理速度。
业务逻辑复杂或耗时
优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间。
消息产生速度快于消费速度
使用消息限流:控制消息的生产速度,确保它不会超过消费者的处理能力。
负载均衡:确保消息在消费者之间公平分配,避免个别消费者过载。
其他配置优化
消息优先级:使用消息优先级确保高优先级消息优先处理。
调整RabbitMQ配置:优化RabbitMQ服务的配置,如文件描述符限制、内存使用限制等。
RabbitMQ 中如何处理未被消费者确认的消息?
在 RabbitMQ 中,当消费者接收到一条消息后,如果出于某种原因没有确认(ACK)这条消息,这条消息会被重新入队并传递给其他消费者(或者相同消费者再次接收)。详细实现方式如下
在消费者代码中需要启用消息确认机制(manual acknowledgment),即通过
channel.basic_ack手动确认消息处理完成。如果消费者没有发送
basic_ack(如消费者宕机或消息处理异常),消息会被再次发送给下一个可用的消费者。这时 RabbitMQ 会把消息的 delvery tag 返还给queue,以保证消息被再次处理
RabbitMQ 的流控机制(Flow Control)是什么?为什么需要它?
RabbitMQ 的流控机制(fowControl)是一种用来防止消息队列系统过载的方法,在RabbitMQ中,流控机制会通过限制某些传输速率或暂停消息的传输来确保系统的稳定性和可靠性,这种机制的触发通常与内存或磁盘使用量有关。
为什么需要它?RabbitMQ 是一种高效的消息队列中间件,但它依然有资源限制,比如内存和硬盘,如果不加以控制,当消息量较多时,可能会导致资源耗尽,从而影响系统的整体性能甚至引发崩溃,流控机制通过提前预警和采取措施,能够防止这种情况发生。
RabbitMQ中如何保证消息不被重复消费
什么情况会导致消息被重复消费呢
生产者:生产者可能会重复推送一条数据到 MQ 中,比如 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;
MQ:在消费者消费完准备响应 ack 消息消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。
消费者:消费者已经消费完消息,正准备但是还未响应给ack消息到时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。
接下来看解决方案
使用数据库唯一键约束
缺点:局限性很大,仅仅只能用在我们数据新增场景,并且性能也比较低
使用乐观锁
假设是更新订单状态,在发送的消息的时候带上修改字段的版本号
缺点:如果说更新字段比较多,并且更新场景比较多,可能会导致数据库字段增加并且还有可能出现多条消息同时在队列中此时他们修改字段版本号一致,排在后续的消息无法被消费
简单的消息去重,插入消费记录,增加数据库判断
优点:很多场景下的确能起到不错的效果
缺点:
这个消费者的代码执行需要1秒,重复消息在执行期间(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),增加校验的地方是不是还是没数据(因为上一条消息还没消费完,没有记录)
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题
并发消息去重基于消息幂等表
缺点:如果说第一次消息投递异常没有消费成功,并且没有将消息状态给置为成功或者没有删除消息表记录,此时延时消费每次执行下列都是一直处于消费中,最后消费就会被视为消费失败而被投递到死信Topic中
方案:插入的消息表必须要带一个最长消费过期时间,例如10分钟
上述方案只需要一个存储的中心媒介,那我们可以选择更灵活的存储中心媒介,比如Redis。使用Redis有两个好处:
性能上损耗更低
上面我们讲到的超时时间可以直接利用Redis本身的ttl实现
总结
利用数据库唯一键约束
可以利用乐观锁
插入消费记录
不丢和不重是矛盾的(在分布式场景下),总的来说,开发者根据业务的实际需求来选择相应的方式即可。
RabbitMQ 中的消息如何确保顺序性?
在 RabbitMQ 中,要确保消息的顺序性,主要办法是将消息发送到同一个队列,并且消费者单线程地处理此队列中的消息。原因在于:RabbitMQ 保证了一个队列中的消息是有序的,先进先出的(FIFO),只要消费者保持顺序处理,消息的顺序性就能得到保证。
队列的顺序性:RabbitMQ 内部设计保证了队列的顺序性,也就是说消息进入队列的顺序即为出队的顺序,确保发送到的队列是同一个队列,消息就能按顺序处理
单一消费者处理:确保只有一个消费者从该队列中消费消息。这样可以避免多消费者并发处理时,处理顺序的混乱。单线程的消费者读出消息并按接收到的顺序处理。
使用消费确认机制:RabbitMQ 想供了消息确认机制,这允许消费者确以处理完成某条消息后RabbitMQ 才会出除这条消息,若未确认,消息会被重新投递,但要注意,这可能导致消息外理顺序发生变化。
顺序保护的高级设计:消息分区(partioning):在复杂系统中,可以对消息进行分区处理,例如,通过某种键(可以是消息属性中的某个字段,例如用户ID、订单ID等)将消息路由到不同的队列,每个队列由一个独立的消费者处理,这样既可以保证特定分组内消息的顺序性,又能并行处理不同分组的消息以提高系统吞吐量。
事务(Transactions)和流水线(Pipeine::在一些应用中,为保证顺序性,可能还会设计事务机制或流水线处理策略,将顺序性需求落入业务逻辑中加以控制。
RocketMQ
为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?而选择自己实现 NameServer?
RocketMQ 在开发的时候充分吸取了前人的教训,特意轻量化注册中心的实现。
RocketMQ 的设计特点体现在以下几个方面:
设计简洁且专用:RocketMQ的 NameServer 设计相对简单,专门用于 RocketMQ的需求,相比于通用的Zookeeper,NameServer 更加轻量级且易于部署和维护。
高可用性:RocketMQ的NameServer是无状态的,多个NameServer实例之间是对等的,可以通过 DNS或者 VP进行负载均衡,天然是备高可用性,而NameServer是一个强一致性系统,对节点之间的同步有严格要求,在其些极端情况下(如网络分区),Zookeeper 的可用性可能受到影响。
性能优化:NameServer 只处理简单的配置和路由信息,不涉及复杂的状态同步和一致性协议,相比 Zookeeper 的 ZAB 协议,性能更高。因为Zookeper的写是不可扩张的,整个Zokeeper 集联的写入只能在Leader 节点,因此对于频繁写入操作,压力都打在Leader 上,不好扩展,对于 RocketMQ 的大规模分布式消息系统,性能优化是非常重要的。
降低依赖性:使用自研的 NameServer 可以降低对外部系统的依赖,简化系统架构,减少维护复杂度。同时,RocketMQ 可以完全掌控注册中心的实现和优化.
定制化需求:RocketMQ需要实现一些特定的功能和优化,例如针对消息队列的动态路由和管理,使用自研的 Nameserver可以更好地满足这些定制化需求,而不必受限于 Zookeeper的实现和接口。
RocketMQ 的开发参考了Kafka,那两者在架构和功能上有什么区别?
一句话总结就是 RocketMQ 相较于 Kafka 在架构上做了减法,在功能上做了加法。
架构
简化协调节点:
RocketMQ 使用更加轻量的 NameServer 替换了 Kafka 中 Zookeeper 来实现多个 broker(主机)间的协调
后来 Kafka 也通过 Raft 算法摆脱了 Zookeeper
简化分区:
Kafka 为了提升并发量将单个 topic 拆分成多个 partition 放在 broker 中,存储全部信息。
RocketMQ 将单个 topic 拆分成多个queue,只存储部分信息(如ofset),完整信息会存储在 Commitlog 文件中,通过 ofset 来定位 Commitog 中的信息。
消费对比
Kafka 消费者获取信息只需从 broker 中的 partition 中读取返回即可。
RocketMQ 需要先从 queue 中获取偏移量,再从 CommitLog 中读取信息,比 Kafka 多读了一次。这样看似乎性能比 Kafka 差,而这就涉及到了两者的底层存储。
底层存储
在底层存储中,Kafka的 parion下有很多段 (egmen),为了更好的性能,存体数据时都是顺字存储,但随着topitc越多,parion和其下的segment 也越来越多,同时向多个topic 中写入数据时,虽然每个segment 是顺序存储,但多个文件放在磁盘不同地方,这样就使顺序存储劣化为了随机存储,写性能就会降低。
而 RocketMQ 干脆将单个 broker 下的所有 topic 数据全部写入一个 CommitLog 中,消除随机写的问题,大大增加了在多 topic 下的写性能.
简化备份模型
Kafka会将一个topic 下的 partition分散到多个broker中,并为其配置副本(partion(主,Leader)与其副本(从,Folower)不在同一个broker中),主从partition之间会建立数据同步。如果 RocketMQ 也给每个分区(queue)单独建立同步通信,就要将 CommitLog 拆开,这样就退化为了随机读写。所
RocketMQ 直接同步 CommitLog 文件,以 broker 为单位区分主从。
功能
消息过滤
Kafka 支持通过 topic 给消息分类。
RocketMQ在此基础上还可以给同一个topic 中的信息打上不同标签(tag),进一步区分。
事务
Kafka支持事务(一次发送多条消息,多条消息同时发送成功或失败),但这满足不了我们开发的需求。
RocketMQ 的事务支持自定义逻辑和发送消息同时成功或失败。
延时队列:RocketMQ 有延时消息队列,Kafka 没有。
死信队列:RocketMQ 在多次消息消费重试失败后会将消息放入死信队列,Kafka 原生不支持
消息回溯:Kafka支持通过调整 offset 确定从某个地方开始消费,RocketMQ 除此之外还支持调整时间
RocketMQ 的 NameServer 的作用是什么?它如何进行服务发现?
RodketMQ的 Nameserver是一种轻量级的服务注册和发现的组件,它的主要作用是提供路由信息,它保存了所有 Broker 的地址和路出信息,供客户端(Producer和Consumer) 在通信时查询
具体来说,它执行以下功能:
维护 Broker 集群的元数据:包括 Broker 的地址、状态等信息。
提供服务注册和发现:Broker 启动时会注册自己的信息到指定的 NameSenver,Producer 和 Consumer,通过NameServer 获取到 broker的路由信息,从而进行消息的发布和消费
服务发现流程大致如下:
Broker在启动时会不断将自己的信息注册到每一个 NameServer.
Producer和 Consumer 在初始化时,会从 NameServer 集群中获取最新的路由信息,并定期刷新。
如果某个 Broker 发生变化(如上线/下线),会及时通知到所有 NameServer,NameServer 更新其内部的路由信息。
RocketMQ消费消息是推模式还是拉模式
RocketMQ实际上同时支持推模式和拉模式来消费消息,但这里有一个有趣的细节:虽然RocketMQ提供了所谓的"推模式"消费者,但在底层实现上,它仍然是基于拉模式的。让我们详细探讨这两种模式,并通过代码示例来解释它们的工作原理。
- 推模式(Push Mode)
虽然称为"推模式",但RocketMQ的DefaultMQPushConsumer实际上是在内部通过长轮询(Long Polling)来实现的,这本质上还是一种拉模式。不过,对于开发者来说,使用起来就像是推模式,因为消息会自动被"推送"到消费者的监听器中。
- 拉模式(Pull Mode)
拉模式允许消费者主动从Broker拉取消息。这种模式给了开发者更多的控制权,可以根据自己的节奏来消费消息。
比较和选择
推模式(Push):
优点:使用简单,自动管理消费进度,适合大多数场景。
缺点:灵活性较低,无法完全控制消费节奏。
拉模式(Pull):
优点:更灵活,可以精确控制消费速度和批次大小。
缺点:需要自己管理偏移量,实现相对复杂。
虽然RocketMQ提供了所谓的"推模式",但实际上它是基于拉模式实现的。对于大多数应用场景,使用DefaultMQPushConsumer就足够了,因为它提供了一个类似推模式的简单接口。只有在需要精细控制消息消费过程时,才需要考虑使用拉模式。
RocketMQ 的广播消息和集群消息有什么区别?
广播消息和集群消息是 RocketMQ 的两种不同的消息消费模式。其中
广播模式意味着一条消息会被发送到所有订阅了这个主题 Topic 的消费者,而所有消费者都会收到相同的消息副本。
集群模式意味着一条消息只会分发给订阅了这个主题 Topic 的同一个消费者组中的一个消费者处理。每个消费者组只会处理一次消息。
他们的区别主要提现在实现方式以及适用场景上。
集群模式在 Broker 端统一管理每个消费者组的消费进度,对消费进度的管理是严格的。这样,每次消费者服务启动后,都可以从上一次消费的进度开始开始进行消费。 而广播模式是交由每个消费者自行管理消费进度,消费进度的管理是不严格的,容易产生丢失。当消费者服务启动后,如果本地的消费进度丢失了,就只能消费到启动之后的消息,而无法从上一次消费的进度开始消费。因此,广播模式对于消息的连续性保证是不强的。
集群模式适用于大多数常规对消息安全敏感的业务场景,例如订单处理、库存管理等。多个消费者协同工作可以提高消息的处理能力并实现消息的负载均衡。而广播模式适用于一些对消息安全不太敏感的特殊业务场景。例如日记记录、时间通知等。这些场景下所有的消费者都需要处理相同的消息。
RocketMQ 中的 Topic 和 Tag 有什么区别?
RocketMQ 作为一个高性能、分布式的消息队列系统,它有两个基本的概念:Topic和Tag。
Topic 是用于对消息进行逻辑分区的一种方式,而 Tag 是用于对消息进行进一步细分的标签,具体来说:
Toptc:主要用于区分不同业务的消息。例如,一个订单系统可以有一个名为"OrderTopic" 的Topic 来存储所有与订单相关的消息。不同的消费者可以订阅不同的 Topic,从而只处理与自己业务相关的消息。
Tag:主要用于对同一个Topic 中的消息进行更加细粒度的分类。Tag允许将同一个下Topic 中的消息继续细分,这样消费者在订阅Topic 的时候,可以通过指定 Tag 来过滤出感兴趣的消息,例如,在"OrderTopic’中,可以根据订单类型使用不同的 Tag,如"CreateOrder"、 “UpdateOrder” 等。
RocketMQ 中的消息存储机制是如何设计的?
RocketMQ 中的消息存储机制主要依赖于文件系统,将消息数据落盘,并且以高效的方式进行读取和管理、核心组成部分包括 CommitLog、ConsumeQueue 和 Indexfile,整体思路是通过顺序写入和分段管理来优化性能和存储,同时利用内存映射文件机制(MappedFile)来加速读取和写入。
CommitLog:这是消息存储的核心,所有消息都会首先写入这个大日志文件。Commitog 文件是顺序写入的,这样可以极大地提高写入速度。
ConsumeQueue:这是消息消费的索引文件。由于消费组需要高效地定位消息,ConsumeQueue 提供了对应Commitlog 的索引,允许消费者能够高效地查找到需要的消息。
IndexFile:这是消息的索引文件,用于通过 Key 快速查找消息。Key 一般可以是消息的主题(topic)或者是用户自定义的标签。
RocketMQ 的日志存储结构是什么?如何优化日志的写入性能?
RocketMQ 的日志存储结构主要包括CommitLog、ConsumeQueue和 IndexFile三个部分,CommitLog 是消息存储的核心文件,所有的消息内容都会首先写入到 CommitLog中,然后画被复制到 ConsumeQueue 和IndexFile 中,以便快速消费和检索。
为了优化日志的写入性能,可以考虑以下几种方法
使用 PageCache:RocketMQ 依赖操作系统的 PageCache 来加速数据的读写,减少磁盘 I/O 操作
批量写入:将多条消息打包成一个批次进行写入,减少频繁的写操作。
异步刷盘:使用异步刷盘提高写入性能,同时利用 HA (High Availability) 模式来保证数据的安全性。
预分配文件:通过预分配 CommitLog 文件,减少文件分配带来的性能开销。
调优磁盘子系统:选择高速磁盘(如 SSDs)和适当的文件系统(如 EXT4、XFS),并进行适当的参数调优。
RocketMQ 的批量消息是如何实现的?如何优化批量消息的性能?
RocketMQ 中的批量消息功能主要通过将多个消息(Message)打包为一个MessageBatch对象进行发送来实现。这样可以减少单次消息发送带来的网络和系统开销,提高整体吞吐量,批量消息有几个重要的实现细节和注意点,比如每批消息大小上限限制(默认为4MB),以及批量消息的分片和排序。
要优化批量消息的性能,可以从以下几个方面入手:
消息大小控制:确保每批次消息的总大小不超过系统限制,以避免因为消息过大而导致发送失败或需要拆分操作。
分批策略:根据业务场景选择合适的批量大小,既要考虑吞吐量,也要控制时。
网络优化:通过使用更高效的网络协议和网络硬件,提高消息发送和接收过程的速率
异步发送:可以使用异步发送方法,提高发送吞吐量和系统的并发处理能力。
RocketMQ如何实现消息的持久化
消息存储的主要组件包括 CommitLog 文件、消费队列文件(ConsumeQueue)、以及索引文件(IndexFile)。
CommitLog文件 :CommitLog是RocketMQ的核心存储文件,负责保存消息的完整内容。
顺序写入:所有的消息都顺序写入CommitLog文件,这种方式减少了磁盘寻道时间,提高了写入性能。
文件滚动:CommitLog按照固定大小(比如1GB)进行分片。当一个文件写满后,会创建一个新的文件。
存储所有数据:包括消息体、主题、队列ID等。
ConsumeQueue文件 :ConsumeQueue是针对消息的逻辑视图,旨在加快消费者对消息的访问速度。
条目固定:每个ConsumeQueue条目固定为20字节,包含消息在CommitLog中的偏移量、消息大小、Tag哈希值。
独立文件:每个主题的每个队列都有独立的ConsumeQueue文件,文件路径为store/consumequeue/{topic}/{queueId}。
快速定位:通过ConsumeQueue,消费者无需扫描整个CommitLog即可快速找到消息的位置。
IndexFile文件 :IndexFile用于支持消息的快速检索。
哈希索引:为消息的key建立哈希索引,支持通过key快速检索消息偏移。
增强查询:IndexFile是可选的,用于需要基于消息属性进行快速查找的场景。
消息存储流程
接收消息:Broker接收到消息后,将其放入内存缓冲区(待写入CommitLog)。
写入CommitLog: 每条消息追加到当前活跃的CommitLog文件中。使用顺序写入提升写入效率和磁盘利用率。
同步到ConsumeQueue:
异步转发服务(ReputMessageService)从CommitLog读取新写入的消息。
将消息的偏移量和其他元数据(如大小和Tag哈希值)存储到相应的ConsumeQueue文件中。
更新IndexFile(可选): 若消息带有key(如业务ID),则将其哈希和偏移量存入IndexFile。 这样,可以通过该key快速查找消息。
RocketMQ的消息存储如何进行清理和归档
RocketMQ 提供了消息存储清理和归档的机制,以便管理消息存储空间,删除过期消息,并将历史消息归档到其他存储介质中。这些功能有助于维护消息队列的性能和可用性。以下是关于 RocketMQ 消息存储清理和归档的主要方面:
消息文件删除策略: RocketMQ 支持多种消息文件删除策略,可以在配置文件中进行设置。以下是一些常见的策略:
定时删除策略: 您可以配置 RocketMQ 定期删除过期的消息文件和索引文件。这样,一旦消息文件中的消息过期,RocketMQ 将自动删除它们。
空间满策略: 如果存储磁盘空间达到一定限制,RocketMQ 可以自动删除最早的消息文件,以释放磁盘空间。这个策略确保了存储空间不会无限制地增长。
指定时间段删除策略: 您可以配置 RocketMQ 只删除特定时间段内的消息文件,以保留历史消息。
消息归档: RocketMQ 允许您将历史消息归档到其他存储介质中,以减小消息服务器的存储负担。归档通常涉及将消息转移到长期存储(如云存储或本地归档系统)中。归档可以手动触发,也可以自动触发,具体取决于您的需求。
历史消息访问: 尽管消息被归档,RocketMQ 仍然提供了访问历史消息的机制。通过合适的归档系统或者存储介质,您可以检索和访问历史消息,以满足合规性要求或其他业务需求。
需要注意的是,清理和归档消息不是 RocketMQ 的核心功能,而是辅助功能。需要根据自己的需求和业务场景来确保配置合理的清理策略以防止存储空间耗尽,并根据业务需求进行消息的归档操作,以保留历史消息数据。同时,归档后的消息可以根据需要进行合适的检索和恢复,以满足特定的数据需求。
RocketMQ延迟消息是如何实现的
RocketMQ通过特定的延迟级别设计实现延迟消息功能。在RocketMQ中,延迟消息是通过设置消息的延迟级别(Delay Level)来实现的。每个延迟级别对应一个特定的时间段,这样可以让消息在指定的时间之后才被消费。
实现原理
延迟级别:RocketMQ不支持任意时间的延迟,而是提供了18个固定的延迟级别,从1s,5s,10s,30s,1m,2m,3m到2h不等。
特殊主题:所有延迟消息都会先发送到一个特殊的内部主题 SCHEDULE_TOPIC_XXXX。
定时任务:Broker会启动一个定时任务,按照延迟时间的先后顺序依次扫描每个延迟级别队列。
消息转移:当扫描到期的消息时,会将消息从延迟队列转移到目标主题。
消费者消费:消息被转移到目标主题后,消费者就可以正常消费这条消息了。
RocketMQ 中的死信消息是什么?
死信消息 (Dead Leter Message,DLM)是指在消息队列系统中,由于各种原因无法正常被消费或处理的消息。在 RocketMQ 中,死信消息是指那些多次重试消费们旧失败的消息,以避免影的系统的稳定性和性能,这些消息会被移到一个特殊的队列,以便后续人工或者特定的程序来处理。
死信消息的触发条件:
消息消费失败并目超过了最大重试次数,比如,默认情况下,RocketMQ 会对每条失败的消息重试16次(这个次数可以根据实际需求进行配置)。如果第16次仍然失败,这条消息就会被投递到死信消息队列
消息超时没被消费,这种情况通常是由于消息在队列中停留的时间超过了预定的超时时间。
处理死信消息的方式
人工处理:在消息移到死信队列后,可以由运维人员或开发人员手动检查和处理这些消息,找到失败原因并采取相应措施
自动化处理:许多系统会设计自动化的重试机制,或者是将这些消息重新投递回原始队列再次消费。例如,利用定时任务定期检查死信队列,将符合特定条件的消息再次发送到原始队列
避免死信消息
优化消费逻辑:确保消费者逻辑健壮、可靠,减少因为代码错误导致消费失败的情况。
异常处理:在消费逻辑中做好异常处理,比如,对于偶尔的网络抖动、服务不稳定等异常情况进行适当的重试机制。
超时设置:根据业务需求合理设置消息超时时间,避免因为配置不当导致消息成为死信消息。
RocketMQ提供了哪些消息过滤机制
RocketMQ提供了多种消息过滤机制,以便消费者能够根据业务需求进行精确的消息消费。主要的过滤机制包括:
Tag过滤:最常用的过滤方式,消费者可以基于消息的Tag进行过滤。
SQL92语法过滤:基于Message属性,以SQL92标准语法进行复杂条件过滤,该功能需要Broker支持。
RocketMQ 中的消费负载均衡是如何工作的?
RocketMQ 的消费负载均街主要是通过消费者组、消费者实例与消息队列来实现的。具体来说,每个消费者组由多个消费者实例组成,每个消费者实例会消费一部分消息队列中的消息。为了实现负载均衡,RocketMQ 会自动将消息队列分配给消费者组中的各个消费者实例,当消费者实例增加或减少时,系统会动态地进行重新分配,确保消息队列的负载均衡。
消费负载均衡在消息队列系统中是一个非常关键的机制,RocketMQ 通过以下方式实现
消费者组:每个消费者组可以包含多个消费者实例,消费者组之间互不影响。同一个消费者组中的消费者实例会共同消费这个组的消息.
消息队列:RocketMQ的每个主题(Topic)下都有多个消息队列(MessageQueue)。这些队列分布在不同的 Broker 节点上,以实现高可用和扩展性。
消息分配策略:RocketMQ提供了多种分配策略,可以通过修改策略来实现不同的业务需求,常见的分配策略有轮询法(RoundRobin),平均分配(Average Allocation)等。
Rebalance触发机制:当消费者实例的数量发生变化(增加或减少消费者实例),RocketMQ会自动能发 Rebalance 机制,重新分配消息队列,确保负载均街,例如,有新消费者实例接入时,RocketMQ 会将部分消息B列分配给新的实例;当某个消费者实例宕机时,它所负责的消息队列会被重新分配给剩余的消费者实例。
消费进度:为了保证消息不会重复消费或丢失,每个消费者实例会在本地或远程保存消费进度(offset),一旦某个消息被消费后,会更新进度信息.
定时任务:RocketMQ 内部有定时任务,每 20 秒触发一次,这个任务会根据当前消费者组的变化重新平衡消息队列的分配。
RocketMQ 的消费负载均衡机制确保了系统的可扩展性和高可用性,避免了某个消费者实例过载或空闲的情况,提升了系统的整体性能
RocketMQ事务消息是如何实现的
RocketMQ的事务消息对分布式系统中的事务一致性问题提供了有效的解决方案。比如当用户下单时,需要同时处理订单生成、库存扣减、支付处理等操作。事务消息确保所有这些操作要么全部成功,要么全部回滚。
RocketMQ的事务消息实现是一个复杂而精巧的过程,主要通过两阶段提交(2PC)和补偿机制来保证分布式事务的一致性。
下面我将结合下单减库存场景详细解释,这里订单和库存是两个微服务,所以这两步操作存在分布式事务问题,我们可以在下单操作完成发送减库存事务消息异步扣减库存,使用 RocketMQ 的事务消息,可以确保订单创建和库存扣减的事务一致性。以下是实现该场景的详细步骤:
假设我们有两个服务:
订单服务(Order Service):负责处理订单创建。
库存服务(Inventory Service):负责管理库存更新。
使用事务消息实现步骤
发送半消息: 在订单服务中,首先发送一个库存扣减的半消息到 RocketMQ。这条消息在事务未完成前,对消费者(库存服务)是不可见的。
执行本地事务(创建订单): 在订单服务中执行本地事务,即在数据库中创建订单记录。
根据本地事务结果提交或回滚事务消息:
如果订单创建成功,提交事务消息,使库存扣减消息对库存服务可见。
如果订单创建失败,回滚事务消息,库存不变。
处理MQ事务消息状态回查: 如果RocketMQ未收到事务提交或回滚的结果,会定期询问订单服务,以确认事务的最终状态。
原子性:半事务消息对消费者不可见,确保了事务的原子性。
持久性:半事务消息会被持久化,即使Broker宕机也能恢复。
隔离性:通过特殊的队列存储半事务消息,实现了隔离。
一致性:通过二阶段提交和回查机制保证了最终一致性。
通过这套实现机制,RocketMQ能够在分布式系统中实现可靠的事务消息,确保消息发送与本地事务的一致性,即使在系统崩溃或网络故障的情况下也能保证数据的一致性。
RocketMQ的集群架构是怎样的
RocketMQ的集群架构设计旨在提高系统的可用性、可靠性和可扩展性。它通过多种组件协同工作,实现消息的生产、存储、分发和消费。以下是关于RocketMQ集群架构及其使用场景的详细说明。
RocketMQ集群架构
NameServer:
它是一个几乎无状态的节点,可以集群部署用于服务发现。
NameServer为Producer和Consumer提供路由信息。
Broker:
Broker负责接收、存储和转发消息。
它可以分为Master和Slave,支持多对Master-Slave配置以实现高可用。
Master和Slave之间通过同步/异步机制进行数据复制。
Producer:
消息生产者负责产生消息,并发送到Broker。
支持同步和异步发送消息。
Consumer:
消息消费者负责从Broker接收消息。
支持Push和Pull两种消费方式,可以是集群消费或广播消费。
使用场景
高可用性:通过Master-Slave配置和Broker高可用机制,保证系统在部分节点失效的情况下仍能正常运行。
消息持久化:控制消息的存储策略(同步/异步),从而达成更高的可靠性需求。
负载均衡:使用多NameServer和Broker集群,支持负载均衡,从而支持大规模消息流量。
弹性扩展:可以按需添加NameServer和Broker节点,进行水平扩展。
RocketMQ的Broker有哪几种集群模式
RocketMQ的Broker有三种集群模式:
单Master模式:只有一个Master节点,其他都是Slave节点。Master节点负责响应客户端的请求并存储消息,Slave节点只同步Master节点的消息,也会响应部分客户端的读请求。这种模式的优点是简单易部署,但是存在单点故障的问题,如果Master节点宕机,会导致整个服务不可用。
Master-Slave模式(经典双集群部署):一个Master节点对应多个Slave节点,Master和Slave都是独立的NameServer。Master节点负责响应客户端请求并存储消息,Slave节点只同步Master节点的消息,也会响应部分客户端的读请求。这种模式的优点是高可用性,即使Master节点宕机,Slave节点可以自动升级为Master节点,继续提供服务。但是,如果只有一个Master节点,存在单点故障的问题。
Dledger模式(高可用集群部署):在Master-Slave模式的基础上增加了Raft协议,实现了自动脑裂后的数据高可靠性。即使某个节点从网络上掉下来或者宕机后,仍然能够保证所有的消息不会丢失。这种模式的优点是高可用性和高可靠性,即使某个节点出现故障,也能保证服务的可用性。
多Master多Slave模式(异步复制):每个Master节点配置一个Slave节点,有多对Master-Slave,HA采用异步复制方式。这种模式下,即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响。Master宕机后,消费者仍然可以从Slave消费,过程对应用透明,不需要人工干预,性能与多Master模式几乎一样。缺点是Master宕机或磁盘损坏情况下可能会丢失少量消息
多Master多Slave模式(同步双写):每个Master节点配置一个Slave节点,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功。这种模式的优点是数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。缺点是性能比异步复制模式略低,发送单个消息的响应时间会略高
总的来说,单Master模式适合测试和开发环境,Master-Slave模式适合生产环境,而Dledger模式适合需要高可靠性的生产环境。
RocketMQ 的主从架构是如何实现的?
RocketMQ 的主从架构是通过主(Master)从(Slave)节点来实现的,主要目的是提高系统的高可用性和数据可靠性。主节点负责处理所有的写请求,以及读清求,而从节点负责备份主节点的数据,并在主节点失效时接管其工作。
主节点(Master):负责处理生产者(Producer)发送的消息和消费者(Consumer)的消费清求。在消息写入时,主节点会将消息持久化到本地存储中
从节点(Slave):从节点通过复制主节点的存储文件来保持数据一致性。具体来说,从节点会定期向主节点拉取数据,并将其存储到本地,实现数据备份。
同步机制:RocketMQ 提供多种同步机制,如异步复制和同步双写,异步复制是指主节点写入消息后立即友回给客户端,而从节点在后台进行数据复制;同步双写是指主节点和从节点都写入数据后才返回给客户端。
故障切换:如果主节点发生故障,从节点会被提升为新的主节点,继续提供服务,以保证系统的高可用性。
RocketMQ 的消费位点(Offset)是如何管理的?
在 RocketMQ 中,消费位点(offset )是用于记录消费者消费到的位置,确保消息不被重复消费或漏消费。RocketMQ 主要通过消费者组同步、Broker 端存储、客户端定期更新和持久化等机制来管理和维护消费位点。实际过程中,RocketMQ 提供了消费位点的自动管理和手动管理两种方式。
如何在 RocketMQ 中实现消息的高可用?
在 RocketMQ 中实现消息的高可用性,主要有以下几个方面:
Broker 主从架构:RocketMQ 使用主从架构,通过主从复制(Replication)来实现数据备份,从而确保消息的高可用性。
消息存储机制:RocketMQ 的消息存储系统采用了多副本存储机制。即同一条消息会被多个 Broker 节点同步备份,以防单点故障。
消息消费重试:消息消费失败后,RocketMQ 具有重试机制。消费失败的消息会被重新放回队列,直到成功消费.
消费者负载均衡:RocketMQ 支持消费者负载均衡机制,可以保证当某个消费者不可用时,其他消费者能继续处理其任务。
定时消息与延时消息:RocketMQ 支持定时消息与延时消息,有助于消息的顺序消费和故障恢复。
事务消息:RocketMQ 支持事务消息,能够保证消息的消费与业务逻辑的执行具有一致性。
什么是 RocketMQ 的消息重试机制?如何配置?
RocketMQ 的消息重试机制是指在消费者消费失败后,RocketMQ 会定期重新投递该消息给消费者,直至消费或功或达到最大重过次数、这个机制能提高消息处理的可靠性和容错性,是消息队列非常关键的特件之一
配置消息重试机制可以通过以下主要参数:
retryTimesWhenSendFailed:这是针对生产者设置的,指定消息发送失败后重试的次数。
maxReconsumeTimes:这是针对消费者设置的,指定消费失败后最多重试的次数。
messageDelayLevel:通常用于配置消费重试的延迟时间,可以设置不同的延迟级别。延迟级别是一个整数值,表示消息的延迟时间。例如,延迟级别1代表1秒,延迟级别2代表5秒,依此类推,直到延迟级别18代表2小时。
RocketMQ如何保证消息顺序
RocketMQ提供了两种级别的顺序消息:全局顺序消息和分区顺序消息。
全局顺序消息:全局顺序消息确保一个主题内的所有消息都按照发送顺序被消费。这通常通过将所有消息路由到同一个队列来实现。
分区顺序消息:分区顺序消息保证具有相同分区键的消息按顺序被消费。这允许更高的并行度,因为不同分区键的消息可以并行处理。
注意事项
全局顺序消息可能会限制系统的吞吐量,因为所有消息都经过单一队列。
分区顺序消息在保证局部顺序的同时提供了更好的并行性。
选择合适的分区键对于分区顺序消息至关重要,以确保相关消息进入同一队列。
消费者端需要正确处理并发和重试逻辑,以维护消息顺序。
RocketMQ 中消息顺序性的保证主要通过“消息的队列分区(Message Queue)“和“顺序消费” 两个核心机制来实现的。其思路是确保同一分区内的消息按顺序发送和消费,而不同分区之间则没有顺序保证, 具体来说
消息的顺序发送:消息在生产时,通过指定的分区规则(例如自定义的 key或 hash值)将消息映射到固定的消息队列(Message Queue),同一个Key的消息一定会发送到同一个队列,这样可以保证该队列内的消息顺序
消息的顺序消费:在消费端,消费者从指定的队列中顺序拉取消息,这样可以确保消息按发送的顺序进行消费。
如何在 RocketMQ 中处理消息的乱序问题?
在 RocketMQ 中处理消息的乱序问题,最常用的方法是利用消息的顺序消费机制。具体来说,我们可以通过精细地设置消息的 Key和选择合适的 MesageQueue,来确保某一组有序的消息,总是发送到同一个队列,并由同一个消费者线程顺序处理。以下是步骤:
为消息设置key:在生产者发送消息时,基于某些业务逻辑、定义一个唯一标识符(如订单ID、用户ID),作为消息的 key。
选择特定队列:利用消息的 key,通过某种哈希算法或者业务逻辑,将消息发送到特定的MessageQueue。
顺序消费消息:在消费侧,确保一个消费者线程读取一个 MessageQueue,从而保证消息的顺序消费。
RocketMQ如何保证消息不丢失
RocketMQ通过多层面的机制来确保消息的可靠性,包括生产者端、broker端和消费者端。
生产者端保证
同步发送 :同步发送是最可靠的发送方式,它会等待broker的确认响应。
异步发送 + 重试机制 :异步发送通过回调来处理发送结果,并可以设置重试次数。
Broker端保证
同步刷盘 :通过配置broker.conf文件,可以启用同步刷盘:flushDiskType = SYNC_FLUSH
主从复制 :配置主从架构,并设置同步复制:brokerRole = SYNC_MASTER
消费者端保证
手动提交消费位移 :使用手动提交可以确保消息被正确处理后再提交位移。
幂等性消费 :在消费端实现幂等性处理,确保重复消费不会导致业务问题。
在 RocketMQ 中,如何确保消息不会重复消费?
在 RocketMQ 中,通过以下几种方法可以确保消息不会重复消费
幂等性:确保消费者的处理逻辑是幂等的,即对于相同的消息重复处理多次,结果应保持一致。
消息状态记录:在消费前记录消息状态,如果记录存在则不重复处理。常见的实现方式是使用数据库记录消费的消息ID.
消息重试机制:配置合理的消息重试机制,限制重试次数,避免消息反复被消费。
RocketMQ 的幂等性如何实现?
我们需要明白一个事情:消息队列的消息可能会被重复投递,这种重复投递是由于网络抖动,消息重传策略等原因造或的,为了保证消费者在处理相同消息时不会产生不同的结果,我们要确保消息处理逻辑的幂等性,实现幂等性的核心就是让相同的消息无论处理多少次,产生的结果都相同。
对于 RocketMQ 幂等性的实现,最主要的方法就是使用唯一的消息标识(例如消息ID或业务唯一标识)来防止消息重复处理。通常做法是:
给每条消息分配一个唯一的ID,比如订单号、事务ID等
在处理每条消息之前,检查这个ID是否已经处理过。
如果没有处理过,就处理这条消息,并记录这个ID。
如果已经处理过,就跳过这条消息,以防止重复处理
RocketMQ如何解决消息积压问题
消息积压是消息中间件中常见的问题,主要由消费速度跟不上生产速度导致。以下是几种解决方案:
增加消费者线程数量:这是最直接的方法,通过增加消费者线程数来提高消费能力。
消息业务异步处理
调整消费者的消费模式:将顺序消费改为并行消费,提高消费效率。
使用消息过滤 :通过消息过滤,只消费重要的消息,降低消费压力。
调整生产者发送策略:如果可能,可以调整生产者的发送策略,如降低发送频率或者实现背压机制。
这些方法可以单独使用,也可以组合使用,具体取决于具体的业务场景和系统架构。在实施这些解决方案时,请注意监控系统性能,确保不会因为过度优化而导致其他问题。
说一下 RocketMQ 中关于事务消息的实现?
RocketMQ 中的事务消息通过两阶段提交的方式来确保消息与本地事务的一致性
第一阶段(消息发送):
生产者先将消息发送到 RocketMQ 的 Topic,此时消息的状态为半消息(Half Message),消费者不可见。
然后,生产者执行本地事务逻辑,并根据本地事务的执行结果来决定下一步的操作。
第二阶段(提交或回查):
如果本地事务成功,生产者会向 RocketMQ 提交 commit 操作,将半消息变为正式消息,消费者可见。
如果本地事务失败,生产者会向 RocketMQ 提交 Rollback操作,RocketMQ 会丢弃该半消息
如果生产者没有及时提交 commit 或 Rollback 操作,RocketMQ 会定时回查生产者本地事务状态,决定是否提交或回滚消息。
Kafka
Kafka 是什么?它的主要应用场景有哪些?
Kafka是一种分布式流事件处理平台,它的核心功能主要包括消息队列、流处理和数据集成。Kafka以高吞吐量、低延迟、可扩展和高容错性著称。
Kafka的主要应用场景有
消息队列:用作高吞吐量的消息系统,将消息从一个系统传递到另一个系统。
日志收集:集中收集日志数据,然后通过Kafka传递到实时监控系统或存储系统。
流计算:处理实时数据流,将数据传递给实时计算系统,如Apache Storm或Apache Flink。
事件溯源:记录事件发生的历史,以便稍后进行数据回溯或重新处理。
Metrics收集和监控:收集来自不同服务的监控指标,统一存储和处理
Kafka的设计理念与传统消息队列(RabbitMq)有所不同,Kafka更注重于处理大规模数据流,支持高吞吐量和持久化存储,而传统消息队列更多用于短生命周期的消息传进和任务调度,所以 Kafka 通常用于处理日志、监控数据等大规模数据流,而传统消息队列用于任务队列、队列服务等场景。
Kafka的优势和特点
优势:
高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能。
高性能:单节点支持上千个客户端,并保证零停机和零数据丢失,异步化处理机制
持久化:将消息持久化到磁盘。通过将数据持久化到硬盘以及replica(follower节点)防止数据丢失。
零拷贝:减少了很多的拷贝技术,以及可以总体减少阻塞事件,提高吞吐量。
可靠性 :Kafka是分布式,分区,复制和容错的。
Kafka的特点 :
顺序读,顺序写
利用Linux的页缓存
分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都会有多个,均为分布式的。无需停机即可扩展机器。多个Producer、Consumer可能是不同的应用。
客户端状态维护:消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡。
支持online(在线)和offline(离线)的场景。
支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。
Kafka与RabbitMQ相比有什么优势?
Kafka 和 RabbitMQ 都是流行的消息中间件系统,他们各自都有一些优势和适用场景。以下是 Kafka 相对于 RabbitMQ 的一些比较明显的优势:
分布式架构: Kafka 是为大规模分布式流处理而设计的,具有高度可伸缩性。RabbitMQ 虽然也支持分布式架构,但相对而言,Kafka 的集群设计更完善,更适合处理大规模的消息流。
吞吐量: Kafka每秒可处理十几万消息,而 RabbitMQ 每秒可处理几万条消息。
消息复制和可用性:Kafka 允许配置多个消息副本,确保数据的冗余存储,提高可用性和容错性。RabbitMQ 也支持镜像队列以实现冗余,但是不如 Kafka 的多副本复制灵活。
时间溯源:Kafka 在事件溯源和事件驱动架构中非常强大。他允许事件在 Topic 中保留一段时间,以便后续的分析和回溯查询。RabbitMQ 通常用于实时消息传递,对于事件溯源不够灵活。
批处理和流处理: Kafka 提供了流处理 API,可用于实时数据流处理等场景。而 RabbitMQ 倾向于更专注的处理实时消息传递。
社区和生态系统:Kafka 有一个庞大的社区和丰富的生态系统,提供了许多与大数据和流处理相关的工具和库。RabbitMQ 也有一个活跃的社区,但是相对而言社区规模以及社区活跃性就要小很多。
如果需要处理大规模的实时数据流或事件驱动架构,Kafka 可能更适合;如果更关注传统的消息传递和队列处理,RabbitMQ 的高级功能更丰富,可能更合适。因此,选择哪种消息中间件还是要取决于具体的应用场景。
Kafka有哪些组件
kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。
kafka有以下一些基本概念:
Producer - 消息生产者,就是向kafka broker发消息的客户端。
Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。
kafka的消费模式是什么?
消费模式有两种:
poll(拉):消费者主动向服务端拉取消息。
push(推):服务端主动推送消息给消费者。
由于推模式很难考虑到每个客户端不同的消费速率,导致消费者无法消费消息而宕机,因此kafka采用的是poll的模式,但该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。
为了避免过多不必要的空轮询:kafka做了改进,如果没消息,服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。
Kafka 的 Producer 是如何发送消息的?如何通过批量发送提高吞吐量?
Kafka 的 Producer 发送消息的过程可以分为以下几个步骤
序列化消息:首先,Producer 会将消息对象(比如说一个Java 对象)序列化成字节数组。Kafka 的序列化机制是可插拔的,可以使用默认的,也可以自定义
分区选择:Producer 根据配置的分区策略(默认是轮询,当然也可以自定义,比如按消息的 key 进行散列)选择要将消息发往的分区。
发送到缓冲区:Producer 将消息存入一个缓冲区(RecordAccumulator)。这个缓冲中区是一个庞大的阻塞队列,它会把消息批量存储起来。
批量发送:当缓冲中区内的消息达到一定的大小,或者等待时间超过设定的阈值时,Producer 会将消息批量发送到 Kafka Brokers。
通过批量发送机制,Kafka Producer 可以显著提高消息的吞吐量,因为
减少网络调用次数:批量发送意味着合并多条消息进行一次网络传输,减少网络调用的次数和开销。
有效利用带宽:批量发送可以更高效地利用网络带宽,减少网络空闲时间,增加数据传输效率
减轻 Broker 压力:可以减轻 Broker 的负载,因为 Broker 处理批量消息时较处理单个消息的负担更轻。
Kafka 中的 Consumer Group 是什么?它在消息消费中起到什么作用?
在Kafka中,Consumer Group是一组消费者(Consumer),它们共同协作来消费一个或多个主题(Topic)中的消息。每个Consumer Group 都有一个唯一的标识符,所有属于同一组的消费者会协同工作,以保证一个组内的每条消息仅会被消费一次。
具体来说
每个Consumer Group内的每个消费者独立消费不同的分区(Partition)中的数据,一个分区只能被一个Consumer消费。
即使有多个消费者在同一个组内消费同一个Topic,Kafka也会确保每条消息只会被组内的其中一个消费者处理。这样极大地提高了消费的并发能力和处理速度,保证了消息的高效处理。
Consumer Group还可以实现负载均衡。当有新的消费者加入或离开组时,Kafka 会自动均衡分区的消费,将需要消费的分区重新分配给现存的消费者。
Kafka 中的 Consumer 是如何订阅 Topic 的?
Kafka 中的 Consumer 订阅 Topic 分为两种方式:自动订阅(Auto Subscription)和手动订阅(Manual Subscription)。
自动订阅:消费者使用 subscribe 方法,传入一个Topic 列表。如果 Topic 列表发生变化,消费者会自动调整.。
手动订阅:消费者使用 assign方法,传入一个Topic 和分区的列表。消费者只接收这些分区的数据,不会自动感知Topic 列表的变化。
Kafka 中的 Consumer Group 是如何进行负载均衡的?它如何保证高效消费?
在Kafka 中,Consumer的负载均衡主要通过分配分区(partion)给不同的消费者(consumer)来实现,每一个消费者实例(consumer instance)会处理一个或多个分区的数据,从而实现了并行的高效消费
具体的负载均衡过程如下:
每个 Kafka topic 由多个分区组成,消费者组中的每一个消费者实例会被分配一个或多个分区。
Kafka 集群中的一个消费者协调者(group coordinator)负责管理消费者组的成员关系,并分配分区。
当新的消费者加入或离开消费者组时,或者当 topic 的分区数发生变化时,会触发重新平衡(rebalance)操作,重新分配分区。
分配算法有多种实现方式,比如 Range、RoundRobin 等,这些算法依据不同策略将分区分配给消费者。
通过这种机制,Kafka 保证了消费者组内部的负载均衡和高效消费,从而最大化了系统的吞吐量和性能。
Kafka 中的 Controller 是什么角色?它在集群中的作用是什么?
Kafka 中的 Controller 是整个集群协调者,它是专门负责监控和管理 Kafka 集群中分区(partition)和副本(replica)状态的节点。在整个 Kafka 集群中,Controller 的角色是至关重要的,它帮助集群维持稳定,确保分区和副本的可用性和一致性。
Controller 在集群中的主要作用包括
Leader 选举:确定哪个副本成为分区的Leader 来处理读写请求。
副本管理:监控和管理副本的状态,确保同步副本集(ISR)的健康状态。
分区迁移:如果某个 broker 出现故障,Controller 负责重新分配其上的分区到其他可用 Broker 上。
Topic 创建和删除:管理 Topic 的创建和删除操作,并广播这些信息到集群中的所有 Broker。
Kafka 的 Controller Failover 是如何设计的?在 Controller 宕机时如何进行故障恢复?
Kafka的Controler是集群中负责管理各种元教据(如主题创建、分区分配、副本分配等)以及协同领导者选举的关键组件,Controller Failover是Kafka 保证高可用性的重要机制,具体来讲,当 Controller 宕机时,Kafka会通过 Zookeeper 选举出一个新的 Controller,以确保集群可以继续正常运行。
以下是 Kafka Controller Failover 的主要设计和流程
Zookeeper 作为协调者:每个Kafka Broker 启动时都会尝试在 Zookeeper 中创建一个待殊的节点 (
/controller)。因为这个节点使用的是Ephemeral(临时)节点类型,当创建该节点的 Broker 宕机时,这个节点会自动删除。竞争成为 Controller:一旦当前的 Controller 宕机,所有活着的 Broker 都会尝试在Zookeeper 中创建
/contoller节点,第一个成功创建这个节点的 Broker 会成为新的Controller,剩下的则会收到失败通知通知机制:新的 Controller 会在 Zookeeper中写入它的选举结果,并通过监听机制通知所有 Broker,这些 Broker 会更新它们本地的 Controller 缓存,从而指向新的 Controller
恢复任务:新当选的Controller需要快速完成集群状态的接管,包括重新分配分区副本、添加主题、调整副本同步等等,这些操作通过监听都会在Zookeeper节点和操作 Kafka 内部Topic(如
_consumer offsets) 完或
Kafka中 Zookeeper 的作用?
在 Kafka 中,Zookeeper 扮演了集群协调和管理的核心角色。它的主要作用是管理和协调Kafka 集群中的元数据,帮助 Kafka 实现高可用性、负载均衡和容错性
以下是 Kafka 中 Zookeeper 的几个关键作用
管理 Broker 元数据:Zookeper 负责管理 Kafka 集群中 Broker的注册、状态监控。当有新的 Broker 加入或离开集群时,Zookeper 能够及时更新集群状态.。
协调分区副本 Leader 选举:当某个分区的 Leader 副本故障时,Zookeeper 协调副本的选举过程,为该分区选出新的Leader,确保分区高可用。
管理消费者的 offset:在早期版本的 Kafka 中,消费者的 offset信息存储在Zookeeper 中,以便消费者在重启后可以从上次消费的位置继续消费,最新的 Kafka 版本将 offset 存储移至 Kafka 自身的内部主题
_consumer_offsets,减少了对 Zookeeper 的依赖。动态配置和负载均衡:Zookeeper 保存着Kafka 配置和拓扑信息,当集群发生变化时(如增加或减少分区、调整副本因子),Zookeper 协助完成负载均衡,.
Kafka为什么要抛弃 Zookeeper?
主要是为了简化架构、提升可扩展性和降低运维复杂性。Kafka引入了 KRaft(Kafka Raft)模式,即使用 Kafka 自身实现的 Raft 共识算法替代Zookeeper
以下是 Kafka 抛弃 Zookeeper 的几个主要原因
简化架构:Zookeeper 是一个独立的分布式协调服务,Kafka需要依赖它来管理元数据,Zookeeper 增加了系统复杂度和运维成本,尤其在处理集群的动态扩屏和管理时,Kafka和Zookeeper 之间的协调带来了额外的开销。通过 KRaft,Kafka 可以直接管理元数据,消除了对外部协调服务的依赖。
提升可扩展性:Zookeeper 的写入性有限,随着 Kafka 集群的规模增大,元数据的读写操作可能会对 Zookeeper 造成压力,进而成为 Kafka 扩展性的瓶颈,KRaft 模式通过将元数据存储在Kafka 自身中,并使用 Raft 协议来确保一致性,使 Kafka 的扩展能力得到提升。
降低运维成本:在生产环境中,Kafka 和Zookeeper之间需要进行一致性管理和维护,运维人员需要掌掘两套系统的部署、监控和故障排查,去掉 Zookeeper 后,Kafka只需维护自身的节点和协议,简化了运维流程。
Kafka工作流程

producer先从zookeeper的 “/brokers/…/state"节点找到该partition的leader
producer将消息发送给该leader
leader将消息写入本地log
followers从leader pull消息
写入本地log后向leader发送ACK
leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
Kafka 的 Topic 是什么?它的作用是什么?
Kafka 的 Topic是 Kafka消息系统中的一个逻辑规念,简单说来,它是用来区分和隔离不同类型消息的单位。每一个Topic都有一个名称,生产者将消息发送到个特定的 Topic上,而消费者从某个特定的 Topic 接收消息
其作用主要包括以下几点
消息分类:Kafka 通过 Topic 来对消息进行分类管理,生产者和消费者通过 Topic 来组织和订阅消息
隔离数据:不同业务或模块的数据可以通过不同的 Topic 隔离开,保证数据之间的独立性和安全性。
分区并行:每个 Topic可以有多个分区,消息会被分布到不同分区上,实现并行处理,提升系统的吞吐量和伸缩性。
在 Kafka 中,Partition 是什么?Partition 的划分对性能有什么影响?
在Kafka中,partition是指一个主题(Topic)中的一个分区,Kafka主题可以划分为多个分区,每个分区是一个有序的、不可变的消息序列。不同分区中的消息是并行地存体和处理的,这使得 Kafka 能够实现高吞吐量
Partition 的划分对性能有直接的影响
并行处理:更多的分区可以让多个消费者实例并行处理消息,从而提升系统的吞吐量。
负载均衡:通过增加分区数量,可以更好地分配负载,避免某个节点成为瓶颈。
数据局部性:分区可以分布在不同的代理节点上,提高数据的可用性和可靠性
Kafka中的Topic和Partition有什么关系?
在Kafka中,Topic和Partition是两个密切相关的概念。
Topic是Kafka中消息的逻辑分类,可以看作是一个消息的存储类别。它是按照不同的主题对消息进行分类,并且可以用于区分和筛选数据。每个Topic可以有多个Partition,每个Partition都是Topic的一个子集,包含了一部分特定的消息。
Partition则是Kafka 中实际保存数据的单位。每个Topic可以被划分为多个Partition,而这些 Partition 会尽量平均的分配到各个 Broker 上。当一条消息发送到Kafka时,它会被分配到一个特定的Partition中,并最终写入 Partition 对应的日志文件里。这个分配过程是根据Partition的规则来完成的,比如可以按照消息的某个属性进行哈希或者按照时间戳进行排序等。
因此,Topic和Partition的关系是,Topic是消息的逻辑分类,用于区分和筛选数据,而Partition则是Topic的物理划分,用于将消息分配到不同的部分中以便于处理和存储。Topic 和 Partition 的设计对于高吞吐量和横向扩展非常有用。因为生产者和消费者只需要根据 Topic 进行具体的业务实现,而不用关心消息在集群内的分布情况。而在集群内部,这些 Partition 会尽量平均的分布在不同的 Broker节点上,从而提高了系统 整体的性能和可伸缩性。
Kafka 中的分区分配策略有哪些?如何选择合适的策略?
Kafka 中有三种主要的分区分配策略:Range(范围),RoundRobin(轮询),Sticky(粘性)。具体如何选择合适的策略取决于实际使用场景和需求。
Range(范围):按照范围将分区分配给消费者,这种策略比较简单,适合分区数和消费者数大致相同的情况。
RoundRobin (轮询):按照均匀分配的方式将分区分配给消费者,适合分区数和消费者数都很大的情况,能够实现负载均衡。
Sticky(粘性):在确保分区均匀分配的同时,尽量保持上一次分配的结果,减少分区重新平衡导致的延迟和性能损失。
Kafka 的内部状态是如何管理的?如何通过状态管理优化性能?
Kafka 的内部状态管理主要依赖于 Zookeeper 和 Kafka 内部的元数据存储机制。来协调和管理集群的各个部分,Kafka 使用 Zookeeper包括管理 brokers、topics、partitions、以及 consumer offset 信息。
另外,Kafka 在内部会使用内存存储和磁盘存储来确保消息的可靠性和高效读取。
为了优化 Kafka 的性能,可以从以下几个方面入手
精心设计 topics 和 partitions 数量,可以提高并发处理能力。
优化 producer 和 consumer 的配置,使得消息的发送和接收更加高效。
合理配置 Kafka brokers,优化内存和磁盘使用,提升数据存取速度。
利用 Kafka Streams 模块进行数据流处理,提供状态存储与管理功能来优化计算性能。
Kafka的消费消息是如何传递的?
在Kafka中,消息的传递主要涉及三个环节:生产者生产消息、broker保存消息和消费者消费消息。
生产者生产消息:生产者负责将消息发布到Kafka broker。在发布消息时,生产者需要指定目标主题。消息被写入后,将被存储在指定分区的当前副本中。当发送消息失败时,生产者还会提供确认以及重试机制,以保证消息能够正确的发送到 Broker 上。
broker保存消息:Kafka broker接收到生产者发送的消息后,会将其存储在内部的缓冲区中,等待消费者拉取。当消费者向broker发送拉取请求时,broker会从缓冲区中获取消息并返回给消费者。Kafka broker能够保证消息的可靠性和顺序性,即使在异常情况下(如服务器崩溃),也能够保证消息不会丢失。
消费者消费消息:消费者从Kafka broker中订阅指定的主题,并拉取消息进行消费。消费者可以以同步或异步的方式拉取消息,并对拉取到的消息进行处理。当消费者处理完消息后,会向Kafka broker发送确认消息,表示消息已经被成功处理。这样可以保证消息被正确处理且不会重复消费。 总体来说,Kafka通过生产者、Kafka broker和消费者的协同工作,实现了高吞吐量、高可靠性和高可扩展性的消息传递。
Kafka中的消息如何分配给不同的消费者?
Kafka中的消息是通过分区(Partition)分配给不同的消费者的。Kafka将每个Topic划分为多个Partition,每个Partition存储一部分消息。消费者通过订阅Topic来消费消息,而Kafka将Partition中的消息按照一定的分配策略分配给消费者组中的不同消费者。
Kafka提供了多种分区分配策略,用于确定如何将分区分配给消费者。例如:
RoundRobin 轮询策略:Kafka将Partition按照轮询的方式分配给消费者组中的不同消费者,每个消费者依次获得一个Partition,直到所有Partition被分配完毕。当消费者数量发生变化时,Kafka会重新分配Partition。
Range 范围策略:Kafka将Partition按照Range的方式分配给消费者组中的不同消费者,每个消费者负责处理指定范围内的Partition。这种分配方式适用于Topic的Partition数量较少,而消费者数量较多的情况。
Sticky 粘性策略: 尽量保持每个消费者在一段时间内消费相同的分区,以减少分区重新分配的频率
当消费者处理完一个Partition中的所有消息后,它会向Kafka发送心跳请求,Kafka会将该Partition分配给其他消费者进行处理。这种机制确保了消息在不同的消费者之间负载均衡,并提高了容错性。如果一个消费者出现故障,其他消费者可以继续处理Partition中的消息,而不会导致消息丢失或重复处理。
说一下 Kafka 中关于事务消息的实现?
Kafka 的事务消息不同于我们理解的分布式事务消息,它的事务消息是实现了消息的Exactly Once 语义,即保证消息在生产、传输和消费过程中的“仅一次”
传递Kafka 的事务消息主要通过以下几个核心组件来实现
事务协调器(TransactionCoordinator):负责事务的启动、提交和中止管理,并将事务状态记录到transaction_state 内部主题.
幂等生产者 (lidempotent Producer):Kafka Producer 通过 producer ID(PID)识别每个事务的唯一性,确保同一事务的每条消息只写入一次。
事务性消费:在消费过程中,消者可以选择隔离未完成事务的数据(通过 readcomited 设置),只消费已提交的事务消息,确保数据的最终一致性。
Kafka 事务消息流程
启动事务:事务性生产者向 Transaction coordinator 请求启动事务。
生产消息:生产者开始向 Kafka写入事务消息,每条消息都带有唯一的Producer ID和 sequence Number,以保证幂等性。
提交事务:在所有消息写入完成后,生产者向事务协调器发送commit或 abort 请求,提交或中止事务。
事务性消费:消费者可以通过设置 readcommitted 隔离级别,仅消费已提交的消息,实现最终数据一致性。
Kafka 是如何保证 Exactly Once 语义的?它的实现原理是什么?
Kafka 为了实现 Exacty Once 语义,采用了事务机制和幂等性生产者的功能。具体来说,它通过以下几方面来保证 Exactly Once 语义
幂等性生产者:Kafka 引入了幂等性生产者,通过给每一条消息分配一个唯一的Producer ID和 Sequence Number,确保生产者在多次发送同一消息时,Broker 只会接受一次,从而避免了重复数据的产生
事务:Kafka 事务允许一组生产者写入在一个原子操作内完成,这意味着要么所有的写入都成功,要么都失败。事务保证将一系列的消息提交到多个Topic 和分区上时,保证其一致性和隔离性。
Consumner偏移量管理:Kafka 在 Consumer端采用了称为 Consumer Group的方法来跟踪偏移量,这能保证每个消息只会被一个Consumerr Group外理一次,再结合事务特性,确保消息不会被重复消费
Kafka 的事务机制与幂等性机制如何协同工作?它们在保证消息一致性上有什么作用?
Kafka 的事务机制与幂等性机制主要用于保证消息的一致性与可靠性,特别是在处理分布式数据流和确保一次及仅一次语义时。
事务机制:Kafka 的事务机制允许消费者组和生产者协调一致地提交或撤销一组消息,确保整个事务中的消息要么全部被处理,要么全部不处理,达到“原子性”和“一致性”的效果。
幂等性机制:Kafka的幂等性机制主要用于确保生产者发送的消息即使重复发送,也只会被消费者处理一次,即所谓的“Exactly Once”语义。这是在存在可能网络失败或重试情况下避免消息重复消费的关键
两者协同工作时,幂等性机制确保每条消息在Kafa 中只会被处理一次,而事务机制进一步确保消息的原子性操作,让消息处理具备更高的一致性,防止部分消息成功而其他部分失败的情况。
Kafka 是如何保证消息顺序性的?在什么场景下顺序性是必须的?
Kafka 通过分区(Partition)机制和消息键(Message Key)来保证消息的顺序性,在Kafka中,每个opic可以分为多个分区,每个分区内的消息都是有序的,因此,Kafka 提供了有限度的顺序性保证,具体来说
在同一个分区内,消息是有序的。
靠消息键将相关消息分配到同一分区,可以保证这些消息在同一分区内依然有序。
在某些场景下,消息的顺序性是十分关键的
金融交易系统:交易指令必须按正确的顺序执行,例如银行的转账操作。
日志聚合:日志事件需要按发生的时间顺序进行处理,以便准确地重现事件顺序。
库存管理系统:商品的出入库操作必须按照操作顺序执行,否则会造成库存记录的混乱。
流媒体服务:视频或者音频流的帧数据需要按照播放顺序发送,否则会影响用户的观看体验。
Kafka 如何保证消息的严格顺序性?在高并发场景下如何优化顺消费?
Kafka 如何保证消息的严格顺序性:
分区(Partition)层面:确保生产者将同一类型的消息发送到特定分区。Kafka 保证一个分区内的消息是按顺序存储和消费的。
消息键(Key):使用消息键(Kev)来控制消息的分区。相同的 Key 总是被路由到同一个分区,从而保证了具有相同 Key 的消息顺序。
单生产者线程(Single Producer Thread):确保生产者是单线程的或使用有序的发送机制,这样就不会因多线程的并发发送而打乱顺序.。
生产者中的分区器(Partitioner):Kafka 的自定义分区器可以确保相同 Key 的消息始终发送到同一个分区。
高并发场景下如何优化顺序消费:
并行处理逻辑:在消费端,可以通过拆分步骤来并行处理部分无顺序依赖的逻辑,从而提高整体吞吐量
异步处理:利用异步处理机制处理消息,但需要确保消息的核心逻辑是顺序执行的,从而保证顺序。
多线程消费:在不同消费组中根据分区并行消费,但仍需每个分区内的消费线程按照顺序处理消息。
Kafka如何保证消息可靠?
为了保证消息在传递过程当中,消息不会丢失或者被重复传递,Kafka 设计了非常多的重要机制来保证消息的可靠性。例如
数据冗余:Kafka通过将消息副本(replica)的方式来实现数据冗余,每个topic都可以配置副本数量,副本数量越多,数据可靠性越高,但会占用更多的存储空间和网络带宽。在 Kafka 中,针对每个 Partition,会选举产生一个 Leader 节点,负责响应客户端的请求,并优先保存消息。而其他节点则作为 Follower 节点,负责备份 Master 节点上的消息。
消息发送确认机制:Kafka支持对生产者发送过来的数据进行校验,以检查数据的完整性。可以通过设置生产者端的参数(例如:acks)来配置校验方式。配置为 0,则不校验生产者发送的消息是否写入 Broker。配置为 1,则只要消息在 Leader 节点上写入成功后就向生产者返回确认信息。配置为-1 或 all,则会等所有 Broker 节点上写入完成后才向生产者返回确认信息。
ISR机制:针对每个 Partition,Kafka 会维护一个 ISR 列表,里面记录当前处于同步状态的所有Partition。并通过 ISR 机制确保消息不会在Master 故障时丢失。
消息持久化:Kafka将消息写入到磁盘上,而不是仅在内存中缓存。这样可以保证即使在系统崩溃的情况下,消息也不会丢失。并且使用零拷贝技术提高消息持久化的性能。
消费者确认机制:Kafka消费者在处理完消息后会向Kafka broker发送确认消息,表示消息已经被成功处理。如果消费者未发送确认消息,则Kafka broker会保留消息并等待消费者再次拉取。这样可以保证消息被正确处理且不会重复消费。
这些机制的组合确保了 Kafka 中消息的高可靠性和持久性,使得 Kafka 成为可靠的消息传递系统,适用于各种实时数据处理和日志聚合需求。
在 Kafka 中,如何处理消息丢失问题?有哪些常见的应对策略?
在 Kafka 中,处理消息丢失问题的主要方法包括
使用适当的确认机制(Acknowledgments)
配置多个副本(Replication)和耐久性(Durability)
配置合理的消费偏移(Consumer Offsets)
启用幂等生产者(ldempotent Producer)和事务(Transactions)
在 Kafka 中,如何处理消息重复消费的问题?有哪些解决方案?
在 Kafka 中,消息重复消费是一个常见的问题,主要因为 Kafka 提供了至少一次的交付语义,让消费者可能会因为重新平衡或者崩溃恢复等原因而重新消费之前已经处理过的消息。在处理消息重复消费的问题时,可以采取以下几种解决方案
消费者端的幂等性处理。
使用 Kafka 幂等性特性和事务支持(ldempotent Producer 和 Transactions)。
在应用层实现去重逻辑。
Kafka 的幂等性是如何保证的?它在消息处理中的作用是什么?
Kafka的幂等性主要是通过Producer 的幂等性(Idempotent produce) 来保证的,它依靠一个叫做 prodncer ID (PID)和 sequence number的机制来实现。每个Kafka Producer都会被分配一个唯一的PID,当 poducer 发送消息到某个分区时,它会用一个说递增的 sequence number 来标识这条消息。如果同一条消息由于各种原因被多次发送到 Kafka,Kafka Broker会检查 sequence number,并丢弃重复的消息,从而确保消息在同一分区内被准确地存储一次。
在消息处理中的作用主要有两个方面
防止重复消息:避免了网络重传和系统故障导致的消息重复处理,从而确保消息的唯一性
提升数据一致性:通过幂等性保证,消费者读取数据时可以确信数据的精确性和一致性。
Kafka中的消息是如何存储的?
Kafka 中的消息是以文件的方式持久化到磁盘中进行存储的,这是 Kafka 的一个关键特性,确保消息的可靠性和可用性。Kafka中的消息是通过以下方式进行存储的:
Partition 分区:Partition是Kafka中消息存储的基本单位,每个Topic下的消息都会被划分成多个Partition进行管理。每个Partition都是一个有序的、不变的消息队列,消息按照追加的顺序被添加到队列尾部。
Segment 分块:Partition会被进一步划分成多个Segment,Segment是逻辑上的文件组,方便进行数据的管理和查找。每个Segment里都包含多个文件,这些文件名相同且被集合在一起。
文件索引:Segment中的每个文件都有自己的索引文件和数据文件,索引文件存储了当前数据文件的索引信息,而数据文件则存储了当前索引文件名对应的数据信息。
消息偏移:Kafka中的每个消息都会被分配到一个特定的Partition中,然后根据Partition内的Segment划分,被存储到对应的数据文件中。消息的偏移量信息则会被记录在索引文件中。
持久化:Kafka中的每个消息都包含一个64位的偏移量,该偏移量表示消息在Partition中的位置。当消费者读取消息时,可以通过偏移量信息来确定需要从哪个位置开始读取。
Kafka 的消息存储是基于日志文件和分区的,确保了消息的可靠性、持久性和高吞吐量。消息被追加到日志文件中,每个消息都有唯一的偏移量,分区和副本机制保证了数据的冗余存储和可用性。这种设计使 Kafka 成为一个可信赖的消息传递系统,适用于各种实时数据处理、日志聚合和事件驱动应用程序。
Kafka 的消息是如何持久化的?它默认的存储机制是什么?
Kafka 的消息持久化主要依赖于它的一个核心组件:日志文件(log files)。Kafka 会将消息分成若干个段(segment),存储数据,并将这些段保存在磁盘上。每条消息会被追加到当前的日志文件的末尾,Kafka 默认通过顺序写入的方式来,这样的方式使得磁盘 I/O 效率非常高。另外,Kafka 使用了零拷贝 (zero-copy) 技术来进一步提升效率。
Kafka 如何保证消息的持久性和高可用性?
Kafka 是一个分布式流处理平台,其设计保证了消息的持久性和高可用性。它通过以下方式实现这一目标
消息持久性:Kafka 使用磁盘进行消息存储,确保即使在系统故障的情况下,消息也不会丢失。具体措施包括
分区:Kafka 将每个主题分成多个分区,每个分区是有序且持久的日志。分区方便了数据的存储和读取。
日志分段和索引:每个分区被分段为多个日志段,分段之后的日志文件会以可配置的方式进行轮转。Kafka 还会为每个消息生成索引,以快速定位消息。
文件系统的强制刷新:Kafka 使用页缓存来提高磁盘 I/O 性能,并定期调用 fsync 系统调用,将数据从页缓存刷新到磁盘,确保数据持久化。
高可用性:Kafka 通过复制机制和分布式架构来实现高可用性,具体包括
副本(Replica):每个分区有一个主副本(leader)和若干个从副本(Follower)。主副本处理读写请求并将数据同步到从副本,从副本在主副本失败时能顶上处理。
ISR(ln-Sync Replica):Kafka 维护一个同步副本集合,只有在 ISR 中的副本才被认为是健康的,从而保证了高可用性。
ACK机制:在生产者发送消息时,可以配置不同的确认级别(acks),例如 acks=all 则需要等待所有 ISR 中的副本确认收到消息,进一步提高可靠性。
Kafka 的高可用性是如何实现的?当 Broker 宕机时,如何保证服务不受影响?
Kafka 的高可用性主要通过以下几个关键机制来实现
多副本机制(Replication):Kafka 中的每个分区都有多个副本(Repicas),这些副本分布在不同的 Broker 上,当一个 Broker 宕机时,其他特有该分区副本的 Broker能够接管工作。
leader-Folow模式:每个分区有一个Leader 副本和若于Follower副本、生产者和消费者只与Leader副本交互,而 Follower 副本则被用来备份数据,当Leader副本所在的 Boker宕机时,一个新的 Leader 会被进举出来。
ZooKeeper 协调:Kafka使用 ZooKeeper 进行分布式协调和元数据管理,当 Broker宕机时,ZooKeeper负责通知集群其他部分,并触发Leader 选举过程。
当某个 Broker 宕机时,Kafka 保证服务不受影响的方式主要体现在以下几个方面
自动选举新 Leader:ZooKeeper 会检测到 Broker 宕机,然后触发新 Leader 的选举过程。新的 Leader 选举出来后,继续对外提供服务。
数据冗余:由于存在多个副本,即使一个 Broker 宕机,其他副本仍然可以保证数据的完整性和高可用性。
分区再均衡(Rebalance):Kafka 会将宕机 Broker 上的分区自动重新分配到其他可用的 Broker 上,确保整个集群负载均衡,
在 Kafka 中,如何通过 Acks 配置提高数据可靠性?Acks 的值如何影响性能?
在Kafka中,可以通过配置 acks 参数来提高数据的可靠性。acks参数有以下几个配置选项,每个选项都会对性能和数据可靠性产生不同的影响
acks=0:生产者不会等待任何服务器的确认。消息可能会丢失,但性能最高。
acks=1:生产者会在领导者副本(leader)成功接收到数据后收到确认。数据可靠性得到了基本保障,但如果领导者副本崩溃,仍有可能丢失消息.
acks=all (或 -1):生产者会等待所有同步副本(ISR)接收到数据后收到确认。数据可靠性最高,但性能会有所下降,因为需要等多个副本都确认接收。
Kafka 的副本机制是如何实现的?它对数据可靠性有何保障?
Katka的副本机制主要通过分区副本(replica)和领导者副本(leader)实现,每个主题(topic)中的分区(partition)会有一个领导者副本和多个跟随副本(follower),领导者副本负责外理所有的读写请求,而跟随副本则定期从领导者副本中拉取数据,保持数据的一致性。当领导者副本宕机时,会在跟随副本中选出一个新的领导者,确保数据的连续性和可用性。通过这种机制,Kaka 确保了数据的可靠性和一致性。
领导者副本:每个分区都有一个领导者副本,负责处理所有的读写请求。
跟随副本:其他副本作为跟随副本,它们从领导者副本中拉取数据,保持数据的一致性。
ISR(In-Sync Replicas):处于同步状态的副本集合,仅包括那些跟上领导者副本进度的副本。
Kafka 中的 ISR(In-Sync Replica)是什么?它如何保证消息的可靠性?
在Kafka 中,ISR (In-Sync Replica)是一组与 Leader 副本保持同步的所有副本。具体来说,ISR 包含那些能够及时复制 Leader 副本中最新消息的副本。
ISR中的副本保证了它们的数据与Leader的数据一致或者仅仅落后很少量的数据,这些副本在副本集合中被认为是“同步”的。
Kafka 使用 ISR 来保证消息的可靠性。具体机制如下
当 Producer 发送消息到 Kafka 时,消息首先会被写入到 Leader 副本。
随后,Leader 副本会将这条消息复制到 ISR 集合中的所有副本。
一旦所有 ISR 副本成功复制了这条消息,Leader 副本会发送确认给 Producer,表示消息已经被可靠存储
在 Kafka 中,什么是 Leader 和 Follower?它们在副本机制中如何协同工作?
在 Kafka 中,leader和 Follower 是副本机制中两个关键的角色,每个分区都由一个Leader 和若干个Folowers 组成,leader 负责处理所有的读写请求,而 Follower 则单纯从 Leader 那里同步教据,这种结构确保了数据的高可用性和容错性。
Leader:每个Kafka 分区都有一个Leader,Leader 负责处理所有的读和写请求,并且是唯一的业务逻辑的来源。
Follower:Follower 是其他的副本,它们不断从 Leader 那里拉取数据以保持同步。当 Leader出现故障时,一个合适的 Follower 会被选举成为新的 Leader
Kafka 的 Offset 是什么?如何追踪消息的消费进度?
Kafka 的 offset(偏移量)是指在 Kafka分区(Partition)中,每条消息对应的唯一标识。offset从0开始递增,是判断消息在分区中的位置的重要依据。
追踪消息的消费进度,核心就是追踪 offset 的进度,Kafka 通过 Consumer Group(消费者组)管理消费进度,每个消费者都维护一个 offset 状态,这个状态会记录每个分区中各自的消费偏移量,具体方式如下
自动提交 Offset:通过配置
enable.auto.commit=true参数,消费者会定期自动提交其 Offset。手动提交 Offset:如果程序中需要更精确地控制 Offset 提交,可以通过 commitSync() 或commitAsync() 方法手动提交 Offset。
Kafka 的反压机制是如何实现的?如何避免生产者压垮消费者?
Kafka的反压机制主要通过调节发送速率和分区的流量控制来实现。具体来说,它提供了多个控制点,如批量发送、消息积压检测、消费者消费速率调节等。为了避免生产者压垮消费者,Kafka可以针对不同的情况采取如下儿几种措施
配置适当的 linger.ms 和 batch.size 参数,控制消息发送的频率和每次发送的消息大小,这样可以减缓生产者的压力。
通过设置 acks 参数确保消息在被写入多个副本之前,生产者会等待响应。
使用流量控制和限流机制,保证生产者不会发送超出消费者处理能力的消息量。
调优消费者的处理能力,提高消费者在高峰时刻的处理速度,包括采用多线程或分布式的消费模式
Kafka 的流量控制是如何实现的?如何通过流量控制避免系统过载?
Kafka的流量控制主要通过两种方式实现:限速(Rate Limiting)和背压(Backpressure)
限速 (Rate limiting):通过配置限速参数来控制生产者和消费者的流量速率。例如,Kafka 生产者可以通过参数
max.in.flight.request.per.connection和linger.ms来配置消息的发送速率,对于消费者,可以通过参数fetch.min.bytes和fetch.max.wait.ms来控制拉取消息的速率。背压(Bacdkpresure):通过阻塞和调节机制来防止系统过载。Kafka 的消费者可以通过手动提交偏移量的方式控制消息的处理进度,从而避免消耗过快导致消费端过载。另外,Kafka 内部实现的一些缓冲区和队列机制也有助于调节数据流量。
通过这些流量控制手段,可以有效避免系统过载,确保Kafka的运行稳定和高效
Kafka 中的批量消费是如何工作的?如何通过批量消费提高处理效率?
Kafka 中的批量消费是通过一次性拉取多个消息(称为批次或批量)来工作,而不是每次一条消息。这种方式不仅减少了网络开销,还能够更好地利用 CPU和 IO 资源,从而提高处理效率,具体来说,批量消费通过如下机制来实现
消费者从 Kafka broker 中拉取消息时,可以指定每次拉取的最大消息数( max.poll.records )
消费者会等待直到足够多的消息到达,或者达到指定的超时时间(fetch.max.wait.ms),然后一起处理这些消息。
一旦消费者拉取到一批消息,应用程序就可以对这批消息进行处理,通过批量处理,可以减少每次处理的开销(例如数据库插入、文件写入等操作的开销),从而提高整体的处理效率
此外,通过调整这两个配置参数,消费者可以根据具体需求灵活地控制批量消费的行为。
在 Kafka 中,如何实现消息的过滤?常见的消息过滤策略有哪些?
在 Kafka 中,消息过滤通常通过以下几种策略实现
生产者端过滤:在发送消息之前,生产者根据预定义的条件过滤消息。
消费者端过滤:消费者在消费消息时,基于某种逻辑判断是否处理这条消息。
Kafka Streams 和 KSQL:利用 Kafka 提供的流处理框架 Kafka Streams 或 KSQL,实现在数据流转时对消息进行过滤
Kafka 的日志压缩功能是如何实现的?它在什么场景下使用?
Kahka的日志压缩功能是通过保留每个唯一键的最新消息来实现的。在启用了日志压缩的主题中,Kafka不会出除所有旧的数据,而是保留每个键的最后一条消息,这样确保每个键在日志中总有唯一的一条最新消息。这主要依赖于 Kafka 的 Cleaner 线程,它会周期性地扫描日志并出除重复的消息,只保留每个键的最新版本。日志压缩通常在需要保留每个键的最新状态的场景下使用,例如数据库变更日志或持久化的消息状态。
Kafka 的日志分段机制是如何工作的?如何通过分段优化存储?
Kafka 的日志分段机制主要是通过将日志文件分段存储来实现的。日志被分为多个较小的段(Segment),每个段由多个消息(Message)组成,段文件有助于管理和维护日志存储。通过这种方式,可以优化存储并提高读写效率。
日志分段:Kafka 会将每个Topic分为多个分区(Partition),每个分区对应一个日志文件。为了便于管理,日志文件被进一步分割为多个较小的段。每个段文件有其固定的大小或者时间间隔,超过该大小或时间后会创建新的段。
索引文件和数据文件:每个段文件包含两部分,即数据文件和索引文件。数据文件存储实际的消息,索引文件存储偏移量和对应的物理地址,通过索引文件,可以快速定位到具体的数据文件中的消息。
老段清理机制:Kafka 通过配置策略(如日志保留时间或日志保留大小)定期地清理旧的消息段,减少存储占用。这是关键的存储优化措施之一。
Kafka 是如何实现横向扩展的?它如何处理大规模集群中的负载均衡?
Kafka 通过分区的设计实现了横向扩展。每个Kafka 主题(Topic)都会被划分为若干个分区(Partition),这些分区可以分布在不同的 Broker(代理)上。这样,当有新的 Broker 加入集群时,Kafka 可以通过重新分配分区的方式来将数据和负载均衡地分布在各个 Broker 上。
此外,Kafka 使用分区的方式实现了负载均衡,生产者(Poducer)可以按照一定的策略(例如轮询、按键哈希等)将消息发送到不同的分区,而消费者(Consumer) 则可以从各自的分区中并行消费消息,从而实现负载的均匀分散。
Kafka 的集群如何进行扩展?扩展过程中需要注意哪些问题?
Kafka 集群扩展主要是通过增加新的 Broker 节点到现有集群中来完成的。具体步骤如下
新增 Broker 节点:配置并启动新的 Kafka Broker,确保新节点能访问到现有 ZooKeeper 集群。
修改配置文件:为新节点配置
server.properties文件,设置必要的参数比如broker.id、log.dirs、zookeeper.connect重新分配分区和副本:使用 Kafka 提供的工具重新分配分区和副本,这样可以均衡各个 Broker 的数据和负载。具体命令有:
kafka-reassign-partitions.sh脚本生成分区计划修改分区计划 JSON 文件
执行分区计划
- 监控和验证:监控新节点的状态和集群的整体性能,确保新节点正常工作
Kafka 如何保证在集群扩展或缩容时数据的安全性和一致性?
Kafka 在集群扩展或缩容时主要通过以下方式来保证数据的安全性和一致性
分区副本机制:Kafka 为每个分区创建多个副本(通常至少三个),这些副本分布在不同的Broker上,以避免单点故境,不是在扩容还是缩容时,Kafka 都会保证至少有一个副本处于ISR (In-Syn Replica)集中,该集中包含了所有与Leader同步的副本。
Leader 和 Folower 模型:Kafka 在集群中为每个分区选举一个Leader,其他的副本则成为Follower,扩容或缩容时,会重新分布分区,确保每个Leader 都能处理写请求,并同步到Follower上
Contoller和 Rebalance 机制:Kafka 集群中有一个Contoller节点,负责分区的Learder的选举、分区的重新分配以及管理集群内的元数据售息、在扩容或缩容时,Contoller 会协调重新分配分区至不同的Broker上,同时确保数据安全和高可用。
Reassignment Tool:Kafka 提供了一些工具和API,实现平滑的分区重新分配。这些工具会将数据从旧的Broker同步到新的Broker,并在后台进行数据迁移,不会影响现有的生产和消费操作。
Kafka的索引设计有什么亮点?
Kafka 索引设计的亮点在于稀疏索引、段文件机制、顺序写入以及基于消息位移(Offset)的查找方法。
稀疏索引:Kafka 使用稀疏索引(Sparse Index)进行快速查找。稀疏索引只存储每隔一定间隔的消息位置,而不是对每条消息都建立索引,从而大幅减少内存占用。
段文件机制:Kafka 将日志文件分成多个段文件(segment file)存储,每个段文件包含一个日志文件和对应的索引文件。当文件达到增定大小或时间限制时,Kafka 会包建新的段文件,旧段文件则根据策略自动删除或压缩。
顺序写入:Kafka 通过顺序写入文件,降低了磁盘的随机写入成本,极大提升了写入性能。
基于 Offset 的顺序查找:Kafka 通过消息位移(offset)定位消息,结合稀疏索引,避免逐个扫描,快速定位到近似位置后再进行顺序查找,保证高效读取。
Kafka 索引主要用于快速定位消息的位置,Kafka中的消息通过 offset进行编号,并使用稀疏索引记录消息在日志文件中的偏移量和物理位置,用户查询的时候,可以通过索引快速查找对应的消息.
什么是ISR 机制?
Kafka根据副本同步的情况,分成了3个集合:
AR(Assigned Replicas):包括ISR和OSR
ISR(In-sync Replicas):和leader副本保持同步的副本集合,可以被认为是可靠的数据
OSR(Out-Sync Replicas):和Leader副本同步失效的副本集合
当 kafka 副本同步机制是所有follower都同步成功才返回 ack 给生产者时,如果有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader保持同步的follower集合。根据follower发来的FETCH请求中的fetch offset判断ISR中的follower完成数据同步是否成功。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
ISR(In-Sync Replicas ):与leader保持同步的follower集合
AR(Assigned Replicas):分区的所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
AR=ISR+OSR。
如何确保Kafka集群的高可用?
Kafka设计了多种机制,共同保证集群的高可用性:
分布式架构:Kafka集群通常由多个Broker组成,每个Broker存储部分数据副本。这样,即使某个Broker出现故障,其他Broker也可以继续处理和存储消息,从而保证整体的高可用性。
数据冗余:Kafka通过数据冗余来保证高可用性。每个Topic的数据会被分成多个Partition,并在多个Broker上进行复制。即使某个Broker出现故障,数据仍然可以从其他Broker中获取。
副本机制:副本是Kafka实现高可用性的重要手段。Kafka中的每个Partition都有多个副本,这些副本分布在不同的Broker上,从而在部分Broker故障时,仍然有足够的副本可用以保证高可用性。
分区领导者选举:在Kafka中,每个Partition都有一个领导者(Leader)和零个或多个追随者(Follower)。当领导者不可用时,追随者会进行领导者选举,以保证系统的可用性。
消费者组实现负载均衡:Kafka的消费者可以组成消费者组,通过消费者组,可以将负载均匀地分配到多个消费者上,从而避免单个消费者的性能瓶颈,提高整个Kafka集群的可用性。
故障检测和恢复: Kafka 会使用 Zookeeper 等组件协助监控和管理集群的状态。当检测到故障节点时,就会自动将不可用的节点从集群中排除。而等到故障节点恢复后,也会重新将节点加入到集群当中。 (Kafka 2.8 后面可以脱离了 zookeeper启动,用 kraft 协议了,但后面的版本仍然支持 zk,但是不推荐使用了。)
集群高可用性是 Kafka非常关键的设计之一。通过多项机制组合,使得 Kafka 可以成为处理关键业务数据的可信平台。
Kafka中的消费者偏移量是如何管理的?
在Kafka中,消费者偏移量是指消费者在处理消息过程中所处的位置。Kafka中的消费者偏移量由两部分组成:Topic和Partition。对于每个消费者组,Kafka都会为其维护在每个 Partition 上的偏移量,以便在处理消息时可以准确地跟踪进度。
消费者偏移量的管理可以通过以下方式进行:
手动提交偏移量:消费者可以通过调用commitSync或commitAsync方法手动提交偏移量到Kafka。手动提交偏移量的方式需要开发者在适当的时机调用提交方法,确保消费者处理完消息后再提交偏移量。这种方式对于灵活性和精确控制偏移量非常有用,但需要开发者自行考虑提交的时机和异常处理。
自动提交偏移量:消费者可以配置为在后台自动提交偏移量。这意味着消费者会定期自动将已经处理的消息的偏移量提交给Kafka,而不需要开发者手动处理。通过配置参数enable.auto.commit为true,以及设置auto.commit.interval.ms参数来控制自动提交的频率。自动提交偏移量简化了管理,但可能会导致消息的重复处理或丢失,因此需要根据具体业务场景谨慎配置。
总之,Kafka 消费者的偏移量管理是确保消息传递的可靠性和一致性的重要部分。它允许消费者灵活地管理消息的消费进度,以满足不同的应用需求。无论您选择自动还是手动管理偏移量,都需要确保偏移量的正确提交,以避免消息的重复消费。
Kafka 如何处理数据倾斜问题?有哪些优化手段可以均负载?
Kafka 处理数据倾斜问题主要是从均衡数据分区和优化生产者、消费者策略来进行的。有几种主要的优化手段
合理设计分区键
增加分区数量
调整分区副本因子
使用自定义分区器
动态调整策略
使用流控和限流机制
说一下 Kafka 为什么性能高?
Kafka 性能高的原因有很多,这里列举几个比较常见的点
顺序写:磁盘顺序写的性能是远远大于随机写的性能的。Kafka 将消息按顺序(不断追加写文件的方式)写入磁盘,从而提高写磁盘的性能。
页缓存:这个主要使用到了 Linux 系统底层的一个机制,即 page Cache,又称页缓存,当消息写入到page Cache之后立刻返回,然后等到系统的刷盘线程进行刷盘操作之后,页缓存将对应的内容一次性写入磁盘。之所以快是因为我们写入内存的读度是远远大于写入磁盘的速度的,然后我们写入内存之后,再将内存中的东西批量一次性写入磁盘,这个讨程就是聚集写的讨程,从一定程度上减少了 IO 的次数。两种机制结合起来,从一定程度上提高了 kafka 的刷盘性能。
Kaka支持批量接收和发送消息,并且支持消息在压缩之后进行接收和发送,这样从一定程度上就减少了网络传输的次数和负担,提高了Kafka 消息读写以及消息进行网络传输的效率。
零拷贝:这个应该是面试中问的最多的一个点,也是Kakfa 和 RocketMQ 高效的原因。Kafka的 Borker传递数据给消费者的过程使用了零拷贝技术,其底层主要就是使用了 sendfile 系统调用,减少了系统用户态与内核态的上下文切换以及数据拷贝,从而提高Kafka 数据拷贝的性
Kafka 采用分段与索引的策略来提高性能,即利用偏移量和时间索引文件实现快速消息查找,从而提高 Kafka 消息查找的效率,
在 Kafka 中,如何优化磁盘 I/O 性能?有哪些策略可以减少 I/O开销?
在 Kafka 中,优化磁盘 I/O 性能的策略主要包括
增加分区数和副本数:增大分区数能让写操作分散到多个磁盘上,从而减少单个磁盘的 I/O 压力,而增加副本数可以提供更多的读取通道。
使用高性能磁盘:选择高性能的 SSD 替换传统 HDD 磁盘,以提高 I/O 性能。
合理配置操作系统和 Kafka 参数:例如增加 linux的文件系统缓存、调节 Kafka的
num.io.threads和log.flush.interval.messages参数。使用磁盘的 RAID 配置:通过 RAID 0或 RAID 10 配置磁盘来提高读写速度。
优化 Kafka 的批量操作:调整
batch.size和linger.ms配置,让 Kafka 可以批量处理消息,从而提高磁盘 I/O 性能.启用 Kafka Tiered Storage:将较老的数据迁移到对象存储或其它较慢的存储介质上,保留热数据在高速磁盘上。
什么是“零拷贝”?有什么作用?
零拷贝是操作系统提供的一种优化 IO 操作的重要机制。通过零拷贝技术,操作系统可以极大的减少在一次 IO 操作中,数据从一个内存区域复制到另一个内存区域的次数,以及在此过程中对 CPU 的性能消耗。零拷贝技术可以极大的提高数据传输的效率,避免不必要的数据拷贝,从而降低系统负载。
零拷贝有两种实现方式,mmap文件映射和sendfile文件复制。
mmap机制主要依赖于内存区域映射技术,可以减少一次 IO 操作中,内核态与用户态之间的数据传输,从而减少因为上下文切换而带来的 CPU 性能开销。mmap机制通常适合于对大量小文件的 IO 操作,Kafka 大量的运用 mmap 机制加速 Partition 日志文件的读写过程。
sendfile主要依赖于 DMA 数据传输技术,采用一组单独的指令集来进行负责数据在内存不同区域之间的拷贝过程。这样就不再需要 CPU 来进行复制,从而减少 CPU 性能消耗,让 CPU 可以用于更重要的计算任务。sendfile通常适合于大文件的拷贝传输操作,Kafka 大量的运用 sendfile 机制,加速消息从 Partition 文件到网卡的传输过程。
总之,零拷贝是由操作系统提供的一种高效的文件读写技术,而 Kafka 则大量的运用了零拷贝技术,从而极大的提升了 Kafka 整体的工作性能。
Kafka 在高吞吐量场景下如何保持低延迟?有哪些性能调优的策略?
Kafka 在高吞吐量场景下保持低延迟的关键在于其高效的设计架构,再加上一系列的性能调优策略。以下是一些核心的策略和方法
优化分区数量和副本数:通过合理增加分区数和副本数,Kafka 可以更好地平衡负载,但是也要避免分区过多导致的管理开销。
配置生产者和消费者的参数:通过设置合理的参数,例如 acks=1、适当提高 batch.size和减少 linger.ms等,将大幅度提升性能
硬件资源的优化:配置高性能的磁盘(如 SSD)、增加内存、提高网络速度等,都会直接提升 Kafka 的性能。
调整Kafka服务的配置:如增大log.retention,增加
socket.send.buffer.bytes和socket.receive.buffer.bytes等使用合适的压缩方式:选择合适的压缩方式(如lz4)能够有效提升吞吐量和节省带宽。
Kafka 与 Flink 的集成是如何实现的?如何优化 Flink 与 Kafka 之间的数据流动?
Kafka 与 Flink 的集成主要通过 Kafka Connectors 来实现。Flink 提供了出色的 Kafka Source 和 Kafka sink 用于从 Kafka 集群中读取数据,或向其写入数据,
要实现 Kafka 与 Flink 的集成,可以按照以下步骤进行
引入 Kafka 依赖:在 Flink 项目中添加 Kafka 依赖库。
配置 Kafka Source:创建 Kafka Source 以便从 Kafka 主题(topic)中读取数据。
配置 Kafka Sink:创建 Kafka Sink 以便将处理后的数据写入 Kafka 主题。
Flink Jobs 设计:根据业务需求设计 Flink 的数据处理作业,通常会包括数据清洗、过滤、变换等操作。
优化 Flink 与 Kafka 之间的数据流动,可以从以下几方面入手
参数调优:调整 Kafka 生产者与消费者的参数,如批量大小、缓冲区大小、并行度等
资源调度:合理分配 Flink 与 Kafka 各自的资源,例如 CPU、内存、网络等资源配置。
容错机制:利用 Flink 的 Checkpointing 与Kafka 的幂等性特性保证数据处理的可靠性和一致性。
数据压缩:使用 Kafka 的消息压缩策略以减少网络带宽消耗。
Kafka 的多租户支持是如何实现的?如何通过配额控制各租户的资源使用?
Kafka 实现多租户支持主要是通过“主题”(Topic)的隔离以及ACL(访问控制列表)来区分不同租户的数据和权限。同时,Kafa 通过配置配额来控制不同租户的资源使用。这些配额主要包括消息的生产速率和消费速率、磁盘占用等。
多租户支持:
Kafka 使用不同的主题(Topic)来隔离不同租户的数据。每个租户可以有一个或多个独立的主题。
使用 ACL(访问控制列表)配置不同的租户对各自主题的读写权限。
配额控制
配额配置主要包括生产速率配额和消费速率配额,分别控制每个租户每秒可生产和消费的消息数量
配额还可以限制租户使用的磁盘空间,防止单个租户占用过多存储资源。
Kafka 提供了动态配置功能,允许管理员在运行时为特定的用户或客户端组设置和调整配额。
Kafka 的 Stream 和 Table 是如何相互转换的?它们在 Kafka Streams 中的应用场景是什么?
在Kafka Streams 中,Stream 和 Table 是两种核心的抽象。Stream是一个无界、连续的记录流,每条记录通常包会一个键值对,并且是按时间顺序组织的。而Table是一个有状态的记录集合,它表示某一个时间点上的数据视图。
Stream 和 Table 可以通过特定的操作相互转换
Stream 转换为 Table:通过操作 groupByKey 和 aggregate 或 reduce 。这些操作会对记录按键进行分组,并随着时间的推移不断更新对应的键的值
Table 转换为 Stream通过 toStream 方法,可以把 Table 视为一个更新日志,将每次对 Table 的变更转化为一个 Stream。
它们的应用场景各自为:
Stream:适用于实时分析、监控、事件检测等场景。例如,实时处理和分析网站点击流、交易记录等
Table:适用于需要保持某种状态的场景,例如,用户的最新配置、商品的库存数量等。