SSE接入的具体落地的几种方案
Published on May 31, 2025, with 26 view(s) and 0 comment(s)
Ai 摘要:本文介绍了三种实现SSE(Server-Sent Events)接入的方案:EventSource、XMLHttpRequest和Fetch API。EventSource是HTML5专为SSE设计的接口,提供自动重连和事件监听功能,适合简单场景;XMLHttpRequest作为传统方案需要手动解析事件流;Fetch API结合流处理更灵活但需自行实现重连逻辑。文章详细对比了三者的特性差异,并提供了客户端/服务端代码示例,特别包含微信小程序通过分块传输实现SSE的方案。推荐根据项目需求选择:优先EventSource,复杂需求考虑Fetch,兼容性要求高时用XMLHttpRequest。

EventSource

EventSource 是 HTML5 提供的用于接收服务器推送事件(Server-Sent Events)的接口,它允许服务器通过 HTTP 连接向客户端推送数据,实现服务器到客户端的单向实时通信。用于接收来自服务器的事件流。与 WebSocket 不同,EventSource 是单向通信(服务器到客户端),基于普通的 HTTP 协议。

 

主要特点

  • 简单易用:基于 HTTP 协议,不需要特殊协议
  • 自动重连:内置连接断开后的重连机制
  • 事件驱动:支持不同类型的事件监听
  • 文本传输:适合传输文本数据(如 JSON)

基本用法

1. 创建 EventSource 连接

const eventSource = new EventSource('事件流URL');

2. 监听消息事件

eventSource.onmessage = function(event) {
  console.log('新消息:', event.data);
};

3. 监听自定义事件

eventSource.addEventListener('customEvent',
    function (event) {
        console.log('自定义事件:', event.data);
    }
);

4. 错误处理

eventSource.onerror = function() {
   console.error('连接出错');
};

事件流格式

服务器返回的事件流应该是 text/event-stream 格式,每个消息由一对换行符分隔。

示例格式:

event: notification 
data: 这是一条通知消息 
data: 这是一条普通消息 
data: 这是一条注释

API 详解

构造函数

new EventSource(url, configuration);

  • url:事件源URL
  • configuration(可选):配置对象,目前只有一个属性:
    • withCredentials:布尔值,表示是否应发送凭据(如cookie)

属性

  • readyState:只读,连接状态
    • 0 (CONNECTING):连接尚未建立
    • 1 (OPEN):连接已建立
    • 2 (CLOSED):连接已关闭
  • url:只读,事件源的URL
  • withCredentials:是否使用凭证
  • lastEventId:只读,最后接收到的事件的ID

方法

  • close():关闭连接

事件

  • onopen:连接打开时触发
  • onmessage:接收到消息时触发
  • onerror:发生错误时触发

服务器实现要求

服务器响应需要设置以下头部:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

浏览器兼容性

EventSource 在所有现代浏览器中都得到支持,包括:

  • Chrome 6+
  • Firefox 6+
  • Safari 5+
  • Edge 79+
  • Opera 11+

注意:Internet Explorer 不支持 EventSource。

示例代码

客户端代码示例

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width,  initial-scale=1.0" />
    <title>Document</title>
</head>

<body>
    <script>
        const eventSource = new EventSource("http://localhost:3000/events");
        eventSource.onmessage = function (e) {
            const data = JSON.parse(e.data);
            console.log("收到消息:", data);
        };
        eventSource.addEventListener("statusUpdate", function (e) {
            console.log("状态更新:", e.data);
        });
        eventSource.onerror = function () {
            console.log("连接错误,尝试重连...");
        };
    </script>
</body>

</html>

服务器端示例(Node.js)

const http = require('http');

http.createServer((req, res) => {
    if (req.url === '/events') {
        res.writeHead(200, {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Credentials': 'true'
        });
        
        // 每秒发送一次消息
        setInterval(() => {
            res.write(`data: ${JSON.stringify({time: new Date()})}\n\n`);
        }, 1000);
    }
}).listen(3000);

