Apache Airflow의 DAG 내에 task들의 dependency를 설정함으로써 task 실행 순서와 병렬 실행 task들 등을 정의할 수 있는데,
Airflow를 조금이라도 사용해 봤다면 이것은 당연히 알 것이다.
그리고 Airflow에서는 2.1 버전부터 DAG 내 task들 뿐만 아니라 DAG 간의 dependency를 설정할 수 있는 기능도 제공한다.
설정할 필요성을 생각해보면,
A라는 DAG이 B라는 DAG 로직에서 생성한 데이터가 있어야만 정상적으로 수행 가능하다고 하자.
그러면 B DAG의 수행 시간(schedule_interval)을 A DAG이 일반적으로 종료되는 시간 이후로 설정하는 방법이 있는데
충분히 여유를 두고 설정하더라도 A DAG 수행이 모종의 이유로 특별히 오래 걸렸다면 B DAG 수행 시 오류가 발생하게 된다.
그러면 B DAG 오류를 막기 위해서는 schedule_interval에 얼마나 여유를 두어야하나?
오류를 막는다고 너무 긴 텀을 두면 적절한 시점에 B DAG을 수행할 수 없는 문제가 있을 수 있다.
이런 경우 의존성을 설정함으로써 A DAG 수행 후 B DAG이 수행되게 한다면 schedule_interval에 대한 고민없이 문제를 해결할 수 있다.
# 참고 자료
이어지는 내용들은 다음 자료들을 참고해서 정리한 자료이다.
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#dag-dependencies
- https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator
- https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external_task_sensor.html
- https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142
- https://tommybebe.github.io/2020/11/30/airflow-external-task-sensor/https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskMarker
# 설정 방법
설정하는 방법은 3가지가 존재하며, 설정된 내용은 Airflow 웹 Menu -> Browse -> DAG Dependencies 메뉴를 통해 시각화된 결과를 확인할 수 있다.
TriggerDagRunOperator 사용
- 상위 DAG에서 TriggerDagRunOperator를 task로 설정해서 하위 DAG을 실행시키는 방법이다.
- 상위 DAG에서 하위 DAG을 실행시켜주기 때문에 하위 DAG에 schedule_interval 설정이 없어도 실행이 된다.
- 하위 DAG에 execution_date와 conf 전달도 가능하다.
- 상위 DAG만 실행하고 연결된 하위 DAG의 실행을 skip하고 싶은 경우도 있을 수 있는데, 이런 옵션은 없는 것으로 보인다.
ExternalTaskSensor 사용
- 하위 DAG에서 ExternalTaskSensor를 task로 설정해서 상위 DAG의 실행 여부를 감지하는 방법이다.
- 하위 DAG을 상위 DAG에서 실행시켜주는 것은 아니기 때문에 별도로 schedule_interval 설정이 필요하다.
- Sensor task에서 설정한 dag_id, task_id, execution_date가 매칭되는 상위 DAG task의 실행 여부를 감지하여 실행되었다면 다음 task가 진행된다.
- task 지정은 생략하고 DAG만 지정도 가능하다.
- 상위 DAG과 하위 DAG의 schedule_interval이 똑같지 않더라도 이를 보정할 수 있는 방법을 제공한다.
- execution_delta, execution_date_fn 옵션을 통해 가능하다.
- 하나의 하위 DAG이 여러 개의 상위 DAG과 연결되어 있는 경우에도 사용할 수 있다.
- TriggerDagRunOperator처럼 상위 DAG 수행 후에 하위 DAG 연달아 수행되지는 않는다.
Airflow API
- Airflow에서 제공하는 Rest API로 다른 DAG 호출하는 방법
- 하위 DAG이 다른 Airflow 환경에 있는 경우 유용하다.
이 중에서 마지막 API를 활용하는 방법은 특수한 경우가 아니면 사실 사용하는 경우는 많지 않을 것 같고,
대부분 TriggerDagRunOperator나 ExternalTaskSensor를 사용하게 될 것 같다.
병렬 연결
하위 DAG 여러 개를 병렬로 연결하는 것도 가능하고 병렬 연결 후 단일 하위 DAG으로 연결도 가능하다.
하지만 주의할 점은 병렬 연결에서 단일 연결로 넘어갈 때 상위 DAG 여러 개에 TriggerDagRunOperator로 설정하게 되면
하위 DAG이 설정한 횟수만큼 실행되는 문제가 발생한다.
따라서 위에 ExternalTaskSensor 부분에서 언급한 것 처럼, 상위 DAG이 여러 개인 하위 DAG은 ExternalTaskSensor을 여러개 설정하는 방식으로 설정해야 한다.
# backfill 또는 rerun
DAG을 재수행해야하는 경우도 있는데, 이렇게 dependency가 설정되어 있는 경우 설정된 모든 DAG을 재수행하고 싶을 수 있다.
수행되었던 dag run이 clear되어야 재수행이 가능한데, 사용자가 직접 재수행시키는 상위 DAG은 직접 clear한다고 해도 하위 DAG이 여러개라면 일일이 clear 작업을 한다는 것은 상당히 번거로운 작업이다.
이를 위한 다음과 같은 방법이 존재한다.
TriggerDagRunOperator.reset_dag_run 옵션
True로 설정함으로써 하위 DAG의 dag run이 존재하는 경우 clear 후 실행시킬 수 있다.
ExternalTaskMarker 사용
상위 DAG에 ExternalTaskMarker task를 설정하는 방법이다.
task가 Recursive가 선택되어 clear되면 지정된 external_dag_id, external_task_id, execution_date에 해당되는 task instance도 같이 clear한다.
Downstream task들도 clear하기 때문에 최상위 task를 지정한다면 전체 DAG이 clear되는 것과 유사한 효과가 있을 것 같다.
'개발 > Data Engineering' 카테고리의 다른 글
HDFS file count 조회 (0) | 2022.05.09 |
---|---|
Apache Airflow 재수행 방법 정리 (0) | 2022.04.21 |
Apache Hudi 소개 - HDFS upsert/delete (0) | 2021.07.11 |
Avro와 Parquet (0) | 2021.06.24 |
태블로 데이터 원본 구성 방법 - 관계와 조인 (0) | 2021.04.16 |