Scala有哪些高级数据类型?请区分它们(Array、Tuple、List、Set、Map)。
时间: 2023-12-03 19:02:09 浏览: 118
Scala拥有多种高级数据类型,包括:
- Array:定长数组,存储的元素类型必须相同。
- Tuple:可变长的不同类型元素集合,通过数字索引访问其中的元素。
- List:链表结构的可变长度集合,存储的元素类型也可以不同。
- Set:不重复元素的可变集合,可以按照特定顺序进行遍历。
- Map:键值对的可变集合,键和值的类型可以不同。
以上是Scala的高级数据类型,需要注意它们的特点和使用方法,以便更好地应用在程序开发中。
相关问题
scala 数据类型
Scala 中的数据类型包括:
1. 基本数据类型:Boolean、Byte、Char、Short、Int、Long、Float、Double
2. 集合数据类型:Array、List、Set、Map、Tuple
3. 类型参数化:Option、Either、Function
4. 高级类型:Class、Object、Trait
5. 隐式类型:隐式类、隐式参数
6. 可变性数据类型:ArrayBuffer、ListBuffer、StringBuilder
Scala 还提供了类型推断功能,可以让编程者省略某些变量或函数的类型声明。此外,Scala 还支持使用关键字来定义类型别名。
flink 实现mysql cdc scala 代码
以下是使用 Flink 实现 MySQL CDC 的 Scala 代码示例:
```scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware.KafkaContext
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import java.util.Properties
object MySQLCDC {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-group")
val consumer = new FlinkKafkaConsumer011[String]("mysql-cdc", new SimpleStringSchema(), properties)
val stream = env.addSource(consumer).map(new MapFunction[String, Row]() {
override def map(value: String): Row = {
val fields = value.split(",")
Row.of(fields(0).toInt.asInstanceOf[Object], fields(1).asInstanceOf[Object], fields(2).asInstanceOf[Object])
}
}).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Row]() {
override def extractTimestamp(row: Row, previousTimestamp: Long): Long = {
row.getField(0).asInstanceOf[Int].toLong
}
override def checkAndGetNextWatermark(row: Row, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
})
val windowedStream = stream.keyBy(1).timeWindow(Time.seconds(10)).apply(new WindowFunction[Row, Row, Tuple, TimeWindow]() {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = {
val sortedInput = input.toList.sortBy(_.getField(0).asInstanceOf[Int])
val firstRow = sortedInput.head
val lastRow = sortedInput.last
out.collect(Row.of(firstRow.getField(1), firstRow.getField(2), lastRow.getField(2)))
}
})
val producer = new FlinkKafkaProducer011[String]("mysql-cdc-output", new KafkaSerializationSchema[String]() with KafkaContextAware[String] {
var context: KafkaContext = _
override def serialize(element: String, timestamp: java.lang.Long): org.apache.kafka.clients.producer.ProducerRecord[Array[Byte], Array[Byte]] = {
new org.apache.kafka.clients.producer.ProducerRecord(context.getOutputTopic(), element.getBytes())
}
override def setRuntimeContext(context: KafkaContext): Unit = {
this.context = context
}
}, properties, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
windowedStream.map(new MapFunction[Row, String]() {
override def map(row: Row): String = {
s"${row.getField(0)},${row.getField(1)},${row.getField(2)}"
}
}).addSink(producer)
env.execute("MySQL CDC")
}
}
```
阅读全文