FastAPI + Async SQLALCHEMY 사용하기

프로필 사진mingke

FastAPI Logo

목차

소개

최신 애플리케이션 개발에서 효율성과 확장성은 가장 중요한 요소입니다. 비동기를 지원하는 ORM은 이러한 목표를 달성하는 데 중요한 역할을 할 수 있습니다. 비동기 API를 지원하는 Python Framework (요즘 제가 제일 좋아하는) FastAPI를 사용할 때 적용한다면 더 큰 효과를 볼 수 있습니다.

클라이언트에서 데이터베이스로의 모든 요청과 응답 과정이 비동기 방식으로 처리되며, 이는 전체 시스템의 성능을 크게 향상시킵니다.

SQLAlchemy를 사용하여 비동기 ORM을 구현하는 이유와 방법을 간략하게 설명합니다.

비동기 프로그래밍

비동기 프로그래밍을 사용하면 I/O 바인딩 작업과 같은 non-blocking 작업이 완료될 때까지 기다리는 동안 작업을 이벤트 루프에 제어권을 넘길 수 있습니다. 이것을 통해 시스템이 다른 작업을 동시에 처리할 수 있으므로 전반적인 시스템 성능이 향상시킬 수 있습니다.

비동기 ORM의 장점

확장성:

  • 비동기 ORM은 다수의 연결을 동시에 처리할 수 있어, 시스템의 확장성을 증대시킵니다.

응답성

  • 시간이 오래 걸리는 데이터베이스 쿼리를 처리할 때에도 애플리케이션의 응답성이 유지됩니다.

리소스 최적화

  • 시스템 리소스를 보다 효율적으로 사용할 수 있습니다.

Asynchronous SQLAlchemy

  1. Async Session

    비동기로 데이터베이스와 연결하기 때문에 AsyncSession을 사용해야합니다. 예시에서는 mysql을 사용해보겠습니다. 동기에서 pymysql이 필요한 것처럼 비동기에서는 aiomysql을 설치해야합니다. create_async_engine, async_sessionmaker을 사용해서 연결 할 수 있습니다.

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
 
meta = MetaData()
engine = create_async_engine("mysql+aiomysql://admin:root@localhost:3306/admin")
async_session = async_sessionmaker(engine, autoflush=False, autocommit=False)
 
async def get_db() -> AsyncSession:
		# 비동기에서 달라지는 부분
    async with engine.begin() as conn:
        await conn.run_sync(meta.create_all)
 
    db = async_session()
    try:
        yield db
    finally:
        await db.close()
  1. ORM 작성

    async를 이용해서 비동기 함수로 만들어야하고 쿼리 실행에 await를 사용해야합니다.

    • Create
      1. Model객체를 먼저 생성하고 Model객체를 커밋하는 방식입니다.
      2. insert 함수를 사용하여 쿼리를 먼저 만들고 실행하는 방식입니다. 이 방법은 data에 list[dict]를 넣는다면 bulk insert도 구현할 수 있습니다.
    • Read
      • Read에서는 select 함수를 사용하여 쿼리를 먼저 만들고 실행하는 방식입니다. 저는 직관적으로 느껴져서 동기 ORM에서 사용하던 session.query() 보다 이 방법이 더 좋은것 같습니다.
    • Update
      1. Model객체를 불러온 뒤 attribute를 변경하고 업데이트 하는 방식입니다.
      2. update 함수를 사용하여 조건과 값을 입력하여 업데이트 하는 방식입니다.
    • Delete 1,2 모두 update와 동일한 로직 입니다.
from sqlalchemy import select, insert, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
 
# Create
async def create(self, db: AsyncSession, **kwargs):
    db_obj = self.model(**kwargs)
    db.add(db_obj)
    await db.commit()
 
async def create2(self, db: AsyncSession, data: dict | list[dict]):
    query = insert(self.model).values(data)
    await db.execute(query)
    await db.commit()
 
# Read
# query(statement)를 먼저 작성 -> 실행
async def get(db: AsyncSession, id: int):
    query = select(Model).filter(Model.id == id)
    result = await db.execute(query)
    return result.scalar()
 
async def get_all(db: AsyncSession, page=None, page_size=None):
    query = select(Model)
    if page and page_size:
        query = query.offset((page - 1) * page_size).limit(page_size)
    result = await db.execute(query)
    return result.scalars().all()
 
# Update
async def update(db: AsyncSession, db_obj, **kwargs):
    for key, value in kwargs.items():
        setattr(db_obj, key, value)
    await db.commit()
    await db.refresh(db_obj)
    return db_obj
 
async def update2(self, db: AsyncSession, columns: dict, filters: dict):
    filters = [(getattr(self.model, k) == v) for k, v in filters.items()]
 
    query = update(self.model).where(*filters).values(columns)
    await db.execute(query)
    await db.commit()
 
# Delete
async def delete(db: AsyncSession, db_obj):
    await db.delete(db_obj)
    await db.commit()
    return db_obj
 
async def delete2(self, db: AsyncSession, columns: dict, filters: dict):
    filters = [(getattr(self.model, k) == v) for k, v in filters.items()]
 
    query = update(self.model).where(*filters).values(columns)
    await db.execute(query)
    await db.commit()
 

마무리

간단하게 비동기 ORM에 대하여 알아봤습니다. AsyncSessionORM 작성에서는 async/await를 사용해야 하며, query(statement)를 먼저 작성한 후 실행해야 합니다. 이를 통해 비동기 프로그래밍을 더욱 효율적으로 수행할 수 있습니다. 빠른 FastAPI를 더 빠르게 해주는 것 같아 아주 매력적인 것 같습니다.