WebSocket

@Slf4j
@Component
public class MyWebSocketHandler implements WebSocketHandler {
    // 存储 sessionId
    private static final Map<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();

    /**
     * 接收 WebSocketSession 对象,即获取客户端信息、发送消息和接收消息的操作对象。
     *
     * @param session WebSocketSession 对象
     * @return Mono<Void>
     */
    @Override
    @NonNull
    public Mono<Void> handle(@NonNull WebSocketSession session) {
        log.info("客户端连接成功,sessionId为:" + session.getId());
        HandshakeInfo handshakeInfo = session.getHandshakeInfo();
        if (handshakeInfo.getUri().getPath().equals("/ws")) {
            log.info("路径符合要求,允许连接");
            session.send(Mono.just(session.textMessage("路径符合要求,允许连接"))).subscribeOn(Schedulers.boundedElastic()).subscribe();
        } else {
            log.info("路径不符合要求,拒绝连接");
            session.send(Mono.just(session.textMessage("路径不符合要求,拒绝连接"))).subscribeOn(Schedulers.immediate()).subscribe();
            return session.close();
        }

        SESSION_POOL.put(session.getId(), session);

        Flux<WebSocketMessage> output = session
                .receive()   // 接收消息
                .doOnNext(message -> {
                    log.info("接收到的消息为:" + message.getPayloadAsText());
                })
                .map(value -> session.textMessage("发送成功:" + value.getPayloadAsText()));
        return session.send(output);
    }

    /**
     * 发送消息
     *
     * @param sessionId 客户端唯一标识
     * @param message   消息
     */
    public void send(String sessionId, String message) {
        WebSocketSession session = SESSION_POOL.get(sessionId);
        if (session != null) {
            session.send(Mono.just(session.textMessage(message))).subscribeOn(Schedulers.boundedElastic()).subscribe();
        }
    }

    /**
     * 广播消息
     */
    public void broadcast(String message) {
        SESSION_POOL.forEach((sessionId, session) -> {
            session.send(Mono.just(session.textMessage(message))).subscribeOn(Schedulers.boundedElastic()).subscribe();
        });
    }
}

Server

Client

Spring WebFlux为WebSocketClient抽象提供了Reactor Netty,Tomcat,Jetty,Undertow和标准Java(即JSR-356)的实现。

要启动WebSocket会话,可以通过创建客户端的实例并使用其execute方法:

URI url = URI.create("wss://wetools.cc/s/websocket");
Mono<Void> execute = client.execute(url, session -> {
    return session.receive()
            .doOnNext(message -> {
                System.out.println("服务端: " + message.getPayloadAsText());
            })
            .then();
});
execute.subscribe();

最后更新于

这有帮助吗?