没有合适的资源?快使用搜索试试~ 我知道了~
首页RocketMQ技术讲解V2.0
RocketMQ源码分析,分为存储篇、NameServer篇、Broker篇、Producer篇、Consumer篇五大部分进行源码级的讲解。大致如下: 1、讲解commitlog、consumequeue、index、transaction文件等数据结构、数据读写、HA高可用等功能; 2、讲解NameServer的启动、注册Broker、客户端查询Topic的路由信息等功能; 3、讲解Broker的启动、注册、处理Producer发送消息、处理Consumer拉取消息、事务消息的处理等功能; 4、讲解Producer端的启动、发送普通消息、定时消息、顺序消息、事务消息等功能; 5、讲解Consumer端的启动、PUSH模式的消息消费、PULL模式的消息消费、顺序消费/并发消费等功能;
资源详情
资源评论
资源推荐
RocketMQ 技术讲解
目录
ROCKETMQ 技术讲解 ......................................................................................................................... 1
1 存储篇.............................................................................................................................................. 6
1.1 整体结构 ................................................. 6
1.2 大文件的磁盘操作——MapedFile ............................ 6
1.2.1 向文件顺序写操作(appendMessage) ................. 7
1.2.2 消息刷盘操作(commit) ............................ 7
1.2.3 随机读操作(selectMapedBuffer) ................... 7
1.2.4 清理内存操作 ...................................... 8
1.2.5 判断文件是否写满 .................................. 8
1.3 MapedFileQueue ........................................... 8
1.3.1 获取在某时间点之后更新的文件(getMapedFileByTime) 8
1.3.2 清理指定偏移量所在文件之后的文件(truncateDirtyFiles)
9
1.3.3 获取或创建最后一个文件(getLastMapedFile) ........ 9
1.3.4 获取列表中的最后一个文件(getLastMapedFile2) ..... 9
1.3.5 统计内存的数据还有多少未持久化(howMuchFallBehind)
9
1.3.6 获取 MapedFile 队列中最小 Offset 值(getMinOffset) 10
1.3.7 获取 MapedFile 队列中最大 Offset 值(getMaxOffset) 10
1.3.8 删除某类文件中的最后一个文件(deleteLastMaped) .. 10
1.3.9 根据指定的 offset 找到所在文件(findMapedFileByOffset)
10
1.3.10 MapedFile 队列中的消息刷盘(commit).............. 10
1.4 Commitlog ............................................... 10
1.4.1 文件的消息单元存储结构 ........................... 11
1.4.2 CommitLog 类结构 ..................................... 12
1.4.3 获取最小 Offset(getMinOffset)................... 12
1.4.4 获取最大物理偏移量(getMaxOffset) ............... 13
1.4.5 读取指定起始位置 offset 所在文件的全部剩余消息
(getData)................................................ 13
1.4.6 正常恢复 CommitLog 内存数据(recoverNormally) .... 13
1.4.7 异常恢复 CommitLog 内存数据(recoverAbnormally) .. 13
1.4.8 写入消息(putMessage) ........................... 15
1.4.9 读取消息(getMessage) ........................... 16
1.4.10 指定位置开始写入二进制消息(appendData) ......... 17
1.4.11 获取指定位置所在文件的下一个文件的起始偏移量
(rollNextFile)........................................... 17
1.4.12 DefaultAppendMessageCallback 类的实现............. 17
1.5 Consumequeue ............................................ 18
1.5.1 ConsumeQueue 类结构 .................................. 19
1.5.2 删除指定偏移量之后的逻辑文件
(truncateDirtyLogicFiles)................................ 19
1.5.3 恢复 ConsumeQueue 内存数据(recover) ............. 20
1.5.4 查找消息发送时间最接近 timestamp 逻辑队列的 offset
(getOffsetInQueueByTime)................................. 20
1.5.5 获取最后一条消息对应物理队列的下一个偏移量
(getLastOffset).......................................... 21
1.5.6 消息刷盘(commit) ............................... 21
1.5.7 将 commitlog 物理偏移量/消息大小等信息直接写入
consumequeue 中(putMessagePostionInfoWrapper) ............ 21
1.5.8 根据消息序号索引获取 consumequeue 数据(getIndexBuffer)
21
1.5.9 根据物理队列最小 offset 计算修正逻辑队列最小 offset
(correctMinOffset)....................................... 21
1.5.10 获取指定位置所在文件的下一个文件的起始偏移量
(rollNextFile)........................................... 22
1.6 IndexFile ............................................... 22
1.6.1 Index 文件的数据结构 ................................. 23
1.6.2 向 index 文件中写入索引消息(putKey) ............. 24
1.6.3 以 topic-key 值从 Index 中获取在一个时间区间内的物理偏
移量列表(selectPhyOffset)................................ 24
1.7 IndexService ............................................ 25
1.7.1 创建消息的索引(buildIndex) ..................... 25
1.7.2 查找 topic 和 key 的物理偏移量 offset(queryOffest) 25
1.8 Config .................................................. 26
1.8.1 ScheduleMessageService 执行延迟消息 .................. 26
1.8.2 收到发送消息时创建 topic 的配置信息
(createTopicInSendMessageMethod)......................... 28
1.8.3 根据 GroupName 查找订阅组信息
(findSubscriptionGroupConfig)............................ 28
1.8.4 收到消费失败时的回传消息时创建 topic 的配置信息
(createTopicInSendMessageBackMethod)..................... 28
1.9 DefaultMessageStore—所有文件的访问入口 ................. 29
1.9.1 根据 topic 和 queueId 查找 ConsumeQueue
(findConsumeQueue)....................................... 29
1.9.2 根据物理偏移量和数据大小获取消息内容
(lookMessageByOffset).................................... 29
1.9.3 将消息写入 commitlog 中(putMessage) ............. 29
1.9.4 读取 commitlog 消息(getMessage) ................. 30
1.9.5 获取最大物理偏移量(getMaxPhyOffset) ............ 32
1.9.6 获取指定偏移量之后的所有 commitlog 数据
(getCommitLogData)....................................... 32
1.9.7 从指定位置开始追加 commitlog 数据(appendToCommitLog)
32
1.9.8 DefaultMessageStore.ReputMessageService 服务线程 ..... 32
1.9.9 DefaultMessageStore.DispatchMessageService 服务线程 .. 33
1.9.10 加载 ConsumeQueue 队列数据(loadConsumeQueue) .... 34
1.9.11 获取指定队列的最大逻辑 Offset(getMaxOffsetInQuque)
34
1.9.12 获取指定队列的最小逻辑 Offset(getMinOffsetInQuque)
34
1.10 Abort ................................................... 35
1.11 Checkpoint .............................................. 35
1.12 HA 高可用................................................ 35
1.12.1 Topic 配置同步.................................... 36
1.12.2 消费进度信息同步 ................................. 36
1.12.3 延迟消费进度信息同步 ............................. 36
1.12.4 订阅关系同步 ..................................... 37
1.12.5 消息数据同步 ..................................... 37
1.13 事务消息相关的文件 ...................................... 41
1.13.1 事务消息状态文件 ................................. 42
1.13.2 事务消息 REDO 日志文件 ............................ 42
1.13.3 定期向 Producer 回查事务消息的最新状态 ............ 42
2 NAME SERVER 篇 ..................................................................................................................... 43
2.1 NameServer 的功能........................................ 43
2.2 NameServer 的初始化及启动过程............................ 43
2.3 处理 Broker 注册请求 ..................................... 45
2.4 根据 Topic 获取 Broker 信息和 topic 配置信息
(getRouteInfoByTopic)........................................ 46
3 BROKER 篇 ................................................................................................................................. 47
3.1 Broker 的初始化过程...................................... 47
3.2 Broker 的启动过程........................................ 50
3.3 向 NameServer 注册 Broker................................. 52
3.4 清理未使用的 topic 数据 .................................. 52
3.5 根据 topic 和 group 查找 Consumer 订阅信息(findSubscriptionData)
53
3.6 处理 Producer 发来的消息 ................................. 53
3.7 处理与客户端的心跳消息 .................................. 54
3.8 注册 Consumer 信息 ....................................... 55
3.9 注册 Producer 信息 ....................................... 56
3.10 查询消费进度(QUERY_CONSUMER_OFFSET) ................... 56
3.11 更新消费进度(UPDATE_CONSUMER_OFFSET) .................. 57
3.12 处理 Consumer 拉取消息 ................................... 57
3.13 对未拉取到的消息进行重试(PullRequestHoldService) ...... 60
3.14 客户端顺序消费时锁住 MessageQueue 队列的请求(LOCK_BATCH_MQ)
61
3.15 客户端顺序消费时解锁 MessageQueue 队列的请求(UNLOCK_BATCH_MQ)
62
3.16 获取 consumerGroup 名下的所有 Consumer 的 ClientId
(GET_CONSUMER_LIST_BY_GROUP)................................. 62
3.17 将 Consumer 消费失败的消息写入延迟消息队列中
(CONSUMER_SEND_MSG_BACK)..................................... 62
3.18 处理结束事务消息的请求(END_TRANSACTION) ............... 64
3.19 客户端发起更新或创建 Topic(UPDATE_AND_CREATE_TOPIC).... 64
4 PRODUCER 篇 ............................................................................................................................ 65
4.1 启动 Producer............................................ 65
4.2 启动 MQClientInstance 类(MQClientInstance.start) ....... 66
4.3 从 NameServer 更新 Topic 的本地路由信息 ................... 68
4.4 向 Broker 发送心跳消息 ................................... 71
4.5 发送普通消息 ............................................ 71
4.6 发送定时消息 ............................................ 74
4.7 发送顺序消息 ............................................ 74
4.8 发送事务消息 ............................................ 75
4.9 处理 Broker 检查事务状态的消息(CHECK_TRANSACTION_STATE) 77
4.10 创建 Topic............................................... 78
5 CONSUMER 篇 .............................................................................................................................. 78
5.1 启动 Consumer............................................ 78
5.2 向 Broker 同步消费进度的定时任务 ......................... 83
5.3 负载均衡服务线程(RebalanceService) .................... 84
5.3.1 为 topic 下的所有 MessageQueue 创建拉取消息请求(广播
模式下) 85
5.3.2 为 Topic 所分配每个 MessageQueue 创建拉取消息请求(集
群模式下)................................................. 85
5.3.3 为每个消息队列维护一个消息处理队列
(updateProcessQueueTableInRebalance)..................... 87
5.3.4 删除未使用的消息队列的消费进度
(removeUnnecessaryMessageQueue).......................... 88
5.3.5 获取 MessageQueue 队列的下一个消费偏移量
(computePullFromWhere)................................... 89
5.4 拉取消息服务线程(PullMessageService) .................. 91
5.4.1 拉取消息的处理逻辑 ............................... 91
5.4.2 延迟拉取消息 ..................................... 91
5.4.3 立即拉取消息 ..................................... 92
5.5 PUSH 模式下的消息消费(DefaultMQPushConsumer)........... 92
5.5.1 拉取消息(pullMessage) .......................... 92
5.5.2 消费消息的回调类(PullCallback) ................. 93
5.6 PULL 模式下的消息消费(DefaultMQPullConsumer)........... 97
5.6.1 应用层的使用方式 ................................. 97
5.6.2 获取队列的消费进度(fetchConsumeOffset) ......... 98
5.7 顺序消费(ConsumeMessageOrderlyService) ................ 99
5.7.1 回调业务层定义的消费方法 ......................... 99
5.7.2 根据消费结果进行相应处理 ........................ 101
5.7.3 重新获取分布式锁后再消费(tryLockLaterAndReconsume)
103
5.8 并发消费(ConsumeMessageConcurrentlyService) .......... 104
5.8.1 回调业务层定义的消费方法 ........................ 104
5.8.2 对消费失败的信息发送重试消息给 Broker(用于消息重试)
106
5.9 拉取消息的底层 API 接口(PullAPIWrapper.pullKernelImpl) 107
5.10 发送远程请求拉取消息的逻辑(PULL_MESSAGE) ............. 108
5.10.1 同步方式拉取消息 ................................ 108
5.10.2 异步方式拉取消息 ................................ 110
5.10.3 处理 Broker 返回的响应消息 ....................... 111
5.11 PULL 消费模式下的调度消费服务(MQPullConsumerScheduleService)
112
5.11.1 应用层使用方式 .................................. 112
5.11.2 触发拉取消息 .................................... 112
5.11.3 拉取消息的线程(PullTaskImpl) .................. 113
5.12 四种 MessageQueue 队列的分配策略 ........................ 113
6 技术解决方案 .............................................................................................................................. 113
6.1 获取系统时间的性能优化 ................................. 113
剩余113页未读,继续阅读
meilong_whpu
- 粉丝: 94
- 资源: 8
上传资源 快速赚钱
- 我的内容管理 收起
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
会员权益专享
最新资源
- RTL8188FU-Linux-v5.7.4.2-36687.20200602.tar(20765).gz
- c++校园超市商品信息管理系统课程设计说明书(含源代码) (2).pdf
- 建筑供配电系统相关课件.pptx
- 企业管理规章制度及管理模式.doc
- vb打开摄像头.doc
- 云计算-可信计算中认证协议改进方案.pdf
- [详细完整版]单片机编程4.ppt
- c语言常用算法.pdf
- c++经典程序代码大全.pdf
- 单片机数字时钟资料.doc
- 11项目管理前沿1.0.pptx
- 基于ssm的“魅力”繁峙宣传网站的设计与实现论文.doc
- 智慧交通综合解决方案.pptx
- 建筑防潮设计-PowerPointPresentati.pptx
- SPC统计过程控制程序.pptx
- SPC统计方法基础知识.pptx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0