0
点赞
收藏
分享

微信扫一扫

电脑自带录屏在哪?电脑录屏,4个详细方法

A邱凌 2024-06-21 阅读 24

概述

  • 随着云计算和大数据技术的飞速发展,分布式系统已经成为现代IT架构的重要组成部分
  • 在分布式系统中,数据的一致性是一个至关重要的挑战,特别是在并发访问和修改共享资源的场景下
  • 分布式锁是一种跨进程、跨机器节点的互斥锁,用于控制多个节点对共享资源的访问。
  • 其核心目标是确保在分布式系统中,同一时刻只有一个节点能够访问特定的共享资源,从而实现数据的一致性。
  • 分布式锁的实现方式多种多样,包括基于数据库、缓存(如Redis)、分布式协调服务(如Zookeeper)等

场景示例

  • 在多携程或者多线程的情况下,这个数据竞争是避免不了的, 参考下图

  • 一开始我们有一个Redis图标代表着之前用到的分布式的锁
  • 两边有两个grorouting,在实际工作当中有很多grorouting
  • 让左边的 grorouting 进行 getKey 和 setKey,就是说对这个 redis 进行读写
  • 让右边的 grorouting 也进行读写,这里面就会产生一个问题
  • 多个gorouting在同时写的时候会不会产生数据一致性的问题
  • 这个场景就是我们经常说的中断,举个例子
    • 写代码,听歌上网写文档,感觉这个计算机同时在做几件事情一样
    • 但是要在计算机看来,它在不停的切换,只是说计算机执行的足够快
    • 人类是无法感觉到它在极短的时间内进行时间的这个切换
  • 这里有两个命令,一个是上面的 getKey和下面这一行的 setKey
  • 在高并发的时候就会导致切换出现数据竞争, 也就是数据不一致的这种情况发生
  • REDIS 里提供了一个命令就是这个 Setnx,全称就是 Set if Not Exists
  • 中文意思就是说设置它,如果它不存在,如果设置成功,就是1,设置失败就返回0
  • 那就是说不存在这个key的时候,我就可以设置成功,如果存在了,那就设置是失败的
  • 那这就保证了第一个gorouting是可以设置成功的,在这个后面的gorouting,它就是设置不成功的

源码示例

  • 分布式锁 redSync 是如何结合这个setNX解决这个数据竞争的问题的
  • 我们来看下之前的业务代码
    pool := goredis.NewPool(client)
    rs := redsync.New(pool)
    mutexname := "my-global-mutex"
    mutex := rs.NewMutex(mutexname, redsync.WithExpiry(30*time.Second))
    fmt.Println("Lock()....")
    if err := mutex.Lock(); err != nil {
        panic(err)
    }
    
  • 上面这里 mutex.Lock() 进入源码
    // Lock locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) Lock() error {
        return m.LockContext(context.Background())
    }
    
  • 这里看到就一个 LockContext 方法,再次进入,其实这里有2个重载函数
    // LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) LockContext(ctx context.Context) error {
        return m.lockContext(ctx, m.tries)
    }
    
    
    // lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) lockContext(ctx context.Context, tries int) error {
        // 如果 ctx 不存在则新建
        if ctx == nil {
            ctx = context.Background()
        }
    
        // 获取 value
        value, err := m.genValueFunc()
        if err != nil {
            return err
        }
    
        var timer *time.Timer
        // 循环 tries 次
        for i := 0; i < tries; i++ {
            if i != 0 {
                if timer == nil {
                    timer = time.NewTimer(m.delayFunc(i))
                } else {
                    timer.Reset(m.delayFunc(i))
                }
    
                select {
                case <-ctx.Done():
                    timer.Stop()
                    // Exit early if the context is done.
                    return ErrFailed
                case <-timer.C:
                    // Fall-through when the delay timer completes.
                }
            }
    
            start := time.Now()
    
            n, err := func() (int, error) {
                ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
                defer cancel()
                return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                    // 在分配锁的时候会加 nx, 进入这里
                    return m.acquire(ctx, pool, value)
                })
            }()
    
            now := time.Now()
            until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
            if n >= m.quorum && now.Before(until) {
                m.value = value
                m.until = until
                return nil
            }
            _, _ = func() (int, error) {
                ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
                defer cancel()
                return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                    return m.release(ctx, pool, value)
                })
            }()
            if i == tries-1 && err != nil {
                return err
            }
        }
    
        return ErrFailed
    }
    
  • 再次进入 m.acquire
    func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
        conn, err := pool.Get(ctx)
        if err != nil {
            return false, err
        }
        defer conn.Close()
        // 这里 SetNX 就是 redSync 封装起来的 nx 特性
        reply, err := conn.SetNX(m.name, value, m.expiry)
        if err != nil {
            return false, err
        }
        return reply, nil
    }
    
  • 进入 conn.SetNX,可见 key, value 和 过期时间 expiry
    // Conn is a single Redis connection.
    type Conn interface {
        Get(name string) (string, error)
        Set(name string, value string) (bool, error)
        SetNX(name string, value string, expiry time.Duration) (bool, error) // 注意这里
        Eval(script *Script, keysAndArgs ...interface{}) (interface{}, error)
        PTTL(name string) (time.Duration, error)
        Close() error
    }
    
    • 不传过期时间,它会有死锁的风险
    • 因为一旦处于某种异常状况,那你这个锁就永远不会释放了,就会出现死锁
    • 过期时间比如说五秒之后自动释放,那我无论是宕机还是崩溃等其他问题,它都会五秒之后,释放出资源
    • 释放之后其他 gorouting 就可以继续抢锁和业务逻辑操作
举报

相关推荐

0 条评论