SpringBoot实现SSE
本文将使用SpringBoot实现SSE(Server-Sent Events)。SSE是一种服务器到客户端的单向通信技术,适用于需要服务器主动推送消息的场景。与WebSocket相比,SSE更加简单轻量,但仅支持文本格式。
特性/技术 | SSE(Server-Sent Events) | WebSocket |
---|---|---|
通信方式 | 单向(服务器到客户端) | 双向(全双工) |
基于的协议 | HTTP | 初始握手后使用WebSocket协议 |
复杂性 | 简单 | 相对复杂 |
持久连接 | 是 | 是 |
自动重连 | 是 | 需要手动实现 |
轻量级 | 是 | 否(需要额外的握手过程) |
文本/二进制支持 | 文本(可使用JSON) | 文本和二进制 |
适用场景 | 服务器到客户端的单向通知 | 需要频繁双向通信的场景 |
扩展性 | 有限(基于HTTP) | 较好(支持跨域,多服务器) |
控制粒度 | 较低(服务器推送为主) | 较高(双向通信控制) |
环境准备
- Java版本:Java 17
- SpringBoot版本:3.2.5
- 所需依赖:
spring-boot-starter-web
、lombok
spring-boot-starter-web
依赖包含了实现SSE所需的所有基础功能。
项目结构
查看代码
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── cengxuyuan
│ │ │ ├── controller
│ │ │ │ └── SseController.java
│ │ │ ├── server
│ │ │ │ └── SseServer.java
│ │ ├── resources
│ │ │ └── templates
│ │ │ └── index.html
└── pom.xml
创建SSE服务器
首先,我们需要创建一个SseServer
类,用于管理SSE连接和消息发送。
java
@Component
@Slf4j
public class SseServer {
private static final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
private static final long DEFAULT_TIMEOUT = 0L; // 设置为0L则永不超时
private static final long RECONNECT_TIME = 5000L; // 重连时间
private static final long MESSAGE_EXPIRATION = 1 * 60 * 1000L; // 消息过期时间
// ... 类的其余部分
}
创建SSE连接
在SseServer
类中,创建一个方法createSse
,用于创建与指定用户的SSE连接:
查看代码
java
/**
* 创建与指定用户的SSE连接。
*
* @param uid 用户标识符
* @return SseEmitter对象,用于发送事件到客户端
*/
public SseEmitter createSse(String uid) {
// 创建SseEmitter实例,设置默认的超时时间为永不超时
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
// 设置连接完成时的回调,当连接结束时执行
sseEmitter.onCompletion(() -> {
log.info("[{}] 结束连接", uid);
// 从映射中移除结束的连接
sseEmitterMap.remove(uid);
});
// 设置连接超时时的回调,当连接超时执行
sseEmitter.onTimeout(() -> log.info("[{}] 连接超时", uid));
// 设置连接发生错误时的回调,当连接出现异常时执行
sseEmitter.onError(throwable -> {
log.info("[{}] 连接异常,{}", uid, throwable.toString());
try {
// 发送一个异常事件到客户端,包含重连时间
sseEmitter.send(SseEmitter.event()
.id(uid)
.name("发生异常!")
.data("发生异常请重试!")
.reconnectTime(RECONNECT_TIME));
// 将异常的SseEmitter重新放入映射中
sseEmitterMap.put(uid, sseEmitter);
} catch (IOException e) {
log.error("发送异常事件失败", e);
}
});
try {
// 初始化连接时发送一个事件,设置客户端的重连时间
sseEmitter.send(SseEmitter.event().reconnectTime(RECONNECT_TIME));
} catch (IOException e) {
log.error("初始化连接失败", e);
}
// 将新的SseEmitter放入映射中,以便后续可以通过用户标识符找到
sseEmitterMap.put(uid, sseEmitter);
log.info("[{}] 创建sse连接成功!", uid);
return sseEmitter;
}
发送消息
创建一个方法sendMessage
,用于给指定用户发送消息:
查看代码
java
/**
* 给指定用户发送消息。
*
* @param uid 用户标识符
* @param messageId 消息ID,用于标识消息
* @param message 消息内容
* @return 是否成功发送消息
*/
public boolean sendMessage(String uid, String messageId, String message) {
if (message == null || message.isEmpty()) {
// 如果消息为空,则记录日志并返回失败
log.info("参数异常,msg为null", uid);
return false;
}
// 从映射中获取与用户UID关联的SseEmitter
SseEmitter sseEmitter = sseEmitterMap.get(uid);
if (sseEmitter == null) {
// 如果未找到与用户UID关联的SseEmitter,则记录日志并返回失败
log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
return false;
}
try {
// 使用SseEmitter发送消息,设置消息ID和重连时间
sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(MESSAGE_EXPIRATION).data(message));
log.info("[{}] 消息id:{},推送成功:{}", uid, messageId, message);
return true;
} catch (IOException e) {
// 如果发送消息时发生异常,则从映射中移除该SseEmitter
sseEmitterMap.remove(uid);
log.info("[{}],消息id:{},推送异常:{}", uid, messageId, e.getMessage());
sseEmitter.complete();
return false;
}
}
断开连接
创建一个方法closeSse
,用于断开与指定用户的SSE连接:
查看代码
java
/**
* 断开与指定用户的SSE连接。
*
* @param uid 用户标识符
*/
public void closeSse(String uid) {
if (sseEmitterMap.containsKey(uid)) {
// 如果映射中包含与用户UID关联的SseEmitter,则关闭该SseEmitter并从映射中移除
SseEmitter sseEmitter = sseEmitterMap.get(uid);
sseEmitter.complete();
sseEmitterMap.remove(uid);
log.info("[{}] 连接已关闭", uid);
} else {
// 如果映射中未找到与用户UID关联的SseEmitter,则记录日志
log.info("[{}] 连接未找到", uid);
}
}
创建SSE控制器
接下来,创建SseController
类,用于处理前端的HTTP请求。
java
@RestController
public class SseController {
@Autowired
private SseServer sseServer;
// ... 控制器实现
}
创建SSE连接
在SseController
类中,创建一个方法createSseConnection
,用于创建与指定用户的SSE连接:
java
/**
* @return 需要返回SseEmitter
*/
@GetMapping("/createSse")
public SseEmitter createSseConnection(String uid) {
return sseServer.createSse(uid);
}
发送消息
创建一个方法getMessage
,用于给指定用户发送消息:
查看代码
java
@GetMapping("/getMessage")
public void getMessage(String uid) throws InterruptedException {
String text = "曾续缘这个名字富含深意。在中文里,“曾”通常表示过去的意思,“续缘”则意味着继续前缘,这里可以理解为继续某段关系或者追求某个目标。这个名字给人一种持之以恒、不断追求技术精进和人际联结的印象。从字面上看,曾续缘暗示着这位程序员可能对于技术有着不懈的追求,并且愿意与人建立并维持深厚的联系。在互联网行业中,技术更新迭代迅速,而“续缘”则表达了持续学习、不断进步的态度。同时,这个名字也体现了这位程序员可能重视团队合作,愿意与他人共同进步,创造更多的可能性。";
List<String> reply = new ArrayList<>();
int length = text.length();
for (int i = 0; i < length; i += 5) {
int end = Math.min(i + 5, length);
reply.add(text.substring(i, end));
}
for (int i = 0; i < reply.size(); i++) {
Thread.sleep(100);
sseServer.sendMessage(uid, "message" + i, reply.get(i));
}
}
断开连接
创建一个方法closeSseConnection
,用于断开与指定用户的SSE连接:
java
@GetMapping("/closeSse")
@ResponseBody
public void closeSseConnection(String uid) {
sseServer.closeSse(uid);
}
前端页面
创建一个简单的HTML页面,使用JavaScript的EventSource
与后端SSE服务器交互。
功能
- 创建SSE连接:
- 用户可以输入一个UID(用户标识符),或者让页面自动生成一个UID。
- 点击“创建SSE连接”按钮后,页面会使用JavaScript的
EventSource
对象尝试与服务器建立连接。 - 如果浏览器支持
EventSource
,则连接成功,否则会弹出一个警告框提示用户浏览器不支持SSE。
- 接收消息:
- 当服务器通过SSE连接发送消息时,客户端会接收这些消息。
- 接收到的消息会显示在页面上的“消息”区域。
- 关闭SSE连接:
- 用户可以点击“关闭SSE连接”按钮来关闭与服务器的连接。
- 连接关闭后,会弹出一个警告框确认连接已关闭。
查看代码
html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>SSE 示例</title>
</head>
<body>
<label for="uidInput">UID:</label>
<input type="text" id="uidInput" placeholder="请输入UID">
<button id="createSseConnectionBtn">创建SSE连接</button>
<button id="getMessageBtn">获取消息</button>
<button id="closeSseConnectionBtn">关闭SSE连接</button>
<div id="messages"></div>
<script>
let sseEmitter;
function createSseConnection() {
let uid = document.getElementById('uidInput').value.trim();
if (!uid) {
uid = UUID();
document.getElementById('uidInput').value = uid;
}
if (!!window.EventSource) {
sseEmitter = new EventSource(`/createSse?uid=${uid}`);
sseEmitter.onmessage = function(event) {
const messagesArea = document.getElementById('messages');
messagesArea.innerHTML += event.data;
};
sseEmitter.onerror = function(event) {
alert("EventSource failed: " + event);
sseEmitter.close();
};
} else {
alert('您的浏览器不支持SSE');
}
}
function getMessage() {
let uid = document.getElementById('uidInput').value.trim();
if (!uid) {
alert('请输入或生成UID');
return;
}
fetch(`/getMessage?uid=${uid}`, {
method: 'GET',
}).catch(error => {
alert('获取消息时出错');
console.log('获取消息时出错: ' + error);
});
}
function closeSseConnection() {
let uid = document.getElementById('uidInput').value.trim();
if (!uid) {
alert('请输入或生成UID');
return;
}
fetch(`/closeSse?uid=${uid}`, {
method: 'GET',
}).then(() => {
sseEmitter.close();
alert('SSE连接已关闭');
}).catch(error => {
alert('关闭SSE连接时出错');
console.log('关闭SSE连接时出错: ' + error);
});
}
function UUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
document.getElementById('createSseConnectionBtn').onclick = createSseConnection;
document.getElementById('getMessageBtn').onclick = getMessage;
document.getElementById('closeSseConnectionBtn').onclick = closeSseConnection;
</script>
</body>
</html>
测试
主要页面如下
在UID框中可以输入一个标识符,如果不输入任何内容,系统将自动生成一个随机的标识符。
点击创建SSE连接
,建立与服务器的连接。
点击获取消息
后,服务器将开始分段发送消息。
前端将以打字机效果显示服务器传来的信息,类似于ChatGPT的实时回复。
在服务器端,日志将记录每次消息发送的详细信息。