Airflow

프로그래밍 언어(파이썬, Python)으로 작업 흐름(Workflows)을 관리, 모니터하는 플랫폼으로 더 자세한 내용은 링크를 참고하세요.

설치

공식 가이드를 따라 설치를 진행합니다.

설치는 Ubuntu 20.04에서 진행하였습니다.

Dependencies 설치

sudo apt-get update -y
sudo apt-get install -y --no-install-recommends \
        freetds-bin \
        krb5-user \
        ldap-utils \
        libsasl2-2 \
        libsasl2-modules \
        libssl1.1 \
        locales  \
        lsb-release \
        sasl2-bin \
        sqlite3 \
        unixodbc \
        postgresql \
        python3-pip \
        python3-testresources

환경 변수 설정

export AIRFLOW_HOME=~/airflow
export AIRFLOW_VERSION="2.1.2"
export PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
export PATH=$PATH:/home/ubuntu/.local/bin

Airflow 설치

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

DB 초기화

사용자 생성

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

사용자 생성 시 비밀번호를 입력할 수 있습니다.

설정 파일 수정

sed -i "103s/True/False/" ~/airflow/airflow.cfg

샘플을 보여주지 않겠다는 뜻이에요.

Web 서버 시작

스케쥴러 시작

# start the web server, default port is 8080
airflow webserver --port 8080

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler

# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page

Webserver 실행 시 -D 옵션을 추가하여, Daemon으로 실행할 수 있습니다.

DAG

Airflow는 작업의 관계를 DAG(방향성 비순환 그래프, Directed Acyclic Graphs)로 표현합니다.

튜토리얼

파이프라인 작성

링크에서 제공하는 예제 파이프라인을 만들어봅니다. 자세한 설명도 있으니 참고하세요.

파일 생성

# Create a file
vi ~/airflow/dags/tutorial.py

내용 입력

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

    t1 >> [t2, t3]

내용을 복사할 수 있습니다.

파이프라인 테스트

앞에서 생성한 파일을 테스트하여 문제가 없는지 확인합니다.

python3 ~/airflow/dags/tutorial.py
파이프라인 분석

Airflow 파이프라인은 파이썬 스크립트로 작성하며, Airflow DAG 객체를 정의합니다.

스크립트를 작성할 때 가장 먼저 하는 일은 필요한 라이브러리 불러오기(Importing) 입니다.

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

코드를 작성하면서 계속 바뀌는 부분 중 한 곳 입니다. 상황에 따라 필요한 라이브러리가 바뀔 수도 있겠죠?

작업(Task)를 생성할 때 사용할 기본 매개변수를 Dictionary 타입으로 정의합니다.

모든 작업 생성자(Constructor)에 Arguments를 명시적으로 전달할 수 있습니다.

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

더 많은 정보는 airflow.models.BaseOperator에서 확인하세요.

지금부터는 작업을 연결하는 DAG 객체가 필요합니다.

객체에 수 많은 DAG를 구별할 수 있는 고유한 식별자 dag_id를 문자열로 정의합니다.

앞서 정의한 Argument Dictionary 추가합니다. 그리고 Schedule_interval도 정의하구요.

with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:

튜토리얼에서 생성한 DAG의 고유한 식별자는 tutorial이군요.

클래스로 생성한 객체를 인스턴스라고 하며, 작업(Tasks)은 Operator의 인스턴스입니다.

작업의 고유한 식별자는 task_id이며, 작업의 첫 번째 Argument로 추가합니다.

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
)

작업은 반드시 task_idowner Arguemnt를 포함하거나 상속받아야합니다. 그렇지 않으면 문제가 발생(Raise an exception)합니다.

Airflow는 Jinja Templating, 사전 정의된 매개변수(Parameters)매크로(Macro) 등을 지원합니다.

Jinja Templating을 사용하면 Template이 Rendering될 때 변수 및 표현들을 실제 값으로 변환합니다.

Template engine은 괄호 사이에 있는 변수나 표현들을 결과로 반환합니다.

