Kafka模拟电影的信息,用scala语言
时间: 2023-08-23 22:16:14 浏览: 130
kafka kafka与sparkStreaming kafka与Scala
好的,下面是使用 Scala 语言编写一个简单的 Kafka 模拟电影信息的示例程序:
```scala
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object MovieKafkaProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "movies"
val movies = List(
Movie("The Shawshank Redemption", "Frank Darabont", List("Tim Robbins", "Morgan Freeman"), List("Drama", "Crime"), 9.3),
Movie("The Godfather", "Francis Ford Coppola", List("Marlon Brando", "Al Pacino"), List("Crime", "Drama"), 9.2),
Movie("The Dark Knight", "Christopher Nolan", List("Christian Bale", "Heath Ledger"), List("Action", "Crime", "Drama"), 9.0)
)
movies.foreach(movie => {
val record = new ProducerRecord[String, String](topic, movie.name, movie.toJson())
producer.send(record)
println(s"Sent movie: $movie")
})
producer.close()
}
case class Movie(name: String, director: String, actors: List[String], genres: List[String], rating: Double) {
def toJson(): String = {
s"""{"name": "$name", "director": "$director", "actors": "${actors.mkString(",")}", "genres": "${genres.mkString(",")}", "rating": $rating}"""
}
}
}
```
这个程序使用 KafkaProducer 发送电影信息到 Kafka 的 `movies` 主题中。在发送电影信息时,将电影信息转换为 JSON 格式,并使用电影名称作为 Kafka 记录的 key。同时,也输出了发送的电影信息。
需要注意的是,Scala 提供了很多 JSON 序列化和反序列化的库,例如 spray-json、Argonaut、circe 等,可以使用这些库来简化 JSON 格式的转换。
阅读全文