系统教程 · 2024年6月17日

关于使用Redisson订阅数问题

关于使用Redisson订阅数问题

收藏

本篇文章给大家分享《关于使用Redisson订阅数问题》,覆盖了数据库的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握它。

一、前提

最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。

二、源码分析

下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

1、RedissonLock#lock() 方法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        // 尝试获取,如果ttl == null,则表示获取锁成功
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
        RFuture
   
     future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        // 后面代码忽略
        try {
            // 无限循环获取锁,直到获取锁成功
            // ...
        } finally {
            // 取消订阅锁释放事件
            unsubscribe(future, threadId);
        }
}
   

总结下主要逻辑:

  • 获取当前线程的线程id;
  • tryAquire尝试获取锁,并返回ttl
  • 如果ttl为空,则结束流程;否则进入后续逻辑;
  • this.subscribe(threadId)订阅当前线程,返回一个RFuture;
  • 如果在指定时间没有监听到,则会产生如上异常。
  • 订阅成功后, 通过while(true)循环,一直尝试获取锁
  • fially代码块,会解除订阅

所以上述这情况问题应该出现在subscribe()方法中

2、详细看下subscribe()方法

protected RFuture
   
     subscribe(long threadId) {
    // entryName 格式:“id:name”;
    // channelName 格式:“redisson_lock__channel:name”;
    return pubSub.subscribe(getEntryName(), getChannelName());
}
   

RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    // ....
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的

public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
    this(config, id);
    this.config = cfg;

    // 初始化
    initTimer(cfg);
    initSingleEntry();
}

protected void initTimer(MasterSlaveServersConfig config) {
    int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
    Arrays.sort(timeouts);
    int minTimeout = timeouts[0];
    if (minTimeout % 100 != 0) {
        minTimeout = (minTimeout % 100) / 2;
    } else if (minTimeout == 100) {
        minTimeout = 50;
    } else {
        minTimeout = 100;
    }

    timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);

    connectionWatcher = new IdleConnectionWatcher(this, config);

    // 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:
    subscribeService = new PublishSubscribeService(this, config);
}

PublishSubscribeService构造函数

