df = self.spark.createDataFrame([], self.table_schema) try: df = self.spark.read.format("csv").option("header", self.config.header) \ .option("delimiter", self.config.delimiter) \ .option("multiLine", self.config.multiLine) \ .option('infer_schema', self.config.infer_schema) \ .option('escape', '"') \ .load(self.config.source_file_path) except AnalysisException as e: if e.desc.startswith('Path does not exist'): self.logger.warning(f'source file not found. will run with empty dataframe.') else: self.logger.exception(e) self.source_df_count = df.count() self.logger.info(f'success extract {self.source_df_count} records') return df
时间: 2024-02-14 19:35:21 浏览: 337
这段代码是一个ETL作业中的数据提取过程,它做了以下几个步骤:
1. 创建一个空的DataFrame对象,使用SparkSession的createDataFrame方法实现,这个空的DataFrame对象将会在出现异常的情况下返回。
2. 使用Spark的read方法读取源文件,这里的源文件是一个CSV文件,read方法可以读取多种不同格式的数据源。
3. 在读取CSV文件时,使用了多个选项来指定文件的特性,例如文件的分隔符、是否包含标题行、是否需要推断架构等等。
4. 如果读取过程中发生了分析异常(AnalysisException),会根据异常的描述来判断是否是文件不存在的问题,如果是文件不存在,则会记录一个警告信息,否则会将异常记录下来。
5. 统计提取出来的DataFrame中的记录总数,并记录日志。
6. 最后返回提取出来的DataFrame对象。
总体而言,这段代码是一个通用的数据提取过程,可以用于读取各种格式的数据源,并且可以根据需要进行自定义和修改。
相关问题
def __init__(self, glueContext: GlueContext, config: argparse.Namespace): """ init function. :param glueContext: the glueContext. the spark session can get from glueContext. :param config: Obtained by parsing from the Glue Job Input parameter list. """ self.config = config self.logger = logging.getLogger(self.config.table_full_name) self.logger.info(f'job init with params: {vars(self.config)}') self.glueContext = glueContext self.spark = glueContext.spark_session self.table_schema = Custom_Schema(self.spark).get_schema(self.config.source_file_schema) self.source_df_count = 0 self.destination_df_count = 0 self.load_date = datetime.now() self.logger.info(f'job load date: {self.load_date}') self.database_name, self.table_name = self.config.table_full_name.split('.') self.set_spark_configs()
这是一个Python类的初始化函数,它接受两个参数:glueContext和config。其中,glueContext是Glue的上下文,可以从中获取Spark会话,而config则是从Glue作业的输入参数列表中解析得到的命名空间。在初始化函数中,将这两个参数分别赋值给实例变量self.glueContext和self.config,并且设置了一些其他的实例变量,比如self.logger、self.spark、self.table_schema等。最后,调用了self.set_spark_configs()方法。可以看到,这个类是用来处理一些数据加载和转换的任务的,具体实现需要看其他方法的实现。
请将以下代码修改成可以多次输入SQL获得多段数据,并不会报错Promise already completed:def spark_sql_test(m): conf = SparkConf().setAppName("test_spark_sql").setMaster("yarn").setSparkHome("/usr/local/share/spark") sc = SparkContext(conf=conf) hc = HiveContext(sc) result = hc.sql(m) spark = SparkSession.builder.appName("test_spark_sql").getOrCreate() df = spark.createDataFrame(result.rdd, result.schema) df_pd = df.toPandas() spark.stop() return df_pd
可以将函数修改为以下形式:
```
from pyspark.sql import SparkSession
def spark_sql_test(m):
spark = SparkSession.builder.appName("test_spark_sql").getOrCreate()
result = spark.sql(m)
df = spark.createDataFrame(result.rdd, result.schema)
df_pd = df.toPandas()
spark.stop()
return df_pd
```
这样修改后,可以多次调用 `spark_sql_test` 函数,每次传入不同的 SQL 语句进行查询,返回多段数据。同时,将 SparkConf 和 HiveContext 替换为 SparkSession 的方式,可以更加简洁地创建 SparkSession。
阅读全文