springboot(二)使用@MessageMapping接收各种类型数据

1、发送数据携带用户ID携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信,需要定义一个实体实现Principal,实现getName()方法。

1、发送数据携带用户ID
携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信,需要定义一个实体实现Principal,实现getName()方法。

@Getter@Setterpublic class User implements Principal {    private String username;    private String password;    private String role;    private List<Url> urls;    @Override    public String getName() {        return username;    }}

定义用户拦截器做认证,并生成User,注入StompHeaderAccessor

/** *用户拦截器 **/public class UserInterceptor implements ChannelInterceptor {    @Override    public Message<?> preSend(Message<?> message, MessageChannel channel) {        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);        if (StompCommand.CONNECT.equals(accessor.getCommand())) {            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);            if (raw instanceof Map) {                //这里就是token                Object name = ((Map) raw).get(Constants.TOKEN_KEY);                if (name instanceof LinkedList) {                    // 设置当前访问器的认证用户                    String token = ((LinkedList) name).get(0).toString();                    String username = null;                    try {                        Map<String, Claim> claimMap = JWTUtils.verifyToken(token);                        username = claimMap.get("username").asString();                        if(username == null){                            throw new RuntimeException("websocket认证失败");                        }                    } catch (UnsupportedEncodingException e) {                        e.printStackTrace();                        throw new RuntimeException("websocket认证失败", e);                    } catch (ValidTokenException e) {                        e.printStackTrace();                        throw new RuntimeException("websocket认证失败", e);                    }                    User user = new User();                    user.setUsername(username);                    accessor.setUser(user);//                    User user = new User();//                    user.setUsername("lalala");//                    accessor.setUser(user);                }            }        }        return message;    }    @Override    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {    }    @Override    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {    }    @Override    public boolean preReceive(MessageChannel channel) {        return false;    }    @Override    public Message<?> postReceive(Message<?> message, MessageChannel channel) {        return null;    }    @Override    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {    }}
/*将客户端渠道拦截器加入spring ioc容器*/    @Bean    public UserInterceptor createUserInterceptor() {        return new UserInterceptor();    }

服务端:

/**     * 接收用户信息     * */    @MessageMapping(value = "/principal")    public void test(Principal principal) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }

客户端:

 /**         * 发送用户信息         * */        function send0() {            stompClient.send("/app/principal", {},                {});        }

2、发送JSON数据体
服务端可以直接在函数中注入JavaBean或者Map,List或者String接收

服务端:

/*点对点通信*/    @MessageMapping(value = "/P2P")    public void templateTest(Principal principal, Map<String,String> data) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }

客户端:

        /**         * 发送JSON数据体         * */        function send() {            stompClient.send("/app/P2P", {},                JSON.stringify({ 'name': 'test' }));        }

3、将参数携带到发送请求的URL路径中
使用@DestinationVariable注解,类似SpringMVC的@PathVirable

服务端:

/**     * 接收路径参数     * */    @MessageMapping(value = "/path/{name}/{company}")    public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }

客户端:

        /**         * 发送路径参数         * */        function send2() {            stompClient.send("/app/path/zhangsan/XXX公司", {},                {});        }

4、发送header
使用@Header注解

服务端:

/**     * 接收header参数     * */    @MessageMapping(value = "/header")    public void headerTest(Principal principal, @Header String one, @Header String two) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }

客户端:

        /**         * 发送header参数         * */        function send3() {            stompClient.send("/app/header", {"one":"lalala", "two":"中国"},                {});        }

5、发送Httpsession中的数据
这里有一点儿小问题,我理解的是只能发送握手连接时的HttpSession中的数据

