FastAPI - Websockets 사용해보기 (with Redis)
목차
FastAPI Websocket
예전에 했던 사이드 프로젝트 중에서 WebSocket을 사용했던 적이 있습니다. FastAPI를 이용해서 websocket을 구현했었는데요. 아주 쉽게 구현이 가능해서 금새 만들었습니다. 그 방법을 이번 포스팅에서 공유해보려고 합니다.
웹소켓(WebSocket)은 서버에 소켓을 연결하여 실시간, 양방향 통신을 가능하게 해주는 기술입니다. 한 번 연결되면 연결이 끊어지기 전까지 계속 데이터를 주고 받을 수 있습니다. HTTP 프로토콜을 이용하여 HandShake를 시작하고 서버에서 수락하면 websocket 연결이 유지됩니다.
보통 예제로 실시간 채팅같은 걸 많이 구현합니다.
FastAPI Websocket 사용
- FastAPI는
websockets
라이브러리에 의존성이 있습니다. 설치해주어야 합니다.
pip install websockets
- FastAPI
Router
에서websocket
을 제공합니다. 다음과 같이 사용할 수 있습니다.
from fastapi import APIRouter, WebSocket
router = APIRouter()
@router.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
...
- http 요청이 아니기 때문에 클라이언트에서
ws://
로 요청해야 합니다.
ws://127.0.0.1:8000/
웹소켓(Websocket) 프로젝트 공유
친구와 점심 메뉴를 함께 고르는 서비스를 사이드 프로젝트로 했었습니다. 하나의 링크를 타고 들어가 친구들이 모여 점심 메뉴를 고르는 간단한 사이트입니다. 링크에 접속한 모든 친구들이 똑같은 화면을 실시간으로 봐야하기 때문에 웹소켓을 사용하였고 공유할 세션을 만들기 위해 Redis
를 사용했습니다.
- 방 만듬(Redis session 생성) → 친구에게 링크 전달 → 친구들 접속(웹소켓 연결) → 각자 메뉴 작성(브로드캐스팅) 순서로 진행됩니다.
방 만들기 (session 생성)
redis도 설치가 필요합니다.
pip install redis
session을 생성하는 것은 HTTP 요청으로 생성합니다.
- session_id를 uuid로 생성하여 redis에 저장
- 유지시간 10분
from redis import asyncio as aioredis
...
async def get_redis_pool():
return await aioredis.from_url("redis://redis:6379/0")
@router.post("/session/")
async def create_session(redis: Annotated[aioredis.Redis, Depends(get_redis_pool)]):
"""
Session 생성
- 방장이 방을 생성한다.
"""
session_id = str(uuid.uuid4())
session_data = {
"participants": [],
"menu": [],
"expires_at": (datetime.now() + timedelta(minutes=10)).isoformat()
}
await redis.set(session_id, json.dumps(session_data), ex=600)
return {"url": f"/ws/{session_id}/"}
친구들 접속(웹 소켓 연결) 및 브로드캐스팅
session_id
로 같은 곳 접속한 친구들 구분- 10명 제한
ws_manager.add_client(session_id, websocket)
친구 연결received_data = await websocket.receive_text()
메뉴 입력await client.send_json(updated_menu)
for문 순회하며 연결된 친구들에게 브로드캐스팅
router = APIRouter()
@router.websocket("/{session_id}/")
async def menu_websocket_endpoint(
websocket: WebSocket,
session_id: str,
redis: Annotated[aioredis.Redis, Depends(get_redis_pool)]
):
try:
session_data = await redis.get(session_id)
if not session_data:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="세션 없음")
session_data = json.loads(session_data)
if len(session_data["participants"]) >= 10:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="최대 참가자 수 초과")
pubsub = redis.pubsub()
await pubsub.subscribe(session_id)
await websocket.accept()
try:
ws_manager.add_client(session_id, websocket)
except MaximumSessionReachException:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="최대 세션 수 초과")
except MaximumConnectionPerSessionReachException:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="최대 연결 수 초과")
await add_participant_to_session(session_data, websocket.client)
await update_session_expiration(session_data)
await redis.set(session_id, json.dumps(session_data), ex=(datetime.fromisoformat(session_data["expires_at"]) - datetime.now()).seconds)
try:
ws_manager.add_client(session_id, websocket)
while True:
received_data = await websocket.receive_text()
command, menu_item = received_data.split(":", 1)
if command.lower() == "add":
session_data["menu"].append(menu_item)
elif command.lower() == "remove":
session_data["menu"].remove(menu_item)
expire_seconds = (datetime.fromisoformat(session_data["expires_at"]) - datetime.now()).total_seconds()
await redis.set(session_id, json.dumps(session_data), ex=int(expire_seconds))
await redis.publish(session_id, json.dumps(session_data["menu"]))
await asyncio.sleep(0.1)
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=5)
if message:
updated_menu = json.loads(message['data'])
clients = ws_manager.get_clients(session_id)
for client in clients:
await client.send_json(updated_menu)
else:
if message is None:
try:
session_data = await redis.get(session_id)
session_data = json.loads(session_data)
clients = ws_manager.get_clients(session_id)
for client in clients:
await client.send_json(session_data["menu"])
except Exception:
pass
break
await asyncio.sleep(0.1)
except WebSocketDisconnect:
ws_manager.remove_client(session_id, websocket)
await redis.delete(session_id)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
Websocket 테스트 websocat
웹소켓을 만들어도 FastAPI에서 제공해주는 Swagger에서 테스트해볼 수 없습니다. 하지만 websocat
이라는 툴을 이용해서 아주 쉽고 빠르게 테스트해 볼 수 있습니다.
- session을 생성한 뒤 테스트
websocat ws://127.0.0.1:8000/ws/e0432a93-e791-437b-a8f8-3b27ea76c3e5/
마무리
웹소켓을 간단하게 구현해서 공유한 것입니다. 예제코드가 개선의 여지는 매우 많습니다. 실무에서는 websocket을 이용한 프로덕트를 만들어본 경험이 없는데 이런 경험도 해보면 참 재밌겠다 생각이 드네요. FastAPI를 이용하면 정말 쉽고 빠르게 구현이 되는 것 같습니다. 전체 코드는 github에 있습니다.