
이번 글에서는 Apache Airflow
에서 DAG
를 작성하고 실행하는 방법에 대해 설명하려고 한다.
앞선 글에서 설명했듯이, Airflow
는 DAG(Directed Acyclic Graph)
를 이용해 작업의 실행 순서를 정의한다.
간단하게 이러한 DAG
를 작성해보고 직접 실행해 볼 예정이다.
실습 환경
OS | Ubuntu 20.04 |
Docker | 28.0.4 |
Apache Airflow | airflow:2.10.5 |
실습은 Ubuntu 20.04 환경에서 진행했으며, Docker Compose를 활용하여 Airflow를 구성하였다.
DAG 작성

DAG는 기본적으로 Python으로 작성되며 dags/
디렉토리에 작성하면 된다.
Airflow는 해당 디렉토리 내의 Python 파일을 자동으로 인식하여 DAG를 등록한다.
dags/
디렉토리에 example_dag.py라는 파일을 만들어 다음과 같이 작성해주었다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_hello():
print("Hello, Airflow!")
def print_goodbye():
print("Goodbye, Airflow!")
with DAG(
dag_id='example_dag',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False
) as dag:
# 첫 번째 Task: "Hello, Airflow!" 출력
task1 = PythonOperator(
task_id='print_hello_task',
python_callable=print_hello
)
# 두 번째 Task: "Goodbye, Airflow!" 출력
task2 = PythonOperator(
task_id='print_goodbye_task',
python_callable=print_goodbye
)
# 작업 연결
task1 >> task2
이는 PythonOperator를 이용하여 작성된 DAG이다.
이제 코드에 대해 설명해 보겠다.
with DAG()
with DAG()
블록 내부에서는 실제 Task들을 정의하고 실행 순서를 지정한다.
위 코드에서는 task1
과 task2
를 정의하고 task1 >> task2
를 통해 실행 순서를 지정했다.
with DAG(
dag_id='example_dag',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False
) as dag:
DAG의 파라미터의 대해 설명하면 다음과 같다.
dag_id
: DAG의 고유 식별자로, Airflow UI에서 이를 통해 DAG를 식별할 수 있다.schedule_interval
: DAG가 실행되는 주기를 나타낸다.@daily
: 하루에 한 번 실행됨 (매일 자정)@once
: 한 번만 실행됨@hourly
: 매 시간 실행됨@weekly
: 매주 일요일 자정에 실행됨@monthly
: 매월 1일 자정에 실행됨@yearly
: 매년 1월 1일 자정에 실행됨timedelta
: timedelta(days = 3), timedelta(weeks =3), timedelta(hours=3), timedelta(minutes=35) 등과 같이 원하는 시간대 마다도 설정할 수 있다.
start_date
: DAG가 언제부터 실행될지를 나타낸다.datetime(2025, 1, 1)
: 2025년 1월 1일부터 DAG가 실행됨- 과거 날짜를 지정하면 Airflow가 자동으로 해당 날짜부터 실행해야 할 DAG 인스턴스를 생성할 수 있음
catchup
: 이전 날짜의 DAG 실행을 보출할지 여부를 설정한다.- False로 설정할 경우 이전 날짜의 실행을 수행하지 않음
PythonOperator
DAG 내에서 Python 함수를 실행하기 위해 PythonOperator
를 사용했다.
from airflow.operators.python import PythonOperator
def print_hello():
print("Hello, Airflow!")
def print_goodbye():
print("Goodbye, Airflow!")
각각 함수는 "Hello, Airflow!"와 "Goodbye, Airflow!"를 출력하는 함수이다.
아래와 같이 PythonOperator의 python_callable 매개변수에 실행할 함수를 지정하여 작업을 생성한다.
task1 = PythonOperator(
task_id='print_hello_task',
python_callable=print_hello
)
task2 = PythonOperator(
task_id='print_goodbye_task',
python_callable=print_goodbye
)
task_id
는 각 작업을 식별하는 고유한 ID이며, python_callable
은 실행할 Python 함수를 지정할 수 있다.
DAG 실행 흐름
task1 >> task2
이는 task1
이 먼저 실행된 후 task2
가 실행되도록 DAG의 실행 순서를 정의한다.
>>
연산자는 task1
이 성공적으로 완료된 후 task2
가 실행됨을 의미한다.
DAG 실행

dags/
디렉토리에 파일을 만들고 작성을 완료했으면 자동으로 감지된다.
웹 UI에 접속해서 확인해보자 (바로 감지되지는 않고 어느정도 지나면 UI에서 확인이 가능하다.)

example_dag가 잘 생성된 것을 확인할 수 있다.

DAG의 그래프를 확인해보면 작성했던 2개의 task가 연결되어 있는 것을 확인할 수 있다.
이제 실행을 해보자


다음과 같이 DAG를 켜주면 앞서 코드에 작성했던 schedule_interval
마다 자동으로 이 DAG가 실행되게 된다.
즉, 이 example_dag는 매일 자정마다 실행되게 된다.


위와 같이 DAG의 상태 또한 확인할 수 있다.


결과를 보면 두 task모두 정상적으로 Hello, Ariflow!와 Goodbye, Airflow!가 출력된 것을 확인할 수 있다.
웹 UI 뿐만 아니라 CLI에서도 가능하다.
다음과 같은 명령어를 통해 DAG list를 확인할 수 있다.
(sudo) docker compose run airflow-cli dags list

다음과 같은 명령어로 DAG를 실행할 수도 있다.
(sudo) docker compose run airflow-cli dags trigger <dag_name>

마무리
이번글에서는 간단히 PythonOperator를 이용해 DAG를 작성하고 실행하는 방법에 대해 정리하였다.
DAG
는 Python 코드로 작성되고,dags/
디렉토리에 배치하면 자동으로 감지된다.- Python 함수를 실행하는 Task를 정의하려면
PythonOperator
를 활용할 수 있다. - DAG 실행 흐름을
>>
연산자를 사용하여 설정할 수 있다. - Web UI 또는 CLI를 활용하여 DAG를 실행할 수 있다.
다음 글에서는 ETL 작업을 DAG로 작성해보고 DB에 저장해보는 실습내용을 포스팅할 예정이다.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Airflow 설치 - Python 가상 환경, Docker (0) | 2025.02.19 |
---|---|
[Airflow] Apache Airflow란? (0) | 2025.02.17 |