MLOPS

Airflow - gitSync

개발허재 2024. 8. 24. 18:51

Kubernetes version: v1.30.0

Airflow version: 2.9.3

 

Airflow helm 설치

helm repo add apache-airflow https://airflow.apache.org/
helm repo update
helm pull apache-airflow/airflow --untar
helm install airflow apache-airflow/airflow --version 1.15.0 -n airflow -f values.yaml

 

helm install 하기전에, values.yaml의 executor 필드를 KubernetesExecutor로 변경한다.

...

# Airflow executor
# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
executor: "KubernetesExecutor"

...

 

dags 필드의 gitSync를 enable 해준뒤, repo를 dag 파일들이 들어있는 깃 경로로 바꿔준다.

...

# Git sync
dags:
  # Where dags volume will be mounted. Works for both persistence and gitSync.
  # If not specified, dags mount path will be set to $AIRFLOW_HOME/dags
  mountPath: ~
  persistence:
    # Annotations for dags PVC
    annotations: {}
    # Enable persistent volume for storing dags
    enabled: true
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName:
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim: dag-pvc
    ## optional subpath for dag volume mount
    subPath: ~
  gitSync:
    enabled: true

    # git repo clone url
    # ssh example: git@github.com:apache/airflow.git
    # https example: https://github.com/apache/airflow.git
    repo: https://github.com/jaeuHeo/airflow.git
    branch: main
    rev: HEAD
    # The git revision (branch, tag, or hash) to check out, v4 only
    ref: main
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: "dags"
    
...

 

추가적으로, logs도 pvc를 활용하여 로그를 영구적으로 보관할 수 있도록 하면 좋다!

...

logs:
  # Configuration for empty dir volume (if logs.persistence.enabled == false)
  # emptyDirConfig:
  #   sizeLimit: 1Gi
  #   medium: Memory

  persistence:
    # Enable persistent volume for storing logs
    enabled: true
    # Volume size for logs
    size: 10Gi
    # Annotations for the logs PVC
    annotations: {}
    # If using a custom storageClass, pass name here
    storageClassName:
    ## the name of an existing PVC to use
    existingClaim: log-pvc
    
...

 

 

테스트 DAG 작성

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

from datetime import datetime 
def test():
    print(datetime.now())

    
with DAG(
    'tutorial',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval='1 * * * *',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = PythonOperator(
        task_id='print_date',
        python_callable=test,
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

 

 

위 테스트 DAG를 github에 푸쉬하고 좀 기다리면, 아래와 같이 Airflow Webserver에 DAG가 추가된다.

 

 

DAG를 실행하면 아래와 같이 각 task별로 pod이 생성되어 Run하나를 실행하게 된다. 이때, pod는 kubernetesexecutor에 따라 kubernetes api로 요청을 보내며, pod template 기반으로 생성되며 다른 이미지 기반으로 다양하게 독립적으로 실행시키려면 kubernetespodoperator를 사용해야 한다.