FastAPI Streaming 하는 법 StreamingResponse
목차
Streaming
오늘은 Streaming이라는 주제로 이야기해 보려고 합니다. Streaming이란 말을 들으면 아마도 실시간으로 영상을 보거나 음악을 듣는것이 떠오르는 분들이 많을 것 같습니다. 하지만 저는 이젠 ChatGPT가 먼저 떠오르네요.
Streaming이란 크게 데이터를 조각조각 나눠서 전송하는 방식을 말합니다. 이 방법은 대용량 데이터를 다룰 때 매우 유용합니다. 예를 들면, 아주 큰 파일을 다운로드하거나 대규모 데이터를 처리할 때, 파일이나 데이터를 한 번에 모두 메모리에 올리면 시스템에 부담이 될 수 있죠. 그런데 Streaming을 사용하면, 전체 데이터를 작은 '블록' 또는 '조각'으로 나누어 순차적으로 처리하게 됩니다. 이렇게 하면 메모리 사용을 최적화하고, 사용자에게 더 빠른 응답을 제공할 수 있습니다..
데이터 전송 시 전체 데이터를 한 번에 처리하지 않고, 필요한 부분만 조금씩 전송하기 때문에 네트워크 트래픽과 메모리 사용을 줄일 수 있습니다. 또한 사용자는 데이터의 일부분만 받았어도 그 내용을 볼 수 있습니다. ChatGPT에게 질문하면 질문에 대한 답변이 한 번에 짠 하고 나오는 것이 아니고 서서히 채워지는게 Streaming 입니다.
작년에 ChatGPT API를 이용해서 인삿말을 생성해주는 MessageBot을 만들었는데 openai python sdk와 FastAPI를 이용해서 만들었습니다. (LangChain을 이용해 볼까도 했지만 너무 간단한 싱글턴 대화여서 사용하지 않았음)
그때 사용된 FastAPI에서 StreamingResponse을 이용해서 Streaming 방식으로 클라이언트에 데이터를 전달하는 방법을 알아보겠습니다.
StreamingResponse
엄밀히 따지면 FastAPI가 Wrapping 하고 있는 starlette에 있는 클래스입니다. Iterable한 ContentStream을 입력으로 전달해주어야 합니다. 코드를 살펴보겠습니다.
다음은 OpenAI 라이브러리를 사용해서 Streaming하는 코드입니다.
openai 0.27 버젼으로 작성되었으며 최신 버젼은 코드가 다릅니다.
async def get_streaming_message_from_openai(data: MessageRequest) -> str:
try:
response = await openai.ChatCompletion.acreate(
model=settings.MODEL,
messages=MessageBotPrompt.make_prompt(data),
temperature=1,
max_tokens=1024,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
stream=True,
)
async for chunk in response:
try:
yield chunk["choices"][0]["delta"].get("content", "")
except KeyError:
pass
except openai.InvalidRequestError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
...
@router.post(
"/streaming/",
dependencies=[Depends(check_secret_header)],
response_model=str,
status_code=status.HTTP_200_OK,
summary="인사말 생성하기 (스트리밍)",
)
async def get_generated_messages_streaming(
request: Request,
data: MessageRequest,
):
"""
## OPEN AI에 프롬프트 전달하기
- name: str (Optional)
- relation: str
- reason: str
- manner: str
- max_length: str
"""
update_count(request)
return StreamingResponse(
get_streaming_message_from_openai(data), media_type="text/event-stream"
)
- opeanai에서 async streaming으로 ChatGPT의 답변을 받아옵니다.
- response는 async iterable이기 때문에 async for 구문을 사용해줍니다.
- chunk에서 content만 꺼내서 yield해줍니다.
- endpoint에서 get_streaming_message_from_openai를 StreamingResponse의 입력으로 넣어주고 media_type을 text/event-stream로 해주면 끝입니다.
이런 방법으로 쉽게 만들 수 있습니다. 그런데 지금은 next.js api로 chat 기능을 이전했습니다. Vercel로 운영중인데 따로 chat용 백엔드를 굳이 비용들여가며 운영할 필요가 없기 때문입니다. 하지만 openai 관련 부분은 곧 1버젼으로 업데이트하려고 합니다.
마무리
위와 같은 방식으로 chatgpt api를 활용한 서비스를 만들어봤습니다. 사실 사용하는 유저는 아내에게 메세지 보낼때 가끔씩 쓰는 저밖에 없습니다. 혹시 이 글을 보신다면 아래 링크를 걸어둘테니 메시지 한 번 만들어봐 주신다면 감사하겠습니다.