airflow dag 网络接口请求demo
时间: 2023-08-03 09:03:04 浏览: 172
网络通信demo
以下是一个使用Airflow DAG进行网络接口请求的示例代码:
```python
import requests
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 7, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'network_api_request',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
def get_api_data():
url = 'https://jsonplaceholder.typicode.com/todos'
response = requests.get(url)
data = response.json()
return data
def save_api_data(**context):
data = context['task_instance'].xcom_pull(task_ids='get_api_data')
with open('/path/to/save/data.json', 'w') as f:
f.write(data)
get_api_data_task = PythonOperator(
task_id='get_api_data',
python_callable=get_api_data,
dag=dag
)
save_api_data_task = PythonOperator(
task_id='save_api_data',
python_callable=save_api_data,
provide_context=True,
dag=dag
)
get_api_data_task >> save_api_data_task
```
在这个例子中,我们使用Python的requests库向一个API发送请求,并将其返回的数据保存到本地文件中。我们使用两个PythonOperator来执行两个任务:get_api_data和save_api_data。第一个任务使用get_api_data函数获取API数据,并将数据存储在XCom中。第二个任务使用save_api_data函数从XCom中获取数据,并将其写入本地文件中。
这个DAG每隔一天执行一次,并在执行过程中处理任何错误。你可以根据自己的需要修改该DAG,例如更改请求的API地址或更改数据的保存位置等。
阅读全文