目录
- 什么是 Apache Airflow?
- 核心概念与架构
- DAG
- Operators
- Tasks
- Executors
- Airflow 的安装与配置
- 环境要求
- 安装步骤
- Airflow 示例项目
- 简单任务调度
- 使用 PythonOperator 实现数据处理任务
- 集成外部工具:MySQL 和 S3
- Airflow 的高级功能
- 自定义 Operators
- 使用 Sensors 实现动态依赖
- 分布式调度
- Airflow 的优缺点
- 总结
什么是 Apache Airflow?
Apache Airflow 是一个强大的开源平台,用于 编排和监控复杂的工作流。通过使用 Python 脚本,开发者可以定义工作流的依赖关系、调度规则以及任务执行逻辑。Airflow 提供了灵活的任务调度与管理能力,适合处理数据工程、ETL 流程以及各种自动化任务。
主要特点:
- Python 编程:工作流以 Python 代码定义,清晰且易于维护。
- 动态性:允许根据运行时动态生成任务。
- 扩展性:支持插件系统和自定义操作。
- 分布式调度:通过 CeleryExecutor 等方式实现分布式执行。
核心概念与架构
Airflow 的核心概念主要围绕 DAG 和 Tasks,这些定义了工作流的结构和执行逻辑。
1. DAG(有向无环图)
DAG 是工作流的核心,表示任务之间的依赖关系。它由多个 Task 组成,保证任务按顺序依赖执行,且不会形成循环。
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
# 定义一个简单的 DAG
with DAG('example_dag',
start_date=datetime(2024, 11, 1),
schedule_interval='@daily') as dag:
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task1 >> task2 # 定义依赖:task1 -> task2
2. Operators
Operators 是 Airflow 中任务的具体实现。Airflow 提供了丰富的内置 Operator,比如:
- PythonOperator:运行自定义 Python 函数。
- BashOperator:执行 Bash 命令。
- MySqlOperator:运行 MySQL 查询。
- HttpOperator:发送 HTTP 请求。
3. Tasks
Tasks 是工作流中的基本执行单元,每个 Task 都是 Operator 的实例。
4. Executors
Executor 决定了任务的执行方式:
- SequentialExecutor:单任务执行(开发测试用)。
- LocalExecutor:并行执行多个任务(单机)。
- CeleryExecutor:分布式调度。
Airflow 的安装与配置
1. 环境要求
- Python 版本:>= 3.7
- 数据库:支持 SQLite、MySQL、PostgreSQL 等。
- 操作系统:支持 Linux、macOS 和 Windows。
2. 安装步骤
安装 Airflow
# 设置环境变量
export AIRFLOW_HOME=~/airflow
# 安装 Airflow
pip install apache-airflow
初始化数据库
airflow db init
创建管理员账户
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
启动 Web 界面
airflow webserver -p 8080
启动调度器
airflow scheduler
访问 http://localhost:8080
,可以看到 Airflow 的 Web 界面。
Airflow 示例项目
以下示例展示如何使用 Airflow 定义和调度实际工作流。
示例 1:简单任务调度
PythonOperator 实现简单任务
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_hello():
print("Hello, Airflow!")
# 定义 DAG
with DAG('simple_dag',
start_date=datetime(2024, 11, 1),
schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='hello_task',
python_callable=print_hello
)
运行结果:
通过 Web 界面可以查看 hello_task
的执行日志,输出 Hello, Airflow!
。
示例 2:集成 MySQL 和 S3
数据库查询任务
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime
# 定义 DAG
with DAG('mysql_example',
start_date=datetime(2024, 11, 1),
schedule_interval='@daily') as dag:
extract_task = MySqlOperator(
task_id='extract_data',
mysql_conn_id='my_mysql',
sql="SELECT * FROM your_table;"
)
上传数据到 S3
使用 S3Hook
提交查询结果到 S3 存储。
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def upload_to_s3():
hook = S3Hook(aws_conn_id='my_aws')
hook.load_string("Your data here", key="example_key", bucket_name="example_bucket")
upload_task = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3
)
将 extract_task >> upload_task
连接起来,定义依赖。
Airflow 的高级功能
1. 自定义 Operator
你可以根据需求定义自定义的 Operator。例如,一个计算任务的 Operator:
from airflow.models import BaseOperator
class MultiplyOperator(BaseOperator):
def __init__(self, a, b, *args, **kwargs):
super().__init__(*args, **kwargs)
self.a = a
self.b = b
def execute(self, context):
return self.a * self.b
2. Sensors 实现动态依赖
Sensors 用于等待某些条件满足,例如文件存在、任务完成等。
from airflow.sensors.filesystem import FileSensor
file_sensor = FileSensor(
task_id='check_file',
filepath='/path/to/file.txt',
poke_interval=10,
timeout=300
)
Airflow 的优缺点
优点
- 灵活性强:支持动态生成任务。
- 插件化:丰富的内置 Operator 和扩展功能。
- 可视化界面:便于监控和管理工作流。
- 分布式执行:轻松支持高并发任务。
缺点
- 安装复杂性:分布式部署依赖额外配置。
- 实时性不足:主要设计用于批量任务,实时任务支持较弱。
- 学习曲线:需要一定 Python 和系统运维知识。
总结
Apache Airflow 是一个强大的工作流调度和管理平台,适用于数据工程、ETL、任务自动化等场景。通过灵活的 DAG 定义、丰富的 Operator 支持以及分布式调度能力,Airflow 可以满足从单机到大规模分布式任务的需求。然而,在复杂部署环境下需要额外的运维投入。