<!DOCTYPE html>
<html lang="en">
<head>
    <title>My Webpage</title>
</head>
<body>
    <ul id="navigation">
    {% for item in navigation %}
        <li><a href="{{ item.href }}">{{ item.caption }}</a></li>
    {% endfor %}
    </ul>

    <h1>My Webpage</h1>
    {{ a_variable }}

    {# a comment #}
</body>
</html>

Jinja Template 소개

실제 Airflow 튜토리얼에 적용한 코드를 살펴보면, templated_command에 {% %} 블록, 매개변수, 함수 등 다양한 문법이 적용되어 있습니다.

templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
)

그리고 하나의 작업을 더 추가했습니다.

지금까지 총 세 개의 작업 T1, T2, T3을 생성했습니다. 작업들 사이의 의존 관계는 존재하지 않습니다.

만약 우리가 작업 사이의 의존 관계를 정의를 한다면, 작업의 메소드 set_downstream & set_upstream를 활용하거나 연산자 >> & << 등으로 나타냅니다.

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
파이프라인 그리기

명령어로 간단히 DAG의 의존성 정보를 나타냅니다.

의존성 정보 확인

airflow tasks list tutorial --tree

# Expected Output
<Task(BashOperator): print_date>
    <Task(BashOperator): sleep>
    <Task(BashOperator): templated>
테스트

임의의 날짜를 대입하여 실제 작업 인스턴스를 동작해봅니다.

첫번째 작업 실행

airflow tasks test tutorial print_date 2015-06-01

# Expected Output
[2021-08-10 07:55:07,803] {subprocess.py:74} INFO - Output:
[2021-08-10 07:55:07,805] {subprocess.py:78} INFO - Tue Aug 10 07:55:07 UTC 2021
[2021-08-10 07:55:07,805] {subprocess.py:82} INFO - Command exited with return c                                                                  ode 0
[2021-08-10 07:55:07,818] {taskinstance.py:1204} INFO - Marking task as SUCCESS.                                                                   dag_id=tutorial, task_id=print_date, execution_date=20150601T000000, start_date                                                                  =20210810T075507, end_date=20210810T075507

두번째 작업 실행

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

# Expected output
[2021-08-10 08:01:27,404] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'sleep 5']
[2021-08-10 08:01:27,408] {subprocess.py:74} INFO - Output:
[2021-08-10 08:01:32,412] {subprocess.py:82} INFO - Command exited with return code 0
[2021-08-10 08:01:32,428] {taskinstance.py:1204} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20150601T000000, start_date=20210810T080127, end_date=20210810T080132

생성한 작업이 정상적으로 동작하네요.

airflow tasks test 명령어는 작업 인스턴스를 로컬에서 실행하며, 의존 관계와 무관하게 동작합니다. 또한 데이터베이스에 상태를 전달하지 않습니다.

이와 유사하게 airflow dags test [dag_id] [execution_date] 명령어를 통해 DAG를 테스트해볼 수 있습니다. 이 때는 의존 관계에 따라 작업을 진행하지만, 데이터베이스에 상태를 전달하지는 않습니다.

Backfill

테스트까지 정상적으로 진행됐으면, 드디어 Backfill을 진행할 때 입니다.

의존 관계에 따라 작업을 진행하고, 파일로 로그를 남기며 데이터베이스에 상태 정보를 저장합니다.

strat_dateend_date를 명시하여 작업 인스턴스가 스케쥴에 맞춰 수행할 수 있도록 합니다.

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

# Expected output
[2021-08-10 08:11:15,786] {backfill_job.py:377} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-08-10 08:11:15,792] {backfill_job.py:831} INFO - Backfill done. Exiting.

Backfill을 성공적으로 수행하고, 종료하였습니다.

여기까지 Airflow 튜토리얼이었습니다.

마지막으로,

끝까지 읽어주신 모든 분들께 감사드립니다.

다음 글 보기
이전 글 보기
[Airflow] 설치 및 튜토리얼