zoukankan      html  css  js  c++  java
  • RedisTemplate详解

    1 概述

    (1)板块含义

    • 红色部分:Redis相关(Redis的抽象使用相关)
    • 橙色部分:Jedis相关(Redis的具体实现相关)
    • 蓝色部分:Jedis的连接池的底层依赖(是用来apache的基础工具类)

    (2)核心讲解

    • RedisTemplate:Spring中用于操作Redis工具类。

      • 根据配置切换Redis客户端;
      • 根据配置切换单点、sentinel、cluster模式;
      • 通过配置不同的RedisConnectionFactory,返回不同的RedisConnection。
    • RedisConnectionFactory:RedisConnection工厂类。

      • RedisConnection:单点模式或者说通用连接类;

      • RedisSentialConnection:Sentinel模式连接类;

      • RedisClusterConnection:Cluster模式连接类。

    • Redis客户端:实际连接Redis干活的底层工具人,类比MySQL数据库的hikari、druid、c3p0、dbcp。

      • Jedis:当多线程使用同一个连接时,是线程不安全的。所以要使用连接池,为每个jedis实例分配一个连接。
      • lettuce:当多线程使用同一连接实例时,是线程安全的。

    注:单点模式即单台Redis;Sentinel和Cluster模式都为集群:不同的是Sentinel模式集群每个节点存全量数据;Cluster模式集群每个节点存部分数据,合计为全量且至少需要6个节点,3台机器(3主3备)。

    2 RedisTemplate执行命令过程(Jedis客户端为例)

    • RedisTemplate的使用
    @Service
    public class DictCacheService{
        @Autowired
    	private RedisTemplate<String, Object> redisTemplate;
        
        @Autowired
        private DictService dictService; 
        
        // 字典缓存:先取Redis缓存,后找DB并缓存至Redis。
        public Object get(String key){
            Object result = redisTemplate.opsForValue().get(key);
            if(result == null){
    			result = dictService.get(key); 
                redisTemplate.opsForValue().set(key,value);
            }
            return result;
        }
    }
    
    • RedisTemplate继承关系
    public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>{
        // cache singleton objects (where possible)
    	private ValueOperations<K, V> valueOps;
       
        // 1. 获取ValueOperations
        public ValueOperations<K, V> opsForValue() {
    		if (valueOps == null) {
    			valueOps = new DefaultValueOperations<K, V>(this);
    		}
    		return valueOps;
    	}
    
        // ...
    }
    
    // 2. RedisTemplate实现类RedisOperations接口
    public interface RedisOperations<K, V> {
        ValueOperations<K, V> opsForValue();
        
        // ...
    }
    
    
    // 3. RedisTemplate.opsForValue()返回接口
    public interface ValueOperations<K, V> {
        void set(K key, V value);
        
        V get(Object key);
        
        // ...
    }
    
    // 4. RedisTemplate.opsForValue()具体返回类型
    class DefaultValueOperations<K, V> extends AbstractOperations<K, V> implements ValueOperations<K, V> {
        
        // 5. 具体执行方法
    	public V get(final Object key) {
            // 6. 调用父类(AbstractOperations)的execute方法
    		return execute(new ValueDeserializingRedisCallback(key) {
    			protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
    				return connection.get(rawKey);
    			}
    		}, true);
    	}
        
        public void set(K key, V value) {
    		final byte[] rawValue = rawValue(value);
    		execute(new ValueDeserializingRedisCallback(key) {
    			protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
    				connection.set(rawKey, rawValue);
    				return null;
    			}
    		}, true);
    	}
    }
    
    // 7. DefaultValueOperations的父类:AbstractOperations类
    abstract class AbstractOperations<K, V> {
        RedisTemplate<K, V> template;
        
        // DefaultValueOperations.get()会使用到
        abstract class ValueDeserializingRedisCallback implements RedisCallback<V> {
    		private Object key;
    		public ValueDeserializingRedisCallback(Object key) {
    			this.key = key;
    		}
    
    		public final V doInRedis(RedisConnection connection) {
    			byte[] result = inRedis(rawKey(key), connection);
    			return deserializeValue(result);
    		}
    
    		protected abstract byte[] inRedis(byte[] rawKey, RedisConnection connection);
    	}
        
        // 8. AbstractOperations.execute方法
        <T> T execute(RedisCallback<T> callback, boolean b) {
            // 9. 实际调用RedisTemplate的execute方法,回到RedisTemplate类看。
    		return template.execute(callback, b);
    	}
    
    }
    
    // 回到RedisTemplate类
    public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>{
    	// 10. RedisTemplate.execute方法
        public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
            // 11. 获取ConnectionFacotry
    		RedisConnectionFactory factory = getConnectionFactory();
            
            // 通过factory获取RedisConnection(新建或复用已存在的)
    		RedisConnection conn = null;
    		try {
    			if (enableTransactionSupport) {
    				conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
    			} else {
    				conn = RedisConnectionUtils.getConnection(factory);
    			}
    			boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
    
    			RedisConnection connToUse = preProcessConnection(conn, existingConnection);
    
                // ...
    
    			RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
                // 13. 调用具体Action的doInRedis方法,如set
    			T result = action.doInRedis(connToExpose);
    
    			// ...
    			return postProcessResult(result, connToUse, existingConnection);
    		} finally {
    			RedisConnectionUtils.releaseConnection(conn, factory);
    		}
    	}
        
    }
    
    // 12. RedisTemplate的父类:RedisAccessor类
    public class RedisAccessor implements InitializingBean {
    	private RedisConnectionFactory connectionFactory;
    
    	public RedisConnectionFactory getConnectionFactory() {
    		return connectionFactory;
    	}
        
        // ...
    }
    
    • 以JedisClusterConnection为例
    public class JedisClusterConnection implements RedisClusterConnection {
        // 由RedisConnectionFacotry
        private final JedisCluster cluster;
        // ...
        
        // RedisConnection.get()实际执行了JedisCluster.get()
    	@Override
    	public byte[] get(byte[] key) {
    		return cluster.get(key);
    	}
        
        // ...
    }
    
    public interface RedisClusterConnection extends RedisConnection, RedisClusterCommands {
        // ...
    }
    
    // JedisCluster何时创建?
    public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
        private JedisCluster cluster;
        private RedisClusterConfiguration clusterConfig;
        
        public void afterPropertiesSet() {
    		if (shardInfo == null) {
    			shardInfo = new JedisShardInfo(hostName, port);
    			if (StringUtils.hasLength(password)) {
    				shardInfo.setPassword(password);
    			}
    			if (timeout > 0) {
    				setTimeoutOn(shardInfo, timeout);
    			}
    		}
    
    		if (usePool && clusterConfig == null) {
    			this.pool = createPool();
    		}
    
    		if (clusterConfig != null) {
                // 1. cluster初始化
    			this.cluster = createCluster();
    		}
    	}
        
        private JedisCluster createCluster() {
            // 2. 创建cluster
    		JedisCluster cluster = createCluster(this.clusterConfig, getPoolConfig());
    		JedisClusterConnection.JedisClusterTopologyProvider topologyProvider = new JedisClusterConnection.JedisClusterTopologyProvider(cluster);
    		this.clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider,
    				new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider), EXCEPTION_TRANSLATION);
    		return cluster;
    	}
        
        protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) {
    		Set<HostAndPort> hostAndPort = new HashSet<HostAndPort>();
    		for (RedisNode node : clusterConfig.getClusterNodes()) {
    			hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
    		}
    		int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects().intValue() : 5;
    
            // 3. 调用构造方法创建
    		return StringUtils.hasText(getPassword())
    				? new JedisCluster(hostAndPort, timeout, timeout, redirects, password, poolConfig)
    				: new JedisCluster(hostAndPort, timeout, redirects, poolConfig);
    	}
        
        // 4. 在获取Connection的时候带入JedisCluster引用
        @Override
    	public RedisClusterConnection getClusterConnection() {
    		return new JedisClusterConnection(cluster, clusterCommandExecutor);
    	}
        
    }
    
    // 5.JedisCluster.get()实际调用Jedis.get()
    public class JedisCluster extends BinaryJedisCluster implements JedisCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
    
    	public String get(final String key) {
            // connectionHandler为JedisCluster的父类的属性
            return (String)(new JedisClusterCommand<String>(this.connectionHandler, this.maxAttempts) {
                public String execute(Jedis connection) {
                    return connection.get(key);
                }
            }).run(key);
        }
    }
    
    // 6. JedisCluster的父类
    public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands, MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
        protected JedisClusterConnectionHandler connectionHandler;
        
         public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {
            this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout);
            this.maxAttempts = maxAttempts;
        }
    }
    
    // 7. JedisClusterCommand类
    public abstract class JedisClusterCommand<T> {
        private JedisClusterConnectionHandler connectionHandler;
        private int maxAttempts;
        private ThreadLocal<Jedis> askConnection = new ThreadLocal<Jedis>();
        
        public abstract T execute(Jedis connection);
    
        public T run(String key) {
          return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
        }
        
        private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
        Jedis connection = null;
        try {
          if (asking) {
            connection = askConnection.get();
            connection.asking();
            asking = false;
          } else {
            if (tryRandomNode) {
              // scential方式采用random,因为每个节点保存全量数据。
              connection = connectionHandler.getConnection();
            } else {
              // cluster方式不能采用random方式,需计算key所在的节点索引值。
              connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
            }
          }
          return execute(connection);
        } catch (JedisNoReachableClusterNodeException jnrcne) {
          throw jnrcne;
        } catch (JedisConnectionException jce) {
          releaseConnection(connection);
          connection = null;
    
          if (attempts <= 1) {
            this.connectionHandler.renewSlotCache();
            throw jce;
          }
    
          return runWithRetries(key, attempts - 1, tryRandomNode, asking);
        } catch (JedisRedirectionException jre) {
          if (jre instanceof JedisMovedDataException) {
            this.connectionHandler.renewSlotCache(connection);
          }
    
          releaseConnection(connection);
          connection = null;
    
          if (jre instanceof JedisAskDataException) {
            asking = true;
            askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
          } 
    
          return runWithRetries(key, attempts - 1, false, asking);
        } finally {
          releaseConnection(connection);
        }
      }
    }
    
    public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
        
        // 通过slot返回缓存的Jedis对象
        public Jedis getConnectionFromSlot(int slot) {
            JedisPool connectionPool = this.cache.getSlotPool(slot);
            if (connectionPool != null) {
                return connectionPool.getResource();
            } else {
                this.renewSlotCache();
                connectionPool = this.cache.getSlotPool(slot);
                return connectionPool != null ? connectionPool.getResource() : this.getConnection();
            }
        }
    }
    
    // 何时缓存的Jedis对象?
    public abstract class JedisClusterConnectionHandler implements Closeable {
     	protected final JedisClusterInfoCache cache;
        
        // 构造方法中新建了缓存类并初始化Jedis进行缓存
        public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
                                           final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
           this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
           initializeSlotsCache(nodes, poolConfig, password);
      	}
        
        private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
            for (HostAndPort hostAndPort : startNodes) {
                // 初始化Jedis进行缓存
                Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
                cache.discoverClusterNodesAndSlots(jedis);
                // 由于是集群可以一次性全部找出来,直接break返回即可。
                break;
            }
      	}
    }
    
    // Redis Cluster模式下信息缓存类
    public class JedisClusterInfoCache {
        private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        private final Lock r = rwl.readLock();
        private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
        
    	public JedisPool getSlotPool(int slot) {
            r.lock();
            try {
              return slots.get(slot);
            } finally {
              r.unlock();
            }
      	}
        
        // slots的初始化
        public void discoverClusterNodesAndSlots(Jedis jedis) {
            w.lock();
            try {
              reset();
              List<Object> slots = jedis.clusterSlots();
    
              for (Object slotInfoObj : slots) {
                List<Object> slotInfo = (List<Object>) slotInfoObj;
    
                if (slotInfo.size() <= MASTER_NODE_INDEX) {
                  continue;
                }
                List<Integer> slotNums = getAssignedSlotArray(slotInfo);
    
                // hostInfos
                int size = slotInfo.size();
                for (int i = MASTER_NODE_INDEX; i < size; i++) {
                  List<Object> hostInfos = (List<Object>) slotInfo.get(i);
                  if (hostInfos.size() <= 0) {
                    continue;
                  }
    
                  HostAndPort targetNode = generateHostAndPort(hostInfos);
                  setupNodeIfNotExist(targetNode);
                  // 主节点索引
                  if (i == MASTER_NODE_INDEX) {
                    assignSlotsToNode(slotNums, targetNode);
                  }
                }
              }
            } finally {
              w.unlock();
            }
       }
        
        public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
            w.lock();
            try {
              JedisPool targetPool = setupNodeIfNotExist(targetNode);
              for (Integer slot : targetSlots) {
                slots.put(slot, targetPool);
              }
            } finally {
              w.unlock();
            }
       }
    }
    
    public class JedisPool extends Pool<Jedis> {
        @Override
        public Jedis getResource() {
          Jedis jedis = super.getResource();
          jedis.setDataSource(this);
          return jedis;
        }
    }
    
    
    // 点到为止,后续有空补充,这部分已经不属于Redis相关的包,而是基础实现的工具包类。
    package org.apache.commons.pool2.impl;
    public abstract class Pool<T> implements Closeable {
        protected GenericObjectPool<T> internalPool;
        
        public T getResource() {
           return internalPool.borrowObject();
        }
    }
    

    3 RedisTemplate执行操作代理问题

    (1)重写RedisTemplate

    • 存放路径

    ValueOperations、ZSetOperations等都是非public访问级别,所以无法在项目包中访问。

    放在在org.springframework.data.redis.core,绕过XxxOperations类的访问问题。

    src/main/java
      └- org.springframework.data.redis.core
      		└- TracingRedisTemplate
    
    • TracingRedisTemplate
    package org.springframework.data.redis.core;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 链路追踪用RedisTemplate
     *
     * @author 80273435
     * @date 2020-05-19 14-01
     */
    public class TracingRedisTemplate<K, V> extends RedisTemplate<K, V> {
        @Autowired
        private ApplicationContext context;
    
        private RedisTemplate proxy;
    
        @PostConstruct
        public void init() {
            proxy = context.getBean(RedisTemplate.class);
        }
    
        // cache singleton objects (where possible)
        private ValueOperations<K, V> valueOps;
        private ListOperations<K, V> listOps;
        private SetOperations<K, V> setOps;
        private ZSetOperations<K, V> zSetOps;
        private GeoOperations<K, V> geoOps;
        private HyperLogLogOperations<K, V> hllOps;
    
    
        public BoundValueOperations<K, V> boundValueOps(K key) {
            return new DefaultBoundValueOperations<K, V>(key, proxy);
        }
    
        public ValueOperations<K, V> opsForValue() {
            if (valueOps == null) {
                valueOps = new DefaultValueOperations<K, V>(proxy);
            }
            return valueOps;
        }
    
        public ListOperations<K, V> opsForList() {
            if (listOps == null) {
                listOps = new DefaultListOperations<K, V>(proxy);
            }
            return listOps;
        }
    
        public BoundListOperations<K, V> boundListOps(K key) {
            return new DefaultBoundListOperations<K, V>(key, proxy);
        }
    
        public BoundSetOperations<K, V> boundSetOps(K key) {
            return new DefaultBoundSetOperations<K, V>(key, proxy);
        }
    
        public SetOperations<K, V> opsForSet() {
            if (setOps == null) {
                setOps = new DefaultSetOperations<K, V>(proxy);
            }
            return setOps;
        }
    
        public BoundZSetOperations<K, V> boundZSetOps(K key) {
            return new DefaultBoundZSetOperations<K, V>(key, proxy);
        }
    
        public ZSetOperations<K, V> opsForZSet() {
            if (zSetOps == null) {
                zSetOps = new DefaultZSetOperations<K, V>(proxy);
            }
            return zSetOps;
        }
    
        @Override
        public GeoOperations<K, V> opsForGeo() {
            if (geoOps == null) {
                geoOps = new DefaultGeoOperations<K, V>(proxy);
            }
            return geoOps;
        }
    
        @Override
        public BoundGeoOperations<K, V> boundGeoOps(K key) {
            return new DefaultBoundGeoOperations<K, V>(key, proxy);
        }
    
        @Override
        public HyperLogLogOperations<K, V> opsForHyperLogLog() {
            if (hllOps == null) {
                hllOps = new DefaultHyperLogLogOperations<K, V>(proxy);
            }
            return hllOps;
        }
    
        public <HK, HV> BoundHashOperations<K, HK, HV> boundHashOps(K key) {
            return new DefaultBoundHashOperations<K, HK, HV>(key, proxy);
        }
    
        public <HK, HV> HashOperations<K, HK, HV> opsForHash() {
            return new DefaultHashOperations<K, HK, HV>(proxy);
        }
    
        public ClusterOperations<K, V> opsForCluster() {
            return new DefaultClusterOperations<K, V>(proxy);
        }
    }
    

    (2) 切面

    package com.littleevil.test;
    
    import lombok.extern.slf4j.Slf4j;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * RedisTemplate.execute的切面
     *
     * @author 80273435
     * @date 2020-05-19 10-53
     */
    @Aspect
    @Component
    @Slf4j
    public class RedisTemplateAspect {
    
        @Around("execution(* org.springframework.data.redis.core.RedisTemplate.execute(..))")
        private Object executeAround(ProceedingJoinPoint joinPoint) throws Throwable {
            log.info("before execute");
            Object proceed = joinPoint.proceed();
            log.info("after execute");
            return proceed;
        }
    
    }w
    

    4 参考

    Lettuce相较于Jedis有哪些优缺点?

  • 相关阅读:
    ArrayList排序Sort()方法(转)
    sqlserver2008 insert语句性能
    Installing TensorFlow on Ubuntu
    自动下载和安装 MNIST 到 TensorFlow 的 python 源码 (转)
    c# BackGroundWorker 多线程操作的小例子 (转)
    c# 修改winform中app.config的配置值
    ffmpeg 编译graph2dot
    ffmpeg常用命令
    live555例子程序编译连接时发现函数未定义问题
    编译代码是提示某些类型错误问题
  • 原文地址:https://www.cnblogs.com/linzhanfly/p/12991415.html
Copyright © 2011-2022 走看看