redis 集群批量操作实现
收藏
怎么入门数据库编程?需要学习哪些知识点?这是新手们刚接触编程时常见的问题;下面golang学习网就来给大家整理分享一些知识点,希望能够给初学者一些帮助。本篇文章就来介绍《redis 集群批量操作实现》,涉及到操作、批量、redis集群,有需要的可以收藏一下
Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:
初始化 JedisCluster类
@Configuration public class JedisClusterConfig { @Value("${spring.redis.cluster.nodes}") private String clusterNodes; @Value("${spring.redis.cache.commandTimeout}") private Integer commandTimeout; @Bean public JedisCluster getJedisCluster() { String[] serverArray = clusterNodes.split(","); Set nodes = new HashSet(); for (String ipPort : serverArray) { String[] ipPortPair = ipPort.split(":"); nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim()))); } return new JedisCluster(nodes, commandTimeout); } }
工具类 JedisClusterUtil
@Component public class JedisClusterUtil { @Autowired private JedisCluster jedisCluster; @Resource(name = "redisTemplate4Json") protected RedisTemplate redisTemplate; /** * ZSet批量查询 * @param keys * @return */ public List batchZRange(List keys) { List resList = new ArrayList(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { BoundZSetOperations operations = redisTemplate.boundZSetOps(keys.get(0)); Set set = operations.reverseRange(0, 0); resList.add(set.iterator().next()); return resList; } Map > jedisPoolMap = getJedisPool(keys); List keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List res = new ArrayList(); Map resultMap = new HashMap(); //执行 for (Map.Entry > entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { currentPipeline.zrevrange(key, 0, 0); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i set = (Set ) res.get(i); if (null == set || set.isEmpty()) { resultMap.put(keyList.get(i), null); } else { byte[] byteStr = set.iterator().next().toString().getBytes(); Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr); resultMap.put(keyList.get(i), obj); } } } } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis, currentJedisPool); } } resList = sortList(keys, resultMap); return resList; } /** * Value批量查询 * @param keys * @return */ public List batchGet(List keys){ List resList = new ArrayList(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { BoundValueOperations operations = redisTemplate.boundValueOps(keys.get(0)); resList.add(operations.get()); return resList; } Map > jedisPoolMap = getJedisPool(keys); List keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List res = new ArrayList(); Map resultMap = new HashMap(); for (Map.Entry > entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { currentPipeline.get(key); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i > getJedisPool(List keys){ //JedisCluster继承了BinaryJedisCluster //BinaryJedisCluster的JedisClusterConnectionHandler属性 //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache //从而获取slot和JedisPool直接的映射 MetaObject metaObject = SystemMetaObject.forObject(jedisCluster); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //保存地址+端口和命令的映射 Map > jedisPoolMap = new HashMap(); JedisPool currentJedisPool = null; List keyList; for (String key : keys) { //计算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //通过哈希槽获取节点的连接 currentJedisPool = cache.getSlotPool(crc); //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的 //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样 //的,可以作为map的key if (jedisPoolMap.containsKey(currentJedisPool)) { jedisPoolMap.get(currentJedisPool).add(key); } else { keyList = new ArrayList(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } return jedisPoolMap; } private List sortList(List keys, Map params) { List resultList = new ArrayList(); Iterator it = keys.iterator(); while (it.hasNext()) { String key = it.next(); resultList.add(params.get(key)); } return resultList; } /** * 释放jedis资源 * * @param jedis */ public void returnResource(Jedis jedis, JedisPool jedisPool) { if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } }
注意:一定要完成后释放 jedis 资源 不然会造成卡死现象