0
点赞
收藏
分享

微信扫一扫

分布式事务 Seata(四) 多数据源事务

扒皮狼 2021-09-21 阅读 77

项目搭建

我搭建的是一个最基础的用户下单,减库存,减用户金额,创建订单的一个微服务框架。因为后面需要测试微服务下的分布式事务,这里测试的是 多数据源 下的分布式事务。

项目结构如下:

|-- demo
|-- entity 实体对象(为了让其他服务拥有所有服务对象)
|-- order 订单 (pom导入了 entity )
|-- stock 库存 (pom导入了 entity )
|-- user 用户 (pom导入了 entity )

这里使用了mybatis-plus的相关技术,不懂得请自行百度,数据源配置如下:

spring:
main:
allow-bean-definition-overriding: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
allow: 127.0.0.1
deny: 192.168.1.73
reset-enable: true
login-username: admin
login-password: admin@2020
web-stat-filter:
enabled: true
url-pattern: "/*"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
dynamic:
druid:
filters: stat,wall
initial-size: 10
min-idle: 10
maxActive: 200
maxWait: 10000
useUnfairLock: true
validation-query: 'select 1'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
primary: user
datasource:
user:
url: jdbc:mysql://0.0.0.0:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
order:
url: jdbc:mysql://0.0.0.0:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
stock:
url: jdbc:mysql://0.0.0.0:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020

表结构创建

所建的表都不在同一个库下,分为order(表为orders)、user、stock三个库,各自的库建各自的表,每个库都要建一个相同的 undo_log 表,如下:

/*表: orders*/--------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ -------- --------- ------ ------ ------- ------ ------------------------------- ---------
id int (NULL) NO PRI (NULL) select,insert,update,references
uid int (NULL) YES (NULL) select,insert,update,references
count int (NULL) YES (NULL) select,insert,update,references
time datetime (NULL) YES (NULL) select,insert,update,references
money int (NULL) YES (NULL) select,insert,update,references
/*表: stock*/--------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ ------ --------- ------ ------ ------- ------ ------------------------------- ---------
id int (NULL) NO PRI (NULL) select,insert,update,references
num int (NULL) YES (NULL) select,insert,update,references
price int (NULL) YES (NULL) select,insert,update,references
/*表: account*/----------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ ------ --------- ------ ------ ------- -------------- ------------------------------- ---------
id int (NULL) NO PRI (NULL) auto_increment select,insert,update,references
amount int (NULL) YES (NULL) select,insert,update,references

表数据如下:

stock: id = 1,num = 500,price = 5
account: id = 1,amount = 2000

undo_log 表结构

编写过程

我把所有的接口和实现均写在 user 服务下进行测试,如下图:

AccountServer

public interface AccountServer {
/**
* 从胡账户中指支出
**/

void debit(Integer uid, Integer money);

}
@Service
public class AccountServerImpl implements AccountServer {


@Autowired
private AccountMapper accountMapper;

@DS("user")
@Override
public void debit(Integer uid, Integer money) {
Account account = accountMapper.selectById(uid);
account.setAmount(account.getAmount() - money);
accountMapper.updateById(account);
}
}

StockServer

public interface StockServer {
/**
* 扣除库存数量,返回金额
**/

Integer deduct(Integer stockId, Integer count);
}
@Service
public class StockServerImpl implements StockServer {

@Autowired
private StockMapper stockMapper;

// 库存这里需要乐观锁,但是这里我就不做了
@DS("stock")
@Override
public Integer deduct(Integer stockId, Integer count) {
Stock stock = stockMapper.selectById(stockId);
stock.setNum(stock.getNum() - count);
stockMapper.updateById(stock);
return stock.getPrice().intValue() * count.intValue();
}
}

OrderServer

public interface OrderServer {

/**
* 创建订单
**/

void create(Integer uid, Integer stockId, Integer count,Integer money);
}
@Service
public class OrderServerImpl implements OrderServer {

@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServer accountServer;

@DS("order")
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {

accountServer.debit(uid,money);
Order order = new Order();
order.setId(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);

orderMapper.insert(order);
}
}

BusinessService

