FastAPI에서 Scheduler 사용하기
목차
APScheduler
백엔드 개발하다보면 Scheduler를 개발해야할 때가 종종 있습니다. FastAPI를 사용할 때 복잡하지 않은 Scheduler라면 apscheduler
라는 라이브러리를 사용하면 됩니다. APScheduler는 다양한 스케줄러를 제공하고 있습니다.
포스팅에서는 FastAPI scheduler 사용에 초점을 맞춰 간단하게 알아보도록 하겠습니다.
- 설치
pip install apscheduler
BackgroundScheduler
scheduler를 실행하면서 서버가 blocking되면 안되기 때문에 BackgroundScheduler
를 사용하면됩니다. scheduler.py
를 하나 생성해줍니다. task를 분리해줄 수 도 있지만 예시에서는 분리하지 않겠습니다.
# scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
def function_1(a,b,c):
print(a,b,c)
# 함수에 파라미터가 없으면 args를 안넣으면 됩니다.
# kargs={"a": 1,"b": 2,"c": 3} 도 가능합니다.
scheduler.add_job(function_1, "interval", seconds=5, args=(1,2,3))
# scheduler.add_job(function_1, "interval", seconds=5, kwargs={"a": 1, "b": 2, "c": 3})
lifespan에서 사용하기
uvicorn
서버 단독으로 실행할 때 lifespan
에서 사용해도 괜찮습니다.
from contextlib import asynccontextmanager
from app.scheduler import scheduler
@asynccontextmanager
async def lifespan(app):
# scheduler 실행
scheduler.start()
yield
print("lifespan finished")
# main.py
from fastapi import FastAPI
from app.core.lifespan import lifespan
app = FastAPI(lifespan=lifespan)
- 결과
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [54162] using StatReload
INFO: Started server process [54164]
INFO: Waiting for application startup.
INFO: Application startup complete.
1 2 3
^CINFO: Shutting down
INFO: Waiting for application shutdown.
lifespan finished
INFO: Application shutdown complete.
INFO: Finished server process [54164]
INFO: Stopping reloader process [54162]
gunicorn에서 사용하기
gunicorn
에서 사용할 경우는 lifespan
에서 사용하는게 좋은 선택이 아닐 수 있습니다. 배포할 때는 gunicorn
으로 worker들은 독립적인 프로세스 이기때문에, 만약 worker가 4개라면 lifespan
도 4번 실행되겠고, 그것은 scheduler도 4번 실행된다는 뜻 입니다.
gunicorn hooks를 사용해서 마스터 프로세스에서 1번만 실행되도록 합니다.
# gunicorn.config.py
import multiprocessing
from app.scheduler import scheduler
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
wsgi_app = "app.main:app"
loglevel = "info"
bind = "0.0.0.0:8000"
max_requests = 1000
max_requests_jitter = 100
def on_starting(server):
print("Gunicorn starting")
scheduler.start()
- 실행
gunicorn -c app/gunicorn.config.py
- 결과
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [7] [INFO] Starting gunicorn 22.0.0
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [7] [INFO] Listening at: http://0.0.0.0:8000 (7)
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [7] [INFO] Using worker: uvicorn.workers.UvicornWorker
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [9] [INFO] Booting worker with pid: 9
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [10] [INFO] Booting worker with pid: 10
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [11] [INFO] Booting worker with pid: 11
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [12] [INFO] Booting worker with pid: 12
proj_1-backend-1 | [2024-05-28 23:06:44 +0900] [13] [INFO] Booting worker with pid: 13
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [10] [INFO] Started server process [10]
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [10] [INFO] Waiting for application startup.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [10] [INFO] Application startup complete.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [13] [INFO] Started server process [13]
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [13] [INFO] Waiting for application startup.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [13] [INFO] Application startup complete.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [11] [INFO] Started server process [11]
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [11] [INFO] Waiting for application startup.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [11] [INFO] Application startup complete.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [12] [INFO] Started server process [12]
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [12] [INFO] Waiting for application startup.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [12] [INFO] Application startup complete.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [9] [INFO] Started server process [9]
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [9] [INFO] Waiting for application startup.
proj_1-backend-1 | [2024-05-28 23:06:47 +0900] [9] [INFO] Application startup complete.
proj_1-backend-1 | 1 2 3
proj_1-backend-1 | 1 2 3
마무리
apscheduler
를 이용해서 FastAPI에서 scheduler를 사용해보았습니다. 간단하게 구현도 해보았으나 apscheduler
를 이용하는게 훨씬 간단하니 그냥 사용하는게 좋을 것 같습니다.
# 직접 구현
import asyncio
from typing import Callable, Coroutine, Any
from starlette.concurrency import run_in_threadpool
class Scheduler:
def __init__(self):
self.tasks = []
def add_task(self,
func: Callable[[], None] | Coroutine[Any, Any, None],
seconds: int = None,
iterations: int| None = None,
*args,
**kwargs
) -> None:
if seconds is None and iterations is None:
raise ValueError('Either seconds or iterations must be provided')
task = {
"task": func,
"args": args,
"kwargs": kwargs,
"seconds": seconds,
"iterations": iterations
}
self.tasks.append(task)
def run(self) -> None:
asyncio.ensure_future(self._run_tasks())
async def _run_tasks(self) -> None:
tasks = [asyncio.create_task(self._run(task)) for task in self.tasks]
await asyncio.gather(*tasks)
async def _run(self, task: dict) -> None:
iterations = task.get('iterations')
if iterations:
for _ in range(iterations):
await self._execute_task(task)
else:
while True:
await self._execute_task(task)
async def _execute_task(self, task: dict) -> None:
func, args, kwargs, seconds, iterations = task.values()
if asyncio.iscoroutinefunction(func):
await func(*args, **kwargs)
else:
await run_in_threadpool(func, *args, **kwargs)
await asyncio.sleep(seconds)
Loading...