FastAPI에서 Scheduler 사용하기

프로필 사진mingke

FastAPI APScheduler

목차

APScheduler

백엔드 개발하다보면 Scheduler를 개발해야할 때가 종종 있습니다. FastAPI를 사용할 때 복잡하지 않은 Scheduler라면 apscheduler 라는 라이브러리를 사용하면 됩니다. APScheduler는 다양한 스케줄러를 제공하고 있습니다.

포스팅에서는 FastAPI scheduler 사용에 초점을 맞춰 간단하게 알아보도록 하겠습니다.

  • 설치
pip install apscheduler

BackgroundScheduler

scheduler를 실행하면서 서버가 blocking되면 안되기 때문에 BackgroundScheduler를 사용하면됩니다. scheduler.py 를 하나 생성해줍니다. task를 분리해줄 수 도 있지만 예시에서는 분리하지 않겠습니다.

# scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
 
scheduler = BackgroundScheduler()
 
def function_1(a,b,c):
    print(a,b,c)
 
# 함수에 파라미터가 없으면 args를 안넣으면 됩니다.
# kargs={"a": 1,"b": 2,"c": 3} 도 가능합니다.
scheduler.add_job(function_1, "interval", seconds=5, args=(1,2,3))
 
# scheduler.add_job(function_1, "interval", seconds=5, kwargs={"a": 1, "b": 2, "c": 3})
 

lifespan에서 사용하기

uvicorn 서버 단독으로 실행할 때 lifespan 에서 사용해도 괜찮습니다.

from contextlib import asynccontextmanager
from app.scheduler import scheduler
 
@asynccontextmanager
async def lifespan(app):
    # scheduler 실행
    scheduler.start()
    yield
    print("lifespan finished")
 
# main.py
from fastapi import FastAPI
from app.core.lifespan import lifespan
 
app = FastAPI(lifespan=lifespan)
 
  • 결과
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [54162] using StatReload
INFO:     Started server process [54164]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
1 2 3
^CINFO:     Shutting down
INFO:     Waiting for application shutdown.
lifespan finished
INFO:     Application shutdown complete.
INFO:     Finished server process [54164]
INFO:     Stopping reloader process [54162]

gunicorn에서 사용하기

gunicorn에서 사용할 경우는 lifespan에서 사용하는게 좋은 선택이 아닐 수 있습니다. 배포할 때는 gunicorn 으로 worker들은 독립적인 프로세스 이기때문에, 만약 worker가 4개라면 lifespan도 4번 실행되겠고, 그것은 scheduler도 4번 실행된다는 뜻 입니다.

gunicorn hooks를 사용해서 마스터 프로세스에서 1번만 실행되도록 합니다.

# gunicorn.config.py
import multiprocessing
from app.scheduler import scheduler
 
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
wsgi_app = "app.main:app"
loglevel = "info"
bind = "0.0.0.0:8000"
max_requests = 1000
max_requests_jitter = 100
 
def on_starting(server):
    print("Gunicorn starting")
    scheduler.start()
 
  • 실행
gunicorn -c app/gunicorn.config.py
  • 결과
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [7] [INFO] Starting gunicorn 22.0.0
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [7] [INFO] Listening at: http://0.0.0.0:8000 (7)
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [7] [INFO] Using worker: uvicorn.workers.UvicornWorker
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [9] [INFO] Booting worker with pid: 9
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [10] [INFO] Booting worker with pid: 10
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [11] [INFO] Booting worker with pid: 11
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [12] [INFO] Booting worker with pid: 12
proj_1-backend-1  | [2024-05-28 23:06:44 +0900] [13] [INFO] Booting worker with pid: 13
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [10] [INFO] Started server process [10]
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [10] [INFO] Waiting for application startup.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [10] [INFO] Application startup complete.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [13] [INFO] Started server process [13]
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [13] [INFO] Waiting for application startup.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [13] [INFO] Application startup complete.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [11] [INFO] Started server process [11]
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [11] [INFO] Waiting for application startup.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [11] [INFO] Application startup complete.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [12] [INFO] Started server process [12]
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [12] [INFO] Waiting for application startup.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [12] [INFO] Application startup complete.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [9] [INFO] Started server process [9]
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [9] [INFO] Waiting for application startup.
proj_1-backend-1  | [2024-05-28 23:06:47 +0900] [9] [INFO] Application startup complete.
proj_1-backend-1  | 1 2 3
proj_1-backend-1  | 1 2 3
 

마무리

apscheduler 를 이용해서 FastAPI에서 scheduler를 사용해보았습니다. 간단하게 구현도 해보았으나 apscheduler 를 이용하는게 훨씬 간단하니 그냥 사용하는게 좋을 것 같습니다.

# 직접 구현
import asyncio
from typing import Callable, Coroutine, Any
from starlette.concurrency import run_in_threadpool
 
class Scheduler:
    def __init__(self):
        self.tasks = []
 
    def add_task(self,
        func: Callable[[], None] | Coroutine[Any, Any, None],
        seconds: int = None,
        iterations: int| None = None,
        *args,
        **kwargs
        ) -> None:
        if seconds is None and iterations is None:
            raise ValueError('Either seconds or iterations must be provided')
 
        task = {
            "task": func,
            "args": args,
            "kwargs": kwargs,
            "seconds": seconds,
            "iterations": iterations
        }
        self.tasks.append(task)
 
    def run(self) -> None:
        asyncio.ensure_future(self._run_tasks())
 
    async def _run_tasks(self) -> None:
        tasks = [asyncio.create_task(self._run(task)) for task in self.tasks]
        await asyncio.gather(*tasks)
 
    async def _run(self, task: dict) -> None:
        iterations = task.get('iterations')
        if iterations:
            for _ in range(iterations):
                await self._execute_task(task)
        else:
            while True:
                await self._execute_task(task)
 
    async def _execute_task(self, task: dict) -> None:
        func, args, kwargs, seconds, iterations = task.values()
        if asyncio.iscoroutinefunction(func):
            await func(*args, **kwargs)
        else:
            await run_in_threadpool(func, *args, **kwargs)
        await asyncio.sleep(seconds)
 
Loading...