채용 공고 트렌드 분석을 위한 데이터 파이프라인 구축 프로젝트
프로젝트 목적
프로젝트 목적은 아래와 같습니다.
•
빠르게 변해가는 개발 직군 채용 트렌드를 직무별로 분석해보는 취지에서 시작하였습니다.
•
온라인 채용 공고에서 명시된 업무 내용, 자격 요건, 우대 사항, 기술 스택, 주소 등의 데이터를 활용하여 키워드 추출을 수행하였습니다.
•
이를 통해 현재 개발 분야의 채용 트렌드를 분석 및 시각화하였습니다.
프로젝트 아키텍처
전체적인 데이터 인프라 아키텍처
웹 스크래핑 (ETL)
데이터 소스
•
채용 플랫폼인 원티드, 랠릿, 점핏, 잡플래닛 에서 데이터를 수집했습니다.
•
개발자 도구의 네트워크 탭 분석을 통해 내부에서 사용하는 api 서버 주소를 확인한 뒤 GET 요청을 보내서 공고를 수집했습니다.
데이터 수집 및 분류
•
4가지 채용 플랫폼에서 공통으로 가져올 수 있는 정보를 선정한 뒤 json 형식으로 정보를 저장했습니다.
•
수집한 정보 : 공고 id, 회사명, 주소, 자격 요건, 우대 사항, 주요 업무, 복지, 기술 스택 …
•
플랫폼마다 직군을 분류하는 카테고리가 다르기 때문에 대분류, 중분류, 소분류로 세분화 해서 공고를 분류했습니다.
아래는 분류 사항의 일부입니다.
스크래퍼 서버
•
스크래퍼는 airflow 서버와 별개의 ec2 인스턴스에서 FastAPI 서버를 통해 동작합니다.
이는 스크래핑에 사용되는 리소스를 분리함으로써, airflow 서버는 오직 DAG를 실행시키는 것에만 집중하게 하기 위해서 입니다.
각 스크래퍼 별 url에 airflow 서버가 httpOperator를 통해 GET 요청을 날려서 트리거하는 방식입니다.
비동기 작업과 FastAPI
•
플랫폼의 api 서버를 호출해 몇천~몇만건의 정보를 가져오는 작업은 네트워크 연결에 대부분의 시간이 소요되는 I/O intensive 한 작업입니다.
이 작업을 동기적으로 처리했을 때는 많은 시간이 걸렸습니다.
더 빠르게 작업을 수행하기 위해 스크래핑에 async/await 키워드를 사용해 비동기적으로 정보를 가져오도록 코드를 수정했습니다.
•
아래는 스크래퍼 코드의 일부입니다.
async def scrape_category(self) -> List[Dict]:
"""
카테고리별 공고를 크롤링
"""
async with aiohttp.ClientSession() as session:
position_ids = await self.get_position_ids(session)
tasks = []
for i, position_id in enumerate(position_ids):
task = asyncio.create_task(
self.get_position_detail(position_id, session)
)
tasks.append(task)
await asyncio.gather(*tasks)
return self.jobs
---------------------------------------------------------------------------
@router.get("/scrape-jumpit")
async def jumpit_scrape_jobs() -> Dict[str, str]:
...
tasks = []
for category_id, category_name in JumpitScraper.job_category_dict.items():
scraper = JumpitScraper(category_id, category_name)
task = scraper.scrape_category()
tasks.append(task)
data_list = await asyncio.gather(*tasks)
...
Python
복사
async/await 키워드를 사용해 비동기적으로 처리하게 한 결과, 스크래핑 속도는 몇 시간 단위에서 몇 분 단위로 급격하게 줄어드는 것을 확인할 수 있었습니다.
•
이러한 코드를 실행시키기 위한 웹 프레임워크로 FastAPI를 선택했습니다.
FastAPI는 flask와 비슷한 웹 프레임워크입니다. 하지만 빠른 속도와 비동기 처리 등의 modern python을 더 잘 지원한다는 장점이 있어서 FastAPI를 선택했습니다.
secret manager 사용한 보안 관리
def get_secret():
"""
AWS Secrets Manager를 이용해 환경변수를 불러옵니다.
"""
...
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=REGION_NAME
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
raise e
...
Python
복사
•
기존에는 환경변수나 시크릿 키 등을 담는 .env 파일을 각자 만들어서 관리하는 방식이었습니다.
이러한 방식은 보안상 취약하며 관리하기도 번거로운 단점이 있습니다.
•
프로젝트 대부분이 aws 인프라 위에서 동작하므로 aws secret manager 서비스를 사용해 시크릿을 관리하기로 결정했습니다.
glue crawler 호환성을 고려한 s3 file path 설정
@staticmethod
def upload_to_s3(file_path: str, bucket_name: str, access_key: str, secret_key: str, region_name: str) -> None:
'''Uploads the specified file to an AWS S3 bucket.'''
today = date.today()
year = str(today.year)
month = str(today.month).zfill(2)
day = str(today.day).zfill(2)
file_name = f"jumpit/year={year}/month={month}/day={day}/jumpit.json"
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name)
s3.upload_file(file_path, bucket_name, file_name)
path_name = os.path.join(bucket_name, file_name)
print(f"End Upload to s3://{path_name}")
Python
복사
•
수집한 데이터를 S3에 …/year=/month=/day=/~.json 형식으로 저장합니다.
이렇게 하는 이유는 glue crawler를 사용할 때 partition을 잡아주기 위함입니다.
•
S3에 날짜 포맷을 맞춰서 저장하는 것 만으로도 자동으로 파티션이 생성되므로, 이후 glue job, athena 등의 서비스를 통해 손쉽게 데이터를 분석할 수 있었습니다.
CI/CD
•
CI/CD는 github actions와 aws code deploy를 같이 사용했습니다.
•
배포 방식은 다음과 같습니다.
1.
main 브랜치에 pr이 merge 되면 action을 트리거합니다.
2.
action을 사용해 전체 레포를 tar.gz 파일로 압축한 뒤 s3에 전송합니다.
3.
s3에 전송된 압축 파일을 code deploy가 타겟 그룹에 전송합니다.
4.
서버에 ssh action을 사용해 접속합니다.
5.
.env 파일을 github secret을 참조해 생성해 환경변수를 세팅합니다.
6.
서버에서 동작하고 있는 docker container들을 모두 내린 뒤, docker compose build & up 합니다.
Airflow
Custom Operator
저희는 크게 Http, Athena, Redshift 관련해서 Airflow 커스텀 오퍼레이터를 만들었습니다.
HTTP Custom Operator 개발
HTTP 커스텀 오퍼레이터를 개발한 이유는 다음과 같습니다.
1.
Scraper 서버 API 요청은 response 타임이 짧을 때는 몇 분이 걸리지만, 길 때는 2시간까지 걸릴 수 있습니다.
2.
기존의 SimpleHttpOperator는 timeout이 존재합니다. Timeout으로 인해 response를 받아오지 못하면 DAG가 Fail 처리 됩니다.
만약 HTTP 요청의 응답이 올 때까지 대기하게 하려면 HttpHook의 run 메서드에서 요청 시 timeout을 무한대로 설정했습니다.
이를 위해 2가지를 수정해야 했습니다. HttpHook과 SimpleHttpOperator 입니다.
1.
이를 위해 먼저, HttpHook의 run 메서드에서 사용되는 session.request() 함수의 timeout 매개변수를 설정하지 않거나 큰 값을 주면 timeout을 무한대로 설정할 수 있습니다.
a.
HttpHook의 run 메서드를 찾아서 timeout을 무한대로 설정합니다.
b.
아래 hook의 run 메서드를 간소화시키는 방향으로 처리합니다.
2.
SimpleHttpOperator의 execute 메서드에서 예외 처리를 추가하여 response_check가 False를 반환하더라도 예외가 발생하지 않도록 합니다.
아래는 해당 변경 사항을 반영한 코드 일부입니다
•
httphook + simplehttpoperator을 같은 모듈 파일에 넣어 플러그인으로 생성하였습니다.
...
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import SimpleHttpOperator
...
import requests
## HttpHook을 오버라이딩하여 Timeout을 None으로 처리
class InfiniteTimeoutHttpHook(HttpHook):
def run(self, endpoint, data=None, headers=None, extra_options=None):
with self.get_conn() as session:
extra_options = extra_options or {}
extra_options.setdefault('timeout', None) # Timout 수정
url = self.base_url + endpoint
self.log.info("Sending %s to %s with timeout %s", self.method, url, extra_options.get("timeout", "not set"))
response = session.request(self.method, url, **extra_options)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise AirflowException(
f"HTTP error: {err.response.reason}, error code: {err.response.status_code}"
)
return response
class CustomSimpleHttpOperator(SimpleHttpOperator):
def execute(self, context):
http = InfiniteTimeoutHttpHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Calling HTTP method")
response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
if self.response_check:
check_result = self.response_check(response)
self.log.info(f"Response check result: {check_result}")
if self.xcom_push_flag:
return response.text
if self.log_response:
self.log.info(response.text)
Python
복사
◦
향후, 실제 개발에서는 timeout을 적절하게 설정하고, 재시도 로직을 추가하는 것이 바람직하기 때문에 개선이 필요합니다.
AWS Athena, Redshift Custom operator
Athena | Redshift | |
__init__ 메서드 | O | O |
execute 메서드 | O | O |
on_kill 메서드 | O | X |
Athena, Redshift의 custom operator를 만든 이유는 아래와 같습니다
•
aws provider가 airflow에서 공식으로 지원하는 라이브러리가 아닙니다.
•
operator 동작 원리 이해하고, 프로젝트에 필요한 최소한의 기능을 갖춘 관리 가능한 operator를 만들기 위함입니다. 이를 BaseOperator를 사용하여 구현하였습니다.
•
또한 아래 2개의 DB 커넥션을 excute 메서드 안에서 구현하였습니다. 생성자 내에서 DAG 파싱 시 호출로 인해, 빈번한 연결을 지양하게 하기 위해서입니다.
•
저희 프로젝트에서는 AWS 서비스에 접근하기 위해 boto3를 이용하였습니다.
◦
boto3를 사용하여 AWS의 클라이언트에 접근하여 Athena, Redshift에 쿼리를 날리고 모니터링하는 작업을 구현하는 것에 초점을 맞췄습니다.
•
코드가 길기 때문에 아래에서는 각 커스텀 오퍼레이터 내부 주요 메서드에 대한 설명을 하도록 하겠습니다
•
AthenaCustomOperator
◦
해당 오퍼레이터를 이용하여 Athena를 이용한 데이터 마트 생성에 필요한 베이스 테이블들을 생성합니다. 테이블 생성에 필요한 쿼리를 날리는 목적으로 오퍼레이터가 만들어졌습니다.
◦
__init__ 메서드 : 이 생성자 함수에서는 인스턴스가 초기화될 때 필요한 여러 매개변수들을 설정하였습니다
◦
execute 메서드 : AWS Athena에서의 주요 동작을 정의합니다.
▪
Airflow에서 제공되는 AwsHook() 메소드를 통해 생성한 연결 정보를 통해 boto3를 이용하여 start_query_execution 메소드를 통해 query를 요청하는 방식입니다.
▪
이를 쿼리 요청부터, 상태 모니터링, 상태 반환까지 진행합니다.
▪
기존의 코드와 달리 특정 시간 이상 query 실행이 지속될 경우 동작을 종료하는 구문을 추가했습니다.
◦
on_kill 메서드 : Airflow에서 DAG 혹은 task 작업이 종료될 시, kill 시그널을 보낼 수 있도록 설정했습니다.
•
RedshiftCustomOperator
◦
기존의 provider 내부 RedshiftDataOperator와 유사하게 작성하되, 프로젝트에서 필요한 기능인 데이터 마트 테이블 생성에 초점을 두었습니다. 즉, redshift와 연결하여 쿼리를 실행하는 프로세스가 메인입니다.
◦
__init__ 메서드
▪
기본 속성을 선언해주는 것 외에 아래의 변수들을 추가했고, 이 변수들은 아래에서 나올 execute()와 hook()에서 활용될 db 연결 및 redshift query 요청 시 필요한 변수들로 확인됩니다.
◦
execute 메서드
▪
AwsHook 클래스를 통해 aws 연결을 시도하고 있으며, boto3를 이용하여 'redshift-data' 클라이언트에 접근하여 execute_statement 메소드를 통해 query를 요청하는 방식이며, Redshift Serverless를 사용하기 때문에 생성자에서 선언한 workgroup_name 등이 이때 활용됩니다.
▪
기존의 코드와 달리 특정 시간 이상 query 실행이 지속될 경우 동작을 종료하는 구문을 추가했습니다.
Airflow의 병렬성 옵션 사용
•
저희 프로젝트에서 진행되는 태스크들은 동시에 병렬적으로 진행되는 DAG들이 꽤나 존재합니다. 하지만 저희는 EC2 내부에 Airflow를 배포했기 때문에 한정된 자원을 이용해야합니다.
•
t3.xlarge 인스턴스는 일반적으로 4 vCPU와 16 GiB의 메모리를 가집니다
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__PARALLELISM: 3
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 4
Python
복사
◦
한정된 자원 속에서 task의 병렬성을 유지하기 위해 위와 같이 airflow의 병렬 옵션을 수정하였습니다.
•
또한 celery 큐를 사용하기 때문에 워커의 스케일 아웃을 하여 워커의 idle time 최소화를 이끌어냈습니다. 이를 위해 워커를 최대 2개까지 사용하도록 설정했습니다.
•
flower를 통해 워커 간 처리가 어떻게 진행되는지 모니터링을 했습니다.
DAG List
아래 DAG는 번호 순서대로 수행이 됩니다.
크게 데이터 스크래핑, Glue 관련 ETL 잡, Redshift 관련 ELT 프로세스로 나뉩니다.
1.
{platform_name}_job_api_scraper_dag
•
예시로 jobplanet_job_api_scraper_dag 그래프를 가져왔습니다. jobplanet 자리에 각 플랫폼명이 들어갑니다. 각 플랫폼의 API Scraper를 trigger 합니다. HTTP sensor를 통해 API health check를 진행한 이후, HTTP custom operator를 작동시킵니다. 이를 통해 응답까지 오래 걸리는 Scraper API 호출을 Airflow에서 처리할 수 있습니다.
•
자정에 플랫폼 별로 수행되도록 스케쥴링을 했습니다.
2.
glue_crawler_dag
: Glue (Crawler, ETL job : pandas, spark) 관련된 내용을 트리거링하는 DAG입니다. 성립 조건이 어려운 Sensor보다는 DagTrigger를 선택하였습니다.
a.
빨강색 (1)번 : 각 플랫폼별 스크래핑 데이터를 AWS Glue crawler를 trigger 합니다.
b.
glue_etl_job_dag : 1차 전처리 Glue job을 trigger 합니다.
c.
빨강색 (2)번 : 1차 전처리 결과에 대해 Glue crawler를 trigger 합니다.
d.
glue_nlp_job_dag : 2차 전처리 Glue job을 trigger 합니다.
e.
glue_crawler_dag (3) : 2차 전처리 결과에 대해 Glue crawler를 trigger 합니다.
3.
athena_query_dag
•
Amazon Athena 쿼리들을 trigger 합니다. 여기서 Array의 경우 unnesting을 진행합니다. 이를 통해 베이스 스키마 테이블을 만듭니다. (Star Schema)
•
Drop ~ Create 의 Full refresh 형태
•
여기서 만들어진 테이블들을 이용하여 8번 마트 DAG를 수행합니다.
4.
redshift_elt_query_dag
•
마지막으로 Amazon Redshift Spectrum ELT 쿼리를 trigger합니다. 데이터 마트 테이블을 생성합니다. 이 때, 스키마가 생긴 후, 근간이 되는 베이스 테이블을 먼저 만들고, 해당 테이블을 조인해서 생기는 마트 테이블들을 생성하게끔 DAG를 작성했습니다.
데이터 전처리 (ETL, 정규화 테이블)
Glue 사용을 위하여 Custom Glue 정책을 생성합니다.
1.
Custom Glue 정책
2.
Glue Role 생성
우선 위와 같이 IAM role을 설정해주었습니다.
Glue Crawler
•
S3 bucket에 있는 데이터를 Glue Data Catalog tables로 옮기는 역할을 합니다.
•
Partition(날짜)를 기반으로 하여 테이블 스키마를 자동으로 생성하도록 했습니다.
◦
크롤러는 크게 3가지로 나뉘는데, 1) 플랫폼별 크롤러, 2) 1차 전처리 크롤러, 3) 2차 전처리 크롤러입니다.
◦
Classifiers는 크롤러 내부 세부 조건을 지정할 수 있습니다.
json(de-1-1-json-classifier), parquet(DE1_1_classifier) 2가지로 나뉩니다.
◦
플랫폼별 크롤러의 경우, $.results[*] 를 json path로 설정하였습니다.
◦
1차 및 2차 전처리 크롤러의 경우, part-%{INT:part_number}-%{UUID:uuid}-c%{INT:partition}.snappy.parquet 를 Grok patterns로 설정하였습니다.
•
크롤러를 통해 생기는 데이터 카탈로그 (year/month/day partition key 칼럼)
scraped data
1st preprocessed
2nd preprocessed
Glue ETL jobs
Glue ETL을 사용한 이유
•
AWS Glue는 서버리스 서비스이며, Spark를 지원합니다. Amazon EMR과 다르게 프로비저닝 등을 고려하지 않습니다.
•
또한 Glue Crawler를 통해 Partition key 기반으로 생성된 Catalog를 읽어와 작업이 가능합니다.
Glue ETL jobs에서 수행한 작업
•
Python Shell script editor를 사용하여 1차 전처리와 2차 전처리 작업 코드를 작성하였습니다. 각 처리 과정의 개요는 아래 표와 같습니다.
1차 전처리 (glue spark) | 2차 전처리 (glue pandas) |
- 일부 플랫폼은 coordinate가 null → 카카오맵 API를 통해 채워넣기
- 다른 포맷의 채용 정보들을 통합할 수 있는 기준을 마련하기 위해, 대/중/소분류를 연결 | 텍스트 덩어리에서 명사만을 추출해서 의미있는 데이터를 뽑아내기 위해, 외부 라이브러리인 kiwipiepy (한국어 형태소 분석기)를 활용 |
worker : 10 (DPU) | pandas보다 spark에서 시간이 더 오래걸림 (DPU 5)
→ Glue 비용 축소를 위해 DPU 조정
→ Glue를 읽어올 때와, parquet 저장할 때 외에는 pandas dataframe을 사용
→ 즉, master node에서 작업이 수행됨 |
JSON이었던 파일을 위와 같은 전처리 수행 후 s3 버킷에 parquet로 저장 | 1차 전처리 이후 데이터를 위와 같은 방법으로 NLP 전처리하여 최종적으로 s3 버킷에 parquet로 한 번 더 저장 |
s3에 저장 시, 파티션을 하나로 축소한 후, year, month, day를 파티션 키로 잡아 버킷 내부 파티셔닝 수행 | s3에 저장 시, 파티션을 하나로 축소한 후, year, month, day를 파티션 키로 잡아 버킷 내부 파티셔닝 수행 |
위 파티셔닝을 통해 Incremental update가 되도록 수행 |
1차 전처리 상세 설명
•
coordinate(위도, 경도) 컬럼 값이 null인 플랫폼(잡플래닛, 점핏)을 대상으로 spark udf와 카카오맵 API를 통해 값을 채워넣습니다.
# ...
@udf(returnType=ArrayType(FloatType()))
def get_coordinate_from_location(location, KAKAO_API_TOKEN=KAKAO_API_TOKEN):
headers = {'Authorization': 'KakaoAK ' + KAKAO_API_TOKEN}
"""
카카오 API를 통해 location으로부터 coordinate(lat, lon) 리스트를 반환합니다.
"""
if location is None:
return None
url = f'https://dapi.kakao.com/v2/local/search/address.json?query={location}'
try:
response = requests.get(url, headers=headers, timeout=5)
result = json.loads(response.text)
match_first = result['documents'][0]['address']
return [float(match_first['y']), float(match_first['x'])]
except (requests.exceptions.RequestException, TypeError, ValueError, KeyError, IndexError) as e:
print(f'Error occurred: {e} while fetching address: {location}')
# ...
Python
복사
•
플랫폼별로 모두 사전에 수작업으로 정의한 대분류와 중분류를 소분류에 연결합니다.
# ...
categories = [
("WEB", "서버/백엔드 개발자", ["서버 개발자", "자바 개발자", "Node.js 개발자", "PHP 개발자", "웹 개발자", "루비온레일즈 개발자", ".NET 개발자", "백엔드 개발", "웹개발", "BACKEND_DEVELOPER", "서버/백엔드 개발자", "웹 풀스택 개발자"]),
("WEB", "프론트엔드 개발자", ["프론트엔드 개발자","프론트엔드 개발","FRONTEND_DEVELOPER"]),
("WEB", "웹 퍼블리셔", ["웹 퍼블리셔","웹퍼블리셔"]),
("GAME", "게임 개발자", ["게임개발", "게임 클라이언트 개발자", "게임 서버 개발자"]),
("GAME", "VR/AR/3D", ["VR 엔지니어", "그래픽스 엔지니어", "VR/AR/3D,게임 클라이언트 개발자"]),
("DATA", "데이터 사이언티스트", ["데이터 사이언티스트", "DATA_SCIENTIST"]),
("DATA", "데이터 엔지니어", ["데이터 엔지니어", "빅데이터 엔지니어", "DATA_ENGINEER"]),
("DATA", "데이터 분석가", ["BI 엔지니어", "데이터 분석가", "DATA_ANALYST"]),
("DATA", "AI 엔지니어", ["머신러닝 엔지니어", "영상,음성 엔지니어", "MACHINE_LEARNING", "인공지능/머신러닝"]),
("DATA", "DBA", ["DBA", "빅데이터 엔지니어,DBA"]),
("MOBILE", "안드로이드 개발자", ["안드로이드 개발자", "안드로이드 개발", "ANDROID_DEVELOPER"]),
("MOBILE", "iOS 개발자", ["iOS 개발자", "iOS", "IOS_DEVELOPER", "IOS 개발자"]),
("MOBILE", "크로스 플랫폼 모바일 개발자", ["크로스플랫폼 앱 개발자", "크로스플랫폼 앱개발자", "CROSS_PLATFORM_DEVELOPER"]),
("SUPPORT", "PM", ["개발 매니저", "프로덕트 매니저", "AGILE_SCRUM_MASTER", "인공지능/머신러닝,개발 PM"]),
("SUPPORT", "QA 엔지니어", ["QA,테스트 엔지니어", "QA", "QA_ENGINEER", "QA 엔지니어"]),
("SUPPORT", "기술지원", ["기술지원", "SUPPORT_ENGINEER"]),
("DEVSECOPS", "데브옵스/인프라 엔지니어", ["DevOps / 시스템 관리자", "시스템,네트워크 관리자", "네트워크/보안/운영", "클라우드 개발", "DEV_OPS", "INFRA_ENGINEER", "devops/시스템 엔지니어"]),
("DEVSECOPS", "정보보안 담당자", ["보안 엔지니어", "CIO,Chief Information Officer", "SECURITY_ENGINEER", "정보보안 담당자"]),
("SW/HW/IOT", "HW/임베디드 개발자", ["임베디드 개발자", "하드웨어 엔지니어", "하드웨어 개발", "HARDWARE_EMBEDDED_ENGINEER", "HW/임베디드"]),
("SW/HW/IOT", "소프트웨어 개발자", ["소프트웨어 엔지니어", "파이썬 개발자", "C,C++ 개발자", "소프트웨어 개발", "소프트웨어아키텍트", "SOFTWARE_ENGINEER", "SW/솔루션"]),
("ETC", "블록체인 엔지니어", ["블록체인 플랫폼 엔지니어", "BLOCKCHAIN_ENGINEER", "프론트엔드 개발자,블록체인"]),
("ETC", "기타", ["ERP전문가", "CTO,Chief Technology Officer", "CTO", "ERP", "etc"])
]
data_list = []
for major_category, middle_category, job_list in categories:
for sub_category in job_list:
data_list.append((major_category, middle_category, sub_category))
schema = StructType([
StructField("major_category", StringType(), True),
StructField("middle_category", StringType(), True),
StructField("sub_category", StringType(), True)
])
mapping_df = spark.createDataFrame(data_list, schema=schema)
df_with_mapped_categories = df_final.join(mapping_df, df_final.category == mapping_df.sub_category, "left")
# ...
Python
복사
2차 전처리 상세 설명
•
외부 라이브러리인 kiwipiepy 한국어 형태소 분석기를 활용하여, 문장에서 한글 명사 및 영어 명사를 분리하는 작업을 아래 코드와 같이 수행합니다.
◦
kiwipiepy 라이브러리를 사용한 이유
▪
비슷한 라이브러리로는 Konlpy 등이 있는데, 이들은 의존성 문제가 복잡하므로 상대적으로 이러한 단점이 덜한 kiwipiepy를 사용했습니다.
# ...
from kiwipiepy import Kiwi
import pandas as pd
# ...
def extract_korean_noun(kiwi, text):
if text is None or text.strip() == "":
return []
result = kiwi.tokenize(text)
return [token.form for token in result if token.tag in {'NNG', 'NNP'}]
def extract_english_noun(kiwi, text):
if text is None or text.strip() == "":
return []
result = kiwi.tokenize(text)
return [token.form for token in result if token.tag == 'SL']
# ...
kiwi = Kiwi()
result_df['preferred_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['preferred']), axis=1)
result_df['required_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['required']), axis=1)
result_df['primary_responsibility_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['primary_responsibility']), axis=1)
result_df['welfare_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['welfare']), axis=1)
result_df['preferred_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['preferred']), axis=1)
result_df['required_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['required']), axis=1)
result_df['primary_responsibility_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['primary_responsibility']), axis=1)
result_df['welfare_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['welfare']), axis=1)
# ...
Python
복사
•
2차 전처리에서 Spark를 사용하지 않고 Pandas로 처리한 이유
◦
Spark UDF(User Defined Function) 사용 시, 속도가 굉장히 느려지는 현상을 발견했습니다.
◦
Spark의 모든 워커 노드에서 kiwi 객체를 생성하고 처리하는 과정에서 시간이 오래 걸립니다.
▪
kiwipiepy 라이브러리의 Kiwi 객체는 직렬화할 수 없습니다.
▪
Spark의 장점인 병렬처리를 활용하기 어렵습니다.
→ 위와 같은 이유로, pandas를 사용해 하나의 노드에서만 처리하도록 수정하였습니다. 사용하는 워커 노드의 수가 줄어들어 금전적 비용도 줄어들고, 소요 시간도 줄어드는 효과를 얻었습니다.
정규화 테이블 (Amazon Athena with Star Schema)
•
Athena Full Access 정책을 사용합니다.
•
◦
해당 프로젝트에서는 parquet 내부에 토큰화된 단어들의 집합을 Array 형태로 칼럼 내부에 관리했습니다.
◦
기존에 Array 타입이었던 column에 대해 Unnesting 작업을 아테나로 수행합니다. 이후 nested 칼럼은 drop합니다.
◦
daily_jd_table 을 기준으로 하여, 각 테이블은 job_id 와 platform 으로 연결되어있습니다. company_deatail 테이블의 경우 company 로 연결되어있습니다. 이 모든 테이블들은 베이스 스키마로써 활용됩니다.
•
가끔 Athena 쿼리 대시보드가 작동 안할 때가 존재하기 때문에 아래 토글 정책을 추가하셔도 됩니다.
DE-1-1-SQL-Workbench
데이터 마트와 대시보드
Redshift를 사용할 때, 사용되는 Role은 아래와 같고, DE-1-1 사용자에 sts:AssumeRole 을 부여했습니다.
Redshift와 Redshift Spectrum을 활용한 데이터 마트 구축
Redshift Spectrum을 활용하여 데이터 카탈로그의 Athena 테이블을 외부 테이블 구축하고, 해당 테이블 간의 비정규화 과정을 거쳐 Redshift에 데이터 마트를 구축했습니다.
Redshift Spectrum을 사용한 이유
외부 테이블 활용의 편의성
•
glue와 Athena로 구축한 데이터 카탈로그와 S3 데이터를 직접 로드하지 않고, 쿼리를 요청할 수 있습니다.
•
Redshift 외부테이블에서 Athena에서 정규화 과정을 통해 구축한 테이블의 변화를 자동으로 감지하여, 별도의 조작없이 지속적으로 업데이트 된 데이터에 접근할 수 있습니다.
확장성
•
Athena 테이블을 데이터 마트로 재구성하여 대시보드와 연동할 수 있지만, 차후 데이터 증가에 따른 스케일 아웃 등의 확장성을 고려하여 Redshift로 구축했습니다.
데이터 마트 구축 과정
외부 스키마 생성
Redshift 내의 데이터 마트 테이블을 위한 analytics 스키마를 생성합니다. 아래 쿼리문을 통해 데이터 카탈로그에 있는 Athena 테이블을 Redshift Spectrum이 바라볼 수 있도록 설정했습니다.
CREATE EXTERNAL SCHEMA IF NOT EXISTS raw_data_external
FROM data catalog
DATABASE 'de1_1_database'
CREATE EXTERNAL DATABASE IF NOT EXISTS;
SQL
복사
데이터에서 도출 가능한 인사이트
좀더 정교한 데이터 마트 테이블 스키마를 설계하기 위해서 수집한 데이터에서 추출 가능한 인사이트는 어떤 것이 있을지 논의했습니다. 프로젝트를 진행한 목적에 맞는 인사이트를 추출한 결과는 다음과 같습니다.
•
전체 채용공고 중 직무별로 차지하는 비중은 어떻게 될 것인가?
•
희망하는 직무의 회사는 주로 어느 지역에 위치하는가?
•
지금 채용 중인 공고는 어떤게 있을까?
•
직무별로 많이 사용하는 기술은 어떤게 있을까?
•
채용공고의 자격 요건, 우대 사항, 기술 스택 등에서 자주 등장하는 키워드는 무엇인가?
데이터 마트 테이블 구축
추출한 인사이트를 바탕으로 데이터 마트 테이블 스키마를 설계했으며, 데이터 마트 테이블은 Redshift Spectrum가 참조하고 있는 외부 테이블의 비정규화 과정을 거친 후 구축했습니다. 비정규화 과정에서는 테이블 간의 join을 통해 중복된 값을 데이터 마트에 허용하고, 연산을 수행한 값을 적재함으로써 쿼리 성능을 향상시켰습니다. 또한, 데이터 마트 테이블 생성은 데이터의 볼륨 크기를 고려하고, 업데이트 로직의 간소화를 위해 Full-refresh 방식으로 진행했습니다.
데이터 마트의 스키마는 다음과 같습니다.
ELT 파이프라인
앞서 설명한 외부 테이블 생성과 데이터 마트 테이블 구축 과정은 Airflow의 DAG(redshift_elt_query_dag)를 통해 ELT 파이프라인으로 구축했습니다. Full-refresh을 위한 DROP, CTS 쿼리 구문을 sql 파일로 작성했으며, 제작한 Redshift Operator를 통해 해당 쿼리를 실행하는 방식으로 테이블들을 삭제, 생성합니다.
일부 테이블에는 테이블 간 종속성 문제가 존재하여, Airflow DAG를 통해 task 순서를 명시적으로 지정하여 이를 해결했습니다. 아래는 활용한 CTS 쿼리 중 일부이며, 이외 다른 쿼리는 깃허브에서 자세히 확인할 수 있습니다.
CREATE TABLE "analytics"."unique_jds" AS (
SELECT
DISTINCT platform
, job_id
, title
, company
, major_category
, middle_category
, sub_category
FROM "raw_data_external"."daily_jd_table"
);
CREATE TABLE "analytics"."unique_jd_skills" AS (
SELECT
u.major_category,
u.middle_category,
u.platform,
u.job_id,
js.unnested_skill as skill
FROM "analytics"."unique_jds" AS u
JOIN "raw_data_external"."jd_skills" AS js
ON u.platform = js.platform
AND u.job_id = js.job_id
);
Python
복사
데이터 대시보드 구축
대시보드 설계 과정
본격적인 대시보드 제작에 앞서, 논의한 인사이트와 구축한 데이터 마트 테이블을 바탕으로 대시보드 설계 테이블을 작성했습니다. 대시보드 설계 테이블은 시각화 주제와 차트 그리고 시각화에 필요한 테이블과 컬럼 등 대시보드 구성 내용을 담고 있습니다.
주제 | 활용 차트 | 테이블명 | 컬럼명 |
직무 카테고리별
공고수
(대분류) | Pie Chart | unique_category_count_major | major_category
count |
직무 카테고리별
공고수
(대분류/중분류/소분류) | Tree Map | unique_category_count_major_middle | major_category
middle_category
sub_category
count |
일자별 수집한 공고 수 | Trendline | unique_daily_job_posting_count | day
count |
기술별로 사용되는 직무 | Bar Chart | unique_jd_skills | major_category
middle_category
skill |
전체 회사 위치도 | Map | unique_company_coordinates | lat
lon |
직무 카테고리별
회사 위치 | Map | unique_company_coordinates | lat
lon
major_category
middle_category
sub_category |
직무 카테고리별
마감 임박 공고리스트 | table | unique_upcoming_deadline_jobs_7days | company
title
url
end_at
primary_responsibility |
직무 카테고리별
채용공고 사용기술에
포함된 기술스택
키워드 언급량 | Bar Chart | unique_preferred_english | preferred |
직무 카테고리별
채용공고 자격요건에
포함된 기술스택
키워드 언급량 | Bar Chart | unique_required_english | required |
직무 카테고리별
채용공고 주요업무에
포함된 기술스택
키워드 언급량 | Bar Chart | unique_primary_responsibility_english | primary_responsibility |
대시보드 플랫폼 선정
대시보드 플랫폼은 데이터 마트 플랫폼인 Redshift 연동과 대시보드 설계 테이블의 차트 종류, 계산식 지원 등의 기준을 두고 검토했으며, 비교적 러닝커브가 적고 무료 프로젝트 기능이 지원되는 슈퍼셋으로 선정했습니다.
대시보드 구축 결과
프로젝트 대시보드는 크게 모든 직무의 통합적인 정보를 제공하는 파트와 직무 카테고리별로 필터를 적용할 수 있는 상세 파트로 구성되어 있습니다.
직무 통합 대시보드
직무 통합 대시보드는 수집한 다양한 채용 플랫폼의 데이터를 종합하여 직무 별로 채용 중인 공고 수, 회사 위치, 기술스택 등을 종합적으로 확인할 수 있습니다.
간략히 분석 내용을 정리하자면 직무 대분류 기준으로 WEB 채용공고가 48%로 가장 많았으며, SW/HW/IOT 채용공고가 약 21%로 두번째 많았으며 나머지 직무에서는 DATA > DEVSECOPS > MOBILE > SUPPORT > ETC > GAME 순으로 확인되었습니다. 또한, 채용 회사의 97% 이상이 서울, 경기 등 수도권 지역에 위치했습니다. 채용공고에서 언급되는 기술스택 키워드로 Javascript가 1위였으며, 그 다음으로 AWS > JAVA > REACT > PYTHON 등이 자주 언급되었습니다.
직무 카테고리별 상세 정보 대시보드
직무 카테고리별 상세 정보 대시보드에서는 대분류, 중분류, 소분류에 따라 공고 수, 회사 위치, 마감이 임박한 채용공고를 확인할 수 있으며, 채용공고의 자격요건, 주요업무, 사용기술에 포함되어 있는 기술 스택을 언급량 순으로 보여주고 있습니다. 보다 자세한 구성과 내용은 <대시보드 사용 영상> 링크를 통해 확인할 수 있습니다.
이벤트 모니터링
AWS 인프라에서 발생하는 이벤트중 원하는 이벤트만 필터링해, 작업이 잘 진행되고 있는지 확인할 수 있도록 슬랙에 알림을 보내주도록 구성했습니다.
추적하고자 하는 이벤트
프로젝트에서 이벤트를 추적해 확인하고자 하는 사항은 아래와 같습니다.
1.
웹 스크래핑의 결과인 json 파일이 s3에 저장되었는지
2.
s3에 저장된 데이터가 갑자기 삭제되지는 않았는지
3.
실행시킨 glue job이 어떤 상태로 종료되었는지 (SUCCEEDED, FAILED, TIMEOUT …)
사전 설정
aws의 거의 모든 이벤트는 계정에 하나씩 할당되어있는 default 이벤트 버스로 전송됩니다.
하지만 s3와 lambda 등의 특정 서비스의 이벤트는 데이터 이벤트에 속하는데, 데이터 이벤트는 자동으로 전송되지 않아 별개의 설정을 해줘야 합니다.
S3의 버킷의 속성 탭에 들어가서 EventBridge로 알림 전송을 활성화 해주면, 해당 버킷에서 발생하는 이벤트들이 default 이벤트 버스로 전송됩니다.
Event 처리 방식
default 이벤트 버스로 전송된 이벤트들은 이벤트 규칙에 필터링된 후에 이벤트 타겟으로 전송됩니다.
프로젝트에서 설정한 이벤트 타겟은 람다 함수입니다.
람다는 이벤트를 받아서 이벤트 메타 데이터를 가져와 메시지를 생성한 뒤 슬랙 채널로 전송합니다.
Event Rules
아래는 Glue Job이 생성하는 샘플 이벤트입니다.
{
"version": "0",
"id": "abcdef00-1234-5678-9abc-def012345678",
"detail-type": "Glue Job State Change",
"source": "aws.glue",
"account": "123456789012",
"time": "2017-09-07T18:57:21Z",
"region": "us-east-1",
"resources": [],
"detail": {
"jobName": "MyJob",
"severity": "INFO",
"state": "SUCCEEDED",
"jobRunId": "jr_abcdef0123456789abcdef0123456789",
"message": "Job run succeeded"
}
}
JSON
복사
Event Rule에 해당하는 패턴을 가진 이벤트들만 타겟으로 전달됩니다.
{
"source": ["aws.glue"],
"detail-type": ["Glue Job State Change"],
"detail": {
"state": ["SUCCEEDED", "FAILED", "TIMEOUT", "STOPPED"],
"jobName": ["de1_1_1st_preprocessing_script", "de1_1_2nd_preprocessing_script"]
}
}
JSON
복사
detail.jobName이 "de1_1_1st_preprocessing_script", "de1_1_2nd_preprocessing_script" 둘 중 하나인 이벤트만 위의 이벤트 룰을 통과할 수 있습니다.
이 이벤트들은 람다 함수로 전달됩니다.
Lambda
람다 함수는 EventBridge에 의해 트리거됩니다.
실행되는 코드는 아래와 같습니다.
from datetime import datetime
import json
import os
import requests
import boto3
def lambda_handler(event, context):
webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
if not webhook_url:
print('Slack Webhook URL not found')
return {
'statusCode': 400,
'body': json.dumps('Slack Webhook URL not found')
}
glue_job_name = event['detail']['jobName']
glue_job_run_id = event['detail']['jobRunId']
glue_job_state = event['detail']['state']
slack_message = {}
message_text = ''
attachments = ''
if glue_job_state == 'SUCCEEDED':
# message_text = f'Glue Job {glue_job_name} has succeeded!'
glue = boto3.client('glue')
response = glue.get_job_run(JobName=glue_job_name, RunId=glue_job_run_id)
execution_time = response['JobRun']['ExecutionTime']
hours, remainder = divmod(execution_time, 3600)
minutes, seconds = divmod(remainder, 60)
message_text = f"Glue Job {glue_job_name} has succeeded!"
attachments = f"Execution Time: {hours}h {minutes}m {seconds}s."
elif glue_job_state == 'FAILED':
message_text = f'Glue Job {glue_job_name} has failed.'
elif glue_job_state == 'TIMEOUT':
message_text = f'Glue Job {glue_job_name} has timed out.'
elif glue_job_state == 'STOPPED':
message_text = f'Glue Job {glue_job_name} has been stopped.'
slack_message['text'] = message_text
if attachments:
slack_message["attachments"] = [
{
"text": attachments
}
]
response = requests.post(webhook_url, json=slack_message)
if response.status_code != 200:
print(f'Failed to send Slack message. Status Code: {response.status_code}, Reason: {response.text}')
return {
'statusCode': 200,
'body': json.dumps('Slack message sent')
}
JSON
복사
코드중 boto3에서 glue 클라이언트를 받아와 glue job 메타데이터인 execution_time을 받아오는 부분이 있는데, 이는 람다로 넘어오는 이벤트에는 execution_time 값이 없기 때문입니다.
이를 위해서 람다 role에 계정의 모든 glue job에 대한 접근을 허용하는 인라인 정책을 추가해줬습니다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"glue:GetJobRun"
],
"Resource": "arn:aws:glue:ap-northeast-2:862327261051:job/*"
}
]
}
JSON
복사
모니터링 결과
•
파일이 s3에 저장될 때
•
파일이 삭제될 때
•
glue job이 성공했을 때
•
glue job이 실패했을 때