1、发送数据携带用户ID
携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信,需要定义一个实体实现Principal,实现getName()方法。
@Getter@Setterpublic class User implements Principal { private String username; private String password; private String role; private List<Url> urls; @Override public String getName() { return username; }}
定义用户拦截器做认证,并生成User,注入StompHeaderAccessor
/** *用户拦截器 **/public class UserInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if (raw instanceof Map) { //这里就是token Object name = ((Map) raw).get(Constants.TOKEN_KEY); if (name instanceof LinkedList) { // 设置当前访问器的认证用户 String token = ((LinkedList) name).get(0).toString(); String username = null; try { Map<String, Claim> claimMap = JWTUtils.verifyToken(token); username = claimMap.get("username").asString(); if(username == null){ throw new RuntimeException("websocket认证失败"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); throw new RuntimeException("websocket认证失败", e); } catch (ValidTokenException e) { e.printStackTrace(); throw new RuntimeException("websocket认证失败", e); } User user = new User(); user.setUsername(username); accessor.setUser(user);// User user = new User();// user.setUsername("lalala");// accessor.setUser(user); } } } return message; } @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) { } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { } @Override public boolean preReceive(MessageChannel channel) { return false; } @Override public Message<?> postReceive(Message<?> message, MessageChannel channel) { return null; } @Override public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { }}
/*将客户端渠道拦截器加入spring ioc容器*/ @Bean public UserInterceptor createUserInterceptor() { return new UserInterceptor(); }
服务端:
/** * 接收用户信息 * */ @MessageMapping(value = "/principal") public void test(Principal principal) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); }
客户端:
/** * 发送用户信息 * */ function send0() { stompClient.send("/app/principal", {}, {}); }
2、发送JSON数据体
服务端可以直接在函数中注入JavaBean或者Map,List或者String接收
服务端:
/*点对点通信*/ @MessageMapping(value = "/P2P") public void templateTest(Principal principal, Map<String,String> data) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); }
客户端:
/** * 发送JSON数据体 * */ function send() { stompClient.send("/app/P2P", {}, JSON.stringify({ 'name': 'test' })); }
3、将参数携带到发送请求的URL路径中
使用@DestinationVariable注解,类似SpringMVC的@PathVirable
服务端:
/** * 接收路径参数 * */ @MessageMapping(value = "/path/{name}/{company}") public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); }
客户端:
/** * 发送路径参数 * */ function send2() { stompClient.send("/app/path/zhangsan/XXX公司", {}, {}); }
4、发送header
使用@Header注解
服务端:
/** * 接收header参数 * */ @MessageMapping(value = "/header") public void headerTest(Principal principal, @Header String one, @Header String two) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); }
客户端:
/** * 发送header参数 * */ function send3() { stompClient.send("/app/header", {"one":"lalala", "two":"中国"}, {}); }
5、发送Httpsession中的数据
这里有一点儿小问题,我理解的是只能发送握手连接时的HttpSession中的数据
注册HttpSessionHandshakeIntercepror
/** * 注册stomp的端点 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域 // 在网页上我们就可以通过这个链接 // http://localhost:8080/webSocketServer // 来和服务器的WebSocket连接 registry.addEndpoint("/webSocketServer") .addInterceptors(new HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") .withSockJS(); }
服务端:
/** * 接收HttpSession数据 * */ @MessageMapping(value = "/httpsession") public void httpsession( StompHeaderAccessor accessor) { String name = (String) accessor.getSessionAttributes().get("name"); System.out.println(1111); }
客户端:
/** * 发送httpsession * */ function send4() { stompClient.send("/app/httpsession", {}, {}); }
6、所有代码
前端JS:
<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>Title</title> <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script> <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script> <script> var socket = new SockJS("http://192.168.100.88:7601/demo/webSocketServer"); var stompClient = Stomp.over(socket); window.onload = function () { connect(); } //订阅消息 function subscribe() { stompClient.subscribe('/user/queue/message', function (response) { console.log("/user/queue/message 你接收到的消息为:" + response); }); } /** * 发送用户信息 * */ function send0() { stompClient.send("/app/principal", {}, {}); } /** * 发送JSON数据体 * */ function send() { stompClient.send("/app/P2P", {}, JSON.stringify({ 'name': 'test' })); } /** * 发送路径参数 * */ function send2() { stompClient.send("/app/path/zhangsan/XXX公司", {}, {}); } /** * 发送header参数 * */ function send3() { stompClient.send("/app/header", {"one":"lalala", "two":"中国"}, {}); } /** * 发送httpsession * */ function send4() { stompClient.send("/app/httpsession", {}, {}); } // /** // * 发送URL中?&参数 // * */ // function send5() { // stompClient.send("/app/param?name=张三", {}, // {}); // } function connect() { stompClient.connect({ Authorization:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIxOTg1NjQxNjAsImlhdCI6MTUzMTg5NzUwMCwidXNlcm5hbWUiOiJ6cXcxMSJ9.VFR2EKUx5BTYLDkDogiLA9LfNVoPjOzQ3rTWoEy7He4" //这里可以改成token // name: 'admin' // 携带客户端信息 }, function connectCallback(frame) { // 连接成功时(服务器响应 CONNECTED 帧)的回调方法 alert("success"); subscribe(); }, function errorCallBack(error) { // 连接失败时(服务器响应 ERROR 帧)的回调方法 alert("error"); }); } function disconnect() { if (stompClient != null) { stompClient.disconnect(); }// setConnected(false); console.log("Disconnected"); } </script></head><body> <input type="text" id="info"/><button onclick="send5();">发送</button></body></html>
后端MessaeMapping处:
package com.iscas.biz.test.controller;import com.iscas.templet.common.ResponseEntity;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.*;import org.springframework.messaging.simp.SimpMessagingTemplate;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.simp.user.SimpUser;import org.springframework.messaging.simp.user.SimpUserRegistry;import org.springframework.web.bind.annotation.RestController;import java.security.Principal;import java.util.Map;/** * 如有要看例子,请打开注释 * **/@RestController@Slf4jpublic class WebSoketDemoController { //spring提供的发送消息模板 @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; /** * 接收用户信息 * */ @MessageMapping(value = "/principal") public void test(Principal principal) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); } /** * 接收数据体 * */ @MessageMapping(value = "/P2P") public void templateTest(Principal principal, Map<String,String> data) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); } /** * 接收路径参数 * */ @MessageMapping(value = "/path/{name}/{company}") public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); } /** * 接收header参数 * */ @MessageMapping(value = "/header") public void headerTest(Principal principal, @Header String one, @Header String two) { log.info("当前在线人数:" + userRegistry.getUserCount()); int i = 1; for (SimpUser user : userRegistry.getUsers()) { log.info("用户" + i++ + "---" + user); } //发送消息给指定用户 messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据"); } /** * 接收HttpSession数据 * */ @MessageMapping(value = "/httpsession") public void httpsession( StompHeaderAccessor accessor) { String name = (String) accessor.getSessionAttributes().get("name"); System.out.println(1111); }// /**// * 接收param数据// * */// @MessageMapping(value = "/param")// public void param(String name) {// System.out.println(1111);// } /*广播*/ @MessageMapping("/broadcast") @SendTo("/topic/getResponse") public ResponseEntity topic() throws Exception { return new ResponseEntity(200,"success"); }}
Websocket配置类:
package com.iscas.base.biz.config.stomp;import org.springframework.context.annotation.Bean;import org.springframework.messaging.simp.config.ChannelRegistration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.*;import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;/** * webscoket配置 ** @auth zhuquanwen * **///@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer { /** * 注册stomp的端点 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域 // 在网页上我们就可以通过这个链接 // http://localhost:8080/webSocketServer // 来和服务器的WebSocket连接 registry.addEndpoint("/webSocketServer") .addInterceptors(new HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") .withSockJS(); } /** * 配置信息代理 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // 订阅Broker名称 registry.enableSimpleBroker("/queue", "/topic"); // 全局使用的消息前缀(客户端订阅路径上会体现出来) registry.setApplicationDestinationPrefixes("/app"); // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/ registry.setUserDestinationPrefix("/user/"); } /** * 配置客户端入站通道拦截器 */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(createUserInterceptor()); } /*将客户端渠道拦截器加入spring ioc容器*/ @Bean public UserInterceptor createUserInterceptor() { return new UserInterceptor(); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.setMessageSizeLimit(500 * 1024 * 1024); registration.setSendBufferSizeLimit(1024 * 1024 * 1024); registration.setSendTimeLimit(200000); }}
用户拦截器
package com.iscas.base.biz.config.stomp;import com.auth0.jwt.interfaces.Claim;import com.iscas.base.biz.config.Constants;import com.iscas.base.biz.util.SpringUtils;import com.iscas.templet.exception.ValidTokenException;import com.iscas.base.biz.model.auth.User;import com.iscas.base.biz.util.JWTUtils;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.simp.SimpMessageHeaderAccessor;import org.springframework.messaging.simp.stomp.StompCommand;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.support.ChannelInterceptor;import org.springframework.messaging.support.MessageHeaderAccessor;import javax.servlet.http.HttpSession;import java.io.UnsupportedEncodingException;import java.util.LinkedList;import java.util.Map;/** *用户拦截器 **/public class UserInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if (raw instanceof Map) { //这里就是token Object name = ((Map) raw).get(Constants.TOKEN_KEY); if (name instanceof LinkedList) { // 设置当前访问器的认证用户// String token = ((LinkedList) name).get(0).toString();// String username = null;// try {// Map<String, Claim> claimMap = JWTUtils.verifyToken(token);// username = claimMap.get("username").asString();// if(username == null){// throw new RuntimeException("websocket认证失败");// }// } catch (UnsupportedEncodingException e) {// e.printStackTrace();// throw new RuntimeException("websocket认证失败", e);// } catch (ValidTokenException e) {// e.printStackTrace();// throw new RuntimeException("websocket认证失败", e);// }// User user = new User();// user.setUsername(username);// accessor.setUser(user); User user = new User(); user.setUsername("lalala"); accessor.setUser(user); } } } else if (StompCommand.SEND.equals(accessor.getCommand())) { //发送数据 } return message; } @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) { } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { } @Override public boolean preReceive(MessageChannel channel) { return false; } @Override public Message<?> postReceive(Message<?> message, MessageChannel channel) { return null; } @Override public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { }}
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/75774.html