在MySQL中建立数据库school,在数据库中建立表student。SQL语句如下: create database school; use school; create table student( id int not null, name varchar(40), age int, grade int, primary key(id) ); 编写配置文件,将student表中的内容输出到控制台。启动Flume,在student表中使用下列命令插入数据,在Kafka消费者中查看相应数据。 insert into student(id,name,age,grade)value(1,'Xiaoming',23,98); insert into student(id,name,age,grade)value(2,'Zhangsan',24,96); insert into student(id,name,age,grade)value(3,'Lisi',24,93); insert into student(id,name,age,grade)value(4,'Wangwu',21,91); insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
时间: 2023-08-12 22:03:18 浏览: 165
配置文件如下(假设Flume和MySQL在同一台机器上):
```
# Name the components on this agent
flume1.sources = mysql-source
flume1.sinks = kafka-sink
flume1.channels = memory-channel
# Describe/configure the source
flume1.sources.mysql-source.type = org.apache.flume.source.jdbc.JdbcSource
flume1.sources.mysql-source.jdbc.driver = com.mysql.jdbc.Driver
flume1.sources.mysql-source.jdbc.url = jdbc:mysql://localhost:3306/school
flume1.sources.mysql-source.jdbc.user = root
flume1.sources.mysql-source.jdbc.password = password
flume1.sources.mysql-source.jdbc.table = student
flume1.sources.mysql-source.columns.to.select = id, name, age, grade
flume1.sources.mysql-source.incremental.column.name = id
flume1.sources.mysql-source.run.query.delay.ms = 10000
# Describe the sink
flume1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
flume1.sinks.kafka-sink.brokerList = localhost:9092
flume1.sinks.kafka-sink.topic = student
# Use a channel which buffers events in memory
flume1.channels.memory-channel.type = memory
flume1.channels.memory-channel.capacity = 1000
flume1.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
flume1.sources.mysql-source.channels = memory-channel
flume1.sinks.kafka-sink.channel = memory-channel
```
启动Flume后,使用下列命令启动Kafka消费者:
```
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic student --from-beginning
```
这样就可以在控制台中查看从MySQL的student表中读取的数据了。
阅读全文