Search

백준 데이터를 이용한 데이터 파이프라인 구축

백준 데이터들을 이용한 엔드 투 엔드 데이터 파이프라인 구축

[프로젝트 내용]
백준 코딩테스트를 준비하는 사람들을 위해 백준 문제 추천 봇을 디스코드로 제공합니다.
AI 모델을 통해 유저들에게 매일 정해진 시간에 유저 수준별 문제 추천 제공합니다
[담당 업무]
백준 문제 추천 봇의 서비스 데이터와 학습용 데이터를 제공하기 위한 데이터 파이프라인 운용
데이터 파이프라인을 운용하기 위한 데이터 인프라 구축 및 데이터 파이프라인 오케스트레이션 수행
깃허브 링크 : 프로젝트 레포

Crawling Pipeline (전체적인 아키텍처)

크롤링 파이프라인의 핵심은 Scrapy, BeautifulSoup, 그리고 Celery로 구성되어 있습니다.
동일 사이트에 대한 빈번한 접근으로 인한 IP 차단 문제를 해결하기 위해, Nginx 기반의 포워드 프록시를 구축하여 크롤링 성공률을 크게 개선했습니다.
크롤링 작업을 트리거하는 API 서버는 FastAPI로 구현했으며, 수집된 데이터는 PostgreSQL에 저장됩니다.
이 전체 크롤링 파이프라인은 Kubernetes 클러스터에 배포된 Apache Airflow를 통해 효율적으로 스케줄링 및 관리됩니다.

Airflow 구성 개요

Rocky Linux 기반 홈서버를 이용한 데이터 인프라 구축
Kubeadm을 이용한 standalone k8s 환경 구축
Airflow의 Http Operator와 Dynamic DAG을 이용한 크롤러 API 스케줄링
Helm을 통해 k8s executor Airflow 구축 및 동적 워커 구성
Git Sync를 통해 Dag 동기화

Raw Data 구성 (데이터 모델링)

유저 백준 문제 Raw Data 총 8개의 테이블로 구성
주요 정보는 아래와 같다
유저 메타 데이터 – Upsert
유저 문제 시퀀스
문제 풀이 시도
유저 현황
문제 메타 데이터 – Full Refresh
문제 정보
문제 정답률
크롤링할 데이터 셋의 갱신률을 기준으로 Upsert, Full refresh를 구분하여 데이터를 가져와 DB에 적재합니다
이 때, 유저와 같은 경우는, 유저의 문제 풀이 현황과 같은 데이터로 구성되어 데이터 갱신률이 높고 데이터를 가져오는데 시간이 오래 소요되기 때문에 유저 id 기반의 Upsert 방식으로 데이터를 적재합니다.
반대로 문제와 같은 경우는 문제 데이터와 같은 경우, 데이터의 갱신률이 높지 않고 한번 가져오는데 시간이 오래 걸리지 않기 때문에, 전체 데이터를 Full refresh 형태로 가져옵니다.

URL 기반의 분산 크롤링

유저의 메타 데이터는 단일 데이터당 크롤링 비용이 높습니다.
여기서 말하는 비용
단일 유저당 크롤링 시간
테이블 하나당 크롤링에 소요되는 시간
이러한 이유로 병렬처리를 고려하여 크롤링을 수행해야겠다는 생각을 했고, 여러 가지 방식을 고민하던 중, 메세지 큐 기반의 분산 처리를 도입하였습니다.
병렬 처리를 통해 효율성을 개선
⇒ 이 때, 병렬 처리 시, 중복 유저를 가져오지 않기 위해 메세지 큐 기반 분산 처리를 사용
1.
이미 DB에 적재되어 있는 유저들의 ID를 기반으로 크롤링할 URL을 만들어내어, 메세지 큐에 푸시합니다.
2.
Celery 내부에서 돌아가는 Scrapy 프로세스들이 Redis로부터 주소를 가져와 크롤링을 수행하게 됩니다
3.
이를 통해 기존의 순차적 방식 대비 크롤링 시간이 개선되었습니다 (50% 감소)
위와 같은 방식을 수행하게 되면, 병렬 처리 시, 같은 유저를 크롤링하는 일이 없어, 효율적으로 크롤링을 할 수 있게 됩니다. (멱등성 보장)

Airflow로부터 연산 분리 (Celery Crawler)

“스케쥴러는 스케쥴러 답게 사용”
Apache Airflow는 연산 작업을 수행하지 않도록 설계.
이 때, 에어플로우 내부에서 연산 작업을 하지 않고 외부 서버에서 크롤링과 같은 데이터 파이프라인 작업을 수행하게 합니다.
스케쥴러 서버의 부하 최소화
아파치 에어플로우가 본연의 스케쥴러로써의 기능만을 수행할 수 있도록 해줍니다.
아파치 에어플로우가 크롤링 파이프라인을 호출할 때, HTTP 오퍼레이터를 사용하여 외부 크롤링 서버를 호출하도록 했습니다.
HTTP Operator를 통해 크롤링 API를 호출만 한 후, 응답을 통해 반환된 task id를 이용하여 Task의 실행 여부를 Polling
“Batch 처리의 경중 구분”
크롤링 서버 내의 배치 태스크 간 경중을 구분했습니다.
여기서의 태스크의 경중은 API 처리에 수행되는 시간으로 구분을 했습니다.
API 처리가 5초 이상 넘어갈 경우에는, Celery가 비동기 처리할 수 있도록 구성했습니다. 이를 통해, 필요에 의해 단일 유저를 크롤링할 API가 호출되더라도, 대기 현상 없이 크롤링할 수 있었습니다.
FastAPI는 크롤러를 호출하는 API 용도
실질적인 크롤러는 Celery 내부에서 사용
무거운 Batch 처리는 Celery가 담당
Celery : Crawler Factory
ex) 전체 유저 시퀀스 크롤링
FastAPI는 응답이 필요하고, 시간이 짧은 크롤링만 수행
ex) 단일 유저 크롤링 (단일 유저가 푼 문제 시퀀스)