public interface BusinessService {
/**
* 采购
**/

void purchase(Integer uid, Integer stockId, Integer count);
}
@Service
public class BusinessServiceImpl implements BusinessService {

@Autowired
private StockServer stockServer;
@Autowired
private OrderServer orderServer;

@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
}

UserApplication

@SpringBootApplication
public class UserApplication implements CommandLineRunner {

@Autowired
private BusinessService businessService;

public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
/**
* 执行采购,用户id=1,库存id=1,采购数量count=3
**/

businessService.purchase(1,1,3);
}
}

无seata事务处理测试

测试加 @Transactional 是否有效

    @Transactional(rollbackFor = Exception.class)
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}

测试结果:

java.sql.SQLSyntaxErrorException: Table 'user.stock' doesn't exist;

什么原因呢?其实就是 @Transactional 的事务传播策略默认为 Propagation.REQUIRED,如果当前没有事务,就新建一个事务,如果已经存在一个事务,就加入到这个事务中。也就是说不同之间的服务调用使用的是同一个库的事务,所以他就查同一个库下的这张表。

避免这种情况可以在子服务的方法上加 @Transactional(propagation = Propagation.REQUIRES_NEW),新建事务,如果当前存在事务,把当前事务挂起。意思就是,A调B的过程中,A方法用的是A库的事务,B方法用的是B库的事务,相互独立不受影响。

代码修改如下:

    // 库存这里需要乐观锁,但是这里我就不做了
@DS("stock")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Integer deduct(Integer stockId, Integer count) {
Stock stock = stockMapper.selectById(stockId);
stock.setNum(stock.getNum() - count);
stockMapper.updateById(stock);
return stock.getPrice().intValue() * count.intValue();
}
    @DS("order")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setUid(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
if(1==1){
throw new RuntimeException("下单异常");
}
orderMapper.insert(order);
}
    @DS("user")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void debit(Integer uid, Integer money) {
Account account = accountMapper.selectById(uid);
account.setAmount(account.getAmount() - money);
accountMapper.updateById(account);
}
测试下单异常
@Service
public class OrderServerImpl implements OrderServer {

@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServer accountServer;

@DS("order")
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setUid(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
if(1==1){
throw new RuntimeException("下单异常");
}
orderMapper.insert(order);
}
}

测试结果:

stock:id=1,num=497,price=5
account:id=1,amount=1985
order:

用户账户扣除,库存扣除,下单失败。
也就是说 @Transactional 处理不了分布式事务,只能处理同一个库的事务。

添加 Seata 分布式事务

添加依赖
        <dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
<version>2.2.1.RELEASE</version>
</dependency>

<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>

<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
修改配置
server:
port: 8082

spring:
main:
allow-bean-definition-overriding: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
allow: 127.0.0.1
deny: 192.168.1.73
reset-enable: true
login-username: admin
login-password: admin@2020
web-stat-filter:
enabled: true
url-pattern: "/*"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
dynamic:
druid:
filters: stat,wall
initial-size: 30
min-idle: 20
maxActive: 200
maxWait: 10000
useUnfairLock: true
validation-query: 'select 1'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
primary: user
# 启用严格模式
strict: true
# 开启分布式事务
seata: true
# 事务模式 为AT
seata-mode: AT
datasource:
user:
url: jdbc:mysql://10.240.30.100:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
# 建表脚本,启动时会运行
# schema: classpath:db/schema-account.sql
order:
url: jdbc:mysql://10.240.30.100:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
stock:
url: jdbc:mysql://10.240.30.100:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020

# 事务配置
seata:
enabled: true
# 启用自动代理数据源
enable-auto-data-source-proxy: false
# 随便起个名字,但最好与服务名称一致
application-id: ${spring.application.name}
# 此处的名称一定要与 service.vgroupMapping 下配置的参数保持一致
tx-service-group: my_test_tx_group
# 目的是从nacos 获取配置信息
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos@root@2020
namespace:
group: SEATA_GROUP
# registry 目的是从nacos找 seata-server 服务
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
username: nacos
password: nacos@root@2020
namespace:

关于 nacos 和 seata-server的配置请看 分布式事务 Seata(三) Seata搭建

添加全局事务
    @Transactional
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
测试
stock:id=1,num=500,price=5
account:id=1,amount=2000
order:

证明多数据源事务处理成功。

举报

相关推荐

0 条评论