Kafka中的消息过期与清理策略
发布时间: 2024-05-03 06:34:06 阅读量: 352 订阅数: 101 ![](https://csdnimg.cn/release/wenkucmsfe/public/img/col_vip.0fdee7e1.png)
![](https://csdnimg.cn/release/wenkucmsfe/public/img/col_vip.0fdee7e1.png)
![ZIP](https://csdnimg.cn/release/download/static_files/pc/images/minetype/ZIP.png)
KAFKA分布式消息系统(window)
![Kafka中的消息过期与清理策略](https://img-blog.csdnimg.cn/20200123094122342.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xJTkJFX2JsYXplcnM=,size_16,color_FFFFFF,t_70)
# 1. Kafka消息过期概述
Kafka消息过期机制是一种管理和删除不再需要的消息的方法。它允许用户指定消息的保留时间,超出该时间后,消息将自动从Kafka集群中删除。消息过期对于管理Kafka集群的存储空间和性能至关重要,因为它可以防止过时和不必要的消息累积。
# 2. 消息过期策略
消息过期策略决定了消息在 Kafka 中保留的时间。过期策略主要分为两类:基于时间的过期策略和基于日志大小的过期策略。
### 2.1 基于时间的过期策略
基于时间的过期策略根据消息的创建或日志时间来确定消息是否过期。
#### 2.1.1 消息创建时间的过期
`log.message.timestamp.type` 参数控制消息的时间戳类型,可以是 `CreateTime`(创建时)或 `LogAppendTime`(日志追加时)。默认情况下,消息的时间戳为创建时。
```
# 设置消息时间戳类型为创建时
log.message.timestamp.type=CreateTime
```
#### 2.1.2 消息日志时间的过期
`log.retention.ms` 参数控制消息保留的时间,单位为毫秒。消息的日志时间为消息追加到日志中的时间。
```
# 设置消息保留时间为 24 小时
log.retention.ms=86400000
```
### 2.2 基于日志大小的过期策略
基于日志大小的过期策略根据日志段或分区的大小来确定消息是否过期。
#### 2.2.1 日志段大小的过期
`log.segment.bytes` 参数控制每个日志段的最大大小,单位为字节。当一个日志段达到最大大小时,它将被删除。
```
# 设置每个日志段的最大大小为 100MB
log.segment.bytes=1073741824
```
#### 2.2.2 分区大小的过期
`log.retention.bytes` 参数控制每个分区保留的最大数据量,单位为字节。当一个分区达到最大数据量时,它将被删除。
```
# 设置每个分区保留的最大数据量为 1GB
log.retention.bytes=1073741824
```
**表格:消息过期策略参数**
| 参数 | 描述 | 默认值 |
|---|---|---|
| `log.message.timestamp.type` | 消息时间戳类型 | `CreateTime` |
| `log.retention.ms` | 基于时间的过期时间 | 无 |
| `log.segment.bytes` | 日志段最大大小 | 1GB |
| `log.retention.bytes` | 分区最大数据量 | 无 |
**Mermaid 流程图:消息过期策略**
```mermaid
graph LR
subgraph 基于时间的过期策略
log.message.timestamp.type --> 消息时间戳类型
log.retention.ms --> 基于时间的过期时间
end
subgraph 基于日志大小的过期策略
log.segment.bytes --> 日志段最大大小
log.retention.bytes --> 分区最大数据量
end
```
# 3. 消息清理策略
### 3.1 手动清理策略
手动清理策略需要由消费者或管理员主动触发,才能对过期的消息进行清理。
#### 3.1.1 消费者手动清理
消费者可以通过调用 `commitSync()` 方法来手动提交消费进度,从而触发消息的清理。当消费者提交消费进度时,Kafka 会将消费者已消费的消息标记为已提交,并删除这些消息。
```java
consumer.commitSync();
```
#### 3.1.2 管理员手动清理
管理员可以通过使用 `kafka-delete-records` 工具手动清理过期的消息。该工具可以根据时间或日志大小等
0
0
相关推荐
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![tgz](https://img-home.csdnimg.cn/images/20250102104920.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)