private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
    super();
    this.connectionManager = connectionManager;
    this.config = config;
    for (int i = 0; i 

3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面

private final ConcurrentMap
    
      entries = new ConcurrentHashMap();

public RFuture
     
       subscribe(String entryName, String channelName) {       // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量      // public AsyncSemaphore getSemaphore(ChannelName channelName) {     //    return locks[Math.abs(channelName.hashCode() % locks.length)];     // }     AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));     AtomicReference
      
        listenerHolder = new AtomicReference
       
        ();         RPromise
        
          newPromise = new RedissonPromise
         
          () {         @Override         public boolean cancel(boolean mayInterruptIfRunning) {             return semaphore.remove(listenerHolder.get());         }     };     Runnable listener = new Runnable() {         @Override         public void run() {             //  如果存在RedissonLockEntry, 则直接利用已有的监听             E entry = entries.get(entryName);             if (entry != null) {                 entry.acquire();                 semaphore.release();                 entry.getPromise().onComplete(new TransferListener
          
           (newPromise));                 return;             }             E value = createEntry(newPromise);             value.acquire();             E oldValue = entries.putIfAbsent(entryName, value);             if (oldValue != null) {                 oldValue.acquire();                 semaphore.release();                 oldValue.getPromise().onComplete(new TransferListener
           
            (newPromise));                 return;             }             // 创建监听,             RedisPubSubListener
             listener = createListener(channelName, value);             // 订阅监听             service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);         }     };     // 最终会执行listener.run方法     semaphore.acquire(listener);     listenerHolder.set(listener);     return newPromise; }
           
          
         
        
       
      
     
    

AsyncSemaphore#acquire()方法

public void acquire(Runnable listener) {
    acquire(listener, 1);
}

public void acquire(Runnable listener, int permits) {
    boolean run = false;

    synchronized (this) {
        // counter初始化值为1
        if (counter 

梳理上述逻辑:

1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。

从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法

4、PublishSubscribeService#subscribe逻辑如下:

private final ConcurrentMap
     
       name2PubSubConnection = new ConcurrentHashMap();
private final Queue
      
        freePubSubConnections = new ConcurrentLinkedQueue(); public RFuture
       
         subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener>... listeners) {     RPromise
        
          promise = new RedissonPromise
         
          ();     // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。     subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);     return promise; } private void subscribe(Codec codec, ChannelName channelName,  RPromise
          
            promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener>... listeners) {     PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);     if (connEntry != null) {         // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中         addListeners(channelName, promise, type, lock, connEntry, listeners);         return;     }     // 没有时,才是最重要的逻辑     freePubSubLock.acquire(new Runnable() {         @Override         public void run() {             if (promise.isDone()) {                 lock.release();                 freePubSubLock.release();                 return;             }             // 从队列中取头部元素             PubSubConnectionEntry freeEntry = freePubSubConnections.peek();             if (freeEntry == null) {                 // 第一次肯定是没有的需要建立                 connect(codec, channelName, promise, type, lock, listeners);                 return;             }             // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。             int remainFreeAmount = freeEntry.tryAcquire();             if (remainFreeAmount == -1) {                 throw new IllegalStateException();             }             PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);             if (oldEntry != null) {                 freeEntry.release();                 freePubSubLock.release();                 addListeners(channelName, promise, type, lock, oldEntry, listeners);                 return;             }             // 如果remainFreeAmount=0, 则从队列中移除             if (remainFreeAmount == 0) {                 freePubSubConnections.poll();             }             freePubSubLock.release();             // 增加监听             RFuture
           
             subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);             ChannelFuture future;             if (PubSubType.PSUBSCRIBE == type) {                 future = freeEntry.psubscribe(codec, channelName);             } else {                 future = freeEntry.subscribe(codec, channelName);             }             future.addListener(new ChannelFutureListener() {                 @Override                 public void operationComplete(ChannelFuture future) throws Exception {                     if (!future.isSuccess()) {                         if (!promise.isDone()) {                             subscribeFuture.cancel(false);                         }                         return;                     }                     connectionManager.newTimeout(new TimerTask() {                         @Override                         public void run(Timeout timeout) throws Exception {                             subscribeFuture.cancel(false);                         }                     }, config.getTimeout(), TimeUnit.MILLISECONDS);                 }             });         }     }); } private void connect(Codec codec, ChannelName channelName, RPromise
            
              promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener>... listeners) {     // 根据channelName计算出slot获取PubSubConnection     int slot = connectionManager.calcSlot(channelName.getName());     RFuture
             
               connFuture = nextPubSubConnection(slot);     promise.onComplete((res, e) -> {         if (e != null) {             ((RPromise
              
               ) connFuture).tryFailure(e);         }     });     connFuture.onComplete((conn, e) -> {         if (e != null) {             freePubSubLock.release();             lock.release();             promise.tryFailure(e);             return;         }         // 这里会从配置中读取subscriptionsPerConnection         PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());         // 每获取一次,subscriptionsPerConnection就会减直到为0         int remainFreeAmount = entry.tryAcquire();         // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中         PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);         if (oldEntry != null) {             releaseSubscribeConnection(slot, entry);             freePubSubLock.release();             addListeners(channelName, promise, type, lock, oldEntry, listeners);             return;         }         if (remainFreeAmount > 0) {             // 加入到队列中             freePubSubConnections.add(entry);         }         freePubSubLock.release();         RFuture
               
                 subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);         // 这里真正的进行订阅(底层与redis交互)         ChannelFuture future;         if (PubSubType.PSUBSCRIBE == type) {             future = entry.psubscribe(codec, channelName);         } else {             future = entry.subscribe(codec, channelName);         }         future.addListener(new ChannelFutureListener() {             @Override             public void operationComplete(ChannelFuture future) throws Exception {                 if (!future.isSuccess()) {                     if (!promise.isDone()) {                         subscribeFuture.cancel(false);                     }                     return;                 }                 connectionManager.newTimeout(new TimerTask() {                     @Override                     public void run(Timeout timeout) throws Exception {                         subscribeFuture.cancel(false);                     }                 }, config.getTimeout(), TimeUnit.MILLISECONDS);             }         });     }); }
               
              
             
            
           
          
         
        
       
      
     

PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:

 public int tryAcquire() {
    while (true) {
        int value = subscribedChannelsAmount.get();
        if (value == 0) {
            return -1;
        }

        if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
            return value - 1;
        }
    }
}

梳理上述逻辑:

1、还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法

2.1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是进行底层的subscribe和addListener

3、如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
4、如果remainFreeAmount 5、最后也是进行底层的subscribe和addListener;

三 总结

根因: 从上面代码分析, 导致问题的根因是因为PublishSubscribeService 会使用公共队列中的freePubSubConnections, 如果同一个key一次性请求超过subscriptionsPerConnection它的默认值5时,remainFreeAmount就可能出现-1的情况, 那么就会导致commandExecutor.syncSubscription(future)中等待超时,也就抛出如上异常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.

解决方法: 在初始化Redisson可以可指定这个配置项的值。

相关参数的解释以及默认值请参考官网: