Airflow KubernetesPodOperator Dag Builder를 위한 Pod-watcher RBAC
Airflow version: 2.2.5
helm repo: apache-airflow
chart version: 1.3.0
저는 쿠버네티스 환경에서 Kubernetes native하게 동작하는 Airflow 를 사용하고 있습니다.
일반적인 Airflow on Kubernetes는 사용자에게 독립적으로 Aiflow를 각각 띄워주는 방식인데, 이렇게 서비스하게 되면 컴퓨팅 자원이 기하급수적으로 늘어날것이라 예상했으며 한정적인 자원에서 Airflow를 서비스하기 위해 KubernetesExecutor 리소스를 활용하여 Kubernetes 자원을 효율적으로 사용하였습니다.
KubernetesExecutor를 사용하면 다음과 같은 장점이 있습니다.
(출처: https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-2)
- 가볍다 - Airflow는 라이브러리 의존성이 없는 GitHub 혹은 Docker Hub에서 받은 기본 이미지를 사용해도 무방합니다. 기존 방식대로 Hadoop 기반의 Airflow 장비 혹은 컨테이너를 운영한다고 하면 Airflow 내에 Hadoop 클라이언트와 Spark 클라이언트, Hive 클라이언트, Sqoop 클라이언트, Kerberos 설정 등이 모두 구성되어 있어야 합니다. 하지만 Kubernetes Executor와 KubernetesPodOperator를 사용하면 앞서 언급한 사항이 필요하지 않습니다.
- 유지 보수 비용 절감 - 컨테이너 이미지 기반으로 운영하기 때문에 태스크 간 독립성이 보장되어 라이브러리 간 의존성 확인과 같은 불필요한 작업을 수행하지 않아도 됩니다. 또한 다양한 데이터 플랫폼 환경에 자유롭게 접근할 수 있기 때문에 여러 환경에 맞춰 각각 구성했던 Airflow를 하나로 통합하는 것도 가능합니다.
- 효율적인 자원 관리 - 기존 Celery Executor를 Kubernetes에서 사용할 경우 마스터와 워커, 브로커가 지속적으로 자원을 점유하고 상주해 있지만 Kubernetes Executor의 경우 태스크가 실행될 경우에만 워커를 생성하고 태스크가 완료되면 자원을 반납하기 때문에 Kubernetes의 자원을 효율적으로 사용할 수 있습니다.
- 개발 효율성 - 대부분의 DAG를 KubernetesPodOperator만을 사용해 운영하면 Workflow DAG 코드를 템플릿화할 수 있습니다. 이를 통해 DAG를 개발하는 비 개발자(분석가, 데이터 사이언티스트)가 DAG를 작성하는데 많은 리서치와 시간을 투자하지 않고 자신의 코어 업무에만 집중할 수 있습니다.
워커 POD는 휘발성이기 때문에 POD가 종료되면 로그가 유실됩니다. 그렇기 때문에 외부 저장소인 NAS 또는 Amazon S3 등으로 로그를 저장하도록 구현하였습니다.
또한, KubernetesPodOperator를 사용하여 도커 이미지, 환경변수 등등 Pod과 동일한 방법으로 Dag를 실행시킬 수 있도록 개발하였습니다.
KubernetesPodOperator Dag 작성을 자동화하기 위해 FastAPI 기반 Dag builder를 개발하였습니다.
Kubernetes에 배포된 Dag builder에서 Kubernetes python client api로 여러 네임스페이스에 띄워져있는 Jupyter notebook Pod에 접근하여 Pod 정보를 가져오기 위해 Pod-watcher RBAC를 생성하였습니다.
Dag builder가 KubernetesPodOperator Dag 작성하는 프로세스
- 실행중인 Jupyter notebook Pod의 Docker container를 commit & push
- Kubernetes python client api로 Jupyter notebook Pod Manifest를 불러옴
- 불러온 Pod Manifest 기반 KubernetesPodOperator Dag 작성 후 Airflow에 배포
Pod-watcher RBAC
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pod-watcher
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: pod-watcher-binding
subjects:
- kind: ServiceAccount
name: pod-watcher-sa # 대상 serviceaccount 이름
namespace: dag-builder # 대상 네임스페이스
roleRef:
kind: ClusterRole
name: pod-watcher # 위에서 생성한 ClusterRole 이름
apiGroup: rbac.authorization.k8s.io
Kubernetes python client api를 위한 ConfigMap
apiVersion: v1
data:
kubeconfig: |
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: LS0tLS...
server: https://{maser-node-ip}:6443
name: kubernetes
contexts:
- context:
cluster: kubernetes
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: kubernetes-admin@kubernetes
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
# pod-watcher rbac token 값
token: eyJhbGc...
kind: ConfigMap
metadata:
name: config-configmap
namespace: dag-builder
위 ConfigMap을 Dag builder 애플리케이션 Deployment 띄울때 Volume mount 해주면 Kubernetes Python Client API Initialize에 적용할 수 있다.