如何在 Airflow 中设置环境变量?
时间: 2024-10-17 10:12:39 浏览: 66
在Apache Airflow中,你可以通过多种方式设置环境变量:
1. **配置文件**: 在`airflow.cfg`文件中,可以在`[core]`部分添加环境变量。例如:
```
core__env_vars={'MY_VAR': 'my_value'}
```
然后运行Airflow时,这些变量会被自动加载。
2. **Docker**: 如果你使用的是Docker容器,可以将环境变量写入Dockerfile或在docker-compose.yml中定义。
3. **Kubernetes**: 在Kubernetes部署中,可以在`dagrun`资源的`env`字段中设置环境变量,或者通过Pod的环境变量设置。
4. **Jinja模板**: 在编写DAG任务时,可以直接使用`{{ var.value.MY_VAR }}`访问环境变量,这是在运行时动态获取值的一种常见做法。
5. **Operator的环境注入**: 对于某些特定的operators,如`CustomOperator`,也可以在operator的初始化函数中指定环境变量。
记得保存配置并重启Airflow服务才能让新的环境变量生效。
相关问题
airflow BranchPythonOperator使用
Airflow 的 `BranchPythonOperator` 是 Apache Airflow 工作流管理平台中的一种任务操作符,它允许你在 Python 脚本中根据条件动态地分支流程。这个操作符会在执行时根据提供的 Python 代码块(通常是一个返回布尔值的函数表达式)来决定后续的任务应该执行哪一条路径。
使用 `BranchPythonOperator` 的步骤大致如下:
1. **创建任务**:首先在 DAG(Directed Acyclic Graph,有向无环图,表示工作流)中添加一个新的 `BranchPythonOperator` 实例,并指定一个唯一的 task_id 和一个 Python 函数作为 `python_callable` 参数。这个函数会接收上下文(context)作为输入,可以根据需要访问任务元数据和配置信息。
```python
from airflow.operators.python_operator import BranchPythonOperator
def decide_flow(execution_date, context):
# 根据某些条件(如数据库查询结果、环境变量等)编写判断逻辑
if condition:
return 'task_id_1'
else:
return 'task_id_2'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=decide_flow,
)
```
2. **连接任务**:在 `python_callable` 返回的结果(字符串)对应的任务 ID 上,通过 `follows` 或 `set_upstream` 属性将两个任务关联起来,形成分支结构。
```python
# 如果条件满足,任务_id_1 将被执行
task_id_1 = SimpleTask(task_id='task_id_1')
branch_task >> task_id_1
# 否则,任务_id_2 将被执行
task_id_2 = SimpleTask(task_id='task_id_2')
branch_task >> [task_id_1, task_id_2]
```
airflow读取配置
Apache Airflow是一款流行的开源平台,用于自动化数据处理工作流程。它使用Python编写,其配置主要包括以下几个方面:
1. **`airflow.cfg`**:这是Airflow的核心配置文件,通常位于项目根目录下。这个文件包含了基础设置如数据库连接、API设置、日志路径等关键信息。
2. **`dags`** 文件夹:每个任务都保存在一个`.py`文件中,这些文件放在`dags`文件夹下。每个DAG(Directed Acyclic Graph,有向无环图)都有其独立的配置,可以在文件顶部通过`default_args`来设置全局默认参数,如调度时间、运行者等。
3. **环境变量**:Airflow也支持从环境变量读取配置,比如AIRFLOW_HOME、SQL_ALCHEMY_CONN等,这使得你可以轻松地在不同的环境中切换配置。
4. **`.env` 文件或外部环境管理工具**:对于敏感信息如密码,推荐使用环境变量管理工具(如`.env`文件)来存储,然后在Airflow启动时加载它们。
为了读取配置,Airflow提供了`BASIC_CONFIG`和`load_config`函数,可以在初始化时自动读取`airflow.cfg`。例如,在创建` DAG`对象时,可以使用`default_args`来引用配置中的值:
```python
from datetime import timedelta
from airflow import DAG
from airflow.config_templates.airflow_local_settings import load_config
# 加载配置
config = load_config()
dag = DAG(
'example_dag',
default_args={
'owner': config['core']['owner'],
'email_on_failure': config.getboolean('smtp', 'email_on_failure'),
'start_date': datetime(2022, 1, 1),
},
schedule_interval=timedelta(hours=1),
)
```
阅读全文