console.log('Server running on http://localhost:3000');

XMLHttpRequest

XMLHttpRequest (XHR) 是一个 JavaScript API,用于在浏览器和服务器之间传输数据。它允许网页在不刷新整个页面的情况下从服务器获取数据,这是 AJAX (Asynchronous JavaScript and XML) 编程的核心技术。

基本特性

  • 异步通信:可以在后台发送和接收数据,不阻塞用户界面
  • 支持多种协议:HTTP、HTTPS、file、ftp等
  • 支持多种数据格式:XML、JSON、HTML、纯文本等
  • 跨域请求:通过 CORS (Cross-Origin Resource Sharing) 支持跨域请求

基本用法

const xhr = new XMLHttpRequest();
xhr.open('GET', 'https://api.example.com/data', true); // 异步请求

xhr.onload = function() {
    if (xhr.status === 200) {
        console.log(xhr.responseText);
    } else {
        console.error('请求失败:', xhr.status);
    }
};

xhr.onerror = function() {
    console.error('请求出错');
};

xhr.send();

接入SSE示例

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
</head>
<body>
    <script>
        function setupSSEWithXHR(url, callback) {
            const xhr = new XMLHttpRequest();
            xhr.open("GET", url, true);
            xhr.setRequestHeader("Accept", "text/event-stream");
            
            let buffer = "";
            
            xhr.onprogress = function () {
                const newData = xhr.responseText.substring(buffer.length);
                buffer = xhr.responseText;
                
                const lines = newData.split("\n");
                for (const line of lines) {
                    if (line.startsWith("data:")) {
                        const data = line.substring(5).trim();
                        callback(data);
                    }
                }
            };
            
            xhr.onerror = function () {
                console.error("SSE连接出错");
            };
            
            xhr.send();
            
            return {
                close: function () {
                    xhr.abort();
                },
            };
        }
        
        const sseConnection = setupSSEWithXHR(
            "http://localhost:3000/events",
            function (data) {
                console.log("收到数据:", data);
            }
        );
        
        // 注意:这里立即关闭了连接,实际使用时可能需要延迟或条件关闭
        sseConnection.close();
    </script>
</body>
</html>

Fetch

Fetch API 是现代浏览器提供的用于替代传统 XMLHttpRequest 的网络请求接口,它基于 Promise 设计,提供了更强大、更灵活的功能集。

基本语法结构

fetch('https://api.example.com/data')
    .then(response => response.json())
    .then(data => console.log(data))
    .catch(error => console.error('Error:', error));

