Search

Airflow with ECS task operator (Fargate Spot)

ECS Task๋ฅผ ๋น„์šฉ ํšจ์œจ์ ์œผ๋กœ Airflow์—์„œ ์‹คํ–‰ํ•ด๋ณด์ž

Airflow์—์„œ ์‹คํ–‰ํ•  ECS ์„œ๋น„์Šค๋Š” ๋ฐ”๋กœ Task๋‹ค.
๊ทธ๋ฆฌ๊ณ  ๊ทธ ํ˜•ํƒœ๋ฅผ Fargate๋ผ๋Š” ์„œ๋ฒ„๋ฆฌ์Šค ํ˜•ํƒœ๋กœ ๋„์šธ ์˜ˆ์ •์ด๋‹ค.
์ฑ„์ฐํ”ผํ‹ฐ์˜ ์„ค๋ช…
1.
์ปจํ…Œ์ด๋„ˆ ์ •์˜: ECS Task๋Š” ํ•˜๋‚˜ ์ด์ƒ์˜ ์ปจํ…Œ์ด๋„ˆ๋กœ ๊ตฌ์„ฑ๋˜๋ฉฐ, ๊ฐ ์ปจํ…Œ์ด๋„ˆ๋Š” Docker ์ด๋ฏธ์ง€, ์‹คํ–‰ ํ™˜๊ฒฝ ๋ณ€์ˆ˜, ๋กœ๊ทธ ๊ตฌ์„ฑ ๋“ฑ์„ ํฌํ•จํ•˜๋Š” '์ปจํ…Œ์ด๋„ˆ ์ •์˜'๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค.
2.
Task ์ •์˜: Task๋ฅผ ์ƒ์„ฑํ•˜๊ธฐ ์ „์— 'Task ์ •์˜(Task Definition)'๋ฅผ ์ž‘์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด ์ •์˜์—๋Š” ์‹คํ–‰ํ•  ์ปจํ…Œ์ด๋„ˆ, ์‚ฌ์šฉํ•  ๋ฆฌ์†Œ์Šค(์˜ˆ: CPU ๋ฐ ๋ฉ”๋ชจ๋ฆฌ ํ• ๋‹น), ๋„คํŠธ์›Œํฌ ์„ค์ •, ๋ณผ๋ฅจ ๋งˆ์šดํŠธ ๋“ฑ์ด ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.
3.
์‹คํ–‰ ํ™˜๊ฒฝ: ECS Task๋Š” AWS์˜ ๋‘ ๊ฐ€์ง€ ์ฃผ์š” ์ปจํ…Œ์ด๋„ˆ ๊ด€๋ฆฌ ์˜ต์…˜ ์ค‘ ํ•˜๋‚˜์—์„œ ์‹คํ–‰๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค: 'Fargate'๋Š” ์„œ๋ฒ„๋ฆฌ์Šค ์ปจํ…Œ์ด๋„ˆ ์‹คํ–‰ ํ™˜๊ฒฝ์„ ์ œ๊ณตํ•˜๋ฉฐ, 'EC2'๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ๊ด€๋ฆฌํ•˜๋Š” EC2 ์ธ์Šคํ„ด์Šค์—์„œ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.
4.
ํ™•์žฅ์„ฑ ๋ฐ ๊ด€๋ฆฌ: ECS๋Š” Task์˜ ๋ฐฐํฌ ๋ฐ ํ™•์žฅ์„ ์ž๋™ํ™”ํ•˜๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์„œ๋น„์Šค ์ •์˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŠน์ • ์ˆ˜์˜ Task ๋ณต์ œ๋ณธ์ด ์ง€์†์ ์œผ๋กœ ์‹คํ–‰๋˜๋„๋ก ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๋กœ๋“œ ๋ฐธ๋Ÿฐ์„œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŠธ๋ž˜ํ”ฝ์„ ๋ถ„์‚ฐํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.
5.
๋ณด์•ˆ ๋ฐ ๊ฒฉ๋ฆฌ: ๊ฐ Task๋Š” ๊ฒฉ๋ฆฌ๋œ ํ™˜๊ฒฝ์—์„œ ์‹คํ–‰๋˜๋ฉฐ, AWS์˜ ๋ณด์•ˆ ๊ทธ๋ฃน ๋ฐ ๋„คํŠธ์›Œํฌ ACL์„ ์‚ฌ์šฉํ•˜์—ฌ ๋„คํŠธ์›Œํฌ ์•ก์„ธ์Šค๋ฅผ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
Task์— ๋Œ€ํ•œ ์„ค๋ช…์€ ์•„๋ž˜ ์˜์ƒ์„ ์ฐธ๊ณ ํ•˜์ž
์œ„์™€ ๊ฐ™์ด ๋น„์šฉ ์ตœ์ ํ™”๋ฅผ ์œ„ํ•ด Fargate๊ฐ€ ์•„๋‹Œ Fargate_spot์„ ์„ ํƒํ•œ๋‹ค (๋Œ€๋žต 70% ๊ฐ€๋Ÿ‰์ด๋ผ๊ณ  ํ•˜๋‹ˆ ํฐ ์ฐจ์ด์ธ ๊ฒƒ ๊ฐ™๋‹ค) ์ฃผ๋กœ ๋ฐฐ์น˜ ์ž‘์—…์„ ๋Œ๋ฆด ๋•Œ, ์œ„์™€ ๊ฐ™์€ ์šฉ๋Ÿ‰ ๊ณต๊ธ‰์ž๋ฅผ ์„ ํƒํ•˜๋Š” ๊ฒƒ ๊ฐ™๋‹ค.
์œ„ ECS Fargate๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ๋จผ์ € ECR์— ์ปจํ…Œ์ด๋„ˆ ์ด๋ฏธ์ง€๊ฐ€ ์˜ฌ๋ผ๊ฐ€ ์žˆ์–ด์•ผ ํ•œ๋‹ค.
๊ธฐ๋ณธ์ ์œผ๋กœ ECS์˜ EcsRunTaskOperator ๋Š” ์•„๋ž˜์™€ ๊ฐ™์€ ๊ตฌ์„ฑ์„ ๊ฐ€์ง„๋‹ค
ย ์šฐ๋ฆฌ๋Š” ์ด๋ฏธ ๋งŒ๋“ค์–ด์ง„ ํƒœ์Šคํฌ ์ •์˜๋ฅผ ๋ฐฐํฌํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋‹ค.
run_task = EcsRunTaskOperator( task_id="run_task", cluster=existing_cluster_name, task_definition=register_task.output, overrides={ "containerOverrides": [ { "name": container_name, "command": ["echo hello world"], }, ], }, network_configuration={"awsvpcConfiguration": {"subnets": existing_cluster_subnets}}, awslogs_group=log_group_name, awslogs_region=aws_region, awslogs_stream_prefix=f"ecs/{container_name}", )
Shell
๋ณต์‚ฌ
์œ„์™€ ๊ฐ™์ด ECS Task๋ฅผ ๋ฐฐํฌํ•  ๋•Œ, ํ•„์š”ํ•œ ์„ค์ •๋“ค์ด ๋“ค์–ด๊ฐ„๋‹ค
๊ธฐ๋ณธ์ ์œผ๋กœ launch type์€ ec2์ด๋‹ค
hello_world = EcsRunTaskOperator( task_id="hello_world", cluster=cluster_name, task_definition=task_definition_name, launch_type="FARGATE", # Fargate๋กœ ๋ณ€๊ฒฝ overrides={ "containerOverrides": [ { "name": container_name, "command": ["echo", "hello", "world"], }, ], }, network_configuration={ "awsvpcConfiguration": { "subnets": test_context[SUBNETS_KEY], "securityGroups": test_context[SECURITY_GROUPS_KEY], "assignPublicIp": "ENABLED", }, }, )
Shell
๋ณต์‚ฌ
์œ„์™€ ๊ฐ™์ด launch_type, ์˜ค๋ฒ„๋ผ์ด๋”ฉํ•  ๋„์ปค ์ปค๋งจ๋“œ ๋˜ํ•œ ์ง€์ •์ด ๊ฐ€๋Šฅํ•Ÿ
๊ทธ๋ ‡๋‹ค๋ฉด Fargate spot์€ ์–ด๋–ป๊ฒŒ ๋„์šธ๊นŒ?
airflow ecs operator์˜ ์ฝ”๋“œ ๊ตฌํ˜„์„ ๋ณด๋ฉด ์•Œ ์ˆ˜ ์žˆ๋‹ค.
def __init__( self, *, task_definition: str, cluster: str, overrides: dict, launch_type: str = "EC2", capacity_provider_strategy: list | None = None, # ์—ฌ๊ธฐ group: str | None = None, placement_constraints: list | None = None, placement_strategy: list | None = None, platform_version: str | None = None, network_configuration: dict | None = None, tags: dict | None = None, awslogs_group: str | None = None, awslogs_region: str | None = None, awslogs_stream_prefix: str | None = None, awslogs_fetch_interval: timedelta = timedelta(seconds=30), propagate_tags: str | None = None, quota_retry: dict | None = None, reattach: bool = False, number_logs_exception: int = 10, wait_for_completion: bool = True, waiter_delay: int = 6, waiter_max_attempts: int = 1000000, # Set the default waiter duration to 70 days (attempts*delay) # Airflow execution_timeout handles task timeout deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ): super().__init__(**kwargs)
Shell
๋ณต์‚ฌ
capacity_provider_strategy๋ฅผ ๋ฐ›๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค
def _start_task(self): ... run_opts["capacityProviderStrategy"] = self.capacity_provider_strategy
Shell
๋ณต์‚ฌ
๋‹ค์‹œ ๊ทธ๋ฆผ์„ ๋ณด๋ฉด, ์œ„์™€ ๊ฐ™์ด ์„ค์ •์ด ๋˜์–ด ์žˆ๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค
๊ทธ๋Ÿผ ๊ฐ’์„ ์–ด๋–ป๊ฒŒ ํ˜•ํƒœ๋กœ ๋งŒ๋“ค์–ด์„œ ๋„ฃ์–ด์ฃผ์ง€? ํ•˜๋Š” ์ƒ๊ฐ์ด ๋“ค ์ˆ˜ ์žˆ๋‹ค (๋‚ด๊ฐ€ ๊ทธ๋žฌ๋‹คโ€ฆ)
๊ธฐ์–ตํ•˜๋ฉด ์ข‹์€ ์‚ฌ์‹ค์ด ์žˆ๋Š”๋ฐ, Airflow AWS ๊ด€๋ จ provider๋Š” boto3๋กœ ๊ตฌํ˜„์ด ๋˜์–ด ์žˆ๋‹ค
๊ทธ๋ ‡๋‹ค๋ฉด boto3์—์„œ ์–ด๋–ป๊ฒŒ ํ•˜๋Š”์ง€ ์ฐพ์•„๋ณด์ž
request syntax
์œ„์˜ ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด ๋ญ”๊ฐ€ Airflow์˜ ecs ์˜คํผ๋ ˆ์ดํ„ฐ์™€ ์ƒ๋‹นํžˆ ์œ ์‚ฌํ•˜๋Š” ์‚ฌ์‹ค์„ ๊นจ๋‹ฌ์„ ์ˆ˜ ์žˆ๋‹ค..!
capacityProviderStrategy=[ { 'capacityProvider': 'string', 'weight': 123, 'base': 123 }, ],
Shell
๋ณต์‚ฌ
์šฐ๋ฆฌ๊ฐ€ ๋„ฃ์„ ๊ฐ’์€ boto3์—์„œ ์œ„์™€ ๊ฐ™์ด ๋„ฃ๊ฒŒ ๋œ๋‹ค
์ž ์ด์ œ ์™„์„ฑํ•ด๋ณด์ž
prs_etl_pipeline_task = EcsRunTaskOperator( task_id='etl_pipeline_task', dag=etl_dag, on_failure_callback=task_fail_alert, aws_conn_id=aws_conn_id, cluster='etl-pipeline', task_definition='etl_pipeline', launch_type='FARGATE', overrides={}, wait_for_completion=True, capacity_provider_strategy=[{ 'capacityProvider': 'FARGATE_SPOT', 'weight': 1, 'base': 1 }], # datapipeline vpc network_configuration={ "awsvpcConfiguration": { "subnets": ["subnet-xxxcaxxxxxxxxx"], "securityGroups": ["sg-093xxxxxxx"], "assignPublicIp": "ENABLED" } }, awslogs_group=log_group_name, awslogs_region=aws_region, awslogs_stream_prefix=awslogs_stream_prefix, )
Shell
๋ณต์‚ฌ
์ž ์œ„์™€ ๊ฐ™์ด ์„ค์ •์„ ํ•˜๊ฒŒ ๋˜๋ฉด ๋œ๋‹ค
์œ„์™€ ๊ฐ™์ด ๋ฐฐํฌ๊ฐ€ ๋œ ๊ฒƒ์„ ํ™•์ธํ–ˆ๋‹ค.

๋ฐฐํฌ (๋ฒˆ์™ธ)

ํ•„์ž๋Š” ํ•œ ๋•Œ, ECS๋กœ Airflow๋ฅผ ๋ฐฐํฌํ•ด๋ณด๊ณ  ์‹ถ์—ˆ๋Š”๋ฐ, ๊ด€๋ฆฌ์ ์ด ๋„ˆ๋ฌด ์–ด๋ ต๊ณ  ๋งŽ๋‹ค๋Š” ๋ถ€๋ถ„์„ ๋“ค์–ด์„œ ํฌ๊ธฐํ–ˆ์—ˆ๋‹ค.
์•„๋ž˜ celery๋ฅผ ์ด์šฉํ•ด์„œ fargate ํ˜•ํƒœ๋กœ ๋ฐฐํฌํ•œ ๊ธ€๋„ ์•„๋ž˜์— ์žˆ๋‹ค.
๋ฒ„์ ผ์ด 1.10 ์ธ ๊ฒƒ์ด ํ ์ด๊ธด ํ•˜์ง€๋งŒ, ์ฐธ๊ณ ํ•  ๋งŒํ•˜๋‹ค.