WebSocket
@Slf4j
@Component
public class MyWebSocketHandler implements WebSocketHandler {
// 存储 sessionId
private static final Map<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
/**
* 接收 WebSocketSession 对象,即获取客户端信息、发送消息和接收消息的操作对象。
*
* @param session WebSocketSession 对象
* @return Mono<Void>
*/
@Override
@NonNull
public Mono<Void> handle(@NonNull WebSocketSession session) {
log.info("客户端连接成功,sessionId为:" + session.getId());
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
if (handshakeInfo.getUri().getPath().equals("/ws")) {
log.info("路径符合要求,允许连接");
session.send(Mono.just(session.textMessage("路径符合要求,允许连接"))).subscribeOn(Schedulers.boundedElastic()).subscribe();
} else {
log.info("路径不符合要求,拒绝连接");
session.send(Mono.just(session.textMessage("路径不符合要求,拒绝连接"))).subscribeOn(Schedulers.immediate()).subscribe();
return session.close();
}
SESSION_POOL.put(session.getId(), session);
Flux<WebSocketMessage> output = session
.receive() // 接收消息
.doOnNext(message -> {
log.info("接收到的消息为:" + message.getPayloadAsText());
})
.map(value -> session.textMessage("发送成功:" + value.getPayloadAsText()));
return session.send(output);
}
/**
* 发送消息
*
* @param sessionId 客户端唯一标识
* @param message 消息
*/
public void send(String sessionId, String message) {
WebSocketSession session = SESSION_POOL.get(sessionId);
if (session != null) {
session.send(Mono.just(session.textMessage(message))).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
}
/**
* 广播消息
*/
public void broadcast(String message) {
SESSION_POOL.forEach((sessionId, session) -> {
session.send(Mono.just(session.textMessage(message))).subscribeOn(Schedulers.boundedElastic()).subscribe();
});
}
}
Server
Client
Spring WebFlux为WebSocketClient
抽象提供了Reactor Netty,Tomcat,Jetty,Undertow和标准Java(即JSR-356)的实现。
要启动WebSocket
会话,可以通过创建客户端的实例并使用其execute
方法:
URI url = URI.create("wss://wetools.cc/s/websocket");
Mono<Void> execute = client.execute(url, session -> {
return session.receive()
.doOnNext(message -> {
System.out.println("服务端: " + message.getPayloadAsText());
})
.then();
});
execute.subscribe();
最后更新于
这有帮助吗?