airflow读取mysql
时间: 2024-10-15 08:06:15 浏览: 17
airflow-on-kubernetes:在Kubernetes上运行Apache Airflow的指南的源代码-apache source code
Apache Airflow是一个强大的开源平台,用于创建、调度和监控复杂的任务自动化工作流。它支持多种数据源,包括MySQL数据库。在Airflow中读取MySQL数据通常通过使用内置的Jinja模板语法配合SQL查询来实现。
以下是基本步骤:
1. **配置连接**:首先,在Airflow的`airflow.cfg`文件中或在每个DAG(dag.py文件中)里设置一个`Connection`,指定到MySQL服务器的连接信息,如主机名、端口、用户名、密码和数据库名。
```ini
[connections]
your_mysql_connection = conn_id=your_mysql_conn, host={{ mysql_host }}, port={{ mysql_port }}, login={{ mysql_user }}, password={{ mysql_password }}, schema={{ mysql_database }}
```
其中`conn_id`是你自定义的一个连接ID,可以根据需要在`Connections`表中创建。
2. **编写Operator**:然后,在DAG中添加一个`MySqlToGoogleCloudStorageOperator`或者自定义的PythonOperator,比如`MySqlOperator`,并传递之前配置好的连接。
```python
from airflow.providers.mysql.operators.mysql import MySqlOperator
t1 = MySqlOperator(
task_id="execute_mysql_query",
sql="{{ params.sql_query }}", # 这里填写SQL查询
mysql_conn_id='your_mysql_connection',
parameters={},
)
```
3. **参数化查询**:你可以将SQL查询作为变量传入,并在运行任务时提供具体的值,使用Jinja表达式语言(如`{{ var.value.some_value }}`)。
4. **调度执行**:最后,将这个任务添加到DAG的工作流程中,设定好定时器或者其他触发条件,Airflow就会定期或根据设置去执行SQL查询操作。
阅读全文