python 消费消费 kafka 数据教程数据教程
1.安装安装python模块模块
pip install --user kafka-python==1.4.3
如果报错压缩相关的错尝试安装下面的依赖
yum install snappy-devel
yum install lz4-devel
pip install python-snappy
pip install lz4
2.生产者生产者
#!/usr/bin/env python
# coding : utf-8
from kafka import KafkaProducer
import json
def kafkaProducer():
producer = KafkaProducer(bootstrap_servers='ip:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('world', {'key1': 'value1'})
if __name__ == '__main__':
kafkaProducer()
2.消费者消费者
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
import click
import ConfigParser
import json
import threading
import datetime
import sched
config = ConfigParser.ConfigParser()
config.read("amon.ini")
@click.group()
def cli():
pass
@cli.command()
@click.option('--topic',type=str)
@click.option('--offset', type=click.Choice(['smallest', 'earliest', 'largest']))
@click.option("--group",type=str)
def client(topic,offset,group):
click.echo(topic)
consumer = KafkaConsumer(topic,
bootstrap_servers=config.get("KAFKA", "Broker_Servers").split(","),
group_id=group,
auto_offset_reset=offset)
for message in consumer:
click.echo(message.value)
# click.echo("%d:%d: key=%s value=%s" % (message.partition,
# message.offset, message.key,
# message.value))
if __name__ == '__main__':
cli()
3.多线程消费多线程消费