airflow中S3ToMySqlOperator 给个样例
时间: 2024-12-15 16:25:37 浏览: 9
在Apache Airflow中,`S3ToMySqlOperator`允许你从Amazon S3加载数据到MySQL数据库。这个操作符通常用于数据管道,从S3读取数据源,然后导入到MySQL表中。这是一个简单的示例:
首先,确保安装了`apache-airflow-providers-celery`和`apache-airflow-providers-mysql`插件:
```bash
pip install apache-airflow-providers-celery apache-airflow-providers-mysql
```
接下来,在DAG中编写S3ToMySqlOperator的示例:
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3_to_mysql import S3ToMySqlTransferOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
dag = DAG(
dag_id='example_s3_to_mysql',
default_args=default_args,
schedule_interval=timedelta(hours=1), # 每小时运行一次
description='A sample DAG that loads data from S3 to MySQL',
catchup=False, # 如果错过调度,不会自动补跑
)
# 获取S3连接和MySQL连接
s3_hook = S3Hook(aws_conn_id='your_s3_connection') # 替换为你的S3连接ID
mysql_hook = MySqlHook(mysql_conn_id='your_mysql_connection') # 替换为你的MySQL连接ID
def load_data_from_s3():
# 从S3加载数据
s3_key = "your_s3_key" # S3对象键
bucket_name = "your_bucket_name" # S3桶名
file_content = s3_hook.get_key(s3_key, bucket_name).get_contents_as_string()
# 将数据插入到MySQL表中
mysql_table = "your_table_name"
mysql_insert_query = f"INSERT INTO {mysql_table} (column1, column2) VALUES (%s, %s)" # 根据实际表结构填写查询
mysql_hook.run(sql=mysql_insert_query, parameters=(file_content,))
load_data_task = PythonOperator(
task_id="load_data",
python_callable=load_data_from_s3,
dag=dag,
)
# 运行S3ToMySqlOperator,它会触发PythonOperator的load_data_task
s3_to_mysql_task = S3ToMySqlTransferOperator(
task_id="s3_to_mysql",
aws_conn_id='your_s3_connection',
sql="""
COPY your_table_name (column1, column2)
FROM 's3://{bucket_name}/{s3_key}'
CREDENTIALS '{"aws_access_key_id": "{aws_access_key}", "aws_secret_access_key": "{aws_secret_key}"}'
DELIMITER ','
CSV;
""".format(
bucket_name=bucket_name, s3_key=s3_key, aws_access_key=s3_hook.get_extra()["aws_access_key_id"], aws_secret_key=s3_hook.get_extra()["aws_secret_access_key"]
),
mysql_conn_id='your_mysql_connection',
mysql_table="your_table_name", # 表名应与SQL中的匹配
dag=dag,
)
load_data_task >> s3_to_mysql_task
```
在这个示例中,你需要替换`your_s3_connection`, `your_bucket_name`, `your_s3_key`, `your_table_name`, `your_mysql_connection`等为实际的S3连接信息、MySQL连接信息和S3对象路径。运行此DAG后,每当其调度时,就会从S3加载指定文件到MySQL表中。
阅读全文