关于scala的代码示例
时间: 2024-08-28 17:01:28 浏览: 32
Scala(可缩写为Sc)是一种多范式的编程语言,设计初衷是希望结合面向对象编程和函数式编程的特性。下面是一个简单的Scala代码示例,展示了如何定义一个类,创建一个对象实例,并调用其方法。
```scala
// 定义一个简单的类Person
class Person(val name: String, val age: Int) {
// 类的方法
def greet(): Unit = {
println(s"Hello, my name is $name and I am $age years old.")
}
}
// 对象实例化
val person = new Person("Alice", 30)
// 调用对象的方法
person.greet()
```
这段代码定义了一个名为`Person`的类,其中包含两个属性`name`和`age`,以及一个`greet`方法用于打印问候信息。接着创建了一个`Person`类的实例`person`,并调用了其`greet`方法。
相关问题
scala代码 贝叶斯模型输出结果
这里是一个简单的 Scala 代码示例,展示如何使用 Apache Spark 中的 MLlib 库来训练一个朴素贝叶斯分类器并输出结果:
```scala
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.SparkSession
// 创建 SparkSession
val spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate()
// 读取数据集
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 划分数据集为训练集和测试集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
// 特征提取器
val tokenizer = new Tokenizer().setInputCol("features").setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
// 训练一个朴素贝叶斯分类器
val nb = new NaiveBayes()
nb.setModelType("multinomial")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, nb))
val model = pipeline.fit(trainingData)
// 在测试集上进行预测
val predictions = model.transform(testData)
// 评估模型
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
// 输出预测结果
predictions.show()
```
你可以将你的数据集替换文中的数据集路径,然后运行代码即可得到训练好的模型和预测结果。
flink 实现mysql cdc scala 代码
以下是使用 Flink 实现 MySQL CDC 的 Scala 代码示例:
```scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware.KafkaContext
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import java.util.Properties
object MySQLCDC {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-group")
val consumer = new FlinkKafkaConsumer011[String]("mysql-cdc", new SimpleStringSchema(), properties)
val stream = env.addSource(consumer).map(new MapFunction[String, Row]() {
override def map(value: String): Row = {
val fields = value.split(",")
Row.of(fields(0).toInt.asInstanceOf[Object], fields(1).asInstanceOf[Object], fields(2).asInstanceOf[Object])
}
}).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Row]() {
override def extractTimestamp(row: Row, previousTimestamp: Long): Long = {
row.getField(0).asInstanceOf[Int].toLong
}
override def checkAndGetNextWatermark(row: Row, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
})
val windowedStream = stream.keyBy(1).timeWindow(Time.seconds(10)).apply(new WindowFunction[Row, Row, Tuple, TimeWindow]() {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = {
val sortedInput = input.toList.sortBy(_.getField(0).asInstanceOf[Int])
val firstRow = sortedInput.head
val lastRow = sortedInput.last
out.collect(Row.of(firstRow.getField(1), firstRow.getField(2), lastRow.getField(2)))
}
})
val producer = new FlinkKafkaProducer011[String]("mysql-cdc-output", new KafkaSerializationSchema[String]() with KafkaContextAware[String] {
var context: KafkaContext = _
override def serialize(element: String, timestamp: java.lang.Long): org.apache.kafka.clients.producer.ProducerRecord[Array[Byte], Array[Byte]] = {
new org.apache.kafka.clients.producer.ProducerRecord(context.getOutputTopic(), element.getBytes())
}
override def setRuntimeContext(context: KafkaContext): Unit = {
this.context = context
}
}, properties, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
windowedStream.map(new MapFunction[Row, String]() {
override def map(row: Row): String = {
s"${row.getField(0)},${row.getField(1)},${row.getField(2)}"
}
}).addSink(producer)
env.execute("MySQL CDC")
}
}
```