系统教程 · 2024年2月21日

redis 集群批量操作实现

redis 集群批量操作实现

收藏

怎么入门数据库编程?需要学习哪些知识点?这是新手们刚接触编程时常见的问题;下面golang学习网就来给大家整理分享一些知识点,希望能够给初学者一些帮助。本篇文章就来介绍《redis 集群批量操作实现》,涉及到操作、批量、redis集群,有需要的可以收藏一下

 Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:

初始化    JedisCluster类

@Configuration
public class JedisClusterConfig {

    @Value("${spring.redis.cluster.nodes}")
    private String clusterNodes;

    @Value("${spring.redis.cache.commandTimeout}")
    private Integer commandTimeout;

    @Bean
    public JedisCluster getJedisCluster() {

        String[] serverArray = clusterNodes.split(",");
        Set
   
     nodes = new HashSet();
        for (String ipPort : serverArray) {
            String[] ipPortPair = ipPort.split(":");
            nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
        }
        return new JedisCluster(nodes, commandTimeout);
    }
}


   

工具类 JedisClusterUtil

@Component
public class JedisClusterUtil {

    @Autowired
    private JedisCluster jedisCluster;

    @Resource(name = "redisTemplate4Json")
    protected RedisTemplate
   
     redisTemplate;

    /**
     * ZSet批量查询
     * @param keys
     * @return
     */
    public List
     batchZRange(List
     
       keys) { List
       resList = new ArrayList(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { BoundZSetOperations
       
         operations = redisTemplate.boundZSetOps(keys.get(0)); Set
         set = operations.reverseRange(0, 0); resList.add(set.iterator().next()); return resList; } Map
         
          > jedisPoolMap = getJedisPool(keys); List
          
            keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List
            res = new ArrayList(); Map
            
              resultMap = new HashMap(); //执行 for (Map.Entry
             
              > entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { currentPipeline.zrevrange(key, 0, 0); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i set = (Set
              ) res.get(i); if (null == set || set.isEmpty()) { resultMap.put(keyList.get(i), null); } else { byte[] byteStr = set.iterator().next().toString().getBytes(); Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr); resultMap.put(keyList.get(i), obj); } } } } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis, currentJedisPool); } } resList = sortList(keys, resultMap); return resList; } /** * Value批量查询 * @param keys * @return */ public List batchGet(List
                
                  keys){ List
                  resList = new ArrayList(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { BoundValueOperations
                  
                    operations = redisTemplate.boundValueOps(keys.get(0)); resList.add(operations.get()); return resList; } Map
                   
                    > jedisPoolMap = getJedisPool(keys); List
                    
                      keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List
                      res = new ArrayList(); Map
                      
                        resultMap = new HashMap(); for (Map.Entry
                       
                        > entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { currentPipeline.get(key); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i > getJedisPool(List
                        
                          keys){ //JedisCluster继承了BinaryJedisCluster //BinaryJedisCluster的JedisClusterConnectionHandler属性 //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache //从而获取slot和JedisPool直接的映射 MetaObject metaObject = SystemMetaObject.forObject(jedisCluster); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //保存地址+端口和命令的映射 Map
                         
                          > jedisPoolMap = new HashMap(); JedisPool currentJedisPool = null; List
                          
                            keyList; for (String key : keys) { //计算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //通过哈希槽获取节点的连接 currentJedisPool = cache.getSlotPool(crc); //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的 //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样 //的,可以作为map的key if (jedisPoolMap.containsKey(currentJedisPool)) { jedisPoolMap.get(currentJedisPool).add(key); } else { keyList = new ArrayList(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } return jedisPoolMap; } private List
                            sortList(List
                            
                              keys, Map
                             
                               params) { List
                               resultList = new ArrayList(); Iterator
                               
                                 it = keys.iterator(); while (it.hasNext()) { String key = it.next(); resultList.add(params.get(key)); } return resultList; } /** * 释放jedis资源 * * @param jedis */ public void returnResource(Jedis jedis, JedisPool jedisPool) { if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } }
                               
                             
                            
                          
                         
                        
                       
                      
                    
                   
                  
                
             
            
          
         
       
     
   

 注意:一定要完成后释放 jedis 资源  不然会造成卡死现象