注册HttpSessionHandshakeIntercepror

    /**     * 注册stomp的端点     */    @Override    public void registerStompEndpoints(StompEndpointRegistry registry) {        // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域        // 在网页上我们就可以通过这个链接        // http://localhost:8080/webSocketServer        // 来和服务器的WebSocket连接        registry.addEndpoint("/webSocketServer")                .addInterceptors(new HttpSessionHandshakeInterceptor())                .setAllowedOrigins("*")                .withSockJS();    }

服务端:

    /**     * 接收HttpSession数据     * */    @MessageMapping(value = "/httpsession")    public void httpsession( StompHeaderAccessor accessor) {        String name = (String) accessor.getSessionAttributes().get("name");        System.out.println(1111);    }

客户端:

        /**         * 发送httpsession         * */        function send4() {            stompClient.send("/app/httpsession", {},                {});        }

6、所有代码

前端JS:

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <title>Title</title>    <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>    <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>    <script>        var socket = new SockJS("http://192.168.100.88:7601/demo/webSocketServer");        var stompClient = Stomp.over(socket);        window.onload = function () {            connect();        }        //订阅消息        function subscribe() {            stompClient.subscribe('/user/queue/message', function (response) {                console.log("/user/queue/message 你接收到的消息为:" + response);            });        }        /**         * 发送用户信息         * */        function send0() {            stompClient.send("/app/principal", {},                {});        }        /**         * 发送JSON数据体         * */        function send() {            stompClient.send("/app/P2P", {},                JSON.stringify({ 'name': 'test' }));        }        /**         * 发送路径参数         * */        function send2() {            stompClient.send("/app/path/zhangsan/XXX公司", {},                {});        }        /**         * 发送header参数         * */        function send3() {            stompClient.send("/app/header", {"one":"lalala", "two":"中国"},                {});        }        /**         * 发送httpsession         * */        function send4() {            stompClient.send("/app/httpsession", {},                {});        }        // /**        //  * 发送URL中?&参数        //  * */        // function send5() {        //     stompClient.send("/app/param?name=张三", {},        //         {});        // }        function connect() {            stompClient.connect({                Authorization:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIxOTg1NjQxNjAsImlhdCI6MTUzMTg5NzUwMCwidXNlcm5hbWUiOiJ6cXcxMSJ9.VFR2EKUx5BTYLDkDogiLA9LfNVoPjOzQ3rTWoEy7He4"                    //这里可以改成token                    // name: 'admin' // 携带客户端信息                },                function connectCallback(frame) {                    // 连接成功时(服务器响应 CONNECTED 帧)的回调方法                    alert("success");                    subscribe();                },                function errorCallBack(error) {                    // 连接失败时(服务器响应 ERROR 帧)的回调方法                    alert("error");                });        }        function disconnect() {            if (stompClient != null) {                stompClient.disconnect();            }//            setConnected(false);            console.log("Disconnected");        }    </script></head><body>    <input type="text" id="info"/><button onclick="send5();">发送</button></body></html>

后端MessaeMapping处:

package com.iscas.biz.test.controller;import com.iscas.templet.common.ResponseEntity;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.*;import org.springframework.messaging.simp.SimpMessagingTemplate;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.simp.user.SimpUser;import org.springframework.messaging.simp.user.SimpUserRegistry;import org.springframework.web.bind.annotation.RestController;import java.security.Principal;import java.util.Map;/** * 如有要看例子,请打开注释 * **/@RestController@Slf4jpublic class WebSoketDemoController {    //spring提供的发送消息模板    @Autowired    private SimpMessagingTemplate messagingTemplate;    @Autowired    private SimpUserRegistry userRegistry;    /**     * 接收用户信息     * */    @MessageMapping(value = "/principal")    public void test(Principal principal) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }    /**     * 接收数据体    * */    @MessageMapping(value = "/P2P")    public void templateTest(Principal principal, Map<String,String> data) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }    /**     * 接收路径参数     * */    @MessageMapping(value = "/path/{name}/{company}")    public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }    /**     * 接收header参数     * */    @MessageMapping(value = "/header")    public void headerTest(Principal principal, @Header String one, @Header String two) {        log.info("当前在线人数:" + userRegistry.getUserCount());        int i = 1;        for (SimpUser user : userRegistry.getUsers()) {            log.info("用户" + i++ + "---" + user);        }        //发送消息给指定用户        messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");    }    /**     * 接收HttpSession数据     * */    @MessageMapping(value = "/httpsession")    public void httpsession( StompHeaderAccessor accessor) {        String name = (String) accessor.getSessionAttributes().get("name");        System.out.println(1111);    }//    /**//     * 接收param数据//     * *///    @MessageMapping(value = "/param")//    public void param(String name) {//        System.out.println(1111);//    }    /*广播*/    @MessageMapping("/broadcast")    @SendTo("/topic/getResponse")    public ResponseEntity topic() throws Exception {        return new ResponseEntity(200,"success");    }}

Websocket配置类:

package com.iscas.base.biz.config.stomp;import org.springframework.context.annotation.Bean;import org.springframework.messaging.simp.config.ChannelRegistration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.*;import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;/** * webscoket配置 ** @auth zhuquanwen * **///@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer {    /**     * 注册stomp的端点     */    @Override    public void registerStompEndpoints(StompEndpointRegistry registry) {        // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域        // 在网页上我们就可以通过这个链接        // http://localhost:8080/webSocketServer        // 来和服务器的WebSocket连接        registry.addEndpoint("/webSocketServer")                .addInterceptors(new HttpSessionHandshakeInterceptor())                .setAllowedOrigins("*")                .withSockJS();    }    /**     * 配置信息代理     */    @Override    public void configureMessageBroker(MessageBrokerRegistry registry) {        // 订阅Broker名称        registry.enableSimpleBroker("/queue", "/topic");        // 全局使用的消息前缀(客户端订阅路径上会体现出来)        registry.setApplicationDestinationPrefixes("/app");        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/        registry.setUserDestinationPrefix("/user/");    }    /**     * 配置客户端入站通道拦截器     */    @Override    public void configureClientInboundChannel(ChannelRegistration registration) {        registration.interceptors(createUserInterceptor());    }     /*将客户端渠道拦截器加入spring ioc容器*/    @Bean    public UserInterceptor createUserInterceptor() {        return new UserInterceptor();    }    @Override    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {        registration.setMessageSizeLimit(500 * 1024 * 1024);        registration.setSendBufferSizeLimit(1024 * 1024 * 1024);        registration.setSendTimeLimit(200000);    }}

用户拦截器

package com.iscas.base.biz.config.stomp;import com.auth0.jwt.interfaces.Claim;import com.iscas.base.biz.config.Constants;import com.iscas.base.biz.util.SpringUtils;import com.iscas.templet.exception.ValidTokenException;import com.iscas.base.biz.model.auth.User;import com.iscas.base.biz.util.JWTUtils;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.simp.SimpMessageHeaderAccessor;import org.springframework.messaging.simp.stomp.StompCommand;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.support.ChannelInterceptor;import org.springframework.messaging.support.MessageHeaderAccessor;import javax.servlet.http.HttpSession;import java.io.UnsupportedEncodingException;import java.util.LinkedList;import java.util.Map;/** *用户拦截器 **/public class UserInterceptor implements ChannelInterceptor {    @Override    public Message<?> preSend(Message<?> message, MessageChannel channel) {        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);        if (StompCommand.CONNECT.equals(accessor.getCommand())) {            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);            if (raw instanceof Map) {                //这里就是token                Object name = ((Map) raw).get(Constants.TOKEN_KEY);                if (name instanceof LinkedList) {                    // 设置当前访问器的认证用户//                    String token = ((LinkedList) name).get(0).toString();//                    String username = null;//                    try {//                        Map<String, Claim> claimMap = JWTUtils.verifyToken(token);//                        username = claimMap.get("username").asString();//                        if(username == null){//                            throw new RuntimeException("websocket认证失败");//                        }//                    } catch (UnsupportedEncodingException e) {//                        e.printStackTrace();//                        throw new RuntimeException("websocket认证失败", e);//                    } catch (ValidTokenException e) {//                        e.printStackTrace();//                        throw new RuntimeException("websocket认证失败", e);//                    }//                    User user = new User();//                    user.setUsername(username);//                    accessor.setUser(user);                    User user = new User();                    user.setUsername("lalala");                    accessor.setUser(user);                }            }        } else if (StompCommand.SEND.equals(accessor.getCommand())) {            //发送数据        }        return message;    }    @Override    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {    }    @Override    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {    }    @Override    public boolean preReceive(MessageChannel channel) {        return false;    }    @Override    public Message<?> postReceive(Message<?> message, MessageChannel channel) {        return null;    }    @Override    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {    }}

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/75774.html

(0)

相关推荐