Stream Response

即服务器不一次性把数据发给用户,而是分块、分段发给用户,用户一段一段接受,而不是等全部下载完再处理(即流式响应其实说的是服务器的数据流出模式,分段输出)。

流式响应应用场景:

大模型逐字输出,大文件下载,视频播放,实时日志推送。

异步 aiohttp 流式响应:

import asyncio
import aiohttp

async def stream_response():
    # 创建异步HTTP客户端会话(相当于打开一个浏览器窗口)
    async with aiohttp.ClientSession() as session:
        # 发请求,访问流式分块测试接口(末尾"/5"逻辑上5次发包,但TCP层会自动合并小包(TCP粘包),比如这里打印出来只有3块)
        # 建立连接,发送请求,服务器返回响应头(header),真正的内容还在传输中,一段一段发过来
        async with session.get("https://httpbin.org/stream/5") as resp:
            async for chunk in resp.content.iter_chunked(1024): # 真正拿内容
                # resp.content:数据流;   iter_chunked(1024):分段读取工具
                #async for: 异步循环读流
                # chunk: 每次读到的一小块数据
                # 把服务器发来的 二进制数据(bytes) 转成 字符串(str)
                text = chunk.decode("utf-8")
                # 为了看到一段一段输出的停顿效果,真正流式响应不需要这行
                await asyncio.sleep(2)
                print("收到一块数据: ",text)

asyncio.run(stream_response())

image.png

按行读取(可用来解决粘包)+用FastAPI写一个异步生产数据的接口(用yield),然后创建一个异步客户端访问这个接口模拟流式响应输出:

# data_generate.py (后端接口+异步生产数据)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse # 流式专用返回对象,不一次性返回全部数据,来一点发一点
import asyncio

app = FastAPI()

# 异步生成器(数据流生产源头)
async def stream_generator():
    for i in range(5):
        data = f'{{"id":{i}}}\n'    # f-string引号里的{里不是要写表达式时,需要写两个{
        yield data.encode("utf-8")  # 字符串->字节(bytes),网络传输只能发字节流
        # yield(关键):每次产出一段数据就暂停函数,把数据丢给前端,下次再需要数据就从这里往下跑,不会一次性生成全部5条存内存
        await asyncio.sleep(1) # 获取一条数据花费的

@app.get("/stream/5")
async def stream():
    # 流式返回,media_type普通文本
    return StreamingResponse(stream_generator(),media_type="text/plain")
    # 启动生成器+流式发送数据
    # 把上面的stream_generator交给流式返回类,生成器 yield 一次,浏览器 / 客户端立马收到一段数据
    # StreamingResponse(...) → 驱动生成器运行
    # return → 把数据流交给 FastAPI 转发给客户端

启动服务器:

uvicorn data_generate:app --port 5000 
#Stream_Response.py
# 异步客户端读取接口生成的数据
import asyncio
import aiohttp

async def stream_response():
    async with aiohttp.ClientSession() as session:
        url = "http://127.0.0.1:5000/stream/5"
        async with session.get(url) as resp:
            async for line in resp.content: # 一行一块
                text = line.decode("utf-8")
                clean = text.strip()
                if(clean):
                    print("一行数据: ",clean)
            # 如果后端接口没有 yield → 客户端这里就不会 “慢慢打印”,而是一次性全部输出
            # 没有 yield,后端就不会等待、不会分段、不会流式
            # 后端如果不用yield,即使有async for 和 await sleep模拟耗时,也是假流式
            # yield才是实现流式响应的灵魂
asyncio.run(stream_response())

流式返回JSON:

# data_generate.py (后端接口+异步生产数据)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse # 流式专用返回对象,不一次性返回全部数据,来一点发一点
import asyncio
import json

app = FastAPI()

# 异步生成器(数据流生产源头)
async def stream_generator():
    for i in range(5):
        await asyncio.sleep(1)
        data = {
            "id": i,
            "content": f"这是第 {i} 条流式数据",
            "status": "success"
        }  # 不要手写json,很容易出错,用字典+json.dumps
        # 转成 JSON 字符串 + 换行
        yield json.dumps(data,ensure_ascii=False)+"\n"
        #ensure_ascii=False 让中文正常显示,不乱码

@app.get("/stream/5")
async def stream():
    return StreamingResponse(stream_generator(),media_type="application/json")
# Stream_Response.py
import asyncio
import aiohttp
import json

async def stream_response():
    async with aiohttp.ClientSession() as session:
        url = "http://127.0.0.1:5000/stream/5"
        async with session.get(url) as resp:
            async for line in resp.content: # 一行一块
                text = line.decode("utf-8")
                clean = text.strip()
                if not clean:
                    continue  # 流式读取空行只能跳过,不能停止
                # 把流式 JSON转成对象
                data = json.loads(clean)
                print("收到数据",data)
                print("id=",data["id"])
                print("内容=",data["content"])
                print("-"*50)
asyncio.run(stream_response())

测试之前需要记得启动服务器:

uvicorn data_generate:app --reload --port 5000 

