如何使用 Server-Sent Events (SSE):完整指南 (2026)
从服务器到客户端的实时流传输详解
开始使用 Hypereal 构建
通过单个 API 访问 Kling、Flux、Sora、Veo 等。免费积分开始,扩展到数百万。
无需信用卡 • 10万+ 开发者 • 企业级服务
如何使用服务器发送事件 (SSE):完整指南 (2026)
服务器发送事件 (Server-Sent Events, SSE) 为服务器提供了一种简单且标准化的方式,可通过单个 HTTP 连接向 Web 客户端推送实时更新。与 WebSockets 不同,SSE 是单向的(仅限服务器到客户端),采用纯 HTTP 协议,无需特殊配置即可穿透代理和防火墙,并能自动处理断线重连。这使得 SSE 成为实时馈送、AI Token 流式传输、通知和仪表盘的理想选择。
SSE vs WebSockets vs 轮询 (Long Polling)
在深入实现之前,了解 SSE 适用场景会有所帮助:
| 特性 | SSE | WebSocket | 长轮询 (Long Polling) |
|---|---|---|---|
| 方向 | 服务器到客户端 (单向) | 双向 | 服务器到客户端 |
| 协议 | HTTP | WS/WSS | HTTP |
| 自动重连 | 内置支持 | 需手动实现 | 需手动实现 |
| 二进制数据 | 否(仅限文本) | 是 | 是 |
| 浏览器支持 | 所有现代浏览器 | 所有现代浏览器 | 所有浏览器 |
| 代理友好 | 是 | 有时存在兼容问题 | 是 |
| 连接开销 | 低 (单个 HTTP) | 低 (单个 TCP) | 高 (重复 HTTP 请求) |
| 最佳场景 | 动态、通知、AI 流式传输 | 聊天、游戏、协作工具 | 遗留系统兼容 |
当你只需要服务器向客户端发送更新、希望获得自动重连功能、且相比双向通信更追求简洁性时,请使用 SSE。
SSE 工作原理
SSE 协议非常简单:
- 客户端发起一个标准的 HTTP GET 请求,请求头包含
Accept: text/event-stream。 - 服务器响应
Content-Type: text/event-stream并保持连接开启。 - 服务器以纯文本形式发送事件,每个事件由两个换行符分隔。
- 如果连接断开,浏览器会自动尝试重新连接。
SSE 消息格式
每条 SSE 消息由一个或多个字段组成,每个字段占据一行:
event: message_type
id: unique_id_123
retry: 5000
data: {"text": "Hello, world"}
各字段说明:
data:—— 消息负载(必需)。多个data:行会被合并并以换行符分隔。event:—— 命名的事件类型(可选)。默认为"message"。id:—— 唯一的事件 ID(可选)。用于重连。retry:—— 重连间隔,单位为毫秒(可选)。
空行(双换行符 \n\n)表示一条事件的结束。
服务端实现
Node.js (Express)
const express = require("express");
const app = express();
app.get("/events", (req, res) => {
// 设置 SSE 响应头
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("Access-Control-Allow-Origin", "*");
// 立即刷新响应头
res.flushHeaders();
// 每 2 秒发送一次事件
let counter = 0;
const interval = setInterval(() => {
counter++;
const data = JSON.stringify({
time: new Date().toISOString(),
count: counter
});
res.write(`id: ${counter}\n`);
res.write(`event: tick\n`);
res.write(`data: ${data}\n\n`);
}, 2000);
// 客户端断开连接时清理资源
req.on("close", () => {
clearInterval(interval);
res.end();
console.log("Client disconnected");
});
});
app.listen(3000, () => {
console.log("SSE server running on http://localhost:3000");
});
Python (FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from datetime import datetime
app = FastAPI()
async def event_generator():
counter = 0
while True:
counter += 1
data = json.dumps({
"time": datetime.now().isoformat(),
"count": counter
})
yield f"id: {counter}\nevent: tick\ndata: {data}\n\n"
await asyncio.sleep(2)
@app.get("/events")
async def stream_events():
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
Python (Flask)
from flask import Flask, Response
import json
import time
from datetime import datetime
app = Flask(__name__)
def generate_events():
counter = 0
while True:
counter += 1
data = json.dumps({
"time": datetime.now().isoformat(),
"count": counter
})
yield f"id: {counter}\nevent: tick\ndata: {data}\n\n"
time.sleep(2)
@app.route("/events")
def stream():
return Response(
generate_events(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # 禁用 nginx 缓冲
}
)
客户端实现
浏览器 (EventSource API)
浏览器提供了内置的 EventSource API,用于处理连接管理、自动重连和事件解析:
// 连接到 SSE 接口
const eventSource = new EventSource("http://localhost:3000/events");
// 监听默认的 "message" 事件
eventSource.onmessage = (event) => {
console.log("Message:", event.data);
};
// 监听命名事件
eventSource.addEventListener("tick", (event) => {
const data = JSON.parse(event.data);
console.log(`Tick #${data.count} at ${data.time}`);
});
// 处理连接建立
eventSource.onopen = () => {
console.log("Connection established");
};
// 处理错误和重连
eventSource.onerror = (error) => {
console.error("SSE error:", error);
if (eventSource.readyState === EventSource.CLOSED) {
console.log("Connection was closed");
} else {
console.log("Reconnecting...");
}
};
// 完成后关闭连接
// eventSource.close();
SSE 专用的 React Hook
import { useEffect, useState, useCallback } from "react";
interface SSEOptions {
url: string;
eventName?: string;
onError?: (error: Event) => void;
}
function useSSE<T>(options: SSEOptions) {
const [data, setData] = useState<T | null>(null);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
const eventSource = new EventSource(options.url);
eventSource.onopen = () => setIsConnected(true);
eventSource.onerror = (error) => {
setIsConnected(false);
options.onError?.(error);
};
const handler = (event: MessageEvent) => {
try {
const parsed = JSON.parse(event.data) as T;
setData(parsed);
} catch {
setData(event.data as unknown as T);
}
};
if (options.eventName) {
eventSource.addEventListener(options.eventName, handler);
} else {
eventSource.onmessage = handler;
}
return () => {
eventSource.close();
};
}, [options.url, options.eventName]);
return { data, isConnected };
}
// 在组件中使用
function LiveDashboard() {
const { data, isConnected } = useSSE<{ time: string; count: number }>({
url: "/events",
eventName: "tick",
});
return (
<div>
<p>状态: {isConnected ? "已连接" : "重连中..."}</p>
{data && (
<p>计数: {data.count} | 时间: {data.time}</p>
)}
</div>
);
}
实际应用案例:AI Token 流式传输
2026 年 SSE 最常见的用途之一是流式接收大型语言模型 (LLM) API 生成的 Token。以下是其实现方式:
服务端 (AI API 代理)
app.post("/api/chat", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.flushHeaders();
try {
const aiResponse = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.OPENAI_API_KEY}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
model: "gpt-4o",
messages: req.body.messages,
stream: true
})
});
const reader = aiResponse.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 直接转发 SSE 数据
res.write(chunk);
}
res.write("data: [DONE]\n\n");
res.end();
} catch (error) {
res.write(`event: error\ndata: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
客户端 (使用 fetch 进行流式传输)
对于 POST 请求(EventSource 并不支持),请使用 Fetch API 配合可读流 (readable stream):
async function streamChat(messages) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") return;
try {
const parsed = JSON.parse(data);
const token = parsed.choices?.[0]?.delta?.content;
if (token) {
document.getElementById("output").textContent += token;
}
} catch {
// 跳过非 JSON 行
}
}
}
}
}
常见问题排查
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 事件成批达到 | 代理缓存(如 nginx, CloudFlare) | 添加 X-Accel-Buffering: no 响应头;禁用代理缓冲 |
| 连接在 60 秒后中断 | 服务端或代理超时 | 每 30 秒发送一行注释(如 :keepalive\n\n) |
| 无法自动重连 | 使用 fetch 代替了 EventSource |
GET 请求请使用 EventSource;POST 请求需手动实现重试 |
| CORS 错误 | 缺少响应头 | 在服务端添加 Access-Control-Allow-Origin |
| 重连时出现重复事件 | 未使用事件 ID | 包含 id: 字段;服务端跟踪 Last-Event-ID 请求头 |
心跳 (Keep-Alive) 模式
为了防止代理超时,建议发送周期性的注释行:
// 服务端心跳
const keepAlive = setInterval(() => {
res.write(": keepalive\n\n");
}, 30000);
req.on("close", () => {
clearInterval(keepAlive);
});
最佳实践
- 始终包含事件 ID,以便客户端在重连后可以从中断处恢复。
- 通过
retry:字段设置合理的重试间隔(默认通常为 3 秒)。 - 每 15-30 秒发送心跳注释,以防止代理超时。
- 使用命名事件,以便在同一流中区分不同类型的消息。
- 处理背压 (Backpressure),在写入前检查客户端是否仍处于连接状态。
- 限制每个用户的并发连接数,以避免资源耗尽。
结论
服务器发送事件 (SSE) 为实现跨标准的实时服务端到客户端通信提供了一种简洁的方法。在单向流式传输方面,它们比 WebSockets 更简单,且浏览器的 EventSource API 能够自动处理重连。随着 AI 流式传输 API 的兴起,SSE 已成为每位开发者工具箱中必不可少的工具。
如果您正在构建流式传输 AI 生成内容的应用程序——例如实时视频生成进度更新、渐进式图像渲染或实时转录反馈——Hypereal AI 提供了兼容流式传输的 API,支持视频生成、数字人播报和图像合成,并内置了用于跟踪生成进度的 SSE 支持。
