系统教程 · 2024年6月5日

websocket+redis动态订阅和动态取消订阅的实现示例

websocket+redis动态订阅和动态取消订阅的实现示例

收藏

怎么入门数据库编程?需要学习哪些知识点?这是新手们刚接触编程时常见的问题;下面golang学习网就来给大家整理分享一些知识点,希望能够给初学者一些帮助。本篇文章就来介绍《websocket+redis动态订阅和动态取消订阅的实现示例》,涉及到websocketredis、动态订阅,有需要的可以收藏一下

原理

websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。

订阅频道消息格式:

{
    "cmd":"subscribe",
    "topic":[
        "topic_name"
    ]
}

模糊订阅格式

{
    "cmd":"psubscribe",
    "topic":[
        "topic_name"
    ]
}

取消订阅格式

{
    "cmd":"unsubscribe",
    "topic":[
        "topic_name"
    ]
}

两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。

redis订阅监听类

package com.curtain.core;

import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

import java.util.Arrays;

/**
 * @Author Curtain
 * @Date 2021/6/7 14:27
 * @Description
 */
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
    private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
    private Jedis jedis;

    //订阅
    public void subscribe(String... channels) {
        jedis = jedisPool.getResource();
        try {
            jedis.subscribe(this, channels);
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到异常后关闭连接重新订阅
            log.info("监听遇到异常,四秒后重新订阅频道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            subscribe(channels);
        }
    }

    //模糊订阅
    public void psubscribe(String... channels) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.psubscribe(this, channels);
        } catch (ArithmeticException e) {//取消订阅故意造成的异常
            if (jedis != null)
                jedis.close();
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到异常后关闭连接重新订阅
            log.info("监听遇到异常,四秒后重新订阅频道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            psubscribe(channels);
        }
    }

    public void unsubscribeAndClose(String... channels){
        unsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }

    public void punsubscribeAndClose(String... channels){
        punsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId());
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId());
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, pattern);
        WebSocketServer.publish(message, channel);

    }

    @Override
    public void onMessage(String channel, String message) {
        log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, channel);
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        log.info("unsubscribe redis channel:" + channel);
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        log.info("punsubscribe redis channel:" + pattern);
    }
}

1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。

webSocket订阅推送类

这个类会有两个ConcurrentHashMap >类型类变量,分别存储订阅和模糊订阅的信息。

外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。

还有个ConcurrentHashMap 类型的变量,存储的是事件-RedisPubSub,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。

信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。

package com.curtain.core;

