MLOPS
Airflow - gitSync
개발허재
2024. 8. 24. 18:51
Kubernetes version: v1.30.0
Airflow version: 2.9.3
Airflow helm 설치
helm repo add apache-airflow https://airflow.apache.org/
helm repo update
helm pull apache-airflow/airflow --untar
helm install airflow apache-airflow/airflow --version 1.15.0 -n airflow -f values.yaml
helm install 하기전에, values.yaml의 executor 필드를 KubernetesExecutor로 변경한다.
...
# Airflow executor
# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
executor: "KubernetesExecutor"
...
dags 필드의 gitSync를 enable 해준뒤, repo를 dag 파일들이 들어있는 깃 경로로 바꿔준다.
...
# Git sync
dags:
# Where dags volume will be mounted. Works for both persistence and gitSync.
# If not specified, dags mount path will be set to $AIRFLOW_HOME/dags
mountPath: ~
persistence:
# Annotations for dags PVC
annotations: {}
# Enable persistent volume for storing dags
enabled: true
# Volume size for dags
size: 1Gi
# If using a custom storageClass, pass name here
storageClassName:
# access mode of the persistent volume
accessMode: ReadWriteOnce
## the name of an existing PVC to use
existingClaim: dag-pvc
## optional subpath for dag volume mount
subPath: ~
gitSync:
enabled: true
# git repo clone url
# ssh example: git@github.com:apache/airflow.git
# https example: https://github.com/apache/airflow.git
repo: https://github.com/jaeuHeo/airflow.git
branch: main
rev: HEAD
# The git revision (branch, tag, or hash) to check out, v4 only
ref: main
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: "dags"
...
추가적으로, logs도 pvc를 활용하여 로그를 영구적으로 보관할 수 있도록 하면 좋다!
...
logs:
# Configuration for empty dir volume (if logs.persistence.enabled == false)
# emptyDirConfig:
# sizeLimit: 1Gi
# medium: Memory
persistence:
# Enable persistent volume for storing logs
enabled: true
# Volume size for logs
size: 10Gi
# Annotations for the logs PVC
annotations: {}
# If using a custom storageClass, pass name here
storageClassName:
## the name of an existing PVC to use
existingClaim: log-pvc
...
테스트 DAG 작성
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def test():
print(datetime.now())
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
},
description='A simple tutorial DAG',
schedule_interval='1 * * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = PythonOperator(
task_id='print_date',
python_callable=test,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
위 테스트 DAG를 github에 푸쉬하고 좀 기다리면, 아래와 같이 Airflow Webserver에 DAG가 추가된다.
DAG를 실행하면 아래와 같이 각 task별로 pod이 생성되어 Run하나를 실행하게 된다. 이때, pod는 kubernetesexecutor에 따라 kubernetes api로 요청을 보내며, pod template 기반으로 생성되며 다른 이미지 기반으로 다양하게 독립적으로 실행시키려면 kubernetespodoperator를 사용해야 한다.