Search

Airflow on Kubernetes

Operating Airflow in k8s executor

Airflow on k8s

: ๋ณดํ†ต ์ฟ ๋ฒ„๋„คํ‹ฐ์Šค ์œ„์— Airflow๋ฅผ ๊ตฌ๋™์‹œํ‚ค๋ฉด Airflow ์ปดํฌ๋„ŒํŠธ๋“ค์ด ํŒŒ๋“œ์˜ ํ˜•ํƒœ๋กœ ๊ตฌ๋™ํ•˜๊ฒŒ ๋œ๋‹ค.
โ€ข
๊ตฌ์„ฑ์ด ๊ฐ„๋‹จํ•จ
โ€ข
์„œ๋น„์Šค๋“ค์ด ๋‹จ์ˆœํžˆ ํŒŒ๋“œ๋กœ ๋ณ€ํ™˜๋œ ๊ฒƒ์ด๋ฏ€๋กœ ํ…œํ”Œ๋ฆฟํ™”ํ•˜๊ธฐ ์‰ฌ์›€
BUT, ๊ฒฐ๊ตญ ๊ตฌ์„ฑ์š”์†Œ๋“ค์ด ํŒŒ๋“œ๋กœ ๋ณ€ํ™˜๋˜๊ธฐ ๋•Œ๋ฌธ์— ๊ณ„์†ํ•ด์„œ ์ž์›์„ ์ ์œ ํ•˜๊ฒŒ ๋œ๋‹ค.
: ๋‹ค์Œ๊ณผ ๊ฐ™์ด worker ๋…ธ๋“œ๊ฐ€ ๊ณ„์†ํ•ด์„œ ์ž์›์„ ์ ์œ ํ•˜๊ณ  ์žˆ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. (์‹ฌ์ง€์–ด DAG๊ฐ€ ํŠธ๋ฆฌ๊ฑฐ๊ฐ€ ๋˜์ง€ ์•Š์•˜๋Š”๋ฐ๋„!)
์ถ”๊ฐ€๋กœ, ์ด๋ฏธ์ง€๋“ค๋กœ ์ฟ ๋ฒ„๋„คํ‹ฐ์Šค์—์„œ airflow๋ฅผ ์กฐ์„ฑํ•˜๋‹ค๋ณด๋‹ˆ, ์˜์กด์„ฑ ๊ด€๋ฆฌ ๋ฐ ํ™•์žฅ์„ฑ์— ๋งค์šฐ ํฐ ์–ด๋ ค์›€์ด ์ƒ๊ธด๋‹ค.

KubernetesExecutor์— ๋Œ€ํ•ด

: KubernetesExecutor์€ ๊ณ„์†ํ•ด์„œ ์›Œ์ปค๊ฐ€ ๋ฆฌ์†Œ์Šค๋ฅผ ์ ์œ ํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹Œ, ์Šค์ผ€์ค„๋Ÿฌ์— ์˜ํ•ด์„œ ๋™์ ์œผ๋กœ ์›Œ์ปค ๋…ธ๋“œ๊ฐ€ ์ƒ๊ธด๋‹ค.
โ€ข
Scheduler๊ฐ€ ํ•ด๋‹น ์‹œ์ ์—์„œ ์‹คํ–‰๋˜์–ด์•ผ ํ•  ํƒœ์Šคํฌ๋ฅผ ์ฐพ๋Š”๋‹ค.
โ€ข
Executor๋Š” ์ด ํƒœ์Šคํฌ๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด ์›Œ์ปค ๋…ธ๋“œ๋ฅผ Pod ํ˜•ํƒœ๋กœ ์‹คํ–‰ํ•œ๋‹ค.
def as_pod(self): if self.kube_config.pod_template_file: return PodGenerator(pod_template_file=self.kube_config.pod_template_file).gen_pod() pod = PodGenerator( image=self.kube_config.kube_image, image_pull_policy=self.kube_config.kube_image_pull_policy or 'IfNotPresent', image_pull_secrets=self.kube_config.image_pull_secrets, volumes=self._get_volumes(), volume_mounts=self._get_volume_mounts(), init_containers=self._get_init_containers(), labels=self.kube_config.kube_labels, annotations=self.kube_config.kube_annotations, affinity=self.kube_config.kube_affinity, tolerations=self.kube_config.kube_tolerations, envs=self._get_environment(), node_selectors=self.kube_config.kube_node_selectors, service_account_name=self.kube_config.worker_service_account_name or 'default', restart_policy='Never' ).gen_pod() pod.spec.containers[0].env_from = pod.spec.containers[0].env_from or [] pod.spec.containers[0].env_from.extend(self._get_env_from()) pod.spec.security_context = self._get_security_context() return append_to_pod(pod, self._get_secrets())
Python
๋ณต์‚ฌ
โ€ข
๋งŒ์ผ ๊ฐœ๋ณ„ ์„ค์ •๋œ pod ํ…œํ”Œ๋ฆฟ ํŒŒ์ผ์ด ์žˆ์œผ๋ฉด ์ด๋ฅผ ๋”ฐ๋ฅด๊ณ , ์•„๋‹ˆ๋ผ๋ฉด ๊ธฐ๋ณธ kube_config์— ๋”ฐ๋ผ ์›Œ์ปค๋…ธ๋“œ๋ฅผ ํŒŒ๋“œ ํ˜•ํƒœ๋กœ ์ƒ์„ฑํ•œ๋‹ค.
โ€ข
๊ทธ๋ ‡๊ฒŒ ํŒŒ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜๋ฉด ํŒŒ๋“œ ๋ฆฌ์ŠคํŠธ์— ํ•ด๋‹น ํŒŒ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜๋ฉฐ, ์ด ํŒŒ๋“œ๋Š” ์›Œ์ปค ๋…ธ๋“œ์˜ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๊ฒŒ ๋œ๋‹ค.
๋™์ ์œผ๋กœ ์›Œ์ปค๋…ธ๋“œ๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ด์— ๋”ฐ๋ฅธ ์žฅ์ ์ด ๋งŽ๋‹ค.
โ€ข
๊ฐ์ข… ์˜์กด์„ฑ ํŒจํ‚ค์ง€๋“ค์„ ๊ตฌ์„ฑํ•  ํ•„์š” ์—†์ด K8sExecutor, k8sPodOperator๋งŒ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.
โ€ข
์ปจํ…Œ์ด๋„ˆ ์ด๋ฏธ์ง€ ๊ธฐ๋ฐ˜ ์šด์˜์œผ๋กœ ์ธํ•œ ์˜์กด์„ฑ ํ™•์ธ ์ž‘์—…์ด ํ•„์š” ์—†์–ด์ง€๋ฏ€๋กœ ์œ ์ง€ ๋ณด์ˆ˜ ๋น„์šฉ์ด ์ ˆ๊ฐ๋œ๋‹ค.
โ€ข
ํƒœ์Šคํฌ๊ฐ€ ์‹คํ–‰๋  ๋•Œ๋งŒ ๋™์ ์œผ๋กœ ์›Œ์ปค ๋…ธ๋“œ๋ฅผ ์ƒ์„ฑํ•˜๋ฏ€๋กœ ํšจ์œจ์ ์ธ ์ž์› ๊ด€๋ฆฌ๊ฐ€ ์ด๋ฃจ์–ด์ง„๋‹ค.
โ€ข
K8sPodOperator์„ ํ†ตํ•ด DAG์ฝ”๋“œ๋ฅผ ํ…œํ”Œ๋ฆฟํ™” ํ•  ์ˆ˜ ์žˆ๋‹ค.
โ‡’ ๊ทธ๋Ÿฌ๋‚˜ ๊ต‰์žฅํžˆ, ๊ต‰์žฅํžˆโ€ฆ๊ตฌ์„ฑ์ด ๋ณต์žกํ•˜๋‹ค.
ํ•˜์ง€๋งŒ? ์šฐ๋ฆฐ ์ด๊ฑธ ์˜ค๋Š˜ ๊ตฌ์„ฑํ•ด๋ณผ๊ฒƒ์ด๋‹ค.

