事务的相关概念就不介绍了,本篇博客先给大家演示本地事务与分布式事务,然后介绍Seata
的基本使用,以后会介绍Seata
的集群、注册中心、配置中心以及各种事务模式。
- 版本说明
搭建工程
一个父module
和一个子module
,父module
用于管理依赖版本,子module
用于演示本地事务与分布式事务(通过一个简易版的用户购买商品案例来演示),最后还需要集成Seata
分布式事务解决方案。
父module
的pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>alibaba</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<description>Spring Cloud Alibaba</description>
<modules>
<module>seata</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring-cloud-version>Hoxton.SR9</spring-cloud-version>
<spring-cloud-alibaba-version>2.2.6.RELEASE</spring-cloud-alibaba-version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
seata module
seata module
通过一个简易版的用户购买商品案例来演示本地事务与分布式事务,seata module
的结构如下图所示:
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.kaven</groupId>
<artifactId>alibaba</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
application.yml
:
server:
port: 9000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: ITkaven@666
url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
jpa:
show-sql: true
Entity
Order
实体类:
package com.kaven.alibaba.entity;
import lombok.Data;
import javax.persistence.*;
@Entity
@Table(name = "`order`") // 不能将`order`改成order(Mysql数据库关键字),不然与数据库交互时会出错
@Data
public class Order {
// 订单id
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Integer id;
// 用户id
public Integer userId;
// 商品id
public Integer productId;
// 商品购买数量
public Integer count;
// 订单金额
public Integer money;
}
Storage
实体类:
package com.kaven.alibaba.entity;
import lombok.Data;
import javax.persistence.*;
@Entity
@Table(name = "storage")
@Data
public class Storage {
// 商品id
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Integer id;
// 库存
public Integer count;
}
User
实体类:
package com.kaven.alibaba.entity;
import lombok.Data;
import javax.persistence.*;
@Entity
@Table(name = "user")
@Data
public class User {
// 用户id
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Integer id;
// 用户余额
public Integer money;
}
Repository
OrderRepository
:
package com.kaven.alibaba.repository;
import com.kaven.alibaba.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface OrderRepository extends JpaRepository<Order, Integer> {}
StorageRepository
:
package com.kaven.alibaba.repository;
import com.kaven.alibaba.entity.Storage;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface StorageRepository extends JpaRepository<Storage, Integer> {}
UserRepository
:
package com.kaven.alibaba.repository;
import com.kaven.alibaba.entity.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface UserRepository extends JpaRepository<User, Integer> {}
Service
IOrderService
:
package com.kaven.alibaba.service;
public interface IOrderService {
void create(int userId, int productId, int count, int money);
}
OrderServiceImpl
:
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.entity.Order;
import com.kaven.alibaba.repository.OrderRepository;
import com.kaven.alibaba.service.IOrderService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class OrderServiceImpl implements IOrderService {
@Resource
private OrderRepository orderRepository;
@Override
public void create(int userId, int productId, int count, int money) {
// 生成订单
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setCount(count);
order.setMoney(money);
orderRepository.save(order);
}
}
IStorageService
:
package com.kaven.alibaba.service;
public interface IStorageService {
void deduct(int productId, int count);
}
StorageServiceImpl
:
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.entity.Storage;
import com.kaven.alibaba.repository.StorageRepository;
import com.kaven.alibaba.service.IStorageService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Optional;
@Service
public class StorageServiceImpl implements IStorageService {
@Resource
private StorageRepository storageRepository;
@Override
public void deduct(int productId, int count) {
Optional<Storage> byId = storageRepository.findById(productId);
if(byId.isPresent()) {
Storage storage = byId.get();
if(storage.getCount() >= count) {
// 减库存
storage.setCount(storage.getCount() - count);
storageRepository.save(storage);
}
else {
throw new RuntimeException("该商品库存不足!");
}
}
else {
throw new RuntimeException("该商品不存在!");
}
}
}
IUserService
:
package com.kaven.alibaba.service;
public interface IUserService {
void debit(int userId, int money);
}
UserServiceImpl
:
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.entity.User;
import com.kaven.alibaba.repository.UserRepository;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Optional;
@Service
public class UserServiceImpl implements IUserService {
@Resource
private UserRepository userRepository;
@Override
public void debit(int userId, int money) {
Optional<User> byId = userRepository.findById(userId);
if(byId.isPresent()) {
User user = byId.get();
if(user.getMoney() >= money) {
// 减余额
user.setMoney(user.getMoney() - money);
userRepository.save(user);
}
else {
throw new RuntimeException("该用户余额不足!");
}
}
else {
throw new RuntimeException("没有该用户!");
}
}
}
IBuyService
:
package com.kaven.alibaba.service;
public interface IBuyService {
void buy(int userId, int productId, int count);
}
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.service.IBuyService;
import com.kaven.alibaba.service.IOrderService;
import com.kaven.alibaba.service.IStorageService;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class BuyServiceImpl implements IBuyService {
@Resource
private IOrderService orderService;
@Resource
private IStorageService storageService;
@Resource
private IUserService userService;
@Override
public void buy(int userId, int productId, int count) {
int money = count;
// 生成订单
orderService.create(userId, productId, count, money);
// 减库存
storageService.deduct(productId, count);
// 减余额
userService.debit(userId, money);
}
}
Controller
BuyController
:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IBuyService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class BuyController {
@Resource
private IBuyService buyService;
@PostMapping("/buy")
public String buy(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
buyService.buy(userId, productId, count);
return "success";
}
}
启动类:
package com.kaven.alibaba;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
数据库
进入数据库中,执行下面的sql
(为了简单,没有创建相关索引)。
CREATE DATABASE seata;
USE seata;
DROP TABLE IF EXISTS `storage`;
CREATE TABLE `storage` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`product_id` int(11) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
插入几条数据,如下图所示:
启动应用。
本地事务
使用Postman
向应用发送请求。
很显然用户的金额不够。
商品的库存也被扣减了。
订单已经创建了。
减用户余额的操作由于没有被执行,因此用户余额不变,这就导致了数据不一致的问题。
这种只在一个服务下的事务比较好解决,可以使用Spring
的@Transactional
注解。
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.service.IBuyService;
import com.kaven.alibaba.service.IOrderService;
import com.kaven.alibaba.service.IStorageService;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class BuyServiceImpl implements IBuyService {
@Resource
private IOrderService orderService;
@Resource
private IStorageService storageService;
@Resource
private IUserService userService;
@Override
@Transactional
public void buy(int userId, int productId, int count) {
int money = count;
// 生成订单
orderService.create(userId, productId, count, money);
// 减库存
storageService.deduct(productId, count);
// 减余额
userService.debit(userId, money);
}
}
重启应用,将数据库中的数据手动回滚。再使用Postman
向应用发送请求。
异常链信息和之前不一样了,其实就是@Transactional
注解的作用,该注解的原理博主以后会在Spring
源码阅读系列中进行介绍。
此时数据是一致的。
分布式事务
现在将每个操作变成一个接口来模拟分布式环境。
Controller
OrderController
:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IOrderService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private IOrderService orderService;
@PostMapping("/create")
public void create(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count,
@RequestParam("money") Integer money) {
orderService.create(userId, productId, count, money);
}
}
StorageController
:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IStorageService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/storage")
public class StorageController {
@Resource
private IStorageService storageService;
@PostMapping("/deduct")
public void deduct(@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
storageService.deduct(productId, count);
}
}
UserController
:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IUserService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private IUserService userService;
@PostMapping("/debit")
public void debit(@RequestParam("userId") Integer userId,
@RequestParam("money") Integer money) {
userService.debit(userId, money);
}
}
修改BuyController
:
package com.kaven.alibaba.controller;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@RestController
public class BuyController {
@Resource
private RestTemplate restTemplate;
@PostMapping("/buy")
@Transactional
public String buy(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
// 请求参数
MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
queryParams.add("userId", userId.toString());
queryParams.add("productId", productId.toString());
queryParams.add("count", count.toString());
queryParams.add("money", count.toString());
// 构造请求
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/order/create").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/storage/deduct").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/user/debit").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
return "success";
}
}
重启应用,再使用Postman
向应用发送请求。
数据不一致。
分布式环境下使用@Transactional
注解就不会起作用了(为了简单,这里只是模拟分布式环境下的用户购买商品案例),接下来就需要使用Seata
来解决该问题。
Seata
Seata
是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata
将为用户提供了 AT
、TCC
、SAGA
和XA
事务模式,为用户打造一站式的分布式解决方案(这些关于Seata
的介绍来自官网)。
-
TC
(Transaction Coordinator
,事务协调者):维护全局和分支事务的状态,驱动全局事务提交或回滚。 -
TM
(Transaction Manager
, 事务管理器):定义全局事务的范围,开始全局事务、提交或回滚全局事务。 -
RM
(Resource Manager
,资源管理器):管理分支事务处理的资源,与TC
交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
很显然TM
和RM
是在业务代码中的,而TC
是一个独立的应用,由Seata
提供。
- 下载地址
博主下载的是1.3.0
版本(要和Spring Cloud Alibaba
版本兼容)。
简单使用不需要修改配置(默认是基于本地file
的形式,为了简单就使用默认形式,分布式环境下需要使用其他形式,其他形式博主以后会介绍),启动Seata
:
C:\Users\Dell>f:
F:\>cd F:\tools\seata-server-1.3.0\seata\bin
F:\tools\seata-server-1.3.0\seata\bin>dir
驱动器 F 中的卷是 WorkSpace
卷的序列号是 D671-D29B
F:\tools\seata-server-1.3.0\seata\bin 的目录
2021/12/29 10:50 <DIR> .
2020/07/16 00:35 <DIR> ..
2020/07/16 00:35 3,648 seata-server.bat
2020/07/16 00:35 4,175 seata-server.sh
2021/12/29 10:51 <DIR> sessionStore
2 个文件 7,823 字节
3 个目录 241,159,860,224 可用字节
F:\tools\seata-server-1.3.0\seata\bin>seata-server.bat -p 8080
修改配置文件:
server:
port: 9000
spring:
application:
name: seata
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: ITkaven@666
url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
jpa:
show-sql: true
seata:
application-id: buy
tx-service-group: kaven_seata_tx_group
service:
vgroup-mapping:
kaven_seata_tx_group: default
grouplist:
- "127.0.0.1:8080"
file.conf
(这里的service
相关配置要和application.yml
配置文件中的一致):
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.kaven_seata_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8080"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
registry.conf
:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}
给buy
接口加上@GlobalTransactional
注解:
@PostMapping("/buy")
@GlobalTransactional
public String buy(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
// 请求参数
MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
queryParams.add("userId", userId.toString());
queryParams.add("productId", productId.toString());
queryParams.add("count", count.toString());
queryParams.add("money", count.toString());
// 构造请求
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/order/create").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/storage/deduct").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/user/debit").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
return "success";
}
在数据库中创建undo_log
表。
USE seata;
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
启动应用。
手动回滚数据库中的数据,再使用Postman
进行测试。
客户端的日志:
TC
的日志:
数据库中的数据也是一致的。
通过这些日志信息也能看出来Seata
分布式事务解决方案应用成功了。本地事务与分布式事务演示以及Seata
的使用就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。