接入SSE示例

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Enhanced SSE Client</title>
</head>
<body>
    <div id="status">连接状态: 未连接</div>
    <div id="messages"></div>
    <button id="connect">连接</button>
    <button id="disconnect">断开</button>
    
    <script>
        class SSEClient {
            constructor(url, options = {}) {
                this.url = url;
                this.options = {
                    reconnectInterval: 3000,
                    maxReconnectAttempts: 5,
                    ...options
                };
                this.reconnectAttempts = 0;
                this.isConnected = false;
                this.abortController = null;
            }
            
            async connect(callback) {
                if (this.isConnected) {
                    console.log('Already connected');
                    return;
                }
                
                this.abortController = new AbortController();
                
                try {
                    this.updateStatus('连接中...');
                    
                    const response = await fetch(this.url, {
                        headers: {
                            Accept: "text/event-stream",
                            "Cache-Control": "no-cache"
                        },
                        signal: this.abortController.signal
                    });
                    
                    if (!response.ok) {
                        throw new Error(`HTTP error! status: ${response.status}`);
                    }
                    
                    if (!response.body) {
                        throw new Error("ReadableStream not supported");
                    }
                    
                    this.isConnected = true;
                    this.reconnectAttempts = 0;
                    this.updateStatus('已连接');
                    
                    const reader = response.body.getReader();
                    const decoder = new TextDecoder();
                    let buffer = "";
                    
                    while (this.isConnected) {
                        const { done, value } = await reader.read();
                        
                        if (done) {
                            console.log("Stream completed");
                            break;
                        }
                        
                        buffer += decoder.decode(value, { stream: true });
                        
                        const lines = buffer.split("\n");
                        buffer = lines.pop();
                        
                        for (const line of lines) {
                            if (line.startsWith("data:")) {
                                const data = line.substring(5).trim();
                                if (data) {
                                    callback(data);
                                }
                            }
                        }
                    }
                } catch (error) {
                    if (error.name === 'AbortError') {
                        console.log('Connection aborted');
                        return;
                    }
                    
                    console.error("SSE Error:", error);
                    this.updateStatus(`连接错误: ${error.message}`);
                    
                    // 自动重连
                    if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
                        this.reconnectAttempts++;
                        this.updateStatus(`重连中... (${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
                        
                        setTimeout(() => {
                            this.connect(callback);
                        }, this.options.reconnectInterval);
                    } else {
                        this.updateStatus('连接失败,已达到最大重试次数');
                    }
                } finally {
                    this.isConnected = false;
                }
            }
            
            disconnect() {
                if (this.abortController) {
                    this.abortController.abort();
                }
                this.isConnected = false;
                this.updateStatus('已断开');
            }
            
            updateStatus(status) {
                const statusEl = document.getElementById('status');
                if (statusEl) {
                    statusEl.textContent = `连接状态: ${status}`;
                }
            }
        }
        
        // 使用示例
        const sseClient = new SSEClient("http://localhost:3000/events");
        
        function handleMessage(data) {
            console.log("Received:", data);
            
            // 显示消息
            const messagesEl = document.getElementById('messages');
            const messageEl = document.createElement('div');
            messageEl.textContent = `${new Date().toLocaleTimeString()}: ${data}`;
            messagesEl.appendChild(messageEl);
        }
        
        // 按钮事件
        document.getElementById('connect').addEventListener('click', () => {
            sseClient.connect(handleMessage);
        });
        
        document.getElementById('disconnect').addEventListener('click', () => {
            sseClient.disconnect();
        });
    </script>
</body>
</html>

 

对比汇总

  特性EventSourceFetch + StreamXMLHttpRequest
  设计目的专为 SSE 设计通用网络请求传统网络请求
  API 风格事件监听Promise + 流回调式
  自动重连✅ 内置❌ 需手动实现❌ 需手动实现
  事件解析✅ 自动❌ 手动实现❌ 手动实现
  自定义头部❌ 不可用✅ 完全支持✅ 支持
  HTTP 方法❌ 仅 GET✅ 所有方法✅ 所有方法
  二进制数据❌ 不支持✅ 支持✅ 支持
  进度事件❌ 不支持❌ 不支持✅ 支持
  超时控制❌ 不支持✅ 通过 Abort✅ 内置
  CORS 灵活性一般一般
  内存管理✅ 自动❌ 手动管理❌ 手动管理
  浏览器支持IE ❌ 其他 ✅较新浏览器 ✅所有浏览器 ✅
  实现复杂度非常简单中等复杂
  推荐指数⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

 

微信小程序中接入示例

const requestTask = wx.request({
    enableChunked: true,
    url: `${baseUrl}${apiPath}`,
    header: {
        ...headers,
        "content-type": "application/json",
    },
    responseType: "arraybuffer",
    method: "POST",
    data: {},
    complete: function (res) {
        console.log(res);
    },
});

requestTask.onChunkReceived((r) => {
    const arrayBuffer = r.data;
    const uint8Array = new Uint8Array(arrayBuffer);
    const data = new TextDecoder("utf-8").decode(uint8Array);
    
    data.split("data:").forEach((ele) => {
        console.log("ele", ele);
    });
});