K8s ๋‚ด์—์„œ airflow ๋„์šฐ๊ธฐ

: k8s ๋‚ด์—์„œ airflow๋ฅผ ์„ค์น˜ํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ์šฐ์„  minikube๊ฐ€ ํ•„์š”ํ•˜๋‹ˆ, minikube๋ถ€ํ„ฐ ์„ค์น˜ (์„ค์น˜๋ฒ• ์ƒ๋žต)
minikube start ๐Ÿ˜„ Darwin 13.4 (arm64) ์˜ minikube v1.30.1 ๐ŸŽ‰ minikube 1.31.2 ์ด ์‚ฌ์šฉ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค! ๋‹ค์Œ ๊ฒฝ๋กœ์—์„œ ๋‹ค์šด๋ฐ›์œผ์„ธ์š”: https://github.com/kubernetes/minikube/releases/tag/v1.31.2 ๐Ÿ’ก ํ•ด๋‹น ์•Œ๋ฆผ์„ ๋น„ํ™œ์„ฑํ™”ํ•˜๋ ค๋ฉด ๋‹ค์Œ ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜์„ธ์š”. 'minikube config set WantUpdateNotification false' โœจ ์ž๋™์ ์œผ๋กœ docker ๋“œ๋ผ์ด๋ฒ„๊ฐ€ ์„ ํƒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค ๐Ÿ“Œ Using Docker Desktop driver with root privileges ๐Ÿ‘ minikube ํด๋Ÿฌ์Šคํ„ฐ์˜ minikube ์ปจํŠธ๋กค ํ”Œ๋ ˆ์ธ ๋…ธ๋“œ๋ฅผ ์‹œ์ž‘ํ•˜๋Š” ์ค‘ ๐Ÿšœ ๋ฒ ์ด์Šค ์ด๋ฏธ์ง€๋ฅผ ๋‹ค์šด๋ฐ›๋Š” ์ค‘ ... ๐Ÿ”ฅ Creating docker container (CPUs=2, Memory=7903MB) ... ๐Ÿณ ์ฟ ๋ฒ„๋„คํ‹ฐ์Šค v1.26.3 ์„ Docker 23.0.2 ๋Ÿฐํƒ€์ž„์œผ๋กœ ์„ค์น˜ํ•˜๋Š” ์ค‘ โ–ช ์ธ์ฆ์„œ ๋ฐ ํ‚ค๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์ค‘ ... โ–ช ์ปจํŠธ๋กค ํ”Œ๋ ˆ์ธ์ด ๋ถ€ํŒ…... โ–ช RBAC ๊ทœ์น™์„ ๊ตฌ์„ฑํ•˜๋Š” ์ค‘ ... ๐Ÿ”— Configuring bridge CNI (Container Networking Interface) ... โ–ช Using image gcr.io/k8s-minikube/storage-provisioner:v5 ๐Ÿ”Ž Kubernetes ๊ตฌ์„ฑ ์š”์†Œ๋ฅผ ํ™•์ธ... ๐ŸŒŸ ์• ๋“œ์˜จ ํ™œ์„ฑํ™” : storage-provisioner, default-storageclass ๐Ÿ„ ๋๋‚ฌ์Šต๋‹ˆ๋‹ค! kubectl์ด "minikube" ํด๋Ÿฌ์Šคํ„ฐ์™€ "default" ๋„ค์ž„์ŠคํŽ˜์ด์Šค๋ฅผ ๊ธฐ๋ณธ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋„๋ก ๊ตฌ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
Bash
๋ณต์‚ฌ
๊ทธํ›„ helm์„ ์„ค์น˜ํ•ด์•ผ ํ•œ๋‹ค. ์žˆ์œผ๋ฉด ์ด ๋ถ€๋ถ„์€ ์ƒ๋žต
(root user) curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 chmod 700 get_helm.sh ./get_helm.sh which helm /usr/local/bin/helm helm์˜ ์œ„์น˜ ์ฐพ๊ธฐ: ๋จผ์ €, root ์‚ฌ์šฉ์ž๋กœ ๋กœ๊ทธ์ธํ•˜์—ฌ helm์˜ ์ •ํ™•ํ•œ ์œ„์น˜๋ฅผ ์ฐพ์Šต๋‹ˆ๋‹ค. sudo su which helm ์ด ๋ช…๋ น์€ helm์˜ ๊ฒฝ๋กœ๋ฅผ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์ถœ๋ ฅ ๊ฒฐ๊ณผ๊ฐ€ /usr/local/bin/helm์ด๋ผ๋ฉด, ์ด ๊ฒฝ๋กœ๋ฅผ ๊ธฐ๋กํ•ด ๋‘์„ธ์š”. PATH ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ˆ˜์ •: ์ด์ œ ์‚ฌ์šฉ์ž๋กœ ๋Œ์•„๊ฐ€์„œ PATH ํ™˜๊ฒฝ ๋ณ€์ˆ˜์— helm์˜ ์œ„์น˜๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ์‚ฌ์šฉ์ž์˜ ์…ธ ์„ค์ • ํŒŒ์ผ (์˜ˆ: .bashrc ๋˜๋Š” .zshrc)์„ ํŽธ์ง‘ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. exit # root ์‚ฌ์šฉ์ž์—์„œ ๋‚˜์˜ค๊ธฐ (user) nano ~/.bashrc # ๋˜๋Š” ์‚ฌ์šฉํ•˜๋Š” ์…ธ์— ๋งž๋Š” ์„ค์ • ํŒŒ์ผ์„ ์—ฝ๋‹ˆ๋‹ค ํŒŒ์ผ์˜ ๋งˆ์ง€๋ง‰์— ๋‹ค์Œ ๋ผ์ธ์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค (์—ฌ๊ธฐ์„œ๋Š” helm์ด /usr/local/bin/helm์— ์œ„์น˜ํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค):] export PATH=$PATH:/usr/local/bin ๋ณ€๊ฒฝ์‚ฌํ•ญ์„ ์ €์žฅํ•˜๊ณ  ํŽธ์ง‘๊ธฐ๋ฅผ ์ข…๋ฃŒํ•ฉ๋‹ˆ๋‹ค. ๋ณ€๊ฒฝ์‚ฌํ•ญ ์ ์šฉ: ๋ณ€๊ฒฝ์‚ฌํ•ญ์„ ์ ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์…ธ ์„ค์ • ํŒŒ์ผ์„ ๋‹ค์‹œ ๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค. source ~/.bashrc # ๋˜๋Š” ์‚ฌ์šฉํ•˜๋Š” ์…ธ์— ๋งž๋Š” ์„ค์ • ํŒŒ์ผ helm ์‹คํ–‰ ํ™•์ธ: ์ด์ œ helm ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ๋ณ€๊ฒฝ์‚ฌํ•ญ์ด ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์ ์šฉ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค. helm version
Bash
๋ณต์‚ฌ
helm ๋ ˆํฌ์ง€ํ† ๋ฆฌ๋ฅผ ์ถ”๊ฐ€ํ•œ ๋‹ค์Œ, airflow ์„ค์น˜
helm repo add apache-airflow https://airflow.apache.org
SQL
๋ณต์‚ฌ
# values.yaml airflowVersion: "2.6.2"
SQL
๋ณต์‚ฌ
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace -f my-values.yaml
Bash
๋ณต์‚ฌ
ํฌํŠธ ํฌ์›Œ๋”ฉ:
airflow ๋„ค์ž„์ŠคํŽ˜์ด์Šค์˜ airflow-webserver ์„œ๋น„์Šค์˜ 8080 ํฌํŠธ๋ฅผ ๋กœ์ปฌ ๋จธ์‹ ์˜ 20000 ํฌํŠธ๋กœ ํฌ์›Œ๋”ฉํ•ฉ๋‹ˆ๋‹ค.
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow kubectl port-forward svc/airflow-webserver 20000:8080 -n airflow
Bash
๋ณต์‚ฌ
localhost:8080 or 20000์œผ๋กœ ์ ‘์†ํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์ด airflow UI๊ฐ€ ๋œฌ๋‹ค!
ID: admin, PW: admin
k8s ์œ„์— airflow๋ฅผ ๋„์šฐ๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ตฌ์กฐ๋ฅผ ํ˜•์„ฑํ•œ๋‹ค.
kubectl get pods -n airflow NAME READY STATUS RESTARTS AGE airflow-postgresql-0 1/1 Running 0 2m12s airflow-redis-0 1/1 Running 0 2m12s airflow-scheduler-68d6b6cdd4-z2ngc 2/2 Running 0 2m12s airflow-statsd-77685bcd45-t6pfj 1/1 Running 0 2m12s airflow-triggerer-0 2/2 Running 0 2m12s airflow-webserver-bf8bbc9cb-qgd85 1/1 Running 0 2m12s airflow-worker-0 2/2 Running 0 2m12s
Bash
๋ณต์‚ฌ
ํ•˜์ง€๋งŒ, airflow๋ฅผ container๋กœ ๋„์šด ๋งŒํผ, ์–ธ์ œ ์žฌ์‹œ์ž‘ํ•ด๋„ ์ „ํ˜€ ์ด์ƒํ•˜์ง€๊ฐ€ ์•Š๋‹ค. ๋”ฐ๋ผ์„œ, git sync๋ฅผ ํ†ตํ•ด ๋ ˆํฌ์ง€ํ† ๋ฆฌ์—์„œ DAG ์ฝ”๋“œ๋ฅผ ์ง€์†์ ์œผ๋กœ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.
โ‡’ ์—ฌ๊ธฐ์„œ DAG Directory๊ฐ€ ์ž์‹ ์˜ Git Repository๊ฐ€ ๋˜๋Š” ๊ฒƒ์ด๋‹ค.

