使用pyspark 进行 API接口的 案例
时间: 2024-10-17 19:15:46 浏览: 35
使用 PySpark 进行 API 接口操作通常是将 Apache Spark 与 Python 的网络请求库如 `requests` 结合使用,以便在大数据处理流程中集成外部数据源。以下是一个简单的案例:
假设你需要从一个天气API获取全球城市的实时温度,并将其整合到Spark DataFrame中进行分析。首先,你需要安装必要的库:
```bash
pip install pyspark requests pandas
```
然后,在PySpark环境中,你可以创建一个函数来发起API请求并处理响应:
```python
from pyspark.sql import SparkSession
import requests
from pandas import json_normalize
def fetch_weather_data(city):
url = "https://api.weatherapi.com/v1/current.json?key=YOUR_API_KEY&q=" + city
response = requests.get(url)
data = json_normalize(response.json()['current'])
return data[['location', 'temp_c']]
# 创建SparkSession
spark = SparkSession.builder.appName('WeatherApiExample').getOrCreate()
# 定义需要查询的城市列表
cities = ['New York', 'London', 'Tokyo']
# 发起API请求并收集结果
weather_data_rdd = spark.sparkContext.parallelize(cities).map(fetch_weather_data)
# 转换RDD为DataFrame
weather_df = weather_data_rdd.flatMap(lambda x: x).toDF(['city', 'temperature'])
# 打印前几行查看数据
weather_df.show()
```
在这个例子中,我们利用 `map()` 函数将每个城市映射到API请求,然后用 `flatMap()` 整合所有返回的数据。注意替换 `'YOUR_API_KEY'` 为实际的 Weather API 密钥。
阅读全文