Redis scan 톺아보기
문제 상황
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import JSONResponse
from dependency_injector.wiring import inject, Provide
from tasks.blog_article_task import bulk_blog_article_task
from services.blog_article_sync_service import BlogArticleSyncService
from config import Config
from containers import IoCContainer
blog_topic_router = APIRouter()
@blog_topic_router.post(
"/bulk/blog-content-data",
response_model=dict,
status_code=status.HTTP_200_OK
)
@inject
async def bulk_index_blog_article(
blog_article_sync_service: BlogArticleSyncService = Depends(
Provide[IoCContainer.service.blog_article_sync_service]
),
app_config: Config = Depends(
Provide[IoCContainer.config.app_config]
)
):
index_name = app_config.blog_article_opensearch_index_name
try:
# Check if a bulk task is already running
if await blog_article_sync_service.is_bulk_task_running():
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"message": "Bulk indexing task is already running.",
"index": index_name
}
)
# Trigger the bulk indexing task
task_result = bulk_blog_article_task.delay(
queue=app_config.blog_sync_queue_name
)
task_id = task_result.id
# Set the bulk task ID
response = await blog_article_sync_service.set_bulk_task_id(task_id)
if not response:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"result": "fail",
"message": "Failed to set bulk task ID.",
"index_name": index_name
}
)
# Return success response
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"result": "success",
"message": "Bulk data request successfully sent.",
"task_id": task_id,
"index_name": index_name
}
)
except Exception as e:
# Log the exception
# log.error(f"Failed to process bulk indexing: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
Python
복사
위와 같이 celery task를 delay를 통해 호출하는 API를 만들었는데, 어느 날 확인해 보니 delay는 잘 작동하는데 API 응답은 오지 않고 'upstream request timeout'과 같은 에러를 반환하게 되었습니다.
또한 celery task 내에서 celery task가 끝나기 전에 celery에서 생성한 비동기 루프가 먼저 종료되는 문제를 발견하였습니다.
원인을 조사해보니, celery task와 API 컨트롤러 내의 blog_article_sync_service.is_bulk_task_running() 메서드 내에서 redis의 scan 문제로 인해 응답이 지연된 것으로 밝혀졌습니다. (총 redis 키는 약 50만개 이상)
'KEYS' 명령어의 위험성
레디스의 'KEYS' 명령어는 모든 키 값을 가져오지만, 이 명령을 사용하는 동안 다른 명령을 수행할 수 없습니다. 이로 인해 성능 문제가 발생할 수 있습니다. 이런 이유로 레디스는 'SCAN' 또는 'HSCAN'을 추천합니다. 'KEYS' 명령어를 실행하는 동안 다른 동작이 중지되는 이유는 레디스가 한 번에 하나의 동작만 수행할 수 있기 때문입니다.
KEYS 명령어는 glob pattern을 사용해 데이터베이스의 모든 키를 간단히 조회할 수 있습니다. 시간 복잡도는 O(N)이지만, 공식 문서에 따르면 저사양 랩탑에서도 40ms 내에 100만 개의 키가 존재하는 데이터베이스를 스캔할 수 있습니다.
그러나 이 명령어는 실행 중에 다른 모든 명령의 실행이 블로킹되는 치명적인 문제가 있습니다.
이는 Redis가 싱글 스레드 아키텍처이기 때문입니다. 데이터베이스의 크기가 클수록 블로킹의 영향으로 성능이 저하되며, 장애가 발생할 가능성이 커집니다. 따라서, 일반적으로 프로덕션 환경에서는 절대 사용하지 않아야 합니다.
Redis의 'SCAN' 명령어를 사용하는 이유
SCAN 명령어는 모든 키를 한 번에 불러오지 않고, 'count' 값에 따라 여러 번에 걸쳐 키를 불러옵니다. (기본 'count' 값은 10입니다)
이 문제에서는 SCAN의 기본 'count' 값인 10개를 사용했습니다. 그 결과, 5~8초가 소요되어 API 응답을 제공하지 못했습니다.
예를 들어, Redis에서 불러와야 하는 키 값이 총 10,000개고 'count'가 10이라면, 총 1,000번에 걸쳐 키를 불러오게 됩니다.
'count' 값을 낮게 설정하면, 한 번에 불러오는 키의 개수는 적지만, 모든 데이터를 불러오는 데 시간이 더 걸립니다. 그러나 그 사이에 다른 요청을 처리할 수 있습니다. 반면, 'count' 값을 높게 설정하면, 한 번에 불러오는 키의 개수가 많아져 데이터를 빠르게 불러올 수 있지만, 다른 요청을 받는 횟수가 줄어들어 병목 현상이 발생할 수 있습니다.
아래 블로그를 참고했을 때, redis scan count의 sweet spot은 대략 100개 ~ 1000개 사이인 것으로 확인됩니다.
그 너머부터는 그렇게 유의미한 효과를 나타내지 않는다고 합니다.
그렇다면 병렬로 redis의 scan을 실행해볼 수 있지 않을까요?
import asyncio
from typing import List
class RedisClient:
def __init__(self, client):
self._client = client
async def _check_connection(self):
# Connection check logic here
pass
async def keys(self, pattern: str = "*", count: int = 1000) -> List[str]:
"""
Retrieve all keys matching a pattern.
Args:
pattern (str, optional): Pattern to match keys. Defaults to '*'.
count (int, optional): Number of keys to retrieve per scan. Defaults to 1000.
Returns:
List[str]: A list of matching keys.
"""
await self._check_connection()
keys = []
try:
cursor = b'0'
while cursor:
cursor, found_keys = await self._client.scan(
cursor=cursor,
match=pattern,
count=count
)
keys.extend(found_keys)
logger.info(f"Retrieved {len(keys)} keys matching pattern '{pattern}'.")
return keys
except Exception as e:
logger.error(f"Error retrieving keys with pattern '{pattern}': {e}")
raise
async def parallel_scan(redis_client: RedisClient, pattern: str = "*", count: int = 1000, num_scanners: int = 10) -> List[str]:
async def scan_task(scanner_id: int):
return await redis_client.keys(pattern, count)
tasks = [scan_task(i) for i in range(num_scanners)]
results = await asyncio.gather(*tasks)
# Flatten the list of lists
all_keys = [key for result in results for key in result]
return all_keys
Python
복사
위 코드에서 parallel_scan 함수는 병렬 스캔으로 성능 향상을 얻는 방법을 보여줍니다.
동시 스캔: 여러 사용자가 동시에 SCAN 명령어를 실행하여 빠르게 키를 찾을 수 있습니다.
배치 크기 변경: SCAN 명령어의 COUNT 파라미터를 바꿔 한 번에 더 많은 키를 가져올 수 있습니다. 기본은 10이지만, 더 큰 수로 바꾸면 한 번에 더 많은 키를 볼 수 있습니다. 예를 들어, COUNT 값을 1000으로 해봅시다.
적절한 패턴 사용: 더 정확한 패턴을 사용하면 찾는 범위가 줄어들어 더 빨리 찾을 수 있습니다.
백그라운드 작업: 키 찾기를 백그라운드에서 실행하면, 찾는 동안 다른 일에 영향을 주지 않습니다.
클러스터링 고려: Redis 클러스터를 사용하면 데이터를 여러 곳에 나눠 놓고, 각 곳에서 동시에 SCAN 명령어를 실행할 수 있습니다.
num_scanners 파라미터로 병렬 스캔 작업의 개수를 바꿀 수 있고, Redis 클라이언트의 구현에 따라 scan_task를 더 잘게 쪼개거나 병렬 처리 방식을 바꿀 수도 있으니 참고하면 좋을 것 같습니다.
이어서 작성할 글
redis hgetall vs redis hscan
hscan이 유리한 이유를 톺아보고, hscan을 제대로 사용하는 방법을 논의해볼 수 있도록 하겠습니다.
ex. python hscan_iter 이 hscan을 어떻게 사용하는지를 중점으로 확인
참고한 글
redis scan number(count)에 대한 sweet spot 정리