def getJSONSchemaMysqlSource(optionArg: String, serverIdArg: String, args: Array[String]): MySqlSource[String] = { //ParameterTool是Flink提供的读取程序启动参数、配置文件、环境变量以及Flink自身配置参数等配置的的一个工具类 val parameterTool: ParameterTool = ParameterTool.fromArgs(args) val mysqlHostname: String = parameterTool.get("mysql-hostname", "hadoop-1") val mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306")) val mysqlUsername = parameterTool.get("mysql-username", "root") val mysqlPasswd = parameterTool.get("mysql-passwd", "root") val serverId = parameterTool.get("server-id", serverIdArg) val option = parameterTool.get("start-up-options", optionArg) // 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中 val config = new util.HashMap[String, Object]() config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()) // 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化 val jsonDebeziumDeserializationSchema = new JsonDebeziumDeserializationSchema(false, config) // 创建 MysqlSourceBuilder 对象 val builder: MySqlSourceBuilder[String] = MySqlSource.builder() .hostname(mysqlHostname) .port(mysqlPort) .username(mysqlUsername) .password(mysqlPasswd) .deserializer(jsonDebeziumDeserializationSchema) // 根据方法的 option 参数做不同的初始化操作,返回不同的 MysqlSource 对象 option match { case "dim" => builder .databaseList("tms") .tableList("tms.user_info", "tms.user_address") .startupOptions(StartupOptions.initial()) .serverId(serverId) .build() case "dwd" => builder .databaseList("tms") .tableList("tms.order_info", "tms.order_cargo", "tms.transport_task", "tms.order_org_bound") .startupOptions(StartupOptions.initial()) .serverId(serverId) .build(); } logger.error("不支持的操作类型") null }实现了什么功能
时间: 2024-04-18 15:34:05 浏览: 184
这段代码实现了根据传入的参数创建一个用于连接 MySQL 数据库的数据源对象。具体功能如下:
1. 通过 `ParameterTool` 类读取程序启动参数,获取 MySQL 数据库的连接信息。
2. 创建一个配置信息的 `Map` 集合,其中包含了将 Decimal 数据类型解析为字符串的配置。
3. 创建一个 `JsonDebeziumDeserializationSchema` 对象,用于解析从 MySQL 数据库读取的 JSON 数据。
4. 创建一个 `MySqlSourceBuilder` 对象。
5. 根据传入的 `option` 参数,初始化不同的数据源对象:
- 如果 `option` 参数为 "dim",则设置数据库名为 "tms",表名为 "tms.user_info" 和 "tms.user_address",并使用初始启动选项和给定的 `serverId` 构建数据源对象。
- 如果 `option` 参数为 "dwd",则设置数据库名为 "tms",表名为 "tms.order_info"、"tms.order_cargo"、"tms.transport_task" 和 "tms.order_org_bound",并使用初始启动选项和给定的 `serverId` 构建数据源对象。
6. 如果 `option` 参数不匹配任何已知选项,则记录错误日志并返回 `null`。
总体而言,这段代码实现了根据不同的选项参数创建不同配置的 MySQL 数据源对象。
相关问题
def main(args: Array[String]): Unit = { // case class sex(userid:String,movieid:String,schema:String,sex:String,zhiye:String,pingfen:String) //1.创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("boke2") .master("local[2]") .getOrCreate()什么意思
这段代码是Scala语言中的一段Spark代码,它的功能是创建一个SparkSession对象,用于连接Spark集群并进行数据处理。具体解释如下:
1. `def main(args: Array[String]): Unit = {`:定义了一个`main`函数,入参为一个长度为`n`的字符串数组`args`,返回值类型为`Unit`,即不返回任何值。
2. `val spark: SparkSession = SparkSession.builder()`:创建一个名为`spark`的`SparkSession`对象,并调用`builder()`方法进行配置。
3. `.appName("boke2")`:设置应用程序名称为`boke2`。
4. `.master("local[2]")`:设置应用程序的部署模式为本地模式,使用2个线程。
5. `.getOrCreate()`:获取或创建一个`SparkSession`对象,如果已经存在,则获取该对象;如果不存在,则创建一个新的对象。
总之,这段代码是创建一个SparkSession对象,用于连接本地Spark集群并进行数据处理,其中`SparkSession`是Spark框架中的一个核心类,用于提供Spark应用程序的所有功能,包括读取数据、执行转换和操作、写入数据等。
//输入数据样例类 case class ApacheLogEvent(ip:String,userId:String,evetTime:Long,method:String,url:String) //窗口聚合结果样例类 case class UrlViewCount(url:String,windowEnd:Long,count:Long) object abc { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.readTextFile("D:\\idea\\ideal\\flink-tutorial\\src\\main\\resources\\apache.log") .map(data=>{ val dataArray = data.split(" ") //定义事件转换 20/05/2015:17:05:11 val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime ApacheLogEvent(dataArray(0).trim,dataArray(1).trim,timestamp,dataArray(5).trim,dataArray(6).trim) }) //*1000看是秒还是毫秒 秒*1000 毫秒不乘 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) { override def extractTimestamp(t: ApacheLogEvent): Long = t.evetTime }) .keyBy(_.url) .timeWindow(Time.minutes(10),Time.seconds(5)) //允许60秒的延迟数据去更新 .allowedLateness(Time.seconds(60)) .aggregate(new CountAgg(),new WindowResult()) .keyBy(_.windowEnd) .process(new TopNHotUrls(5)) dataStream.print() env.execute("network flow job") } }
这段代码是一个Scala语言的Flink应用程序的主函数。具体来说,它定义了一个输入数据样例类ApacheLogEvent,它包括了IP地址、用户ID、事件时间、请求方法和URL地址等信息。同时,它还定义了一个窗口聚合结果样例类UrlViewCount,它包括了URL地址、窗口结束时间和计数器等信息。
在主函数中,它首先获取Flink的流执行环境,并设置并行度和时间特征。接下来,它从文件中读取Apache日志数据,并对数据进行处理,将时间戳转换为事件时间,并使用BoundedOutOfOrdernessTimestampExtractor对数据进行时间戳提取和乱序限制。然后,它按照URL地址进行分组,并使用时间窗口对数据进行聚合。该应用程序还允许60秒的延迟数据去更新,并使用CountAgg和WindowResult对窗口中的数据进行聚合和处理。最后,它使用KeyedProcessFunction对数据流进行键控处理,并使用TopNHotUrls对URL地址进行热门度排名。最后,它打印结果并执行该应用程序。
阅读全文