0
点赞
收藏
分享

微信扫一扫

Redis(四):集群模式

静鸡鸡的JC 2021-09-21 阅读 65

集群模式

作为缓存数据库,肯定要考虑缓存服务器稳定性相关的保障机制。

持久化机制就是一种保障方式,持久化机制保证了Redis服务器重启的情况下也不会损失(或少量损失)数据,因为持久化会把内存中的数据保存到硬盘上,重启会从硬盘上加载数据

随着Redis使用场景越来越多,技术发展越来越完善,在Redis整体服务上的容错、扩容、稳定各个方面都需要不断优化,因此在Redis的集群模式上也有不同的搭建方式来应对各种需求。

总结来说,Redis集群模式有三种:

  • 主从模式
  • 哨兵模式
  • Cluster集群模式

1、主从模式

为了Redis服务避免单点故障,通常的做法是将redis的数据复制到多个副本以部署在不同的服务器上,这样即使有一台服务器出现故障,其他服务器依然可以继续提供服务,为此,Redis提供了复制(replication)功能,可以实现当一台数据库的数据更新后,自动将更新的数据同步到其他数据库上。

Redis服务器分为两类:一类是主数据库(Master),另一类是从数据库(Slave);
主数据库可以进行读写操作,当写操作导致数据变化时,会自动将数据同步给从数据库。
从数据库一般是只读的,并接受主数据库同步过来的数据。
一个主数据库可以拥有多个从数据库,而一个从数据库却只能拥有一个主数据库。

  • 优点
    1、一个主,可以有多个从,并以非阻塞的方式完成数据同步;
    2、从服务器提供读服务,分散主服务的压力,实现读写分离;
    3、从服务器之间可以彼此连接和同步请求,减少主服务同步压力;

  • 缺点
    1、不具备容错和恢复功能,主服务存在单点风险;
    2、Redis的主从复制采用全量复制,需要服务器有足够的空余内存;
    3、主从模式较难支持在线扩容;

2、哨兵模式——Sentinel 集群

Redis提供的sentinel(哨兵)机制,通过sentinel模式启动redis后,自动监控Master/Slave的运行状态,基本原理是:心跳机制+投票裁决。

简单来说,哨兵的作用就是监控redis系统的运行状况,它的功能包括以下两个:

  • 1、监控主数据库和从数据库是否正常运行
  • 2、主数据库出现故障时,自动将从数据库转换为主数据库

哨兵模式主要以下几个内容:

  • 监控(Monitoring):Sentinel会定期检查主从服务器是否处于正常工作状态
  • 提醒(Notification):当被监控的某个Redis服务器出现异常时,Sentinel可以通过API向管理员或者其他应用程序发送通知
  • 自动故障迁移(Antomatic failover):当一个主服务器不能正常工作时,Sentinel会开始一次自动故障迁移操作,它会将失效主服务器的其中一个从服务器升级为新的主服务器,并让失效主服务器的其他从服务器改为复制新的主服务器;当客户端试图连接失效的主服务器时,集群也会向客户端返回新主服务器的地址,使得集群可以使用新主服务器代替失效服务器

Redis Sentinel 是一个分布式系统,你可以在一个架构中运行多个Sentinel进程(progress)

  • 优点
    1、哨兵模式主从可以切换,具备基本的故障转移能力;
    2、哨兵模式具备主从模式的所有优点

  • 缺点
    1、哨兵模式也很难支持在线扩容操作
    2、集群的配置信息管理比较复杂

3、集群模式

3.1 Redis Cluster

Redis Cluster是一种服务器Sharding技术,采用CRC16算法来实现数据的分片,3.0版本开始正式提供,采用无中心架构,每个节点保存数据和整个集群状态,每个节点都和其他所有节点连接。

Cluster集群结构特点:

  • 1、Redis Cluster所有物理节点都映射到[0-16383]slot上(不一定均匀分布),Cluster负责维护节点、桶(slot)、值之间的关系;

  • 2、在Redis集群中放置一个key-value时,根据CRC16(16) mod 16384的值,从之前划分的16384个桶中选择一个;

  • 3、所有的Redis节点彼此互联(PING_PONG机制),内部使用二进制协议优化传输效率;

  • 4、超过半数的节点检测到某个几点失效时,则判定该节点失效;

  • 5、使用端与Redis节点连接,不需要中间proxy层,直接可以操作,使用端不需要连接集群所有节点,连接集群中任意一个可用节点即可。

  • 优点
    1、无中心架构,节点间数据共享,可动态调整数据分布;
    2、节点可动态添加删除,扩张性比较灵活;
    3、部分节点异常,不影响整体集群的可用性;

  • 缺点
    1、集群实现比较复杂;
    2、批量操作指令(mget、mset等)支持有限;
    3、事务操作支持有限

