文章目录
1. 概述
官方文档
Lua 语言是一种小巧的、高效的、可嵌入的脚本编程语言,由巴西里约热内卢天主教大学的一个研究小组于 1993 年开发。设计目的是为了通过灵活嵌入应用程序中,为应用程序提供灵活的扩展和定制功能。
参考资料:
- Lua 官网
- 菜鸟教程
Redis 2.6 版本开始支持用户在服务器上传和执行 Lua 脚本,从而实现复杂的操作逻辑。脚本在 Redis 中是以字符串的形式传递给服务器的,然后服务器会执行这些脚本并返回结果。脚本在 Redis 中由内置的执行引擎执行。目前,仅支持一个脚本引擎,即 Lua 5.1 解释器。
Lua 脚本的优势:
- 原子性:保证一个
Lua脚本在执行期间是原子性的,即脚本执行期间不会被其他命令打断。在执行脚本期间,服务器的所有活动都会被阻塞,直到脚本运行结束。 - 减少网络开销:可以将多个命令打包成一个
Lua脚本执行,减少与Redis服务器的网络交互次数。 - 复用性:可以将复杂的逻辑编写成
Lua脚本,然后在多个地方重复使用。 - 高效率:于脚本在服务器上执行,因此从脚本读取和写入数据非常高效。
注意事项:脚本被视为客户端应用程序的一部分,因此它们没有名称、版本或持久性。因此,如果脚本丢失(例如在服务器重新启动、故障切换到副本后等),所有脚本可能需要随时由应用程序重新加载。
2. 常用命令
2.1 EVAL
在服务端执行 Lua 脚本。
语法格式:
EVAL script numkeys [key [key ...]] [arg [arg ...]]
参数说明:
script:Lua编写的脚本源代码numkeys:脚本中用到的key的数量[key [key ...]]:key键名称列表,在脚本中可以使用KEYS[]进行占位,例如EVAL的第三个参数会被赋值给KEYS[1],多个依次类推[arg [arg ...]]:参数列表,在脚本中可以使用ARGV[]进行占位,例如EVAL的第四个参数会被赋值给ARGV[1],多个依次类推
注意事项:
- 禁止滥用
Lua EVAL,例如,每次调用EVAL时生成不同的脚本。这些脚本将被添加到Lua解释器并缓存到服务端,随着时间的推移会消耗大量内存。 - 从
Redis 7.4开始,使用EVAL或EVAL_RO加载的脚本将根据一定数量(按最近最少使用顺序)从Redis中删除。可以通过INFO命令查看被驱逐的脚本数量。
为了确保脚本在单机和集群环境中正确执行,还需要注意:
Lua脚本的操作应该基于传递给脚本的参数进行,而不应该直接访问存储中的数据结构内容,例如哈希表、列表或集合。- 不支持动态键名访问或基于数据内容的键名计算,因此所有访问的键必须在执行脚本时已经确定。这些输入键的名称作为
KEYS全局运行时变量提供给Lua脚本使用。
简单示例:
localhost:0>EVAL "return 'Hello, scripting!'" 0
"Hello, scripting!"
示例说明:
return 'Hello, scripting!':使用双引号包含的脚本源代码,返回一个字符串0:没有使用到键
包含参数示例:
localhost:0>EVAL "return ARGV[1]" 0 Hello
"Hello"
示例说明:
0:没有使用到键Hello:将Hello传递给ARGV[1]
包含键、参数示例:
redis> EVAL "return { KEYS[1], KEYS[2], ARGV[1], ARGV[2], ARGV[3] }" 2 key1 key2 arg1 arg2 arg3
1) "key1"
2) "key2"
3) "arg1"
4) "arg2"
5) "arg3"
示例说明:
2:包含两个键key1:将Hello传递给KEYS[1],按照位置顺序依次类推arg1:将arg1传递给ARGV[1],按照位置顺序依次类推
可以通过 redis.call() 或 redis.pcall() 在 Lua 脚本中调用 Redis 命令,区别在于处理运行时错误(例如语法错误)的方式:
call():如果发生错误,错误会直接返回给执行它的客户端pcall():遇到的错误会返回到脚本的执行环境
示例:
EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 foo bar
OK
2.2 SCRIPT LOAD
每次调用 EVAL 命令时,请求中都会包括脚本的源代码,重复调用来执行相同的脚本集会浪费网络带宽,并且在 Redis 中也会产生一些额外开销,因此 Redis 提供了脚本的缓存机制。
使用 SCRIPT LOAD 可以将 Lua 脚本加载到 Redis 服务器,服务器不会执行脚本,而是仅编译并加载到服务器的缓存中。加载后返回一个 SHA1 摘要,唯一标识了它在缓存中的位置,该摘要可以用于执行已加载的脚本。
语法格式:
SCRIPT LOAD script
注意事项:
- 脚本缓存始终是易失性的,它不被视为数据库的一部分,也不会持久化存储。当服务器重新启动、在故障转移时从副本切换为主服务器时,或者通过调用
SCRIPT FLUSH命令时,缓存可能会被清空。 - 应用程序应首先用
SCRIPT LOAD加载脚本,然后再次调用EVALSHA运行缓存的脚本。大多数Redis客户端已经提供了自动执行此过程的API。
示例:
redis> SCRIPT LOAD "return 'Immabe a cached script'"
"c664a3bf70bd1d45c4284ffebb65a6f2299bfc9f"
redis> EVALSHA c664a3bf70bd1d45c4284ffebb65a6f2299bfc9f 0
"Immabe a cached script
2.3 EVALSHA
用于执行已加载的 Lua 脚本。
语法格式:
EVALSHA <sha1> <numkeys> <key> [key ...] <arg> [arg ...]
参数说明:
<sha1>:之前已加载缓存的Lua脚本的SHA1值。
示例:
127.0.0.1:6379> SCRIPT LOAD "local key = KEYS[1]\nlocal value = ARGV[1]\n\nredis.call('SET', key, value)"
"5b405e1d1f5c91b27e7e2b091380a848d38a99d6"
127.0.0.1:6379> EVALSHA 5b405e1d1f5c91b27e7e2b091380a848d38a99d6 1 mykey "myvalue"
OK
注意事项:
SHA1对应的脚本不存在时,会报错- 在使用管道时,尽量不要使用
EVALSHA,管道请求中的命令按发送顺序执行,但其他客户端的命令可能会在这些命令之间交错执行。因此,可能会从管道请求中返回NOSCRIPT错误,但无法进行处理。因此,在管道中应当使用普通的EVAL或带参数的EVAL。
2.4 SCRIPT FLUSH
清空 Lua 脚本缓存。默认情况下,SCRIPT FLUSH 命令会同步清空缓存。从 Redis 6.2 开始,设置 lazyfree-lazy-user-flush 配置指令为 “yes” 将改变默认的清空模式为异步。
语法格式:
SCRIPT FLUSH [ASYNC | SYNC]
参数说明:
ASYNC:异步清空缓存SYNC:同步清空缓存
注意事项:
- 运行命令将完全清空脚本缓存,删除到目前为止执行的所有脚本。
- 在正常操作期间,脚本应该在缓存中无限期地保留。
示例:
localhost:0>SCRIPT FLUSH
"OK"
2.5 其他
其他脚本相关的命令:
EVAL_RO:EVAL命令的只读变体,不能执行修改数据的命令。EVALSHA_RO:EVALSHA命令的只读变体,不能执行修改数据的命令。SCRIPT DEBUG:控制内置的Redis Lua脚本调试器。SCRIPT EXISTS:判断脚本已存在于缓存中,一个或多个SHA1摘要作为参数。返回1表示存在,0不存在。SCRIPT KILL:中断长时间运行脚本(即慢脚本)的唯一方式,除非关闭服务器。脚本在执行时间超过配置的最大执行时间阈值后被视为慢脚本。只能用于在执行期间未修改数据集的脚本(因为停止只读脚本不会违反脚本引擎的原子性保证)。
3. 脚本复制
集群部署环境下,至少有三个主节点,每个主节点有一个或多个从节点,主从之间使用完全复制保持数据一致性。由于脚本可以修改数据,Redis 确保脚本执行的所有写操作也会被发送到从节点以保持一致性。
脚本复制有以下两种方式:
- 逐字复制:主节点将脚本的源代码发送到副本,副本然后执行脚本并应用写入效果。
- 效果复制:只复制脚本的数据修改命令,副本随后运行这些命令而不执行任何脚本。
逐字复制模式意味着副本会重新执行主节点已完成的工作,这是一种浪费。更重要的是,它还要求所有写入脚本都是确定性的。效果复制模式虽然在网络流量方面可能更为冗长,但这种复制模式是确定性的,因此不需要特别考虑其他情况。
直到 Redis 3.2 版本之前,逐字脚本复制是唯一支持的模式,在 Redis 3.2 中添加了效果复制。在 Redis 5.0 中,效果复制成为默认模式,截至 Redis 7.0 ,不再支持逐字复制。
在效果复制模式下,当 Lua 脚本执行时, Redis 会收集 Lua 脚本引擎实际修改数据集的所有命令。当脚本执行结束时,脚本生成的命令序列会被封装成一个 MULTI/EXEC 事务,并发送到副本和 AOF 。
效果复制的优势:
- 当脚本计算速度较慢,可以由少数写入命令替换时,重新在副本或重新加载
AOF上计算脚本是一种浪费。在这种情况下,仅复制脚本的效果要好得多。 - 解除了非确定性函数的限制。例如,您可以在脚本中任意使用
TIME或SRANDMEMBER命令。 Lua伪随机数生成器在每次调用时都是随机种子。
4. 案例演示
Redis 命令的执行是单线程的,Lua 脚本在执行期间是原子性的,并且可以同时执行多个命令。特别适用于多线程环境下,需要保证多个复杂操作的原子性的场景。
4.1 限流
演示需求: 基于固定时间窗口进行 IP 限流, 同一 IP 在一分钟内,只允许访问固定的次数。
在 resources 目录下添加限流脚本文件:
local ipKey = KEYS[1]; -- IP地址
local rate = tonumber(ARGV[1]); -- 允许的请求次数
local requestCount = redis.call('incr', ipKey); -- 每次请求 + 1
if (requestCount == 1) then -- 第一次请求
redis.call('expire', ipKey, 60); -- 设置过期时间(60S)
return true ; -- 返回是否允许访问
else -- 不是第一次请求
if (requestCount > rate) then -- 如果当前请求次数大于允许的请求次数
return false ;
end
end
使用 Lettuce 客户端执行脚本:
public static void main(String[] args) {
// 创建客户端
RedisClient redisClient = RedisClient.create("redis://:123456@127.0.0.1:6379/0");
// 获取连接
try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
// 执行命令
RedisCommands<String, String> sync = connection.sync();
// 获取脚本流
byte[] lua = getResourceAsByteArray("limit.lua");
// 执行脚本
boolean eval = sync.eval(lua, ScriptOutputType.BOOLEAN, new String[]{"127.0.0.1"}, "1");
// 结果判断
if (!eval) {
System.out.println("当前访问过于频繁,请稍后重试");
} else {
System.out.println("允许访问");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
redisClient.shutdown();
}
}
public static byte[] getResourceAsByteArray(String resourceName) throws IOException {
// 使用ClassLoader获取资源作为输入流
try (InputStream inputStream = LuaTest.class.getClassLoader().getResourceAsStream(resourceName)) {
if (inputStream == null) {
return null;
}
// 将输入流转换为字节数组
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, length);
}
return outputStream.toByteArray();
}
}
4.2 分布式锁
演示需求: 商品秒杀防止超卖,使用 Redis +Lua 实现分布式锁(生产环境请使用成熟的框架)。
获取锁的命令:
SET key value NX EX expire_time
简单的实现原理:
- 使用
SET NX...命令设置值,设置成功表示获取到的锁,反之未获取(直接返回失败或循环重试),并设置一个过期时间(避免死锁) - 获取到锁后,执行业务逻辑
- 执行完成后释放锁
在 resources 目录下添加尝试获取锁脚本文件 tryLock.lua:
local lockKey = KEYS[1] -- 锁的key
local lockValue = ARGV[1] -- 锁的值
local lockExpireTime = tonumber(ARGV[2]) -- 锁的过期时间(秒)
local acquiredLock = redis.call('SET', lockKey, lockValue, 'NX', 'EX', lockExpireTime) -- 获取锁
if acquiredLock then
return true -- 获取锁成功
else
return false -- 获取锁失败
end
添加删除锁脚本文件 unlock.lua:
local lockKey = KEYS[1] -- 锁的key
redis.call('DEL', lockKey) -- 删除锁
创建工具类封装相关分布式锁逻辑:
public class RedisLockUtils {
/**
* 尝试获取锁
*
* @param sync 连接
* @param key 锁的 Key
* @param value 锁的值
* @param second 过期时间
* @return 是否获取到锁
* @throws IOException
*/
public static boolean tryLock(RedisCommands<String, String> sync, String key, String value, String second) throws IOException {
byte[] lua = RedisLockUtils.getResourceAsByteArray("tryLock.lua"); // 获取脚本流
return sync.eval(lua, ScriptOutputType.BOOLEAN, new String[]{key}, value, second); // 执行脚本
}
/**
* 删除锁
*
* @param sync 连接
* @param key 锁的 Key
* @return 是否获取到锁
* @throws IOException
*/
public static void unlock(RedisCommands<String, String> sync, String key) throws IOException {
byte[] lua = RedisLockUtils.getResourceAsByteArray("unlock.lua");
boolean eval = sync.eval(lua, ScriptOutputType.BOOLEAN, new String[]{key});
}
public static byte[] getResourceAsByteArray(String resourceName) throws IOException {
// 使用ClassLoader获取资源作为输入流
try (InputStream inputStream = LuaLimitTest.class.getClassLoader().getResourceAsStream(resourceName)) {
if (inputStream == null) {
return null;
}
// 将输入流转换为字节数组
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, length);
}
return outputStream.toByteArray();
}
}
}
模拟秒杀,简单测试:
public class LuaLockTest {
private static Integer stockCount = 100; // 商品库存数量
private static String goodsId = "899632563356632489"; // 商品库存数量
public static void main(String[] args) {
// 创建客户端
RedisClient redisClient = RedisClient.create("redis://:123456@127.0.0.1:6379/0");
// 获取连接
try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
// 秒杀逻辑
try {
// 获取到锁
if (RedisLockUtils.tryLock(connection.sync(), goodsId, "123456", "10")) {
// 减库存
if (stockCount > 0) {
stockCount = stockCount - 1;
System.out.println("减库存成功,剩余库存:" + stockCount);
} else {
System.out.println("库存不足");
}
} else {
System.out.println("库存不足");
}
} finally {
// 释放锁
RedisLockUtils.unlock(connection.sync(), goodsId);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
redisClient.shutdown();
}
}
}










