spark-submit运行Python代码的结构如下,出现了数据倾斜问题,即每次访问接口,得到的json数据量不同,该如何修改代码结构:a = [] b = [] for url in url_list: response = requests.get(url, params=params) html = response.text data = json.loads(html) # 对Data做相关数据处理 ...... a.append(data['data']['key1']) b.append(data['data']['key2']) answer_pd= pd.DataFrame() answer_pd['a'] = a answer_pd['b'] = b conf = SparkConf().setAppName("APP").setMaster("yarn").setSparkHome("/usr/xxx/xx/xxx") sc = SparkContext(conf=conf) hc = HiveContext(sc) dt = 'database.table' # 数据库和表名 hc.setConf("hive.exec.dynamic.partition.mode", "nonstrict") hc.createDataFrame(answer_pd).write.mode("append").insertInto(dt)
时间: 2024-04-14 20:26:13 浏览: 109
为了解决数据倾斜问题,可以考虑对代码进行如下修改:
1. 使用Spark的并行处理能力替代循环遍历:将循环遍历替换为Spark的并行处理能力,可以提高数据处理的效率和并行度。可以使用`SparkSession`来代替`SparkContext`和`HiveContext`。
2. 使用RDD的flatMap来处理数据:在获取数据后,可以使用RDD的flatMap函数将数据扁平化,以便更好地进行并行处理。
3. 使用RDD的reduceByKey来聚合数据:在数据处理过程中,可以使用reduceByKey函数对数据进行聚合操作,避免频繁的shuffle操作。
修改后的代码如下:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("APP") \
.master("yarn") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.instances", "8") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "false") \
.getOrCreate()
# 并行处理获取数据
rdd = spark.sparkContext.parallelize(url_list)
result_rdd = rdd.flatMap(lambda url: requests.get(url, params=params).json()['data'])
# 对数据进行相关处理
processed_rdd = result_rdd.map(lambda data: (data['key1'], data['key2']))
# 将处理后的数据转为DataFrame
answer_pd = processed_rdd.toDF(['a', 'b'])
# 写入Hive表
dt = 'database.table'
answer_pd.write.mode("append").insertInto(dt)
# 关闭SparkSession
spark.stop()
```
在修改后的代码中,使用SparkSession来创建Spark应用程序,并设置相关配置,如executor的内存、实例数和核心数等。通过对url_list进行并行处理,使用flatMap函数将获取到的数据扁平化,然后进行相关处理并转为DataFrame。最后,将处理后的数据写入Hive表。注意,这里并未涉及到数据倾斜的处理,如果数据倾斜依然存在,可能需要考虑其他解决方案,如分桶、采样等。
阅读全文