Git Sync

์•„๋ž˜ ๊ณผ์ •์€ private repo๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ง„ํ–‰๋œ๋‹ค.
์ฐธ๊ณ ๋กœ public repo์˜ ๊ฒฝ์šฐ์—๋Š”, ๊นƒ ์ฃผ์†Œ๋งŒ ์ž…๋ ฅํ•˜๋ฉด ๋œ๋‹ค. ๋ณ„๋„์˜ Deploy key๋Š” ํ•„์š”ํ•˜์ง€ ์•Š๋‹ค.
11708
issues
: DAG๋ฅผ ์ €์žฅํ•  Private Repository๋ฅผ ๋งŒ๋“ ๋‹ค.
ssh-keygen -t rsa -b 4096 -C "your_email@example.com"
Bash
๋ณต์‚ฌ
.ssh ๋””๋ ‰ํ† ๋ฆฌ ๋‚ด์— airflow_ssh_key ๋ผ๋Š” ์ด๋ฆ„์œผ๋กœ ssh key๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
์ด ๋•Œ, passphrase๋ฅผ ์ž…๋ ฅํ•˜์ง€ ์•Š๊ณ  ๋นˆ์นธ์œผ๋กœ ๋‘”๋‹ค.
pbcopy < airflow_ssh_key.pub
Bash
๋ณต์‚ฌ
: ๊ทธํ›„ ํด๋ฆฝ๋ณด๋“œ์— ssh_key ํผ๋ธ”๋ฆญํ‚ค๋ฅผ ๋ณต์‚ฌํ•œ๋‹ค.
:๋ณต์‚ฌํ•œ ํผ๋ธ”๋ฆญํ‚ค๋ฅผ ๋ ˆํฌ์ง€ํ† ๋ฆฌ์˜ ํ‚ค์— ๋ณต๋ถ™ํ•œ ํ›„, ์ถ”๊ฐ€ํ•œ๋‹ค.
kubectl create secret generic airflow-git-ssh-secret \ --from-file=gitSshKey=/Users/xxxx/.ssh/airflow_ssh_key \ --from-file=id_ed25519.pub=/Users/xxxx/.ssh/airflow_ssh_key.pub \ -n airflow
Bash
๋ณต์‚ฌ
๊ทธ ํ›„, ๋งŒ๋“  ํ‚ค์™€ ํผ๋ธ”๋ฆญํ‚ค๋ฅผ k8s ํด๋Ÿฌ์Šคํ„ฐ ์•ˆ์— secret์œผ๋กœ ์ถ”๊ฐ€ํ•œ๋‹ค.
cd ~/.ssh vi values.yaml
Bash
๋ณต์‚ฌ
์ด์ œ, airflow ์„ค์ • yamlํŒŒ์ผ์— secret์„ ๋“ฑ๋กํ•˜๊ธฐ ์œ„ํ•ด values.yamlํŒŒ์ผ์„ ์ˆ˜์ •ํ•œ๋‹ค.
/# Git sync airflowVersion: "2.6.2" extraEnv: | - name: "AIRFLOW__CORE__PLUGINS_FOLDER" value: "/opt/airflow/dags/repo/plugins" # Git sync dags: persistence: # Enable persistent volume for storing dags enabled: true #๋ณ€๊ฒฝ 1 # Volume size for dags size: 10Gi # If using a custom storageClass, pass name here storageClassName: # access mode of the persistent volume accessMode: ReadWriteOnce ## the name of an existing PVC to use existingClaim: gitSync: enabled: true #๋ณ€๊ฒฝ 2 repo: git@github.com:xxx/xxxx.git #๋ณ€๊ฒฝ 3 branch: main #๋ณ€๊ฒฝ 4 rev: HEAD depth: 1 # the number of consecutive failures allowed before aborting maxFailures: 0 # subpath within the repo where dags are located # should be "" if dags are at repo root subPath: "" credentialsSecret: git-credentials sshKeySecret: airflow-git-ssh-secret #๋ณ€๊ฒฝ 5 wait: 10 containerName: git-sync uid: 65533 # When not set, the values defined in the global securityContext will be used securityContext: {} # runAsUser: 65533 # runAsGroup: 0 extraVolumeMounts: [] env: [] resources: {} # limits: # cpu: 100m # memory: 128Mi # requests: # cpu: 100m # memory: 128Mi
YAML
๋ณต์‚ฌ
: ์œ„์—์„œ #๋ณ€๊ฒฝ์— ํ•ด๋‹นํ•˜๋Š” ๋ถ€๋ถ„์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ˆ˜์ •ํ•œ ํ›„, ์ €์žฅํ•œ๋‹ค.
helm upgrade --install airflow apache-airflow/airflow -n airflow -f my-values.yaml
Bash
๋ณต์‚ฌ
: value.yaml์„ ์ˆ˜์ •ํ•˜์—ฌ helm upgrade๋ฅผ ์‹œ๋„ํ•˜๋ฉดโ€ฆ?
Error: UPGRADE FAILED: cannot patch "airflow-scheduler" with kind Deployment: Deployment.apps "airflow-scheduler" is invalid: [spec.template.spec.containers[1].volumeMounts[1].name: Not found: "git-sync-ssh-key", spec.template.spec.initContainers[1].volumeMounts[1].name: Not found: "git-sync-ssh-key"]
Bash
๋ณต์‚ฌ
์‹คํŒจ๊ฐ€ ๋– ๋ฒ„๋ฆฐ๋‹ค. ์˜ค๋ฅ˜๋ฅผ ์ˆ˜์ •ํ•ด๋ณด์ž.
vi values.yaml /# Airflow scheduler settings
Bash
๋ณต์‚ฌ
๋‹ค์‹œ values.yaml๋กœ ๋“ค์–ด๊ฐ„๋‹ค.
scheduler: extraVolumes: - name: git-sync-ssh-key secret: secretName: airflow-git-ssh-secret
YAML
๋ณต์‚ฌ
/# Airflow scheduler settings ๋ฅผ ์ž…๋ ฅํ•ด ํ‚ค์›Œ๋“œ๋ฅผ ์ฐพ์€ ํ›„, ์œ„์˜ scheduler ์ฝ”๋“œ๋ฅผ ์ถ”๊ฐ€ํ•œ๋‹ค.
helm upgrade --install airflow apache-airflow/airflow -n airflow -f values.yaml
Bash
๋ณต์‚ฌ
๋‹ค์‹œ upgrade๋ฅผ ํ•˜๋ฉด, ์„ฑ๊ณต์ ์œผ๋กœ ๋  ๊ฒƒ์ด๋‹ค.
from datetime import datetime, timedelta from textwrap import dedent from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime def test(): print(datetime.now()) with DAG( 'tutorial', # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={ 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }, description='A simple tutorial DAG', schedule_interval='1 * * * *', start_date=datetime(2021, 1, 1), catchup=False, tags=['example'], ) as dag: # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = PythonOperator( task_id='print_date', python_callable=test, ) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, ) t1.doc_md = dedent( """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """ ) dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG dag.doc_md = """ This is a documentation placed anywhere """ # otherwise, type it like this templated_command = dedent( """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" {% endfor %} """ ) t3 = BashOperator( task_id='templated', depends_on_past=False, bash_command=templated_command, ) t1 >> [t2, t3]
Python
๋ณต์‚ฌ
: git-sync๊ฐ€ ์ œ๋Œ€๋กœ ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด dagํŒŒ์ผ์„ ๋งŒ๋“ค์–ด git repository์— ํ‘ธ์‰ฌํ•œ ํ›„, ํ™•์ธํ•ด๋ณด์ž.
์งœ์ž”. ๋‹ค์Œ๊ณผ ๊ฐ™์ด DAG ๋ฆฌ์ŠคํŠธ๊ฐ€ ์—…๋ฐ์ดํŠธ ๋œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.
BUTโ€ฆ๋ฐ˜์ชฝ์งœ๋ฆฌ k8s airflowโ€ฆ?
์›๋ž˜ kubernetes๋ฅผ ํ†ตํ•ด airflow๋ฅผ ๊ตฌ๋™ํ•˜๋ฉด, DAG๊ฐ€ ํŠธ๋ฆฌ๊ฑฐ ๋  ๊ฒฝ์šฐ์—๋งŒ ์›Œ์ปค ๋…ธ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜์–ด ๋ฆฌ์†Œ์Šค๋ฅผ ํ• ๋‹น๋ฐ›๊ณ , ํƒœ์Šคํฌ๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ๋ฆฌ์†Œ์Šค๋ฅผ ๋ฐ˜ํ™˜ํ•ด์•ผ ๋˜๋Š”๋ฐ, ์›Œ์ปค๊ฐ€ ๊ณ„์† ๋Œ์•„๊ฐ€๊ณ  ์žˆ๋‹ค. KubernetesExecutor๋กœ ์„ค์ •๋˜์–ด์žˆ์ง€ ์•Š์•„์„œ ๊ทธ๋ ‡๋‹ค!

KubernetesExecutor๋กœ ์„ค์ •

vi values.yaml
Bash
๋ณต์‚ฌ
: values.yaml์— ๋‹ค์‹œ ๋“ค์–ด๊ฐ€์„œ executor๋ฅผ ๋ณ€๊ฒฝํ•œ๋‹ค.
์ด๋ ‡๊ฒŒ ๋˜๋ฉด ํ‰์ƒ์‹œ์—๋Š” ์›Œ์ปค๋…ธ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜์–ด์žˆ์ง€ ์•Š๋‹ค.
๋‹ค์Œ๊ณผ ๊ฐ™์ด DAG๋ฅผ ํŠธ๋ฆฌ๊ฑฐ ํ•˜๋ฉด, DAG ์•„์ด๋””์— ํ•ด๋‹นํ•˜๋Š” ์›Œ์ปค ๋…ธ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜์–ด ํƒœ์Šคํฌ๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.
๊ทธ ํ›„, ํƒœ์Šคํฌ๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ํŒŒ๋“œ๋Š” ์ข…๋ฃŒ๋˜๊ณ  ๋ฆฌ์†Œ์Šค๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
์ตœ์ข…์ ์ธ overriding values.yml์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค
private repo ๊ธฐ์ค€
airflowVersion: "2.6.2" extraEnv: | - name: "AIRFLOW__CORE__PLUGINS_FOLDER" value: "/opt/airflow/dags/repo/airflow/plugins" executor: "KubernetesExecutor" dags: persistence: # Enable persistent volume for storing dags enabled: true #๋ณ€๊ฒฝ 1 # Volume size for dags size: 10Gi # If using a custom storageClass, pass name here storageClassName: # access mode of the persistent volume accessMode: ReadWriteOnce ## the name of an existing PVC to use existingClaim: gitSync: enabled: true repo: git@github.com:xxx/xxxx.git branch: main rev: HEAD depth: 1 # the number of consecutive failures allowed before aborting maxFailures: 0 # subpath within the repo where dags are located # should be "" if dags are at repo root subPath: "airflow/dags" credentialsSecret: git-credentials sshKeySecret: airflow-git-ssh-secret #๋ณ€๊ฒฝ 5 wait: 10 containerName: git-sync uid: 65533 # When not set, the values defined in the global securityContext will be used securityContext: {} # runAsUser: 65533 # runAsGroup: 0 extraVolumeMounts: [] env: [] resources: {} # limits: # cpu: 100m # memory: 128Mi # requests: # cpu: 100m # memory: 128Mi scheduler: extraVolumes: - name: git-sync-ssh-key secret: secretName: airflow-git-ssh-secret
SQL
๋ณต์‚ฌ

Dynamic webserver ํ‚ค๊ฐ€ ์•„๋‹Œ static key ๋ถ€์—ฌ

Kubernetesํ™˜๊ฒฝ์—์„œ Airflow๋ฅผ?!?!
Kubernetesํ™˜๊ฒฝ์—์„œ Airflow๋ฅผ?!?! ์š”์ฆ˜ ๊ฐ€์žฅ ํ•ซํ•œ Workflow Tool์€ Airflow์ž…๋‹ˆ๋‹ค. python์œผ๋กœ ๊ฐœ๋ฐœ๋˜์–ด์„œ ์„ค์น˜๋„ PyPl๋กœ ๊ฐ„ํŽธํ•˜๊ฒŒ ์„ค์น˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ๋””ํ…Œ์ผํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜๋ ค๋ฉด Metastore, Celery Worker ๋“ฑ ์„ค์ •ํ• ๊ฒŒ ๋งŽ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌ๋˜ ์ค‘ Kubernetes์— Airflow๋ฅผ ์„ค์น˜ํ•ด์•ผ ๋  ์—…๋ฌด๋ฅผ ๋งก๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” Airflow๋ฅผ Kubernetes ํ™˜๊ฒฝ์— ์„ค์น˜ํ•˜๋Š” ๊ฒƒ์„ ์ •๋ฆฌํ•ด ๋ณด์•˜์Šต๋‹ˆ๋‹ค. Helm์œผ๋กœ Airflow ์„ค์น˜ Airflow์—์„œ ๊ณต์‹ Helm์„ ์ œ๊ณตํ•ด ์ฃผ๊ธฐ ๋•Œ๋ฌธ์— ํ•ด๋‹น Helm์œผ๋กœ ์„ค์น˜๋ฅผ ์ง„ํ–‰ํ•˜์˜€์Šต๋‹ˆ๋‹ค. ๋‹ค๋ฅธ ๋ฒ„์ „์ธ User-community Helm๋„ ์กด์žฌํ•˜๋‹ˆ ์ฐธ๊ณ ํ•ด ์ฃผ์‹œ๊ธธ ๋ฐ”๋ž๋‹ˆ๋‹ค. Airflow Helm Chart๋ฅผ..
python3 -c 'import secrets; print(secrets.token_hex(16))' f83bab1deefae2199d27d2621762c30b
SQL
๋ณต์‚ฌ
Airflow์—์„œ๋Š” webserver๋กœ flask๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค
flask๋Š” session id๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ํ•„์š”ํ•œ๋ฐ, ๋‚œ์ˆ˜๋ฅผ ๋งŒ๋“ค์–ด์„œ k8s์˜ secret object๋กœ ์ƒ์„ฑ์„ ํ•ด์•ผํ•œ๋‹ค
kubectl create secret generic \ airflow-webserver-secret \ --from-literal="webserver-secret-key=f83bab1deefae2199d27d2621762c30b" \ -n airflow secret/airflow-webserver-secret created
SQL
๋ณต์‚ฌ
airflowVersion: "2.6.2" extraEnv: | - name: "AIRFLOW__CORE__PLUGINS_FOLDER" value: "/opt/airflow/dags/repo/airflow/plugins" # ์ถ”๊ฐ€ webserverSecretKey: webserver-secret-key webserverSecretKeySecretName: airflow-webserver-secret executor: "KubernetesExecutor" dags: persistence: # Enable persistent volume for storing dags enabled: true #๋ณ€๊ฒฝ 1 # Volume size for dags size: 10Gi # If using a custom storageClass, pass name here storageClassName: # access mode of the persistent volume accessMode: ReadWriteOnce ## the name of an existing PVC to use existingClaim: gitSync: enabled: true repo: https://github.com/xxx/xxx.git branch: main rev: HEAD depth: 1 # the number of consecutive failures allowed before aborting maxFailures: 0 # subpath within the repo where dags are located # should be "" if dags are at repo root subPath: "airflow/dags" # credentialsSecret: git-credentials # sshKeySecret: airflow-git-ssh-secret #๋ณ€๊ฒฝ 5 wait: 10 containerName: git-sync uid: 65533 # When not set, the values defined in the global securityContext will be used securityContext: {} # runAsUser: 65533 # runAsGroup: 0 extraVolumeMounts: [] env: [] resources: {} # limits: # cpu: 100m # memory: 128Mi # requests: # cpu: 100m # memory: 128Mi # ์ถ”๊ฐ€ webserver: service: type: NodePort ## service annotations ports: - name: airflow-ui port: "{{ .Values.ports.airflowUI }}" targetPort: "{{ .Values.ports.airflowUI }}" nodePort: 31151
SQL
๋ณต์‚ฌ
helm upgrade --install airflow apache-airflow/airflow -n airflow -f my-values.yaml
SQL
๋ณต์‚ฌ
๋‹ค์‹œ ๋ฐฐํฌ๋ฅผ ์ง„ํ–‰ํ•ด๋ณธ๋‹ค.
$ minikube ip 192.168.49.2 $ kubectl get svc -n airflow NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE airflow-postgresql ClusterIP 10.100.22.149 <none> 5432/TCP 30m airflow-postgresql-hl ClusterIP None <none> 5432/TCP 30m airflow-statsd ClusterIP 10.107.91.48 <none> 9125/UDP,9102/TCP 30m airflow-triggerer ClusterIP None <none> 8794/TCP 30m airflow-webserver NodePort 10.100.214.92 <none> 8080:31151/TCP 30m # ์•„๋ž˜์˜ ์ฃผ์†Œ๋กœ ์ ‘๊ทผ http://192.168.49.2:31151
SQL
๋ณต์‚ฌ

log ์ด์Šˆ

dag๊ฐ€ ์ž˜ ์‹คํ–‰์€ ๋˜๋Š”๋ฐ ์•„๋ž˜์™€ ๊ฐ™์€ ์ด์Šˆ๊ฐ€ ๋ฐœ๊ฒฌ๋˜์—ˆ๋‹ค.
Could not read served logs: [Errno -2] Name or service not known
์›น์„œ๋ฒ„ ํŒŒ๋“œ์˜ ๋กœ๊ทธ๋ฅผ ํ•œ๋ฒˆ ์‚ดํŽด๋ณด์ž.
$ kubectl logs airflow-webserver-78866bff9b-cbrx4 -n airflow Traceback (most recent call last): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 505, in _read_from_logs_server response = _fetch_logs_from_service(url, rel_path) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 93, in _fetch_logs_from_service headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_api.py", line 201, in get trust_env=trust_env, File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_api.py", line 110, in request follow_redirects=follow_redirects, File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 821, in request return self.send(request, auth=auth, follow_redirects=follow_redirects) File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 912, in send history=[], File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 939, in _send_handling_auth history=history, File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 973, in _send_handling_redirects response = self._send_single_request(request) File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_client.py", line 1009, in _send_single_request response = transport.handle_request(request) File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_transports/default.py", line 218, in handle_request resp = self._pool.handle_request(req) File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__ self.gen.throw(type, value, traceback) File "/home/airflow/.local/lib/python3.7/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ConnectError: [Errno -2] Name or service not known
SQL
๋ณต์‚ฌ
์ด์œ ๋Š” Airflow์˜ ๋กœ๊ทธ ์ง€์†์„ฑ๊ณผ ์—ฐ๊ด€ ์žˆ๋‹ค.
kubernetes executor๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋ฉด ๋™์ ์œผ๋กœ worker๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋Š”๋ฐ,
์ด ๋•Œ ๋กœ๊น…์„ ์›Œ์ปค์— ํ•˜๊ฒŒ ๋˜๊ณ , ์›Œ์ปค๊ฐ€ ์‚ฌ๋ผ์ง€๊ฒŒ ๋˜๋ฉด ํ•ด๋‹น service ์— ๋Œ€ํ•ด์„œ ๋กœ๊ทธ๋ฅผ ๊ฐ€์ ธ์˜ค์ง€ ๋ชปํ•˜๊ฒŒ ๋˜๋Š” ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธด๋‹ค
์ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ๋กœ๊น… ์Šคํ† ๋ฆฌ์ง€๋ฅผ s3์™€ ๊ฐ™์ด ๋ถ„๋ฆฌํ•˜๊ฑฐ๋‚˜, ์ถ”๊ฐ€์ ์ธ ์™ธ๋ถ€ ๋ณผ๋ฅจ(PVC)์„ ์žก์•„์•ผํ•œ๋‹ค
Airflow์—์„œ ๋กœ๊ทธ ์ง€์†์„ฑ(persistence)์„ ํ™œ์„ฑํ™”ํ•˜๋ฉด, Airflow ์ปดํฌ๋„ŒํŠธ๋“ค์ด ๋กœ๊ทธ๋ฅผ ๋™์ผํ•œ ๋ณผ๋ฅจ์— ๊ธฐ๋กํ•˜๊ฒŒ ๋œ๋‹ค. ์ด๋ฅผ ์œ„ํ•ด ReadWriteMany ์ ‘๊ทผ ๋ชจ๋“œ๋ฅผ ๊ฐ€์ง„ PersistentVolumeClaim์ด ํ”„๋กœ๋น„์ €๋‹๋œ๋‹ค.
์•„๋ž˜์— ์ถ”๊ฐ€์ ์ธ ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•์„ ๋ช…์‹œํ•ด๋ณด๋„๋ก ํ•˜๊ฒ ๋‹ค.
values.yaml ์•„๋ž˜ ๋ช…์‹œ๋ฅผ ํ•˜์ž.
logs: persistence: enabled: true
SQL
๋ณต์‚ฌ
์ฆ‰, ๊ธฐ๋ณธ ๋กœ๊ทธ๊ฐ€ false์ธ๋ฐ, true ๋ฐ”๊ฟ”์ค˜์•ผ Persistence volume์— Pod log๋ฅผ ๋‚จ๊ธด๋‹ค.
์œ„์™€ ๊ฐ™์ด ์„ค์ • ํ›„ helm์„ ์žฌ์„ค์น˜ํ•˜์ž.
์œ„์™€ ๊ฐ™์ด ์ •์ƒ์ ์œผ๋กœ ๋กœ๊ทธ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

requirements.txt ์„ค์น˜

์ฐธ๊ณ  ์ž๋ฃŒ

Kubernetesํ™˜๊ฒฝ์—์„œ Airflow๋ฅผ?!?!
Kubernetesํ™˜๊ฒฝ์—์„œ Airflow๋ฅผ?!?! ์š”์ฆ˜ ๊ฐ€์žฅ ํ•ซํ•œ Workflow Tool์€ Airflow์ž…๋‹ˆ๋‹ค. python์œผ๋กœ ๊ฐœ๋ฐœ๋˜์–ด์„œ ์„ค์น˜๋„ PyPl๋กœ ๊ฐ„ํŽธํ•˜๊ฒŒ ์„ค์น˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ๋””ํ…Œ์ผํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜๋ ค๋ฉด Metastore, Celery Worker ๋“ฑ ์„ค์ •ํ• ๊ฒŒ ๋งŽ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌ๋˜ ์ค‘ Kubernetes์— Airflow๋ฅผ ์„ค์น˜ํ•ด์•ผ ๋  ์—…๋ฌด๋ฅผ ๋งก๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” Airflow๋ฅผ Kubernetes ํ™˜๊ฒฝ์— ์„ค์น˜ํ•˜๋Š” ๊ฒƒ์„ ์ •๋ฆฌํ•ด ๋ณด์•˜์Šต๋‹ˆ๋‹ค. Helm์œผ๋กœ Airflow ์„ค์น˜ Airflow์—์„œ ๊ณต์‹ Helm์„ ์ œ๊ณตํ•ด ์ฃผ๊ธฐ ๋•Œ๋ฌธ์— ํ•ด๋‹น Helm์œผ๋กœ ์„ค์น˜๋ฅผ ์ง„ํ–‰ํ•˜์˜€์Šต๋‹ˆ๋‹ค. ๋‹ค๋ฅธ ๋ฒ„์ „์ธ User-community Helm๋„ ์กด์žฌํ•˜๋‹ˆ ์ฐธ๊ณ ํ•ด ์ฃผ์‹œ๊ธธ ๋ฐ”๋ž๋‹ˆ๋‹ค. Airflow Helm Chart๋ฅผ..
2. Airflow on Kubernetes - git sync
Prerequisite 1. Airflow on Kubernetes Apache Airflow๋Š” ์ฟ ๋ฒ„๋„คํ‹ฐ์Šค์— ์นœ์ˆ™ํ•œ ํ”„๋กœ์ ํŠธ๋ฅผ ๋ชฉํ‘œ๋กœ ํ•˜๊ณ  ์žˆ์œผ๋ฉฐ, ๊ณต์‹ helm ์ฐจํŠธ๋ฅผ ๊ด€๋ฆฌํ•˜๊ณ  ์žˆ๋‹ค. ํ•ด๋‹น ์ฐจํŠธ๋ฅผ ์ด์šฉํ•ด์„œ ์ฟ ๋ฒ„๋„คํ‹ฐ์Šค ํด๋Ÿฌ์Šคํ„ฐ์— ๋น„๊ต์  ์‰ฝ๊ฒŒ ์—์–ดํ”Œ๋กœ์šฐ๋ฅผ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋‹ค. G jinyes-tistory.tistory.com Native Airflow๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด local directory์— dag๋ฅผ ์ €์žฅํ•˜๊ณ  ์‚ฌ์šฉํ•˜๋ฉด ๋˜์ง€๋งŒ, kubernetes๋Š” ์ปจํ…Œ์ด๋„ˆ ํ˜•ํƒœ(์ƒํƒœ)๋กœ ๋ฆฌ์†Œ์Šค๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์—, ์–ธ์ œ๋“  Airflow๊ฐ€ ์ข…๋ฃŒ๋˜์—ˆ๋‹ค๊ฐ€ ์žฌ์‹œ์ž‘์ด ๋  ์ˆ˜ ์žˆ๋‹ค. Airflow๋Š” remote directory ๊ธฐ๋Šฅ์ธ git sync๋ฅผ ์ œ๊ณตํ•˜๋Š”๋ฐ, ํ•ด๋‹น ๊ธฐ๋Šฅ์„ ์ด์šฉํ•ด์„œ DAG ํŒŒ์ผ ์ €์žฅ์†Œ๋กœ github reposito..
EKS ์œ„์— Airflow ๊ตฌ์„ฑ
EKS ์ƒ์„ฑ์€ ๋ชจ๋‘ ๋๋‚ฌ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ณ  ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์ด ๊ธ€์€ Kubernetes๋ฅผ ์„ค๋ช…ํ•˜๋Š” ๊ธ€์ด ์•„๋‹ˆ๋ฏ€๋กœ namespace, pod, kubectl, helm ๋“ฑ K8S์™€ ํ•จ๊ป˜ ๋“ฑ์žฅํ•˜๋Š” ์šฉ์–ด์— ๋Œ€ํ•œ ๊นŠ์ด ์žˆ๋Š” ์„ค๋ช…์€ ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋กœ์ปฌ PC์—์„œ aws ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด EKS ์ปจํ…์ŠคํŠธ๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ๋ฌด์Šจ ์˜๋ฏธ์ธ์ง€ ํ•ด์„ํ•˜์‹ค ํ•„์š” ์—†์Šต๋‹ˆ๋‹ค. ์‰ฝ๊ฒŒ ์ƒ๊ฐํ•ด์„œ ๋กœ์ปฌ์—์„œ kubectl ๋ช…๋ น์„ ์‚ฌ์šฉํ•  ๋•Œ ๋ฐ”๋ผ๋ณด๊ฒŒ ๋˜๋Š” ๊ณณ์ด AWS EKS๋ผ๊ณ  ์ƒ๊ฐํ•˜์‹œ๋ฉด ๋ฉ๋‹ˆ๋‹ค[1]. --name ๋’ค์— transformer-dev๋Š” ๊ฐ์ž์˜ ์„ค์ •์— ๋งž๊ฒŒ ๋ณ€๊ฒฝํ•ด์ฃผ์‹œ๊ธฐ ๋ฐ”๋ž๋‹ˆ๋‹ค. transformer-dev๋Š” ์ด ๊ธ€์ด ์ž‘์„ฑ๋  ๋•Œ ๊ธฐ์ค€์œผ๋กœ ํ…Œ์ŠคํŠธํ•œ EKS ํด๋Ÿฌ์Šคํ„ฐ ์ด๋ฆ„์ž…๋‹ˆ๋‹ค. $ aws eks --region ap-northeast-2 updat..

24.03.20

airflow ๋ฌดํ•œ pending ์ƒํƒœ
๋‚˜์™€ ๋™์ผํ•œ ์—๋Ÿฌ๋ฅผ ๋ฐœ๊ฒฌ
rancher storage class
โ€ข
storageClass๋Š” ๋™์ ์œผ๋กœ PersistentVolume์„ ์ƒ์„ฑ
โ€ข
์‚ฌ์šฉ์ž๊ฐ€ PVC๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ด PVC๊ฐ€ ํŠน์ • StorageClass๋ฅผ ์ฐธ์กฐ
โ€ข
์•„๋ž˜ storage class๋ฅผ ์ฐธ๊ณ 
โ€ข
PVC๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ์š”์ฒญํ•˜๋Š” ์Šคํ† ๋ฆฌ์ง€ ๋ฆฌ์†Œ์Šค๋ฅผ ์ •์˜