import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @Author Curtain
 * @Date 2021/5/14 16:49
 * @Description
 */
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static volatile ConcurrentHashMap
   
    > webSocketMap = new ConcurrentHashMap();
    /**
     * 存放psub的事件
     **/
    private static volatile ConcurrentHashMap
    
     > pWebSocketMap = new ConcurrentHashMap();     /**      * 存放topic(pattern)-对应的RedisPubsub      */     private static volatile ConcurrentHashMap
     
       redisPubSubMap = new ConcurrentHashMap();     /**      * 与某个客户端的连接会话,需要通过它来给客户端发送数据      */     private Session session;     private String sessionId = "";     //要注入的对象     private static TaskExecuteService executeService;     private static WebsocketProperties properties;     private Cancelable cancelable;     @Autowired     public void setTaskExecuteService(TaskExecuteService taskExecuteService) {         WebSocketServer.executeService = taskExecuteService;     }     @Autowired     public void setWebsocketProperties(WebsocketProperties properties) {         WebSocketServer.properties = properties;     }     /**      * 连接建立成功调用的方法      */     @OnOpen     public void onOpen(Session session) {         this.session = session;         this.sessionId = session.getId();         //构造推送数据         Map pubHeader = new HashMap();         pubHeader.put("name", "connect_status");         pubHeader.put("type", "create");         pubHeader.put("from", "pubsub");         pubHeader.put("time", new Date().getTime() / 1000);         Map pubPayload = new HashMap();         pubPayload.put("status", "success");         Map pubMap = new HashMap();         pubMap.put("header", pubHeader);         pubMap.put("payload", pubPayload);         sendMessage(JSON.toJSONString(pubMap));         cancelable = executeService.runPeriodly(() -> {             try {                 if (cancelable != null && !session.isOpen()) {                     log.info("断开连接,停止发送ping");                     cancelable.cancel();                 } else {                     String data = "ping";                     ByteBuffer payload = ByteBuffer.wrap(data.getBytes());                     session.getBasicRemote().sendPing(payload);                 }             } catch (IOException e) {                 e.printStackTrace();             }         }, properties.getPeriod());     }     @OnMessage     public void onMessage(String message) {         synchronized (session) {             Map msgMap = (Map) JSON.parse(message);             String cmd = (String) msgMap.get("cmd");             //订阅消息             if ("subscribe".equals(cmd)) {                 List
      
        topics = (List
       
        ) msgMap.get("topic");                 //本地记录订阅信息                 for (int i = 0; i map = new ConcurrentHashMap();                         map.put(this.sessionId, this);                         webSocketMap.put(topic, map);                         new Thread(() -> {                             RedisPubSub redisPubSub = new RedisPubSub();                             //存入map                             redisPubSubMap.put(topic, redisPubSub);                             redisPubSub.subscribe(topic);                         }).start();                     }                     log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic);                     log();                     log.info("============================subscribe-end============================");                 }             }             //psubscribe             if ("psubscribe".equals(cmd)) {                 List
        
          topics = (List
         
          ) msgMap.get("topic");                 //本地记录订阅信息                 for (int i = 0; i map = new ConcurrentHashMap();                         map.put(this.sessionId, this);                         pWebSocketMap.put(topic, map);                         new Thread(() -> {                             RedisPubSub redisPubSub = new RedisPubSub();                             //存入map                             redisPubSubMap.put(topic, redisPubSub);                             redisPubSub.psubscribe(topic);                         }).start();                     }                     log.info("sessionId:" + this.sessionId + ",完成模糊订阅:" + topic);                     log();                     log.info("============================psubscribe-end============================");                 }             }             //取消订阅             if ("unsubscribe".equals(cmd)) {                 List
          
            topics = (List
           
            ) msgMap.get("topic");                 //删除本地对应的订阅信息                 for (String topic : topics) {                     log.info("============================unsubscribe-start============================");                     log.info("sessionId:" + this.sessionId + ",开始删除订阅:" + topic);                     if (webSocketMap.containsKey(topic)) {                         ConcurrentHashMap
            
              map = webSocketMap.get(topic);                         map.remove(this.sessionId);                         if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道                             webSocketMap.remove(topic);                             redisPubSubMap.get(topic).unsubscribeAndClose(topic);                             redisPubSubMap.remove(topic);                         }                     }                     if (pWebSocketMap.containsKey(topic)) {                         ConcurrentHashMap
             
               map = pWebSocketMap.get(topic);                         map.remove(this.sessionId);                         if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道                             pWebSocketMap.remove(topic);                             redisPubSubMap.get(topic).punsubscribeAndClose(topic);                             redisPubSubMap.remove(topic);                         }                     }                     log.info("sessionId:" + this.sessionId + ",完成删除订阅:" + topic);                     log();                     log.info("============================unsubscribe-end============================");                 }             }         }     }     @OnMessage     public void onPong(PongMessage pongMessage) {         try {             log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");         } catch (UnsupportedEncodingException e) {             e.printStackTrace();         }     }     /**      * 连接关闭调用的方法      */     @OnClose     public void onClose() {         synchronized (session) {             log.info("============================onclose-start============================");             //删除订阅             Iterator iterator = webSocketMap.keySet().iterator();             while (iterator.hasNext()) {                 String topic = (String) iterator.next();                 ConcurrentHashMap
              
                map = webSocketMap.get(topic);                 map.remove(this.sessionId);                 if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道                     webSocketMap.remove(topic);                     redisPubSubMap.get(topic).unsubscribeAndClose(topic);                     redisPubSubMap.remove(topic);                 }             }             //删除模糊订阅             Iterator iteratorP = pWebSocketMap.keySet().iterator();             while (iteratorP.hasNext()) {                 String topic = (String) iteratorP.next();                 ConcurrentHashMap
               
                 map = pWebSocketMap.get(topic);                 map.remove(this.sessionId);                 if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道                     pWebSocketMap.remove(topic);                     redisPubSubMap.get(topic).punsubscribeAndClose(topic);                     redisPubSubMap.remove(topic);                 }             }             log.info("sessionId:" + this.sessionId + ",断开连接:");             //debug             log();             log.info("============================onclose-end============================");         }     }     /**      * @param session      * @param error      */     @OnError     public void onError(Session session, Throwable error) {         synchronized (session) {             log.info("============================onError-start============================");             log.error("用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage());             error.printStackTrace();             log.info("关闭错误用户对应的连接");             //删除订阅             Iterator iterator = webSocketMap.keySet().iterator();             while (iterator.hasNext()) {                 String topic = (String) iterator.next();                 ConcurrentHashMap
                
                  map = webSocketMap.get(topic);                 map.remove(this.sessionId);                 if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道                     webSocketMap.remove(topic);                     redisPubSubMap.get(topic).unsubscribeAndClose(topic);                     redisPubSubMap.remove(topic);                 }             }             //删除模糊订阅             Iterator iteratorP = pWebSocketMap.keySet().iterator();             while (iteratorP.hasNext()) {                 String topic = (String) iteratorP.next();                 ConcurrentHashMap
                 
                   map = pWebSocketMap.get(topic);                 map.remove(this.sessionId);                 if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道                     pWebSocketMap.remove(topic);                     redisPubSubMap.get(topic).punsubscribeAndClose(topic);                     redisPubSubMap.remove(topic);                 }             }             log.info("完成错误用户对应的连接关闭");             //debug             log();             log.info("============================onError-end============================");         }     }     /**      * 实现服务器主动推送      */     public void sendMessage(String message) {         synchronized (session) {             try {                 this.session.getBasicRemote().sendText(message);             } catch (IOException e) {                 e.printStackTrace();             }         }     }     public static void publish(String msg, String topic) {         ConcurrentHashMap
                  
                    map = webSocketMap.get(topic);         if (map != null && map.values() != null) {             for (WebSocketServer webSocketServer : map.values())                 webSocketServer.sendMessage(msg);         }         map = pWebSocketMap.get(topic);         if (map != null && map.values() != null) {             for (WebSocketServer webSocketServer : map.values())                 webSocketServer.sendMessage(msg);         }     }     private void log() {         log.info(">>>>>>>>>");         Iterator iterator1 = webSocketMap.keySet().iterator();         while (iterator1.hasNext()) {             String topic = (String) iterator1.next();             log.info("topic:" + topic);             Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();             while (iterator2.hasNext()) {                 String session = (String) iterator2.next();                 log.info("订阅" + topic + "的sessionId:" + session);             }         }         log.info(">>>>>>>>>");     } }
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   

项目地址

上面介绍了核心代码,下面是完整代码地址

Update20220415

参考评论区老哥的建议,将redis订阅监听类里面的subscribe和psubscribe方法调整如下:

    //订阅
    @Override
    public void subscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.subscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    //模糊订阅
    @Override
    public void psubscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.psubscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }