0
点赞
收藏
分享

微信扫一扫

分布式前修课:Etcd锁实现方式

前言

这里就是分布式锁的最后一个系列,关于Etcd的方式。很多人可能并没有接触过Etcd,也不知道这是个什么东西。所以我们先来介绍一下关于Etcd的基本概念

搞定“Etcd”

What Is Etcd

熟悉Linux的大家都知道,在Linux下存在一个目录:/etc,该目录是一个全局的配置存储目录。

而我们要介绍到的Etcd的最初的主要目的也是来解决集群管理系统中OS升级时的分布式并发控制,配置文件的存储和分发等问题。在云的领域应用最为广泛,由于其特性逐渐为人所知。更多的使用场景在k8s上。

那么,Etcd基于Go语言实现,CoreOS公司开源的一款高可用,一致性的小型Key-Value存储数据库,并且热度不断上升

image20220109201255603.png
Etcd是通过Raft共识算法来达到数据一致性的

关于Raft共识算法这里不会过多介绍,大家根据 The Secret Lives of Data 这个网站来进行相关的学习

通常情况下,我们可以使用Etcd来做如下事情:

  • 服务注册和发现
  • 配置中心
  • 集群监控
  • 分布式锁,分布式ID

Etcd架构

gRPC

当客户端发送操作请求之后,先会到达gRPC层面,然后gRPC才会将操作的具体指定向后分发到其他组件。

除了接收客户端请求之外,gRPC还需要处理各个节点之间的心跳请求和同步请求

wal【Write Ahead Log】

预写式日志,是实现事务的标准方法,跟MySQL中的redo log类似。

Etcd在操作的时候会先进行写日志的操作,但是此时日志状态为prepare,等待某一个时刻将日志提交落盘并且修改操作数据

wal在日志落盘的时候属于顺序写入,这样能够提高IO性能

snapshot

快照,Leader节点用来向其他节点进行数据同步从而达到主数据一致性的关键

boltdb

boltdb是一个单机的支持事务的kv存储,而etcd的事务就是基于boltdb的事务来实现的。

boltdb为每一个key都创建了一个索引,该索引通过B+Tree来维护。其中该B+Tree存储了key所对应的版本数据。

也就是说每操作一次,etcd都会记录一个版本号,并且会存储对应版本号所对应的数据

Etcd,你过来呀

工欲善其事必先利其器,说的再多,不如实际上手来试一试,接下来我们开始搭建etcd的环境吧

环境规划

一定要记住一句话:好记性不如烂笔头。

拿到一台机器之后,不要盲目上手就开始装各种东西,一定要做好整个环境的规划,不慌不乱

node ip port
etcd 192.168.10.200 2379,2380

Etcd安装

etcd属于一款开源产品,在github我们就能看到其源码。

如果你本地环境有GO版本的话,那么可以通过编译安装的形式来安装,我不是Go Coder,所以我这里就采用最简单的安装方式

来,跟着我一起操作

yum install -y etcd

等待完成之后,etcd的安装也就已经完毕,接下来我们来验证一下

etcd --version
etcdctl -v

image20220102091502805.png

说明已经安装成功了

接下来我们来看一看etcd的配置

基本配置

默认情况下,yum的安装方式会在/etc/etcd下存在配置文件,所以cd /etc/etcd我们进入到这个目录下, 会发现存在etcd.conf

这里最好先备份一下,然后我们再调整配置

# 单机
#[Member]
# 监听etcd 各个节点间通信,设置为自己的服务器的IP地址,当前最好能够指定hostname
ETCD_LISTEN_PEER_URLS="http://192.168.10.200:2380"
# 监听客户端通信
ETCD_LISTEN_CLIENT_URLS="http://192.168.10.200:2379"
#[Clustering]
# 对外公告的该节点客户端监听地址
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.10.200:2379"

这是单机版的最小配置,其他的如鉴权等操作在对应位置配置

然后我们启动etcd

systemctl start etcd

实际生产环境下单机版可用性不高,我们接下来介绍一下集群操作

集群配置

环境规划就不说了,先按照单机版本安装成功,然后直接看配置

首先,需要注意的是,Etcd集群组成最少需要三台节点,需要用于选取Leader节点,多的话最好是奇数台,那么配置如下

一定要注意ETCD_NAME,否则会启动失败

#[Member]
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"
TCD_LISTEN_PEER_URLS="http://192.168.10.200:2380"
ETCD_LISTEN_CLIENT_URLS="http://192.168.10.200:2379,http://127.0.0.1:2379"
ETCD_NAME="slave01"

#[Clustering]
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.10.200:2380"
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.10.200:2379"
ETCD_INITIAL_CLUSTER="master=http://192.168.10.201:2380,slave01=http://192.168.10.200:2380,slave02=http://192.168.10.202:2380"
ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
ETCD_INITIAL_CLUSTER_STATE="new"

以上配置为集群的最小配置,配置完之后启动etcd就好。而且需要注意的是:

  • ETCD_INITIAL_CLUSTER中,名称必须和自身节点的ETCD_NAME保持一致
  • ETCD_INITIAL_CLUSTER中,规划的主节点一定要写在前面
问题:etcd cluster is unavailable or misconfigured

在集群过程中可能会遇到如下问题:这是因为etcd本身无法找到127.0.0.1的原因

Error:  client: etcd cluster is unavailable or misconfigured; error #0: dial tcp 127.0.0.1:4001: connect: connection refused
; error #1: dial tcp 127.0.0.1:2379: connect: connection refused

error #0: dial tcp 127.0.0.1:4001: connect: connection refused
error #1: dial tcp 127.0.0.1:2379: connect: connection refused

所以:

  • ETCD_LISTEN_CLIENT_URLS="http://192.168.10.200:2379,http://127.0.0.1:2379"该配置是重点

Etcd相关操作

相关操作不是本节重点,这里就简单列出一些

这里需要注意一下:etcdctl 默认使用V2版本的API,如果想要换成V3的话,进行如下操作

echo 'export ETCDCTL_API=3' >> /etc/profile
source /etc/profile

接下来就是具体的操作命令

# 会列出相对的帮助列表
etcdctl

# 列出集群节点的信息
etcdctl member list

# 插入,读取
etcdctl put key value
etcdctl get key 
# 列出一个key的详细信息
etcdctl get key -w json

# 监控指定key,包括增删改动作都能监控到
etcdctl watch key

# 删除
etcdctl del key

就列出这些吧,感兴趣的大家下来自己搭建一下,亲自动手感受一下

别动我的“蛋糕”

知其然

Lease机制

Etcd作为一款Key-Value形式的存储数据库,类似于Redis,支持对存储的K-V设置租约,当租约到期时key-value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,避免因为K-V对过期失效而导致锁被删除,也保证分布式的安全性。

同时对锁设置租约,即使锁持有者因故障无法主动释放,锁也能因为租约到期而自动释放

Revision机制

定义的每一个key都带有一个Revision版本号,这个版本号是全局唯一的。每进行一次事务操作,该版本号就会加一。

通过该Revision版本号就能清楚写操作的顺序,在实现分布式锁的时候,多个客户端同时抢锁,根据Revision版本号的大小依次获得锁就可以实现公平锁

Prefix机制

前缀机制,在后面代码实现的时候可能会更清晰一点。

比如我们定义的lockPath = /etcd/lock,多个客户端争抢进行写操作,而此时实际写入到Etcd的key为

  • /etcd/lock/{UUID}

这里的UUID表示全局唯一的ID,确保了多个客户端key的唯一性。

而上面我们也说过,基于Revision机制,返回的Revision号不一样。那么我们就可以通过如下方式来判断自己是否可以获取到锁:

  • 通过前缀/etcd/lock查询,返回包含客户端Key-Value对列表,同时也会包含各自的Revision。通过判断Revision大小,客户端就可以判断自己是否获取锁

这里很像Zookeeper中的有序节点

Watch机制

监听,Etcd中Watch机制支持监听某个固定的key,也支持监听某个前缀路径,当被监听者发生变化时客户端将会受到回调通知。

