airflow dag 网络接口请求,并且把请求结果写入redis。
时间: 2023-03-03 16:22:05 浏览: 143
Airflow DAG 可以通过使用 PythonOperator 和 requests 库来进行网络接口请求,并通过使用 redis 模块将请求结果写入 redis。
下面是一个简单的示例:
```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import redis
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 15),
}
dag = DAG('example_dag', default_args=default_args, schedule_interval='@once')
def make_api_request():
response = requests.get('https://example.com/api')
return response.text
def write_to_redis():
r = redis.Redis(host='localhost', port=6379)
result = make_api_request()
r.set('example_key', result)
with dag:
task1 = PythonOperator(
task_id='make_api_request',
python_callable=make_api_request,
)
task2 = PythonOperator(
task_id='write_to_redis',
python_callable=write_to_redis,
)
task1 >> task2
```
在这个示例中,我们定义了两个 PythonOperator 任务,分别是 make_api_request 和 write_to_redis。make_api_request 任务使用 requests 库从 https://example.com/api 获取数据,并将结果返回。write_to_redis 任务连接到本地运行的 Redis 服务,并使用 redis 模块将 make_api_request 任务的结果写入 Redis 中。
在 DAG 中,我们将这两个任务通过任务依赖关系连接在一起,确保 make_api_request 在 write_to_redis 之前运行。这样,在 write_to_redis 任务运行时,我们可以确保 make_api_request 任务已经完成,将数据写入 Redis 中。
阅读全文