Airflow跳过后续任务(ShortCircuitOperator)及BashOperator模版参数传入

再次使用airflow对其也有了更深刻的理解,发现之前使用到的内容真的比较少,基本上就当一个可以管理任务依赖的crontab用了。 之前写dag的时候是当一个完整的项目写,基础类比如数据库连接都是自己封装,各种配置也自己用环境变量或者配置文件来配置。 随着理解的深入,发现其实人家做这个项目就是给大家提供一个开箱即用的任务管理平台,而不是让你还费半天劲建立项目。 基础连接类可以使用airflow自带的各种Hook,如果不符合你的要求,可以在plugins中添加自己的Hook,各种变量配置可以直接使用Variable来配置和获取。 基本上能想到的airflow都有提供,这里我记录一下最近踩坑的的几点特性

Dag回填之前的任务

我们在设置了start_date,dag开启的时候airflow会将start_date到现在的任务都跑一遍,有时候我们是不需要回填的,因为有的任务不依赖execution_date, 也可能我们就是不需要回填遗落的任务。我们可以在airflow.cfg中设置catchup_by_defaultFalse,默认为True

这样的话就所有dag默认都不回填了。也可以在单个dag中配置参数catchup=False来禁止该dag回填

ShortCircuitOperator

我们可以用ShortCircuitOperator来进行逻辑判断,return True会继续后面的任务,return False则会中断本次Dag,后续的任务会被标记为skipped

BashOperator模版参数传入

BashOperator中bash命令是可以使用jinja2模版语言的,如官方文档中Tutorial的例子

...
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
...

传入Variable参数

bash_command可以传入Variables的变量

传入字典(通过var.json

# test为json格式字符串
{{ var.json.test.value }}

传入字符串(通过var.value

# test为普通字符串
{{ var.value.test }}

传入params参数

通过params来取值

BashOperator(
    task_id='templated',
    bash_command="echo '{{params.test}}'",
    params={'test': 'ttt'},
    dag=dag)

python代码中可以使用Variable.getVariable.set来操作Variables中的变量,详细用法可以查看airflow源码或官方文档

DummyOperator

DummyOperator不会执行任何任务,但可以用它来组织整个Dag的结构。

例子

该例子展示了以上所有特性的用法

from airflow.models import DAG
from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(dag_id='test2', default_args=args, catchup=False)


def test1(*args, **kwargs):
    # 判断逻辑
    return False


def test2(*args, **kwargs):
    print(2)


cond = ShortCircuitOperator(
    task_id='condition',
    python_callable=test1,
    dag=dag,
)
task2 = BashOperator(
    task_id='test2',
    bash_command="echo '{{params.name}}'",
    params={'name': 'haha'},
    dag=dag,
)
task3 = BashOperator(
    task_id='test3',
    bash_command="echo '{{var.json.test.value}}'",
    params={},
    dag=dag,
)
dummy = DummyOperator(task_id='dummy', dag=dag)

cond >> dummy
dummy >> task2
dummy >> task3