注意:为什么浏览器会一次性显示所有数据,而异步客户端是一条一条打印?

前端类型是谁特点
aiohttp 客户端写的 Python 代码不会缓存,来一条打印一条,最适合测试
浏览器Chrome/Edge会缓存普通流,只会在最后一次性显示;SSE 会实时显示

注意:为什么要写异步客户端?

  • 浏览器会缓存数据,看不到真实的逐条输出

  • aiohttp 客户端最纯粹,收到数据立刻打印

  • 能真实验证后端是不是真流式

  • 相当于用代码模拟一个 “前端” 来调用写的接口

SSE(Server-Sent Events):

给浏览器看的流式响应格式(在浏览器会看到流式输出),底层还是StreamingResponse。

# SSE.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse # 流式专用返回对象,不一次性返回全部数据,来一点发一点
import asyncio
import json

app = FastAPI()

# sse 流式生成器
async def sse_generator():
    for i in range(5):
        await asyncio.sleep(1)
        data = {
            "id": i,
            "msg": f"实时信息 {i}",
            "status": "ok"
        }
        # SSE 固定格式,必须这么写
        # 1. 必须以 data: 开头
        # 2. 必须以 \n\n 结尾
        # 3. 内容可以是字符串 / JSON
        # 且必须用 f-string 包裹,把 data: 和 \n\n 写进字符串里
        # 如 yield f"data: {i}\n\n"
        # 如 yield f"data: 'hi'\n\n"
        yield f"data: {json.dumps(data,ensure_ascii=False)}\n\n"

@app.get("/stream/sse")
async def stream():
    return StreamingResponse(
        sse_generator(),    # 生成器
        media_type="text/event-stream"  # SSE 必须写这个
    )

响应中:

image.png

测试之前需要记得启动服务器

uvicorn SSE:app --port 5001 --reload 

StreamResponse与SSE应用场景:

StreamResponse(普通流)-> 给异步客户端/后端服务 用的流式格式。

SSE->给浏览器/前端界面 用的流式格式。

在浏览器显示上:

纯 StreamingResponse(非 SSE):浏览器缓存,结束一次性出全部内容。

StreamingResponse+SSE 协议格式 + text/event-stream:浏览器不缓存,实时逐条刷新。

流式长连接客户端断开异常处理:

SSE、StreamingResponse 都是长连接:客户端关闭浏览器、终止 aiohttp 客户端、断网,后端默认还在循环 sleep、不停生成数据,占用协程与内存。用request库里的is_disconnected协助异常处理,解决关掉浏览器后,生成器还会继续 sleep + 循环跑完剩余次数,白白占用协程这个问题。

# SSE.py
from asyncio import CancelledError

from fastapi import FastAPI
from fastapi.responses import StreamingResponse # 流式专用返回对象,不一次性返回全部数据,来一点发一点
import asyncio
import json
from fastapi import Request
# Request 在SSE 场景:只用到它的 is_disconnected 查断线

app = FastAPI()

# sse 流式生成器
async def sse_generator(request: Request):
    try:
        for i in range(5):
            # 实时判断客户端是否关闭页面/断网
            if await request.is_disconnected():
                print("SSE客户端主动断开连接,终止数据流")
                return
            await asyncio.sleep(1)
            data = {
                "id": i,
                "msg": f"实时信息 {i}",
                "status": "ok"
            }
            yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
    except asyncio.CancelledError:  # 踩坑点:CancelledError 不是 Exception 的子类
        # 客户端断开连接(关闭标签页)
        # 下一次 yield 尝试写入数据时,底层 TCP 连接会抛出 ConnectionError(或类似异常)
        # StreamingResponse 内部捕获到这个写入异常后,主动取消生成器协程
        # 取消时,协程内部会抛出 CancelledError

        print("生成器被取消(客户端断开连接)")

    except Exception as e:  # 这里捕获不到 CancelledError
        print("连接已断开(客户端关闭/断网): ",e)

@app.get("/stream/sse")
async def stream(request: Request):
    return StreamingResponse(
        sse_generator(request),    # 生成器
        media_type="text/event-stream"  # SSE 必须写这个
    )

测试:

启动服务器:

uvicorn SSE:app --port 5006

访问 127.0.0.1:5006/stream/sse 并在数据显示到一半时关闭标签页,查看终端打印信息:

image.png

异步生成器里写真实业务:

在真实项目里,不会只发数字,而会做查数据库,调用AI大模型,读取大文件,循环处理数据并逐条推送等操作。

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()


# ====================
# 真实业务:异步生成器
# ====================
async def real_business_generator(request: Request):
    try:
        # 模拟业务:循环 10 条消息
        for i in range(10):
            # 1. 检测客户端是否断开(必须写!防止服务器空跑)
            if await request.is_disconnected():
                print("客户端关闭,停止业务")
                return

            # ====================
            # 这里写你的真实业务
            # ====================
            # 1. 模拟查数据库
            # await database.query("select * from table")

            # 2. 模拟调用 AI 模型
            # await model.generate()

            # 3. 模拟读取文件
            # await file.read_chunk()

            # 模拟业务耗时
            await asyncio.sleep(0.5)

            # 构造业务 JSON
            data = {
                "code": 200,
                "id": i,
                "data": f"业务数据 {i}",
                "message": "流式推送成功"
            }

            # SSE 标准格式推送
            yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"

    # 客户端强制断开
    except asyncio.CancelledError:
        print("客户端断开连接")

    # 其他异常(不会崩服务)
    except Exception as e:
        print("服务异常:", e)
        yield f"data: {json.dumps({'code': 500, 'msg': '服务器错误'})}\n\n"