Jedis客户端实现:JedisCluster

3.2 Redis Sharding

Redis Sharding 属于客户端sharding分片技术,采用一致性Hash算法来实现数据的分片,3.0版本以前基本上使用分片实现集群。

Redis Sharding特点:

  • 各个Redis节点独立,之间无关系
  • 某个Redis节点挂了,整个集群不可用,所以需要对每个节点做主从备份
  • 主从备份方案一般通过读写分离设置,每个master至少两个slaver,只有这样master挂掉后,才能选举其中一个Slaver成为新的master,原来master节点加入集群后成为新master的slaver节点
  • redis主从切换对客户端jedis使用时透明的,即redis发生了主从切换并不影响jedis的使用

缺点:
节点扩展和收缩不友好

Jedis客户端实现:ShardedJedis

4、哨兵Sentinel Sharding集群模式

如果既想要哨兵模式提供的自动监控和故障转移机制,又想要Sharding集群的分片机制,那么该怎么办呢?

在服务端,以Sharding集群启动,同时,使得Redis Sentinel分布式系统监听多个Master节点;
在客户端,自定义一个类,继承redis.clients.util.Pool,实现redis线程池;

以jedis为例,自定义线程池实现如下,参考jedis源码redis.clients.jedis.JedisSentinelPool,redis.clients.jedis.ShardedJedisPool

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

/**
 * Jedis不能同时支持Shareded和Sentinel。
 *
 * 这里是把单master改成多master,同时把Jedis改成ShardedJedis。
 * 支持多主机集群
 */
public class ShardedJedisSentinelPoolExt extends Pool<ShardedJedis> {

    public static final int MAX_RETRY_SENTINEL = 10;

    private static final Logger logger = LoggerFactory.getLogger(LoggerType.COMMON);
    
    protected GenericObjectPoolConfig poolConfig;

    protected int timeout = Protocol.DEFAULT_TIMEOUT;
    
    private int sentinelRetry = 0;

    protected String password;

    protected int database = Protocol.DEFAULT_DATABASE;

    protected Set<MasterListener> masterListeners = new HashSet<>();
    
