Operating Airflow in k8s executor
Airflow on k8s
: ๋ณดํต ์ฟ ๋ฒ๋คํฐ์ค ์์ Airflow๋ฅผ ๊ตฌ๋์ํค๋ฉด Airflow ์ปดํฌ๋ํธ๋ค์ด ํ๋์ ํํ๋ก ๊ตฌ๋ํ๊ฒ ๋๋ค.
โข
๊ตฌ์ฑ์ด ๊ฐ๋จํจ
โข
์๋น์ค๋ค์ด ๋จ์ํ ํ๋๋ก ๋ณํ๋ ๊ฒ์ด๋ฏ๋ก ํ
ํ๋ฆฟํํ๊ธฐ ์ฌ์
BUT, ๊ฒฐ๊ตญ ๊ตฌ์ฑ์์๋ค์ด ํ๋๋ก ๋ณํ๋๊ธฐ ๋๋ฌธ์ ๊ณ์ํด์ ์์์ ์ ์ ํ๊ฒ ๋๋ค.
: ๋ค์๊ณผ ๊ฐ์ด worker ๋
ธ๋๊ฐ ๊ณ์ํด์ ์์์ ์ ์ ํ๊ณ ์๋ ๊ฒ์ ํ์ธํ ์ ์๋ค. (์ฌ์ง์ด DAG๊ฐ ํธ๋ฆฌ๊ฑฐ๊ฐ ๋์ง ์์๋๋ฐ๋!)
์ถ๊ฐ๋ก, ์ด๋ฏธ์ง๋ค๋ก ์ฟ ๋ฒ๋คํฐ์ค์์ airflow๋ฅผ ์กฐ์ฑํ๋ค๋ณด๋, ์์กด์ฑ ๊ด๋ฆฌ ๋ฐ ํ์ฅ์ฑ์ ๋งค์ฐ ํฐ ์ด๋ ค์์ด ์๊ธด๋ค.
KubernetesExecutor์ ๋ํด
: KubernetesExecutor์ ๊ณ์ํด์ ์์ปค๊ฐ ๋ฆฌ์์ค๋ฅผ ์ ์ ํ๋ ๊ฒ์ด ์๋, ์ค์ผ์ค๋ฌ์ ์ํด์ ๋์ ์ผ๋ก ์์ปค ๋
ธ๋๊ฐ ์๊ธด๋ค.
โข
Scheduler๊ฐ ํด๋น ์์ ์์ ์คํ๋์ด์ผ ํ ํ์คํฌ๋ฅผ ์ฐพ๋๋ค.
โข
Executor๋ ์ด ํ์คํฌ๋ฅผ ์คํํ๊ธฐ ์ํด ์์ปค ๋
ธ๋๋ฅผ Pod ํํ๋ก ์คํํ๋ค.
def as_pod(self):
if self.kube_config.pod_template_file:
return PodGenerator(pod_template_file=self.kube_config.pod_template_file).gen_pod()
pod = PodGenerator(
image=self.kube_config.kube_image,
image_pull_policy=self.kube_config.kube_image_pull_policy or 'IfNotPresent',
image_pull_secrets=self.kube_config.image_pull_secrets,
volumes=self._get_volumes(),
volume_mounts=self._get_volume_mounts(),
init_containers=self._get_init_containers(),
labels=self.kube_config.kube_labels,
annotations=self.kube_config.kube_annotations,
affinity=self.kube_config.kube_affinity,
tolerations=self.kube_config.kube_tolerations,
envs=self._get_environment(),
node_selectors=self.kube_config.kube_node_selectors,
service_account_name=self.kube_config.worker_service_account_name or 'default',
restart_policy='Never'
).gen_pod()
pod.spec.containers[0].env_from = pod.spec.containers[0].env_from or []
pod.spec.containers[0].env_from.extend(self._get_env_from())
pod.spec.security_context = self._get_security_context()
return append_to_pod(pod, self._get_secrets())
Python
๋ณต์ฌ
โข
๋ง์ผ ๊ฐ๋ณ ์ค์ ๋ pod ํ
ํ๋ฆฟ ํ์ผ์ด ์์ผ๋ฉด ์ด๋ฅผ ๋ฐ๋ฅด๊ณ , ์๋๋ผ๋ฉด ๊ธฐ๋ณธ kube_config์ ๋ฐ๋ผ ์์ปค๋
ธ๋๋ฅผ ํ๋ ํํ๋ก ์์ฑํ๋ค.
โข
๊ทธ๋ ๊ฒ ํ๋๊ฐ ์์ฑ๋๋ฉด ํ๋ ๋ฆฌ์คํธ์ ํด๋น ํ๋๊ฐ ์์ฑ๋๋ฉฐ, ์ด ํ๋๋ ์์ปค ๋
ธ๋์ ์ญํ ์ ์ํํ๊ฒ ๋๋ค.
๋์ ์ผ๋ก ์์ปค๋
ธ๋๋ฅผ ๊ด๋ฆฌํ๊ธฐ ๋๋ฌธ์ ์ด์ ๋ฐ๋ฅธ ์ฅ์ ์ด ๋ง๋ค.
โข
๊ฐ์ข
์์กด์ฑ ํจํค์ง๋ค์ ๊ตฌ์ฑํ ํ์ ์์ด K8sExecutor, k8sPodOperator๋ง ์ฌ์ฉํ๋ฉด ๋๋ค.
โข
์ปจํ
์ด๋ ์ด๋ฏธ์ง ๊ธฐ๋ฐ ์ด์์ผ๋ก ์ธํ ์์กด์ฑ ํ์ธ ์์
์ด ํ์ ์์ด์ง๋ฏ๋ก ์ ์ง ๋ณด์ ๋น์ฉ์ด ์ ๊ฐ๋๋ค.
โข
ํ์คํฌ๊ฐ ์คํ๋ ๋๋ง ๋์ ์ผ๋ก ์์ปค ๋
ธ๋๋ฅผ ์์ฑํ๋ฏ๋ก ํจ์จ์ ์ธ ์์ ๊ด๋ฆฌ๊ฐ ์ด๋ฃจ์ด์ง๋ค.
โข
K8sPodOperator์ ํตํด DAG์ฝ๋๋ฅผ ํ
ํ๋ฆฟํ ํ ์ ์๋ค.
โ ๊ทธ๋ฌ๋ ๊ต์ฅํ, ๊ต์ฅํโฆ๊ตฌ์ฑ์ด ๋ณต์กํ๋ค.
ํ์ง๋ง? ์ฐ๋ฆฐ ์ด๊ฑธ ์ค๋ ๊ตฌ์ฑํด๋ณผ๊ฒ์ด๋ค.
K8s ๋ด์์ airflow ๋์ฐ๊ธฐ
: k8s ๋ด์์ airflow๋ฅผ ์ค์นํ๋ ๋ฐฉ๋ฒ์ด๋ค. ์ฐ์ minikube๊ฐ ํ์ํ๋, minikube๋ถํฐ ์ค์น (์ค์น๋ฒ ์๋ต)
minikube start
๐ Darwin 13.4 (arm64) ์ minikube v1.30.1
๐ minikube 1.31.2 ์ด ์ฌ์ฉ๊ฐ๋ฅํฉ๋๋ค! ๋ค์ ๊ฒฝ๋ก์์ ๋ค์ด๋ฐ์ผ์ธ์: https://github.com/kubernetes/minikube/releases/tag/v1.31.2
๐ก ํด๋น ์๋ฆผ์ ๋นํ์ฑํํ๋ ค๋ฉด ๋ค์ ๋ช
๋ น์ด๋ฅผ ์คํํ์ธ์. 'minikube config set WantUpdateNotification false'
โจ ์๋์ ์ผ๋ก docker ๋๋ผ์ด๋ฒ๊ฐ ์ ํ๋์์ต๋๋ค
๐ Using Docker Desktop driver with root privileges
๐ minikube ํด๋ฌ์คํฐ์ minikube ์ปจํธ๋กค ํ๋ ์ธ ๋
ธ๋๋ฅผ ์์ํ๋ ์ค
๐ ๋ฒ ์ด์ค ์ด๋ฏธ์ง๋ฅผ ๋ค์ด๋ฐ๋ ์ค ...
๐ฅ Creating docker container (CPUs=2, Memory=7903MB) ...
๐ณ ์ฟ ๋ฒ๋คํฐ์ค v1.26.3 ์ Docker 23.0.2 ๋ฐํ์์ผ๋ก ์ค์นํ๋ ์ค
โช ์ธ์ฆ์ ๋ฐ ํค๋ฅผ ์์ฑํ๋ ์ค ...
โช ์ปจํธ๋กค ํ๋ ์ธ์ด ๋ถํ
...
โช RBAC ๊ท์น์ ๊ตฌ์ฑํ๋ ์ค ...
๐ Configuring bridge CNI (Container Networking Interface) ...
โช Using image gcr.io/k8s-minikube/storage-provisioner:v5
๐ Kubernetes ๊ตฌ์ฑ ์์๋ฅผ ํ์ธ...
๐ ์ ๋์จ ํ์ฑํ : storage-provisioner, default-storageclass
๐ ๋๋ฌ์ต๋๋ค! kubectl์ด "minikube" ํด๋ฌ์คํฐ์ "default" ๋ค์์คํ์ด์ค๋ฅผ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉํ๋๋ก ๊ตฌ์ฑ๋์์ต๋๋ค.
Bash
๋ณต์ฌ
๊ทธํ helm์ ์ค์นํด์ผ ํ๋ค. ์์ผ๋ฉด ์ด ๋ถ๋ถ์ ์๋ต
(root user)
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh
which helm
/usr/local/bin/helm
helm์ ์์น ์ฐพ๊ธฐ: ๋จผ์ , root ์ฌ์ฉ์๋ก ๋ก๊ทธ์ธํ์ฌ helm์ ์ ํํ ์์น๋ฅผ ์ฐพ์ต๋๋ค.
sudo su
which helm
์ด ๋ช
๋ น์ helm์ ๊ฒฝ๋ก๋ฅผ ์ถ๋ ฅํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ์ถ๋ ฅ ๊ฒฐ๊ณผ๊ฐ /usr/local/bin/helm์ด๋ผ๋ฉด, ์ด ๊ฒฝ๋ก๋ฅผ ๊ธฐ๋กํด ๋์ธ์.
PATH ํ๊ฒฝ ๋ณ์ ์์ : ์ด์ ์ฌ์ฉ์๋ก ๋์๊ฐ์ PATH ํ๊ฒฝ ๋ณ์์ helm์ ์์น๋ฅผ ์ถ๊ฐํฉ๋๋ค.
์ฌ์ฉ์์ ์
ธ ์ค์ ํ์ผ (์: .bashrc ๋๋ .zshrc)์ ํธ์งํด์ผ ํฉ๋๋ค.
exit # root ์ฌ์ฉ์์์ ๋์ค๊ธฐ
(user)
nano ~/.bashrc # ๋๋ ์ฌ์ฉํ๋ ์
ธ์ ๋ง๋ ์ค์ ํ์ผ์ ์ฝ๋๋ค
ํ์ผ์ ๋ง์ง๋ง์ ๋ค์ ๋ผ์ธ์ ์ถ๊ฐํฉ๋๋ค (์ฌ๊ธฐ์๋ helm์ด /usr/local/bin/helm์ ์์นํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค):]
export PATH=$PATH:/usr/local/bin
๋ณ๊ฒฝ์ฌํญ์ ์ ์ฅํ๊ณ ํธ์ง๊ธฐ๋ฅผ ์ข
๋ฃํฉ๋๋ค.
๋ณ๊ฒฝ์ฌํญ ์ ์ฉ: ๋ณ๊ฒฝ์ฌํญ์ ์ ์ฉํ๊ธฐ ์ํด ์
ธ ์ค์ ํ์ผ์ ๋ค์ ๋ก๋ํฉ๋๋ค.
source ~/.bashrc # ๋๋ ์ฌ์ฉํ๋ ์
ธ์ ๋ง๋ ์ค์ ํ์ผ
helm ์คํ ํ์ธ: ์ด์ helm ๋ช
๋ น์ด๋ฅผ ์คํํ์ฌ ๋ณ๊ฒฝ์ฌํญ์ด ์ฌ๋ฐ๋ฅด๊ฒ ์ ์ฉ๋์๋์ง ํ์ธํฉ๋๋ค.
helm version
Bash
๋ณต์ฌ
helm ๋ ํฌ์งํ ๋ฆฌ๋ฅผ ์ถ๊ฐํ ๋ค์, airflow ์ค์น
helm repo add apache-airflow https://airflow.apache.org
SQL
๋ณต์ฌ
# values.yaml
airflowVersion: "2.6.2"
SQL
๋ณต์ฌ
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace -f my-values.yaml
Bash
๋ณต์ฌ
ํฌํธ ํฌ์๋ฉ:
airflow ๋ค์์คํ์ด์ค์ airflow-webserver ์๋น์ค์ 8080 ํฌํธ๋ฅผ ๋ก์ปฌ ๋จธ์ ์ 20000 ํฌํธ๋ก ํฌ์๋ฉํฉ๋๋ค.
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow
kubectl port-forward svc/airflow-webserver 20000:8080 -n airflow
Bash
๋ณต์ฌ
localhost:8080 or 20000์ผ๋ก ์ ์ํ๋ฉด ๋ค์๊ณผ ๊ฐ์ด airflow UI๊ฐ ๋ฌ๋ค!
ID: admin, PW: admin
k8s ์์ airflow๋ฅผ ๋์ฐ๋ฉด ๋ค์๊ณผ ๊ฐ์ ๊ตฌ์กฐ๋ฅผ ํ์ฑํ๋ค.
kubectl get pods -n airflow
NAME READY STATUS RESTARTS AGE
airflow-postgresql-0 1/1 Running 0 2m12s
airflow-redis-0 1/1 Running 0 2m12s
airflow-scheduler-68d6b6cdd4-z2ngc 2/2 Running 0 2m12s
airflow-statsd-77685bcd45-t6pfj 1/1 Running 0 2m12s
airflow-triggerer-0 2/2 Running 0 2m12s
airflow-webserver-bf8bbc9cb-qgd85 1/1 Running 0 2m12s
airflow-worker-0 2/2 Running 0 2m12s
Bash
๋ณต์ฌ
ํ์ง๋ง, airflow๋ฅผ container๋ก ๋์ด ๋งํผ, ์ธ์ ์ฌ์์ํด๋ ์ ํ ์ด์ํ์ง๊ฐ ์๋ค. ๋ฐ๋ผ์, git sync๋ฅผ ํตํด ๋ ํฌ์งํ ๋ฆฌ์์ DAG ์ฝ๋๋ฅผ ์ง์์ ์ผ๋ก ๊ฐ์ ธ์ฌ ์ ์๋ค.
โ ์ฌ๊ธฐ์ DAG Directory๊ฐ ์์ ์ Git Repository๊ฐ ๋๋ ๊ฒ์ด๋ค.
Git Sync
์๋ ๊ณผ์ ์ private repo๋ฅผ ๊ธฐ์ค์ผ๋ก ์งํ๋๋ค.
์ฐธ๊ณ ๋ก public repo์ ๊ฒฝ์ฐ์๋, ๊น ์ฃผ์๋ง ์
๋ ฅํ๋ฉด ๋๋ค. ๋ณ๋์ Deploy key๋ ํ์ํ์ง ์๋ค.
: DAG๋ฅผ ์ ์ฅํ Private Repository๋ฅผ ๋ง๋ ๋ค.
ssh-keygen -t rsa -b 4096 -C "your_email@example.com"
Bash
๋ณต์ฌ
.ssh ๋๋ ํ ๋ฆฌ ๋ด์ airflow_ssh_key ๋ผ๋ ์ด๋ฆ์ผ๋ก ssh key๋ฅผ ์์ฑํ๋ค.
์ด ๋, passphrase๋ฅผ ์
๋ ฅํ์ง ์๊ณ ๋น์นธ์ผ๋ก ๋๋ค.
pbcopy < airflow_ssh_key.pub
Bash
๋ณต์ฌ
: ๊ทธํ ํด๋ฆฝ๋ณด๋์ ssh_key ํผ๋ธ๋ฆญํค๋ฅผ ๋ณต์ฌํ๋ค.
:๋ณต์ฌํ ํผ๋ธ๋ฆญํค๋ฅผ ๋ ํฌ์งํ ๋ฆฌ์ ํค์ ๋ณต๋ถํ ํ, ์ถ๊ฐํ๋ค.
kubectl create secret generic airflow-git-ssh-secret \
--from-file=gitSshKey=/Users/xxxx/.ssh/airflow_ssh_key \
--from-file=id_ed25519.pub=/Users/xxxx/.ssh/airflow_ssh_key.pub \
-n airflow
Bash
๋ณต์ฌ
๊ทธ ํ, ๋ง๋ ํค์ ํผ๋ธ๋ฆญํค๋ฅผ k8s ํด๋ฌ์คํฐ ์์ secret์ผ๋ก ์ถ๊ฐํ๋ค.
cd ~/.ssh
vi values.yaml
Bash
๋ณต์ฌ
์ด์ , airflow ์ค์ yamlํ์ผ์ secret์ ๋ฑ๋กํ๊ธฐ ์ํด values.yamlํ์ผ์ ์์ ํ๋ค.
/# Git sync
airflowVersion: "2.6.2"
extraEnv: |
- name: "AIRFLOW__CORE__PLUGINS_FOLDER"
value: "/opt/airflow/dags/repo/plugins"
# Git sync
dags:
persistence:
# Enable persistent volume for storing dags
enabled: true #๋ณ๊ฒฝ 1
# Volume size for dags
size: 10Gi
# If using a custom storageClass, pass name here
storageClassName:
# access mode of the persistent volume
accessMode: ReadWriteOnce
## the name of an existing PVC to use
existingClaim:
gitSync:
enabled: true #๋ณ๊ฒฝ 2
repo: git@github.com:xxx/xxxx.git #๋ณ๊ฒฝ 3
branch: main #๋ณ๊ฒฝ 4
rev: HEAD
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: ""
credentialsSecret: git-credentials
sshKeySecret: airflow-git-ssh-secret #๋ณ๊ฒฝ 5
wait: 10
containerName: git-sync
uid: 65533
# When not set, the values defined in the global securityContext will be used
securityContext: {}
# runAsUser: 65533
# runAsGroup: 0
extraVolumeMounts: []
env: []
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
YAML
๋ณต์ฌ
: ์์์ #๋ณ๊ฒฝ์ ํด๋นํ๋ ๋ถ๋ถ์ ๋ค์๊ณผ ๊ฐ์ด ์์ ํ ํ, ์ ์ฅํ๋ค.
helm upgrade --install airflow apache-airflow/airflow -n airflow -f my-values.yaml
Bash
๋ณต์ฌ
: value.yaml์ ์์ ํ์ฌ helm upgrade๋ฅผ ์๋ํ๋ฉดโฆ?
Error: UPGRADE FAILED: cannot patch "airflow-scheduler" with kind Deployment: Deployment.apps "airflow-scheduler" is invalid: [spec.template.spec.containers[1].volumeMounts[1].name: Not found: "git-sync-ssh-key", spec.template.spec.initContainers[1].volumeMounts[1].name: Not found: "git-sync-ssh-key"]
Bash
๋ณต์ฌ
์คํจ๊ฐ ๋ ๋ฒ๋ฆฐ๋ค. ์ค๋ฅ๋ฅผ ์์ ํด๋ณด์.
vi values.yaml
/# Airflow scheduler settings
Bash
๋ณต์ฌ
๋ค์ values.yaml๋ก ๋ค์ด๊ฐ๋ค.
scheduler:
extraVolumes:
- name: git-sync-ssh-key
secret:
secretName: airflow-git-ssh-secret
YAML
๋ณต์ฌ
/# Airflow scheduler settings ๋ฅผ ์
๋ ฅํด ํค์๋๋ฅผ ์ฐพ์ ํ, ์์ scheduler ์ฝ๋๋ฅผ ์ถ๊ฐํ๋ค.
helm upgrade --install airflow apache-airflow/airflow -n airflow -f values.yaml
Bash
๋ณต์ฌ
๋ค์ upgrade๋ฅผ ํ๋ฉด, ์ฑ๊ณต์ ์ผ๋ก ๋ ๊ฒ์ด๋ค.
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def test():
print(datetime.now())
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='A simple tutorial DAG',
schedule_interval='1 * * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = PythonOperator(
task_id='print_date',
python_callable=test,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
Python
๋ณต์ฌ
: git-sync๊ฐ ์ ๋๋ก ๋์๋์ง ํ์ธํ๊ธฐ ์ํด dagํ์ผ์ ๋ง๋ค์ด git repository์ ํธ์ฌํ ํ, ํ์ธํด๋ณด์.
์ง์. ๋ค์๊ณผ ๊ฐ์ด DAG ๋ฆฌ์คํธ๊ฐ ์
๋ฐ์ดํธ ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
BUTโฆ๋ฐ์ชฝ์ง๋ฆฌ k8s airflowโฆ?
์๋ kubernetes๋ฅผ ํตํด airflow๋ฅผ ๊ตฌ๋ํ๋ฉด, DAG๊ฐ ํธ๋ฆฌ๊ฑฐ ๋ ๊ฒฝ์ฐ์๋ง ์์ปค ๋
ธ๋๊ฐ ์์ฑ๋์ด
๋ฆฌ์์ค๋ฅผ ํ ๋น๋ฐ๊ณ , ํ์คํฌ๊ฐ ์๋ฃ๋๋ฉด ๋ฆฌ์์ค๋ฅผ ๋ฐํํด์ผ ๋๋๋ฐ, ์์ปค๊ฐ ๊ณ์ ๋์๊ฐ๊ณ ์๋ค. KubernetesExecutor๋ก ์ค์ ๋์ด์์ง ์์์ ๊ทธ๋ ๋ค!
KubernetesExecutor๋ก ์ค์
vi values.yaml
Bash
๋ณต์ฌ
: values.yaml์ ๋ค์ ๋ค์ด๊ฐ์ executor๋ฅผ ๋ณ๊ฒฝํ๋ค.
์ด๋ ๊ฒ ๋๋ฉด ํ์์์๋ ์์ปค๋
ธ๋๊ฐ ์์ฑ๋์ด์์ง ์๋ค.
๋ค์๊ณผ ๊ฐ์ด DAG๋ฅผ ํธ๋ฆฌ๊ฑฐ ํ๋ฉด, DAG ์์ด๋์ ํด๋นํ๋ ์์ปค ๋
ธ๋๊ฐ ์์ฑ๋์ด ํ์คํฌ๋ฅผ ์ํํ๋ค.
๊ทธ ํ, ํ์คํฌ๊ฐ ์๋ฃ๋๋ฉด ํ๋๋ ์ข
๋ฃ๋๊ณ ๋ฆฌ์์ค๋ฅผ ๋ฐํํ๋ค.
์ต์ข
์ ์ธ overriding values.yml์ ์๋์ ๊ฐ๋ค
private repo ๊ธฐ์ค
airflowVersion: "2.6.2"
extraEnv: |
- name: "AIRFLOW__CORE__PLUGINS_FOLDER"
value: "/opt/airflow/dags/repo/airflow/plugins"
executor: "KubernetesExecutor"
dags:
persistence:
# Enable persistent volume for storing dags
enabled: true #๋ณ๊ฒฝ 1
# Volume size for dags
size: 10Gi
# If using a custom storageClass, pass name here
storageClassName:
# access mode of the persistent volume
accessMode: ReadWriteOnce
## the name of an existing PVC to use
existingClaim:
gitSync:
enabled: true
repo: git@github.com:xxx/xxxx.git
branch: main
rev: HEAD
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: "airflow/dags"
credentialsSecret: git-credentials
sshKeySecret: airflow-git-ssh-secret #๋ณ๊ฒฝ 5
wait: 10
containerName: git-sync
uid: 65533
# When not set, the values defined in the global securityContext will be used
securityContext: {}
# runAsUser: 65533
# runAsGroup: 0
extraVolumeMounts: []
env: []
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
scheduler:
extraVolumes:
- name: git-sync-ssh-key
secret:
secretName: airflow-git-ssh-secret
SQL
๋ณต์ฌ
Dynamic webserver ํค๊ฐ ์๋ static key ๋ถ์ฌ
python3 -c 'import secrets; print(secrets.token_hex(16))'
f83bab1deefae2199d27d2621762c30b
SQL
๋ณต์ฌ
Airflow์์๋ webserver๋ก flask๋ฅผ ์ฌ์ฉํ๋ค
flask๋ session id๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํด ํ์ํ๋ฐ, ๋์๋ฅผ ๋ง๋ค์ด์ k8s์ secret object๋ก ์์ฑ์ ํด์ผํ๋ค
kubectl create secret generic \
airflow-webserver-secret \
--from-literal="webserver-secret-key=f83bab1deefae2199d27d2621762c30b" \
-n airflow
secret/airflow-webserver-secret created
SQL
๋ณต์ฌ
airflowVersion: "2.6.2"
extraEnv: |
- name: "AIRFLOW__CORE__PLUGINS_FOLDER"
value: "/opt/airflow/dags/repo/airflow/plugins"
# ์ถ๊ฐ
webserverSecretKey: webserver-secret-key
webserverSecretKeySecretName: airflow-webserver-secret
executor: "KubernetesExecutor"
dags:
persistence:
# Enable persistent volume for storing dags
enabled: true #๋ณ๊ฒฝ 1
# Volume size for dags
size: 10Gi
# If using a custom storageClass, pass name here
storageClassName:
# access mode of the persistent volume
accessMode: ReadWriteOnce
## the name of an existing PVC to use
existingClaim:
gitSync:
enabled: true
repo: https://github.com/xxx/xxx.git
branch: main
rev: HEAD
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: "airflow/dags"
# credentialsSecret: git-credentials
# sshKeySecret: airflow-git-ssh-secret #๋ณ๊ฒฝ 5
wait: 10
containerName: git-sync
uid: 65533
# When not set, the values defined in the global securityContext will be used
securityContext: {}
# runAsUser: 65533
# runAsGroup: 0
extraVolumeMounts: []
env: []
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# ์ถ๊ฐ
webserver:
service:
type: NodePort
## service annotations
ports:
- name: airflow-ui
port: "{{ .Values.ports.airflowUI }}"
targetPort: "{{ .Values.ports.airflowUI }}"
nodePort: 31151
SQL
๋ณต์ฌ
helm upgrade --install airflow apache-airflow/airflow -n airflow -f my-values.yaml
SQL
๋ณต์ฌ
๋ค์ ๋ฐฐํฌ๋ฅผ ์งํํด๋ณธ๋ค.
$ minikube ip
192.168.49.2
$ kubectl get svc -n airflow
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
airflow-postgresql ClusterIP 10.100.22.149 <none> 5432/TCP 30m
airflow-postgresql-hl ClusterIP None <none> 5432/TCP 30m
airflow-statsd ClusterIP 10.107.91.48 <none> 9125/UDP,9102/TCP 30m
airflow-triggerer ClusterIP None <none> 8794/TCP 30m
airflow-webserver NodePort 10.100.214.92 <none> 8080:31151/TCP 30m
# ์๋์ ์ฃผ์๋ก ์ ๊ทผ
http://192.168.49.2:31151
SQL
๋ณต์ฌ
log ์ด์
dag๊ฐ ์ ์คํ์ ๋๋๋ฐ ์๋์ ๊ฐ์ ์ด์๊ฐ ๋ฐ๊ฒฌ๋์๋ค.
Could not read served logs: [Errno -2] Name or service not known
์น์๋ฒ ํ๋์ ๋ก๊ทธ๋ฅผ ํ๋ฒ ์ดํด๋ณด์.
$ kubectl logs airflow-webserver-78866bff9b-cbrx4 -n airflow
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 505, in _read_from_logs_server
response = _fetch_logs_from_service(url, rel_path)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 93, in _fetch_logs_from_service
headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_api.py", line 201, in get
trust_env=trust_env,
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_api.py", line 110, in request
follow_redirects=follow_redirects,
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 821, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 912, in send
history=[],
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 939, in _send_handling_auth
history=history,
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 973, in _send_handling_redirects
response = self._send_single_request(request)
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 1009, in _send_single_request
response = transport.handle_request(request)
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_transports/default.py", line 218, in handle_request
resp = self._pool.handle_request(req)
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
SQL
๋ณต์ฌ
์ด์ ๋ Airflow์ ๋ก๊ทธ ์ง์์ฑ๊ณผ ์ฐ๊ด ์๋ค.
kubernetes executor๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ฉด ๋์ ์ผ๋ก worker๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋๋ฐ,
์ด ๋ ๋ก๊น
์ ์์ปค์ ํ๊ฒ ๋๊ณ , ์์ปค๊ฐ ์ฌ๋ผ์ง๊ฒ ๋๋ฉด ํด๋น service ์ ๋ํด์ ๋ก๊ทธ๋ฅผ ๊ฐ์ ธ์ค์ง ๋ชปํ๊ฒ ๋๋ ๋ฌธ์ ๊ฐ ์๊ธด๋ค
์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด์๋ ๋ก๊น
์คํ ๋ฆฌ์ง๋ฅผ s3์ ๊ฐ์ด ๋ถ๋ฆฌํ๊ฑฐ๋, ์ถ๊ฐ์ ์ธ ์ธ๋ถ ๋ณผ๋ฅจ(PVC)์ ์ก์์ผํ๋ค
Airflow์์ ๋ก๊ทธ ์ง์์ฑ(persistence)์ ํ์ฑํํ๋ฉด, Airflow ์ปดํฌ๋ํธ๋ค์ด ๋ก๊ทธ๋ฅผ ๋์ผํ ๋ณผ๋ฅจ์ ๊ธฐ๋กํ๊ฒ ๋๋ค. ์ด๋ฅผ ์ํด ReadWriteMany ์ ๊ทผ ๋ชจ๋๋ฅผ ๊ฐ์ง PersistentVolumeClaim์ด ํ๋ก๋น์ ๋๋๋ค.
์๋์ ์ถ๊ฐ์ ์ธ ํด๊ฒฐ ๋ฐฉ๋ฒ์ ๋ช
์ํด๋ณด๋๋ก ํ๊ฒ ๋ค.
values.yaml ์๋ ๋ช
์๋ฅผ ํ์.
logs:
persistence:
enabled: true
SQL
๋ณต์ฌ
์ฆ, ๊ธฐ๋ณธ ๋ก๊ทธ๊ฐ false์ธ๋ฐ, true ๋ฐ๊ฟ์ค์ผ Persistence volume์ Pod log๋ฅผ ๋จ๊ธด๋ค.
์์ ๊ฐ์ด ์ค์ ํ helm์ ์ฌ์ค์นํ์.
์์ ๊ฐ์ด ์ ์์ ์ผ๋ก ๋ก๊ทธ๋ฅผ ๋ณผ ์ ์๋ค.
requirements.txt ์ค์น
์ฐธ๊ณ ์๋ฃ
24.03.20
airflow ๋ฌดํ pending ์ํ
๋์ ๋์ผํ ์๋ฌ๋ฅผ ๋ฐ๊ฒฌ
rancher storage class
โข
storageClass๋ ๋์ ์ผ๋ก PersistentVolume์ ์์ฑ
โข
์ฌ์ฉ์๊ฐ PVC๋ฅผ ์์ฑํ๊ณ , ์ด PVC๊ฐ ํน์ StorageClass๋ฅผ ์ฐธ์กฐ
โข
์๋ storage class๋ฅผ ์ฐธ๊ณ
โข
PVC๋ ์ฌ์ฉ์๊ฐ ์์ฒญํ๋ ์คํ ๋ฆฌ์ง ๋ฆฌ์์ค๋ฅผ ์ ์