 Apache RocketMQ 从入门到实战0,但消息积压数却在降低是个什么“鬼” 二、问题分析 1. rocketmq-console 数据获获取逻辑探讨 要解开消费 TPS 显示为0的问题,我们首先要来看一下 rocketmq-console 这个 页面的展示逻辑,即通过阅读 rocketmq-console 的源码来解开其采集逻辑。 得知,【消费者】界面查询各个消费组的基本信息的接口为 /consumer/groupList. query,那 获取该消费组针对该队列的消费 TPS。  代码@10:累积消费 TPS,并最终作为该消费组的总 TPS。 上面这个方法非常关键,是返回给前段页面核心的数据组装逻辑,以队列、消费组为纬 度给出 brokerOffset、consumeOffset、lastTimestamp。然后将数据返回给前段页面 进行展示。 接下将聚焦到消费组消费 TPS 的统计处理,其入口为 tpsGroupGetNums 。 2. rocketmq rocketmq 监控指标的存储数据结构,如下图 所示: 正如上图所示:RocketMQ 使用 HashMap Apache RocketMQ 从入门到实战0,但消息积压数却在降低是个什么“鬼” 二、问题分析 1. rocketmq-console 数据获获取逻辑探讨 要解开消费 TPS 显示为0的问题,我们首先要来看一下 rocketmq-console 这个 页面的展示逻辑,即通过阅读 rocketmq-console 的源码来解开其采集逻辑。 得知,【消费者】界面查询各个消费组的基本信息的接口为 /consumer/groupList. query,那 获取该消费组针对该队列的消费 TPS。  代码@10:累积消费 TPS,并最终作为该消费组的总 TPS。 上面这个方法非常关键,是返回给前段页面核心的数据组装逻辑,以队列、消费组为纬 度给出 brokerOffset、consumeOffset、lastTimestamp。然后将数据返回给前段页面 进行展示。 接下将聚焦到消费组消费 TPS 的统计处理,其入口为 tpsGroupGetNums 。 2. rocketmq rocketmq 监控指标的存储数据结构,如下图 所示: 正如上图所示:RocketMQ 使用 HashMap- 来存储监 控收集的数据,其中 Key 为监控指标的类型,例如 topic 发送消息数量、topic 发送消息 大小、消费组获取消息个数等信息,每一项使用 StatsItemSet 存储,该存储结构内部又 维护一个 HashMap:ConcurrentMap,key 0 码力 | 165 页 | 12.53 MB | 1 年前3
 消息中间件RocketMQ原理解析 - 斩秋随机选择一台 producer 查询消息,根据 commitLogOffset 和 msgSize 到 commitlog 查找消息 向 Producder 发起请求,请求 code 类型为 CHECK_TRANSACTION_STATE,producer 的 DefaultMQProducerImpl. checkTransactionState()方法来处理 broker 定时回调的请求, Set 消息中间件RocketMQ原理解析 - 斩秋随机选择一台 producer 查询消息,根据 commitLogOffset 和 msgSize 到 commitlog 查找消息 向 Producder 发起请求,请求 code 类型为 CHECK_TRANSACTION_STATE,producer 的 DefaultMQProducerImpl. checkTransactionState()方法来处理 broker 定时回调的请求, Set- mqSet 1) 平均分配算法,其实是类似于分页的算法 将所有 queue 排好序类似于记录 将所有消费端 consumer 排好序,相当于页数 然后获取当前 consumer 所在页面应该分配到的 queue 2) 按照配置来分配队列, 也就是说在 consumer 启动的时候指定了 queue 3) 按照机房来配置队列 Consumer 启动的时候会指定在哪些机房的消息 30W 条 = 300000 * CQStoreUnitSize(每条大小) filename: filename 文件名称但不仅仅是名称还表示文件记录的初始偏移量, 文件名其 实是个 long 类型的值 4) MapedFileQueue 存储队列,数据定时删除,无限增长。 队列有多个文件(MapedFile)组成,由集合对象 List 表示升序排列,前面讲到文件名即 是消息在此文 0 码力 | 57 页 | 2.39 MB | 1 年前3
 快速部署高可用的Apache RocketMQ 集群 - Amazon S3CentOS AMI。 13 Bastion Instance Type BastionInstanceType t2.micro 堡垒机实例的 EC2 实 例类型。 14 Number of Bastion Hosts NumBastionHosts 1 堡垒机数量。Auto Scaling 确 保您始终保持此数量的运⾏堡垒 机数量。 Apache RocketMQ Broker 节点的数量。 Page 11 of 21 17 IOPS Iops 100 如果您选择的是 io1 卷类型,此 设置为 EBS 卷的 IOPS,否则此 选项将被忽略。 18 RocketMQ Version RocketMQVersion 4.7.1 选择部署的 Apache NameServerInstanceTy pe m5.large Nameserver 节点的 EC2 实例 类型 20 Broker Node Instance Type BrokerNodeInstanceTy pe m5.xlarge Broker 节点 EC2 实例类型 21 Apache RocketMQ flush Disk Type FlushDiskType0 码力 | 21 页 | 2.57 MB | 1 年前3 快速部署高可用的Apache RocketMQ 集群 - Amazon S3CentOS AMI。 13 Bastion Instance Type BastionInstanceType t2.micro 堡垒机实例的 EC2 实 例类型。 14 Number of Bastion Hosts NumBastionHosts 1 堡垒机数量。Auto Scaling 确 保您始终保持此数量的运⾏堡垒 机数量。 Apache RocketMQ Broker 节点的数量。 Page 11 of 21 17 IOPS Iops 100 如果您选择的是 io1 卷类型,此 设置为 EBS 卷的 IOPS,否则此 选项将被忽略。 18 RocketMQ Version RocketMQVersion 4.7.1 选择部署的 Apache NameServerInstanceTy pe m5.large Nameserver 节点的 EC2 实例 类型 20 Broker Node Instance Type BrokerNodeInstanceTy pe m5.xlarge Broker 节点 EC2 实例类型 21 Apache RocketMQ flush Disk Type FlushDiskType0 码力 | 21 页 | 2.57 MB | 1 年前3
 Apache RocketMQ on Amazon Web Services中的 CentOS AMI。 13 Bastion Instance Type BastionInstanceType t2.micro 堡垒机实例的 EC2 实 例类型。 14 Number of Bastion Hosts NumBastionHosts 1 堡垒机数量。Auto Scaling 确保 您始终保持此数量的运⾏堡垒机 数量。 BrokerClusterCount 3 选择部署 Apache RocketMQ Broker 节点的数量。 17 IOPS Iops 100 如果您选择的是 io1 卷类型,此 设置为 EBS 卷的 IOPS,否则此选 项将被忽略。 18 RocketMQ Version RocketMQVersion 4.7.1 选择部署的 Apache 节点的 EC2 实例类 型 20 Broker Node Instance Type BrokerNodeInstanceType m5.xlarge Broker 节点 EC2 实例类型 21 Apache RocketMQ flush Disk Type FlushDiskType ASYNC_FLUSH Apache RocketMQ Flush Disk0 码力 | 18 页 | 1.55 MB | 1 年前3 Apache RocketMQ on Amazon Web Services中的 CentOS AMI。 13 Bastion Instance Type BastionInstanceType t2.micro 堡垒机实例的 EC2 实 例类型。 14 Number of Bastion Hosts NumBastionHosts 1 堡垒机数量。Auto Scaling 确保 您始终保持此数量的运⾏堡垒机 数量。 BrokerClusterCount 3 选择部署 Apache RocketMQ Broker 节点的数量。 17 IOPS Iops 100 如果您选择的是 io1 卷类型,此 设置为 EBS 卷的 IOPS,否则此选 项将被忽略。 18 RocketMQ Version RocketMQVersion 4.7.1 选择部署的 Apache 节点的 EC2 实例类 型 20 Broker Node Instance Type BrokerNodeInstanceType m5.xlarge Broker 节点 EC2 实例类型 21 Apache RocketMQ flush Disk Type FlushDiskType ASYNC_FLUSH Apache RocketMQ Flush Disk0 码力 | 18 页 | 1.55 MB | 1 年前3
 RocketMQ v3.2.4 开发指南RocketMQ 中,所有消息队列都是持丽化,长度无限的数据结构,所谓长度无限是挃队列中的每个存储 单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内丌会溢出,所以讣为是长度无限,另外队列中只保存最近几天的数据,乀前的数据会挄照过期时间来 删除。 也可以讣为 Message Queue 是一个长度无限的数组,offset Consumer 的要求做过滤,优点是减少了对亍 Consumer 无用消息的网络传输。 缺点是增加了 Broker 的负担,实现相对复杂。 (1). 淘宝 Notify 支持多种过滤方式,包含直接挄照消息类型过滤,灵活的诧法表达式过滤,几乎可以满足 最苛刻的过滤需求。 (2). 淘宝 RocketMQ 支持挄照简单的 Message Tag 过滤,也支持挄照 Message Header、body "extFields": { "count": "0", "messageTitle": "HelloMessageTitle" } } Header 字段名 类型 Request Response code 整数 请求操作代码,请求接收方 根据丌同的代码做丌同的操 作 应答结果代码,0 表示成 功,非 0 表示各种错诨 代码 language0 码力 | 52 页 | 1.61 MB | 1 年前3 RocketMQ v3.2.4 开发指南RocketMQ 中,所有消息队列都是持丽化,长度无限的数据结构,所谓长度无限是挃队列中的每个存储 单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内丌会溢出,所以讣为是长度无限,另外队列中只保存最近几天的数据,乀前的数据会挄照过期时间来 删除。 也可以讣为 Message Queue 是一个长度无限的数组,offset Consumer 的要求做过滤,优点是减少了对亍 Consumer 无用消息的网络传输。 缺点是增加了 Broker 的负担,实现相对复杂。 (1). 淘宝 Notify 支持多种过滤方式,包含直接挄照消息类型过滤,灵活的诧法表达式过滤,几乎可以满足 最苛刻的过滤需求。 (2). 淘宝 RocketMQ 支持挄照简单的 Message Tag 过滤,也支持挄照 Message Header、body "extFields": { "count": "0", "messageTitle": "HelloMessageTitle" } } Header 字段名 类型 Request Response code 整数 请求操作代码,请求接收方 根据丌同的代码做丌同的操 作 应答结果代码,0 表示成 功,非 0 表示各种错诨 代码 language0 码力 | 52 页 | 1.61 MB | 1 年前3
 基于Apache APISIX 与RocketMQ 构建云原生一体化架构ACK 机制 普通消息 顺序消息 延迟消息 事务消息 重试消息 死信消息 设计思想: 1.消息不丢、高可靠是架构的基础 2.时延优先,兼顾吞吐 3.收敛业务共性问题,提供丰富的业务消息类型 4.注重可运维性、弹性扩缩、流量调拨能力建设 RocketMQ 简介 2022 2007 2012 2016 2017 2018 N otify Born from multicolored 分区迁移时,会导致网络风暴,耗时极长 问题重点 • 在主节点宕机时,备节点要有自动切换为主的能力 • 容量调整时,不能产生数据迁移,且要在秒级完成 固定分区使用场景 • 任务计算过程中,会将同一个业务类型的数据发到同一个队列 • Binlog 等数据同步过程中,需要保证严格顺序 RAFT 存储支持:自动主从切换,强一致性保证 逻辑队列:秒级无损弹性扩缩,无数据复制,流量精准调度 消息与流融合索引支持0 码力 | 22 页 | 2.26 MB | 1 年前3 基于Apache APISIX 与RocketMQ 构建云原生一体化架构ACK 机制 普通消息 顺序消息 延迟消息 事务消息 重试消息 死信消息 设计思想: 1.消息不丢、高可靠是架构的基础 2.时延优先,兼顾吞吐 3.收敛业务共性问题,提供丰富的业务消息类型 4.注重可运维性、弹性扩缩、流量调拨能力建设 RocketMQ 简介 2022 2007 2012 2016 2017 2018 N otify Born from multicolored 分区迁移时,会导致网络风暴,耗时极长 问题重点 • 在主节点宕机时,备节点要有自动切换为主的能力 • 容量调整时,不能产生数据迁移,且要在秒级完成 固定分区使用场景 • 任务计算过程中,会将同一个业务类型的数据发到同一个队列 • Binlog 等数据同步过程中,需要保证严格顺序 RAFT 存储支持:自动主从切换,强一致性保证 逻辑队列:秒级无损弹性扩缩,无数据复制,流量精准调度 消息与流融合索引支持0 码力 | 22 页 | 2.26 MB | 1 年前3
 Apache RocketMQ 介绍Apache RocketMQ 介绍 概要 Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容 和灵活的可伸缩性。它的一个重要特性是支持非日志类型的可靠消息传送,非常适合运用在金融和电 商务领域。目前他是Apache社区的顶级项目,在全球有超过100家公司在其业务中使用RocketMQ 开源版本。 诞生 RocketMQ起源于阿里巴巴。阿0 码力 | 5 页 | 375.48 KB | 1 年前3 Apache RocketMQ 介绍Apache RocketMQ 介绍 概要 Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容 和灵活的可伸缩性。它的一个重要特性是支持非日志类型的可靠消息传送,非常适合运用在金融和电 商务领域。目前他是Apache社区的顶级项目,在全球有超过100家公司在其业务中使用RocketMQ 开源版本。 诞生 RocketMQ起源于阿里巴巴。阿0 码力 | 5 页 | 375.48 KB | 1 年前3
共 7 条
- 1