    private volatile List<HostAndPort> currentHostMasters;
    
    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels) {
        this(masters, sentinels, new GenericObjectPoolConfig(),
            Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE);
    }
    
    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, String password) {
        this(masters, sentinels, new GenericObjectPoolConfig(),
            Protocol.DEFAULT_TIMEOUT, password);
    }
    
    public ShardedJedisSentinelPoolExt(final GenericObjectPoolConfig poolConfig, Set<String> masters, Set<String> sentinels) {
        this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
            Protocol.DEFAULT_DATABASE);
    }

    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
                                       final GenericObjectPoolConfig poolConfig, int timeout,
                                       final String password) {
        this(masters, sentinels, poolConfig, timeout, password,
            Protocol.DEFAULT_DATABASE);
    }

    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
                                       final GenericObjectPoolConfig poolConfig, final int timeout) {
        this(masters, sentinels, poolConfig, timeout, null,
            Protocol.DEFAULT_DATABASE);
    }

    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
                                       final GenericObjectPoolConfig poolConfig, final String password) {
        this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT,
            password);
    }

    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
                                       final GenericObjectPoolConfig poolConfig, int timeout,
                                       final String password, final int database) {
        this.poolConfig = poolConfig;
        this.timeout = timeout;
        this.password = password;
        this.database = database;

        List<String> convertList = new ArrayList<>(masters);
        List<HostAndPort> masterList = initSentinels(sentinels, convertList);
        initPool(masterList);
    }

    @Override
    public void destroy() {
        for (MasterListener m : masterListeners) {
            m.shutdown();
        }
        
        super.destroy();
    }

    public List<HostAndPort> getCurrentHostMaster() {
        return currentHostMasters;
    }

    private void initPool(List<HostAndPort> masters) {
        if (!equalsObj(currentHostMasters, masters)) {
            StringBuilder sb = new StringBuilder();
            for (HostAndPort master : masters) {
                sb.append(master.toString());
                sb.append(" ");
            }
            logger.info("Created ShardedJedisPool to master at [" + sb.toString() + "]");
            List<JedisShardInfo> shardMasters = makeShardInfoList(masters);
            initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));
            currentHostMasters = masters;
        }
    }

    private static boolean equalsObj(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters) {
        if (currentShardMasters != null && shardMasters != null && checkListSize(currentShardMasters,shardMasters)) {
                for (int i = 0; i < currentShardMasters.size(); i++) {
                    if (!currentShardMasters.get(i).equals(shardMasters.get(i)))
                        return false;
                }
                return true;
        }
        return false;
    }

    private static boolean checkListSize(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters){
        return (currentShardMasters.size() == shardMasters.size())? true : false;
    }

    private List<JedisShardInfo> makeShardInfoList(List<HostAndPort> masters) {
        List<JedisShardInfo> shardMasters = new ArrayList<>();
        for (HostAndPort master : masters) {
            JedisShardInfo jedisShardInfo = new JedisShardInfo(master.getHost(), master.getPort(), timeout);
            jedisShardInfo.setPassword(password);
            
            shardMasters.add(jedisShardInfo);
        }
        return shardMasters;
    }

    private List<HostAndPort> initSentinels(Set<String> sentinels, final List<String> masters) {

        Map<String, HostAndPort> masterMap = new HashMap<>();
        List<HostAndPort> shardMasters = new ArrayList<>();

        logger.info("Trying to find all master from available Sentinels...");
        
        for (String masterName : masters) {
            HostAndPort master = null;
            boolean fetched = false;
            
            while (!fetched && sentinelRetry < MAX_RETRY_SENTINEL) {
                for (String sentinel : sentinels) {
                    final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));

                    logger.info("Connecting to Sentinel " + hap);

                    try( Jedis jedis = new Jedis(hap.getHost(), hap.getPort())) {
                        master = masterMap.get(masterName);
                        if (master == null) {
                            List<String> hostAndPort = jedis.sentinelGetMasterAddrByName(masterName);
                            if (hostAndPort != null && ! hostAndPort.isEmpty()) {
                                master = toHostAndPort(hostAndPort);
                                logger.info("Found Redis master at " + master);
                                shardMasters.add(master);
                                masterMap.put(masterName, master);
                                fetched = true;
                                jedis.disconnect();
                                break;
                            }
                        }
                    } catch (JedisConnectionException e) {
                        logger.error("Cannot connect to sentinel running @ " + hap + ". Trying next one.",e);
                    }
                }
                
                if (null == master) {
                    try {
                        logger.info("All sentinels down, cannot determine where is "
                            + masterName + " master is running... sleeping 1000ms, Will try again.");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage());
                    Thread.currentThread().interrupt();
                    }
                    fetched = false;
                    sentinelRetry++;
                }
            }
            
            // Try MAX_RETRY_SENTINEL times.
            if (!fetched && sentinelRetry >= MAX_RETRY_SENTINEL) {
                logger.info("All sentinels down and try " + MAX_RETRY_SENTINEL + " times, Abort.");
                throw new JedisConnectionException("Cannot connect all sentinels, Abort.");
            }
        }

        
        // All shards master must been accessed.
        if (! masters.isEmpty() && masters.size() == shardMasters.size()) {

            logger.info("Starting Sentinel listeners...");
            for (String sentinel : sentinels) {
                final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
                MasterListener masterListener = new MasterListener(masters, hap.getHost(), hap.getPort());
                masterListeners.add(masterListener);
                masterListener.start();
            }
        }
        
        return shardMasters;
    }

    private static HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
        String host = getMasterAddrByNameResult.get(0);
        int port = Integer.parseInt(getMasterAddrByNameResult.get(1));
        
        return new HostAndPort(host, port);
    }
    
    /**
     * PoolableObjectFactory custom impl.
     */
    protected static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
        private List<JedisShardInfo> shards;
        private Hashing algo;
        private Pattern keyTagPattern;
    
        public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
            this.shards = shards;
            this.algo = algo;
            this.keyTagPattern = keyTagPattern;
        }

        @Override
        public PooledObject<ShardedJedis> makeObject() throws Exception {
            ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
            return new DefaultPooledObject<>(jedis);
        }

        @Override
        public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws Exception {
            final ShardedJedis shardedJedis = pooledShardedJedis.getObject();
            for (Jedis jedis : shardedJedis.getAllShards()) {
                try {
                    jedis.quit();
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                }
                try {
                    jedis.disconnect();
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                }
            }
        }

        @Override
        public boolean validateObject(PooledObject<ShardedJedis> pooledShardedJedis) {
            try {
            ShardedJedis jedis = pooledShardedJedis.getObject();
            for (Jedis shard : jedis.getAllShards()) {
                if (!"PONG".equals(shard.ping())) {
                return false;
                }
            }
            return true;
            } catch (Exception ex) {
                logger.error(ex.getMessage(),ex);
            return false;
            }
        }

        @Override
        public void activateObject(PooledObject<ShardedJedis> p) throws Exception {
            // Do nothing because of X and Y.
        }

        @Override
        public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {
            // Do nothing because of X and Y.
        }
    }

    protected class JedisPubSubAdapter extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            // Do nothing because of X and Y.
        }
    
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            // Do nothing because of X and Y.
        }
    
        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
            // Do nothing because of X and Y.
        }
    
        @Override
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            // Do nothing because of X and Y.
        }
    
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            // Do nothing because of X and Y.
        }
    
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            // Do nothing because of X and Y.
        }
    }

    protected class MasterListener extends Thread {

        protected List<String> masters;
        protected String host;
        protected int port;
        protected long subscribeRetryWaitTimeMillis = 5000;
        protected Jedis jedis;
        protected AtomicBoolean running = new AtomicBoolean(false);
    
        protected MasterListener() {
        }
    
        public MasterListener(List<String> masters, String host, int port) {
            this.masters = masters;
            this.host = host;
            this.port = port;
        }
    
        public MasterListener(List<String> masters, String host, int port,
            long subscribeRetryWaitTimeMillis) {
            this(masters, host, port);
            this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
        }

        @Override
        public void run() {
    
            running.set(true);
    
            while (running.get()) {
    
            jedis = new Jedis(host, port);
    
            try {
                jedis.subscribe(new JedisPubSub() {
                    @Override
                    public void onMessage(String channel, String message) {
                        logger.info("Sentinel " + host + ":" + port + " published: " + message + ".");
        
                        String[] switchMasterMsg = message.split(" ");
        
                        if (switchMasterMsg.length > 3) {


                            int index = masters.indexOf(switchMasterMsg[0]);
                            if (index >= 0) {
                                HostAndPort newHostMaster = toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
                                List<HostAndPort> newHostMasters = new ArrayList<>();
                                for (int i = 0; i < masters.size(); i++) {
                                    newHostMasters.add(null);
                                }
                                Collections.copy(newHostMasters, currentHostMasters);
                                newHostMasters.set(index, newHostMaster);
                                
                                initPool(newHostMasters);
                            } else {
                                StringBuilder sb = new StringBuilder();
                                for (String masterName : masters) {
                                    sb.append(masterName);
                                    sb.append(",");
                                }
                                logger.info("Ignoring message on +switch-master for master name "
                                    + switchMasterMsg[0]
                                    + ", our monitor master name are ["
                                    + sb + "]");
                            }
        
                        } else {
                            logger.info("Invalid message received on Sentinel "
                                + host
                                + ":"
                                + port
                                + " on channel +switch-master: "
                                + message);
                        }
                    }
                }, "+switch-master");
    
            } catch (JedisConnectionException e) {
    
                if (running.get()) {
                    logger.info("Lost connection to Sentinel at " + host
                        + ":" + port
                        + ". Sleeping 5000ms and retrying.");
                    try {
                        Thread.sleep(subscribeRetryWaitTimeMillis);
                    } catch (InterruptedException e1) {
                        logger.error(e.getMessage(),e1);
                      Thread.currentThread().interrupt();
                    }
                } else {
                    logger.info("Unsubscribing from Sentinel at " + host + ":"
                        + port);
                }
            }
            }
        }
    
        public void shutdown() {
            try {
                logger.info("Shutting down listener on " + host + ":" + port);
                running.set(false);
                // This isn't good, the Jedis object is not thread safe
                jedis.disconnect();
            } catch (Exception e) {
                logger.error("Caught exception while shutting down: " , e);
            }
        }
    }
}

举报

相关推荐

0 条评论