如何在 Airflow 中设置环境变量?
时间: 2024-10-17 07:12:39 浏览: 28
在Apache Airflow中,你可以通过多种方式设置环境变量:
1. **配置文件**: 在`airflow.cfg`文件中,可以在`[core]`部分添加环境变量。例如:
```
core__env_vars={'MY_VAR': 'my_value'}
```
然后运行Airflow时,这些变量会被自动加载。
2. **Docker**: 如果你使用的是Docker容器,可以将环境变量写入Dockerfile或在docker-compose.yml中定义。
3. **Kubernetes**: 在Kubernetes部署中,可以在`dagrun`资源的`env`字段中设置环境变量,或者通过Pod的环境变量设置。
4. **Jinja模板**: 在编写DAG任务时,可以直接使用`{{ var.value.MY_VAR }}`访问环境变量,这是在运行时动态获取值的一种常见做法。
5. **Operator的环境注入**: 对于某些特定的operators,如`CustomOperator`,也可以在operator的初始化函数中指定环境变量。
记得保存配置并重启Airflow服务才能让新的环境变量生效。
相关问题
mac 安装 airflow
Mac上安装Apache Airflow,首先需要确保你的系统已经安装了Python、pip和其他必要的工具。以下是安装步骤:
1. **更新Python和pip**:
```sh
pip install --upgrade pip
```
2. **创建一个新的Python虚拟环境** (可选,推荐使用以避免全局包冲突):
```sh
python3 -m venv airflow_venv
source airflow_venv/bin/activate
```
3. **安装Airflow**: 使用pip安装最新版本的Apache Airflow:
```sh
pip install apache-airflow[all]
```
如果你想安装特定版本,可以指定版本号,例如 `pip install apache-airflow==2.4.0`。
4. **配置Airflow**: 创建一个Airflow的初始化文件,通常是在`~/airflow`目录下:
```sh
airflow initdb
```
5. **启动Web服务器**:
```
airflow webserver --port 8080
```
6. **验证安装**:
打开浏览器访问`http://localhost:8080`, 如果能看到Airflow的欢迎界面,说明安装成功。
7. **设置环境变量** (如果在非标准位置安装):
将AIRFLOW_HOME路径添加到`~/.bash_profile`或`~/.zshrc`中,并运行`source ~/.bash_profile`(或其他相应命令取决于你的shell)以使更改生效。
airflow BranchPythonOperator使用
Airflow 的 `BranchPythonOperator` 是 Apache Airflow 工作流管理平台中的一种任务操作符,它允许你在 Python 脚本中根据条件动态地分支流程。这个操作符会在执行时根据提供的 Python 代码块(通常是一个返回布尔值的函数表达式)来决定后续的任务应该执行哪一条路径。
使用 `BranchPythonOperator` 的步骤大致如下:
1. **创建任务**:首先在 DAG(Directed Acyclic Graph,有向无环图,表示工作流)中添加一个新的 `BranchPythonOperator` 实例,并指定一个唯一的 task_id 和一个 Python 函数作为 `python_callable` 参数。这个函数会接收上下文(context)作为输入,可以根据需要访问任务元数据和配置信息。
```python
from airflow.operators.python_operator import BranchPythonOperator
def decide_flow(execution_date, context):
# 根据某些条件(如数据库查询结果、环境变量等)编写判断逻辑
if condition:
return 'task_id_1'
else:
return 'task_id_2'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=decide_flow,
)
```
2. **连接任务**:在 `python_callable` 返回的结果(字符串)对应的任务 ID 上,通过 `follows` 或 `set_upstream` 属性将两个任务关联起来,形成分支结构。
```python
# 如果条件满足,任务_id_1 将被执行
task_id_1 = SimpleTask(task_id='task_id_1')
branch_task >> task_id_1
# 否则,任务_id_2 将被执行
task_id_2 = SimpleTask(task_id='task_id_2')
branch_task >> [task_id_1, task_id_2]
```
阅读全文