@app.get("/api/stream")
async def api_stream(request: Request):
    return StreamingResponse(
        real_business_generator(request),
        media_type="text/event-stream"
    )

SSE身份认证(鉴权,谁能连SSE)、定向推送(用户A发消息->只能发给用户B,单点推送):

SSE身份认证

SSE 只能 GET,不能在请求体带 token,靠 URL 参数 / Header 带用户 ID、Token 校验,非法连接直接拦截、不让建立长连接。

定向推送

用全局字典存在线用户:online = {用户ID: 当前生成器队列},用户连上 SSE 就把连接存字典,关闭就剔除;

新增一个发消息接口,传入target_uid和消息内容;

代码从字典找到目标用户的连接,单独yield推送,其他在线用户收不到。

SSE身份认证+定向推送 整体业务场景:

用户登录带参数连上 SSE → 服务认证通过、登记在线列表 → 别的用户调用发消息接口填目标 ID → 服务精准推送。

一对一聊天/定向推送(谁连上SSE,就把谁的用户ID和消息通道存起来,有人发消息,就找到对应的通道,把消息塞进去,对方立刻收到)。

示例代码

# SSE长连接接口:带身份user_id鉴权
@app.get("/stream/sse")
#   用户 101 打开浏览器 SSE 页面 → 创建专属消息队列 q,存入全局字典:online_user[101] = q
#   进到gen()死循环while True:
async def sse_conn(user_id: int,request: Request):
    # 简易鉴权:不存在用户直接拒绝
    # 用户 ID 不合法,不让连 SSE
    if(user_id <= 0):
        # FastAPI 写法,返回空流+403禁止访问(告诉浏览器,无权连接SSE)
        return StreamingResponse(iter([]),status_code = 403)

    #TODO   两个接口不靠函数调用通信,靠同一个全局队列对象 asyncio.Queue() 中转
    q = asyncio.Queue() # 初始化一个空的生成器队列(可以理解为一个专属消息信箱)
    # 来一条消息->放进信箱,用户SSE连接->一直盯着信箱,信箱有消息->立刻推送给浏览器
    online_user[user_id] = q
    print(f"用户{user_id}上线")

    async def gen():     #仅创建生成器,gen内部代码暂停,不执行
        try:
            while True: #之前写的代码没有while True是因为之前是服务器主动发,发5次就结束,不需要等别人发消息
                # 现在需要长期挂着长连接,等待外部接口随时发消息
                # 断线检测
                if await request.is_disconnected():
                    print(f"用户{user_id}下线")
                    # 用户断开连接 → 从在线列表删除
                    # None = 如果 key 不存在,不报错
                    online_user.pop(user_id,None)
                    return  # 这个 return 只是结束生成器函数 gen,断开 SSE 连接,不会返回任何数据到浏览器
                # 阻塞等待消息 = 等待信箱来消息(从 /send_msg 接口来)
                # 一个放,一个等
                # 队列空,代码停在这里不动; 有消息来,立刻拿到消息并接续。
                # 阻塞挂起,原地等待队列里进数据,没数据就卡在这一行不动,不占用 CPU
                msg = await q.get()
                # 把队列里的消息 → 包装成 SSE 格式 → 推送给浏览器
                yield f"data: {json.dumps(msg,ensure_ascii=False)}\n\n"
        except asyncio.CancelledError:  # 浏览器直接关掉标签页、断网,FastAPI 会抛出CancelledError,进入异常
            online_user.pop(user_id, None)
            print("客户端关闭连接")

    return StreamingResponse(
        gen(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"
        }
    )  #  # FastAPI开始遍历g,进入gen函数运行

# 2.推送接口:定向发给指定用户B(A调用这个接口发消息)
@app.post("/send_msg")
async def send_msg(tar_id: int,content: str):
    if tar_id not in online_user:
        return {"code": 400, "msg": "用户不在线"}
    # 丢进目标用户队列,实现单发
    await online_user[tar_id].put({"msg": content,"from_sys": True})
    return {"code": 200, "msg": "推送成功"}

#TODO 数据流:tar_id=101 → 拿到同一个队列:online_user[101] 就是上面 SSE 创建的那个q
# .put(数据):往队列里塞入消息

测试:

先启动服务器:

uvicorn sse_token:app --port 5007 

image.png

先访问127.0.0.1:5007/stream/sse?user_id=101等待数据传过来……

image.png

访问http://127.0.0.1:5007/docs,通过/send_msg接口上传数据:

image.png

image.png