FastAPI + Async SQLALCHEMY 사용하기
목차
소개
최신 애플리케이션 개발에서 효율성과 확장성은 가장 중요한 요소입니다. 비동기를 지원하는 ORM은 이러한 목표를 달성하는 데 중요한 역할을 할 수 있습니다. 비동기 API를 지원하는 Python Framework (요즘 제가 제일 좋아하는) FastAPI를 사용할 때 적용한다면 더 큰 효과를 볼 수 있습니다.
클라이언트에서 데이터베이스로의 모든 요청과 응답 과정이 비동기 방식으로 처리되며, 이는 전체 시스템의 성능을 크게 향상시킵니다.
SQLAlchemy를 사용하여 비동기 ORM을 구현하는 이유와 방법을 간략하게 설명합니다.
비동기 프로그래밍
비동기 프로그래밍을 사용하면 I/O 바인딩 작업과 같은 non-blocking 작업이 완료될 때까지 기다리는 동안 작업을 이벤트 루프에 제어권을 넘길 수 있습니다. 이것을 통해 시스템이 다른 작업을 동시에 처리할 수 있으므로 전반적인 시스템 성능이 향상시킬 수 있습니다.
비동기 ORM의 장점
확장성:
- 비동기 ORM은 다수의 연결을 동시에 처리할 수 있어, 시스템의 확장성을 증대시킵니다.
응답성
- 시간이 오래 걸리는 데이터베이스 쿼리를 처리할 때에도 애플리케이션의 응답성이 유지됩니다.
리소스 최적화
- 시스템 리소스를 보다 효율적으로 사용할 수 있습니다.
Asynchronous SQLAlchemy
-
Async Session
비동기로 데이터베이스와 연결하기 때문에 AsyncSession을 사용해야합니다. 예시에서는 mysql을 사용해보겠습니다. 동기에서 pymysql이 필요한 것처럼 비동기에서는 aiomysql을 설치해야합니다. create_async_engine, async_sessionmaker을 사용해서 연결 할 수 있습니다.
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
meta = MetaData()
engine = create_async_engine("mysql+aiomysql://admin:root@localhost:3306/admin")
async_session = async_sessionmaker(engine, autoflush=False, autocommit=False)
async def get_db() -> AsyncSession:
# 비동기에서 달라지는 부분
async with engine.begin() as conn:
await conn.run_sync(meta.create_all)
db = async_session()
try:
yield db
finally:
await db.close()
-
ORM 작성
async를 이용해서 비동기 함수로 만들어야하고 쿼리 실행에 await를 사용해야합니다.
- Create
- Model객체를 먼저 생성하고 Model객체를 커밋하는 방식입니다.
- insert 함수를 사용하여 쿼리를 먼저 만들고 실행하는 방식입니다. 이 방법은 data에 list[dict]를 넣는다면 bulk insert도 구현할 수 있습니다.
- Read
- Read에서는 select 함수를 사용하여 쿼리를 먼저 만들고 실행하는 방식입니다. 저는 직관적으로 느껴져서 동기 ORM에서 사용하던 session.query() 보다 이 방법이 더 좋은것 같습니다.
- Update
- Model객체를 불러온 뒤 attribute를 변경하고 업데이트 하는 방식입니다.
- update 함수를 사용하여 조건과 값을 입력하여 업데이트 하는 방식입니다.
- Delete 1,2 모두 update와 동일한 로직 입니다.
- Create
from sqlalchemy import select, insert, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
# Create
async def create(self, db: AsyncSession, **kwargs):
db_obj = self.model(**kwargs)
db.add(db_obj)
await db.commit()
async def create2(self, db: AsyncSession, data: dict | list[dict]):
query = insert(self.model).values(data)
await db.execute(query)
await db.commit()
# Read
# query(statement)를 먼저 작성 -> 실행
async def get(db: AsyncSession, id: int):
query = select(Model).filter(Model.id == id)
result = await db.execute(query)
return result.scalar()
async def get_all(db: AsyncSession, page=None, page_size=None):
query = select(Model)
if page and page_size:
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return result.scalars().all()
# Update
async def update(db: AsyncSession, db_obj, **kwargs):
for key, value in kwargs.items():
setattr(db_obj, key, value)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def update2(self, db: AsyncSession, columns: dict, filters: dict):
filters = [(getattr(self.model, k) == v) for k, v in filters.items()]
query = update(self.model).where(*filters).values(columns)
await db.execute(query)
await db.commit()
# Delete
async def delete(db: AsyncSession, db_obj):
await db.delete(db_obj)
await db.commit()
return db_obj
async def delete2(self, db: AsyncSession, columns: dict, filters: dict):
filters = [(getattr(self.model, k) == v) for k, v in filters.items()]
query = update(self.model).where(*filters).values(columns)
await db.execute(query)
await db.commit()
마무리
간단하게 비동기 ORM에 대하여 알아봤습니다. AsyncSession
과 ORM 작성
에서는 async/await를 사용해야 하며, query(statement)
를 먼저 작성한 후 실행해야 합니다. 이를 통해 비동기 프로그래밍을 더욱 효율적으로 수행할 수 있습니다. 빠른 FastAPI를 더 빠르게 해주는 것 같아 아주 매력적인 것 같습니다.