这里和Zookeeper中实现分布式锁的方式非常像:

  • 通过Prefix机制获取到的客户端Key-Value对列表中的Revision,并且监听和自己离得最近的一个key。
  • 当这个key释放锁之后,自己才能获取到锁

是不是和Zookeeper加锁方式非常像,所以说,一定要记住一点:

  • 我们在学习某个知识点的时候,一定要能够对比其中的差异性

知其所以然

那好,了解到锁的原理之后,那么我们就来自己实现一下分布式锁吧

一步一步来,既然是基于Etcd,那么我们就先来获取一下Etcd的客户端

// 注意:客户端使用2379端口,集群的话中间就通过 , 分割
private static final String node = "http://192.168.10.200:2379";

public static Client client() {
    final ClientBuilder builder = Client.builder().endpoints(node);
    // 是否需要用户密码
    // builder.user(ByteSequence.from()).password(ByteSequence.from());
    return builder.build();
}

接下来就是加锁和释放锁的过程了,注意睁大眼睛看清楚了,我只说一遍

// 用来暂存线程和key之间的关系
private final ConcurrentMap<Thread, Long> threadData = Maps.newConcurrentMap();
// 获取etcd下锁客户端
private final Lock lockClient;
// 获取Lease客户端
private final Lease leaseClient;
private final String lockKey;
// etcd获取到的加锁地址
private String lockPath;

// 租约有效期
private final long leaseTTL;

public EtcdLock(Client client, String lockKey, long leaseTTL, TimeUnit unit) {
    this.lockKey = lockKey;
    this.leaseTTL = unit.toNanos(leaseTTL);
    this.lockClient = client.getLockClient();
    this.leaseClient = client.getLeaseClient();
}

//加锁
public void lock() {
    Thread currentThread = Thread.currentThread();

    // 记录租约 ID
    Long leaseId = 0L;
    try {
        leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
        // 续租心跳周期, 续约时间的一半
        long period = leaseTTL >> 1;
        // 这里缺少启动定时任务续约:和Redis中的看门狗机制是一样一样的
        // 续约方式: leaseClient.keepAliveOnce(leaseId);
        LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
        if (lockResponse != null) {
            lockPath = lockResponse.getKey().toString(Charset.forName(StandardCharsets.UTF_8.name()));
            LOGGER.info("获取锁成功,锁路径:{},线程:{}", lockPath, currentThread.getName());
        }
    } catch (InterruptedException | ExecutionException e) {
        LOGGER.error("获取锁失败", e);
        throw new BusException(e);
    }
    // 获取锁成功,锁对象设置
    threadData.put(currentThread, leaseId);
}

// 释放锁
public void unlock() {
    Thread currentThread = Thread.currentThread();
    Long leaseId = threadData.get(currentThread);

    try {
        // 释放锁
        if (lockPath != null) {
            lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
        }

        // 将定时任务关闭
        // 删除租约
        if (leaseId != 0L) {
            leaseClient.revoke(leaseId);
        }
    } catch (InterruptedException | ExecutionException e) {
        LOGGER.error("解锁失败异常:{}", e.getMessage());
        throw new BusException(e);
    } finally {
        // 移除当前线程资源
        threadData.remove(currentThread);
    }
}

那接下来就来验证了

private static ExecutorService es = Executors.newFixedThreadPool(10000);
private static String key = "/etcd/lock";

public static void main(String[] args) throws InterruptedException {
    int[] count = {0};

    Client client = client();

    for (int i = 0; i < 100; i++) {
        es.submit(() -> {
            final EtcdLock lock = new EtcdLock(client, key, 20, TimeUnit.SECONDS);
            try {
                lock.lock();
                count[0]++;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    lock.unlock();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    es.shutdown();
    es.awaitTermination(1, TimeUnit.HOURS);
    System.err.println("执行结果: " + count[0]);
}

最后

到这里关于Etcd分布式锁就介绍完了,并且整个关于分布式锁的系列也就全部结束了,

举报

相关推荐

0 条评论