ECS Task๋ฅผ ๋น์ฉ ํจ์จ์ ์ผ๋ก Airflow์์ ์คํํด๋ณด์
Airflow์์ ์คํํ ECS ์๋น์ค๋ ๋ฐ๋ก Task๋ค.
๊ทธ๋ฆฌ๊ณ ๊ทธ ํํ๋ฅผ Fargate๋ผ๋ ์๋ฒ๋ฆฌ์ค ํํ๋ก ๋์ธ ์์ ์ด๋ค.
์ฑ์ฐํผํฐ์ ์ค๋ช
1.
์ปจํ
์ด๋ ์ ์: ECS Task๋ ํ๋ ์ด์์ ์ปจํ
์ด๋๋ก ๊ตฌ์ฑ๋๋ฉฐ, ๊ฐ ์ปจํ
์ด๋๋ Docker ์ด๋ฏธ์ง, ์คํ ํ๊ฒฝ ๋ณ์, ๋ก๊ทธ ๊ตฌ์ฑ ๋ฑ์ ํฌํจํ๋ '์ปจํ
์ด๋ ์ ์'๋ฅผ ์ฌ์ฉํ์ฌ ๊ตฌ์ฑ๋ฉ๋๋ค.
2.
Task ์ ์: Task๋ฅผ ์์ฑํ๊ธฐ ์ ์ 'Task ์ ์(Task Definition)'๋ฅผ ์์ฑํด์ผ ํฉ๋๋ค. ์ด ์ ์์๋ ์คํํ ์ปจํ
์ด๋, ์ฌ์ฉํ ๋ฆฌ์์ค(์: CPU ๋ฐ ๋ฉ๋ชจ๋ฆฌ ํ ๋น), ๋คํธ์ํฌ ์ค์ , ๋ณผ๋ฅจ ๋ง์ดํธ ๋ฑ์ด ํฌํจ๋ฉ๋๋ค.
3.
์คํ ํ๊ฒฝ: ECS Task๋ AWS์ ๋ ๊ฐ์ง ์ฃผ์ ์ปจํ
์ด๋ ๊ด๋ฆฌ ์ต์
์ค ํ๋์์ ์คํ๋ ์ ์์ต๋๋ค: 'Fargate'๋ ์๋ฒ๋ฆฌ์ค ์ปจํ
์ด๋ ์คํ ํ๊ฒฝ์ ์ ๊ณตํ๋ฉฐ, 'EC2'๋ ์ฌ์ฉ์๊ฐ ๊ด๋ฆฌํ๋ EC2 ์ธ์คํด์ค์์ ์ปจํ
์ด๋๋ฅผ ์คํํฉ๋๋ค.
4.
ํ์ฅ์ฑ ๋ฐ ๊ด๋ฆฌ: ECS๋ Task์ ๋ฐฐํฌ ๋ฐ ํ์ฅ์ ์๋ํํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ์๋น์ค ์ ์๋ฅผ ์ฌ์ฉํ์ฌ ํน์ ์์ Task ๋ณต์ ๋ณธ์ด ์ง์์ ์ผ๋ก ์คํ๋๋๋ก ํ ์ ์์ผ๋ฉฐ, ๋ก๋ ๋ฐธ๋ฐ์๋ฅผ ์ฌ์ฉํ์ฌ ํธ๋ํฝ์ ๋ถ์ฐํ ์๋ ์์ต๋๋ค.
5.
๋ณด์ ๋ฐ ๊ฒฉ๋ฆฌ: ๊ฐ Task๋ ๊ฒฉ๋ฆฌ๋ ํ๊ฒฝ์์ ์คํ๋๋ฉฐ, AWS์ ๋ณด์ ๊ทธ๋ฃน ๋ฐ ๋คํธ์ํฌ ACL์ ์ฌ์ฉํ์ฌ ๋คํธ์ํฌ ์ก์ธ์ค๋ฅผ ์ ์ดํ ์ ์์ต๋๋ค.
Task์ ๋ํ ์ค๋ช
์ ์๋ ์์์ ์ฐธ๊ณ ํ์
์์ ๊ฐ์ด ๋น์ฉ ์ต์ ํ๋ฅผ ์ํด Fargate๊ฐ ์๋ Fargate_spot์ ์ ํํ๋ค (๋๋ต 70% ๊ฐ๋์ด๋ผ๊ณ ํ๋ ํฐ ์ฐจ์ด์ธ ๊ฒ ๊ฐ๋ค) ์ฃผ๋ก ๋ฐฐ์น ์์
์ ๋๋ฆด ๋, ์์ ๊ฐ์ ์ฉ๋ ๊ณต๊ธ์๋ฅผ ์ ํํ๋ ๊ฒ ๊ฐ๋ค.
์ ECS Fargate๋ฅผ ์คํํ๊ธฐ ์ํด์๋ ๋จผ์ ECR์ ์ปจํ
์ด๋ ์ด๋ฏธ์ง๊ฐ ์ฌ๋ผ๊ฐ ์์ด์ผ ํ๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก ECS์ EcsRunTaskOperator ๋ ์๋์ ๊ฐ์ ๊ตฌ์ฑ์ ๊ฐ์ง๋ค
ย ์ฐ๋ฆฌ๋ ์ด๋ฏธ ๋ง๋ค์ด์ง ํ์คํฌ ์ ์๋ฅผ ๋ฐฐํฌํ๋ ์คํผ๋ ์ดํฐ๋ฅผ ์ฌ์ฉํ ๊ฒ์ด๋ค.
run_task = EcsRunTaskOperator(
task_id="run_task",
cluster=existing_cluster_name,
task_definition=register_task.output,
overrides={
"containerOverrides": [
{
"name": container_name,
"command": ["echo hello world"],
},
],
},
network_configuration={"awsvpcConfiguration": {"subnets": existing_cluster_subnets}},
awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",
)
Shell
๋ณต์ฌ
์์ ๊ฐ์ด ECS Task๋ฅผ ๋ฐฐํฌํ ๋, ํ์ํ ์ค์ ๋ค์ด ๋ค์ด๊ฐ๋ค
๊ธฐ๋ณธ์ ์ผ๋ก launch type์ ec2์ด๋ค
hello_world = EcsRunTaskOperator(
task_id="hello_world",
cluster=cluster_name,
task_definition=task_definition_name,
launch_type="FARGATE", # Fargate๋ก ๋ณ๊ฒฝ
overrides={
"containerOverrides": [
{
"name": container_name,
"command": ["echo", "hello", "world"],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"subnets": test_context[SUBNETS_KEY],
"securityGroups": test_context[SECURITY_GROUPS_KEY],
"assignPublicIp": "ENABLED",
},
},
)
Shell
๋ณต์ฌ
์์ ๊ฐ์ด launch_type, ์ค๋ฒ๋ผ์ด๋ฉํ ๋์ปค ์ปค๋งจ๋ ๋ํ ์ง์ ์ด ๊ฐ๋ฅํ
๊ทธ๋ ๋ค๋ฉด Fargate spot์ ์ด๋ป๊ฒ ๋์ธ๊น?
airflow ecs operator์ ์ฝ๋ ๊ตฌํ์ ๋ณด๋ฉด ์ ์ ์๋ค.
def __init__(
self,
*,
task_definition: str,
cluster: str,
overrides: dict,
launch_type: str = "EC2",
capacity_provider_strategy: list | None = None, # ์ฌ๊ธฐ
group: str | None = None,
placement_constraints: list | None = None,
placement_strategy: list | None = None,
platform_version: str | None = None,
network_configuration: dict | None = None,
tags: dict | None = None,
awslogs_group: str | None = None,
awslogs_region: str | None = None,
awslogs_stream_prefix: str | None = None,
awslogs_fetch_interval: timedelta = timedelta(seconds=30),
propagate_tags: str | None = None,
quota_retry: dict | None = None,
reattach: bool = False,
number_logs_exception: int = 10,
wait_for_completion: bool = True,
waiter_delay: int = 6,
waiter_max_attempts: int = 1000000,
# Set the default waiter duration to 70 days (attempts*delay)
# Airflow execution_timeout handles task timeout
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
):
super().__init__(**kwargs)
Shell
๋ณต์ฌ
capacity_provider_strategy๋ฅผ ๋ฐ๋ ๊ฒ์ ๋ณผ ์ ์๋ค
def _start_task(self):
...
run_opts["capacityProviderStrategy"] = self.capacity_provider_strategy
Shell
๋ณต์ฌ
๋ค์ ๊ทธ๋ฆผ์ ๋ณด๋ฉด, ์์ ๊ฐ์ด ์ค์ ์ด ๋์ด ์๋ ๊ฒ์ ๋ณผ ์ ์๋ค
๊ทธ๋ผ ๊ฐ์ ์ด๋ป๊ฒ ํํ๋ก ๋ง๋ค์ด์ ๋ฃ์ด์ฃผ์ง? ํ๋ ์๊ฐ์ด ๋ค ์ ์๋ค (๋ด๊ฐ ๊ทธ๋ฌ๋คโฆ)
๊ธฐ์ตํ๋ฉด ์ข์ ์ฌ์ค์ด ์๋๋ฐ, Airflow AWS ๊ด๋ จ provider๋ boto3๋ก ๊ตฌํ์ด ๋์ด ์๋ค
๊ทธ๋ ๋ค๋ฉด boto3์์ ์ด๋ป๊ฒ ํ๋์ง ์ฐพ์๋ณด์
request syntax
์์ ์ฝ๋๋ฅผ ๋ณด๋ฉด ๋ญ๊ฐ Airflow์ ecs ์คํผ๋ ์ดํฐ์ ์๋นํ ์ ์ฌํ๋ ์ฌ์ค์ ๊นจ๋ฌ์ ์ ์๋ค..!
capacityProviderStrategy=[
{
'capacityProvider': 'string',
'weight': 123,
'base': 123
},
],
Shell
๋ณต์ฌ
์ฐ๋ฆฌ๊ฐ ๋ฃ์ ๊ฐ์ boto3์์ ์์ ๊ฐ์ด ๋ฃ๊ฒ ๋๋ค
์ ์ด์ ์์ฑํด๋ณด์
prs_etl_pipeline_task = EcsRunTaskOperator(
task_id='etl_pipeline_task',
dag=etl_dag,
on_failure_callback=task_fail_alert,
aws_conn_id=aws_conn_id,
cluster='etl-pipeline',
task_definition='etl_pipeline',
launch_type='FARGATE',
overrides={},
wait_for_completion=True,
capacity_provider_strategy=[{
'capacityProvider': 'FARGATE_SPOT',
'weight': 1,
'base': 1
}],
# datapipeline vpc
network_configuration={
"awsvpcConfiguration": {
"subnets": ["subnet-xxxcaxxxxxxxxx"],
"securityGroups": ["sg-093xxxxxxx"],
"assignPublicIp": "ENABLED"
}
},
awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=awslogs_stream_prefix,
)
Shell
๋ณต์ฌ
์ ์์ ๊ฐ์ด ์ค์ ์ ํ๊ฒ ๋๋ฉด ๋๋ค
์์ ๊ฐ์ด ๋ฐฐํฌ๊ฐ ๋ ๊ฒ์ ํ์ธํ๋ค.
๋ฐฐํฌ (๋ฒ์ธ)
ํ์๋ ํ ๋, ECS๋ก Airflow๋ฅผ ๋ฐฐํฌํด๋ณด๊ณ ์ถ์๋๋ฐ, ๊ด๋ฆฌ์ ์ด ๋๋ฌด ์ด๋ ต๊ณ ๋ง๋ค๋ ๋ถ๋ถ์ ๋ค์ด์ ํฌ๊ธฐํ์๋ค.
์๋ celery๋ฅผ ์ด์ฉํด์ fargate ํํ๋ก ๋ฐฐํฌํ ๊ธ๋ ์๋์ ์๋ค.
๋ฒ์ ผ์ด 1.10 ์ธ ๊ฒ์ด ํ ์ด๊ธด ํ์ง๋ง, ์ฐธ๊ณ ํ ๋งํ๋ค.