0
点赞
收藏
分享

微信扫一扫

MySQL Binlog 组提交实现

1.背景

MySQL 提交流程有两个问题需要解决:

1.1\. 提交写两份日志的性能问题

为了保证事务的持久性和原子性,事务提交完成前,其日志(WAL)必须持久化。对于 MySQL 来说,需要保证事务提交前,redo log 落盘。虽然日志顺序写的性能,已经高于数据文件随机写的性能,但是如果每次事务提交,都需将 redo log 刷盘,效率较低。同时 MySQL 还要写 binlog,相当于每次事务提交需要两次 IO,很容易成为性能瓶颈。

为了解决上述性能问题,经过 MySQL 5.6/5.7/8.0 的不断优化,引入组提交技术和流水线技术。

1.2\. redo log/binlog 的原子性和一致性

原子性比较好解决,MySQL 利用一个内部 2PC 机制实现 redo log 和 binlog 的原子提交,其中2PC 的协调者由 binlog 承担。

// mysqld.cc, init_server_components
if (total_ha_2pc > 1 || (1 == total_ha_2pc && opt_bin_log)) {
if (opt_bin_log)
// tc means transaction coordinator
tc_log = &mysql_bin_log;
else
tc_log = &tc_log_mmap;
}

内部两阶段提交的流程简单描述为:

  • Prepare 阶段:(1)InnoDB 将回滚段上的事务状态设置为 PREPARED;(2)将 redolog 写文件并刷盘;
  • Commit 阶段:(1)Binlog 写入文件;(2)binlog 刷盘;(3)InnoDB commit;

两阶段提交的 commit point 是 binlog 刷盘成功(因为此时两个日志都持久化成功了)。Recovery 流程会比较 binlog xid 和 redo xid,判断事务是否达到 commit point,以此来决定提交还是回滚:

  • 如果 binlog 还未刷盘,即使 redo log 已经刷盘了也要回滚。
  • 如果 binlog 已经刷盘,即使 InnoDB commit 还未完成,也要重新写入 commit 标志,完成提交。

解决完原子性的问题,还有一致性问题。事务binlog 提交的顺序应该和 redo log 保持一致,否则可能物理备份(不拷贝 binlog)丢数据的问题)。但是 Xtrabackup 在这次提交后 [https:// ​​jira.percona.com/browse​​ /PXB-1770 ],通过备份 binlog 避免了这种问题。

MySQL5.6以前,为保证 binlog 和 redo log 提交顺序一致,MySQL 使用了prepare_commit_mutex 锁,事务在两阶段提交流程中持有它,这样确实可以保证两份日志顺序一致,但它也会导致事务串行执行,效率很低。后来组提交和流水线提交的引入,不仅减少了 IO 次数,还提高了事务执行的并发度,减小了加锁的粒度。

2\. 提交流水线

为解决上节提到的两个问题,经过 5.6/5.7/8.0 的逐步优化,两阶段提交的逻辑优化为:

  • Prepare 阶段基本不变,只是写 redolog 时并不刷盘。
  • Commit 阶段按步骤做流水线批处理,将锁粒度进一步拆细。

Commit 阶段又拆为三个主要步骤:

  • flush stage:按事务进入的顺序将 binlog 从 cache 写入文件(不刷盘),redo log 刷盘(多个事务 redo 合并刷盘)。
  • sync stage:对 binlog 文件做 fsync 操作(多个事务的 binlog 合并刷盘)。
  • commit stage:各个线程按顺序做 InnoDB commit 操作。

每个 stage 一个队列,第一个进入该队列的线程成为 leader,后续进入的线程会阻塞直至完成提交。leader 线程会领导队列中的所有线程执行该 stage 的任务,并带领所有 follower 进入到下一个 stage 去执行,当遇到下一个 stage 为非空队列时,leader 会变成 follower 注册到此队列中。

而 redo log 刷盘从 Prepare 阶段移动到 flush stage,这样 leader 也可以将多个事务的 redo log 合并刷盘。同样 sync stage 的 leader 可以将多个事务的 binlog 合并刷盘。

每一个 stage 都是加锁的,保证 binlog 与 redo log 写入顺序是一致的。

总结下来,这套优化主要带来了两个好处:

  1. Commit 阶段流水化作业,stage 内批处理,stage 之间可以并发,大大提升了写的并发度,进而提高吞吐与资源利用率。
  2. redo log / binlog 合并刷盘,大幅减少 IO 次数。

3\. 代码实现

3.1 Prepare

协调者的 Prepare 调用存储引擎的 ha_prepare_low 即可,下面这段注释说的很清楚,此时不持久化 InnoDB redo log。

int MYSQL_BIN_LOG::prepare(THD *thd, bool all) {
/*
Set HA_IGNORE_DURABILITY to not flush the prepared record of the
transaction to the log of storage engine (for example, InnoDB
redo log) during the prepare phase. So that we can flush prepared
records of transactions to the log of storage engine in a group
right before flushing them to binary log during binlog group
commit flush stage. Reset to HA_REGULAR_DURABILITY at the
beginning of parsing next command.
*/
thd->durability_property = HA_IGNORE_DURABILITY;
int error = ha_prepare_low(thd, all);
return error;
}<

3.2 Commit

组提交的代码主要位于 MYSQL_BIN_LOG::ordered_commit

MySQL 8.0.29 后将原来 slave 并行回放过程抽象成新的 stage0(原来这个流程也是有的,只是没有抽象为 stage0),其工作是协调多个回放线程的回放顺序,让事务提交顺序与主库一致。以下代码只有备库回放会走到。

/*
Stage #0: ensure slave threads commit order as they appear in the slave's
relay log for transactions flushing to binary log.

This will make thread wait until its turn to commit.
Commit_order_manager maintains it own queue and its own order for the
commit. So Stage#0 doesn't maintain separate StageID.
*/
if (Commit_order_manager::wait_for_its_turn_before_flush_stage(thd) ||
ending_trans(thd, all) ||
Commit_order_manager::get_rollback_status(thd)) {
if (Commit_order_manager::wait(thd)) {
return thd->commit_error;
}
}

stage 转换函数

事务提交三个 stage 之间的转换,都用的是 MYSQL_BIN_LOG::change_stage 函数,其主要逻辑是调用了 Commit_stage_manager::enroll_for。该函数在 8.0.29 版本里,加了很多WL#7846 处理逻辑,帮助备库在不开 binlog,但是并行回放的情况下,依旧可以和主库保持相同的提交序,这一部分我会从下面的核心代码里删除,感兴趣的朋友可以看下WL#7846 。

enroll_for 主要做了以下几件事:

1.判断自己是不是入队的第一个,如果是则为 leader,否则为 follower,enroll_for 的返回值为 true 则为 leader。

2.释放上个阶段持有的锁,先入队新的 stage,再释放上一个 stage 的锁,保证事务执行的顺序在每个 stage 相同,保证事务的正确性。注意:BINLOG_FLUSH_STAGE 没有上一个阶段的锁,入参 stage_mutex 为 nullptr。

3.follower 会阻塞等待在 m_stage_cond_binlog 条件变量上。

4.Leader 持有本阶段的锁(enter_mutex)。

bool MYSQL_BIN_LOG::change_stage(THD *thd [[maybe_unused]],
Commit_stage_manager::StageID stage,
THD *queue, mysql_mutex_t *leave_mutex,
mysql_mutex_t *enter_mutex) {
if (!Commit_stage_manager::get_instance().enroll_for(
stage, queue, leave_mutex, enter_mutex)) {
return true;
}
return false;
}

bool Commit_stage_manager::enroll_for(StageID stage, THD *thd,
mysql_mutex_t *stage_mutex,
mysql_mutex_t *enter_mutex) {

// 1.判断自己是不是入队的第一个,如果是则为 leader,
// 否则为 follower,enroll_for 的返回值为 true 则为 leader。
lock_queue(stage);
bool leader = m_queue[stage].append(thd);
unlock_queue(stage);

// 2.先入队新的 stage,再释放上一个 stage 的锁,
// 保证事务执行的顺序在每个 stage 相同,保证事务的正确性。
// 注意:BINLOG_FLUSH_STAGE 没有上一个阶段的锁,入参 stage_mutex 为 nullptr。

// 特殊情况:当前持有的是 LOCK_log,且正在进行 rotating,就不用释放当前 stage 的锁了
// 因为 rotating 需要 LOCK_log
bool need_unlock_stage_mutex =
!(mysql_bin_log.is_rotating_caused_by_incident &&
stage_mutex == mysql_bin_log.get_log_lock());

if (stage_mutex && need_unlock_stage_mutex) mysql_mutex_unlock(stage_mutex);

// 3.follower 会阻塞等待在 m_stage_cond_binlog 条件变量上。
if (!leader) {
mysql_mutex_lock(&m_lock_done);
while (thd->tx_commit_pending) {
mysql_cond_wait(&m_stage_cond_binlog, &m_lock_done);
}
mysql_mutex_unlock(&m_lock_done);
return false;
}

// 4.leader 持有本阶段的锁(enter_mutex)。
bool need_lock_enter_mutex = false;
if (leader && enter_mutex != nullptr) {
// 特殊情况:enter_mutex 是 LOCK_log,且正在进行 rotating,就不用再去加锁了,
// 因为已经加上了。
need_lock_enter_mutex = !(mysql_bin_log.is_rotating_caused_by_incident &&
enter_mutex == mysql_bin_log.get_log_lock());
if (need_lock_enter_mutex)
mysql_mutex_lock(enter_mutex);
else
mysql_mutex_assert_owner(enter_mutex);
}
return leader;
}

Stage 1 -- BINLOG_FLUSH_STAGE

事务 flush 到 binlog (不 sync) ,代码中的解释:

/*
Stage #1: flushing transactions to binary log

While flushing, we allow new threads to enter and will process
them in due time. Once the queue was empty, we cannot reap
anything more since it is possible that a thread entered and
appointed itself leader for the flush phase.
*/

BINLOG_FLUSH_STAGE leader 的主要工作如下(代码依旧位于MYSQL_BIN_LOG::ordered_commit)

1.change_stage,进入 BINLOG_FLUSH_STAGE 状态。

2.如果发现 binlog 被关了,直接跳到(goto)commit stage。

(3,4,5 在 process_flush_stage_queue 完成)

3.拿到 flush queue 的 head,清空 flush queue,以便新的线程进入作为 leader。调用 ha_flush_logs(true) 批量刷 redo log。

4.依次调用 MYSQL_BIN_LOG::flush_thread_caches 将每个事务缓存在 binlog_cache_mngr 里的信息 flush 到 binlog(cache)。调用路径:

MYSQL_BIN_LOG::flush_thread_caches
|--binlog_cache_mngr::flush
|----binlog_stmt_cache_data::flush
|----binlog_trx_cache_data::flush
|------binlog_cache_data::flush
|--------MYSQL_BIN_LOG::write_transaction

5.判断是否需要 rotate。

6.将 binlog 写到 binlog 文件(不 sync),flush_cache_to_file

// 1\. change_stage,进入 BINLOG_FLUSH_STAGE 状态.
if (change_stage(thd, Commit_stage_manager::BINLOG_FLUSH_STAGE, thd, nullptr,
&LOCK_log)) {
return finish_commit(thd);
}

// 2.如果 binlog 被关了,直接跳到(goto)COMMIT_STAGE。
// leave_mutex_before_commit_stage 表示需要在 COMMIT_STAGE 释放的锁。
if (unlikely(!is_open())) {
final_queue = fetch_and_process_flush_stage_queue(true);
leave_mutex_before_commit_stage = &LOCK_log;
goto commit_stage;
}

// 3/4/5 步在该函数执行
flush_error = process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);

if (flush_error == 0 && total_bytes > 0) {
// 6.将 binlog 写到 binlog 文件
flush_error = flush_cache_to_file(&flush_end_pos);
}

process_flush_stage_queue 执行 3-5 步,事务 redo 刷盘,将事务的信息写到 binary log

int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
THD **out_queue_var) {

// 3\. 该函数会调用 ha_flush_logs 持久化 redo log
THD *first_seen = fetch_and_process_flush_stage_queue(should_return, term, true);

// 4.依次将所有所有事务从各自的 cache 里 flush 到 binlog
for (THD *head = first_seen; head; head = head->next_to_commit) {
std::pair<int, my_off_t> result = flush_thread_caches(head);;
}

// 5.判断是否需要 rotate。刚写完 binlog,是判断的恰当时期。
if (total_bytes > 0 &&
(m_binlog_file->get_real_file_size() >= (my_off_t)max_size ||
DBUG_EVALUATE_IF("simulate_max_binlog_size", true, false)))
*rotate_var = true;
return flush_error;
}

Stage 2 -- SYNC_STAGE

BINLOG_FLUSH_STAGE 阶段的 leader 带着一个链表进入 SYNC_STAGE 阶段,首先依旧调用 change_state 函数,可能成为该阶段的 leader,也可能成为 follower,因为此时 LOCK_sync 可能正在被做 sync 的线程持有。多个 flush queue 会因为等待锁而合并成一个 sync queue。

Sync 的后续流程:

1.判断本次要不要 sync

一个 SYNC_STAGE 的 leader 通过参数判断,本次是否需要 sync。sync_counter 变量代表进入 SYNC_STAGE 但是没有真正 sync 的 leader 的个数。当 MySQL 配置参数 sync_binlog 设置大于 1 时,并不是每个 leader 执行到这里都会 sync。

get_sync_period() 获得的值,即是 sync_binlog 参数的值。

因此,判断 sync_counter + 1 >= get_sync_period(),表示当前的 leader 可以 sync 了,那么该线程继续等一会儿,等待更多的线程进入 sync queue 在一起提交,具体等多久,由 opt_binlog_group_commit_sync_no_delay_count 和 opt_binlog_group_commit_sync_delay 决定。如果本 leader 不 sync,则不用等待。

注意,当 sync_binlog == 0 时,每个 leader 线程都要等待。当 sync_binlog == 1 时,同样每个 leader 线程都要等待,因为每个 leader 都要 sync。当 sync_binlog > 1 时,一部分 leader 线程就不用等待,接着执行,反正也不会 sync。

2.调用 sync_binlog_file 去 sync binlog。sync_binlog_file 中实现只有当 sync_period > 0 && ++sync_counter >= sync_period 时才真正 sync。

/*
Stage #2: Syncing binary log file to disk
*/

if (change_stage(thd, Commit_stage_manager::SYNC_STAGE, wait_queue, &LOCK_log,
&LOCK_sync)) {
return finish_commit(thd);
}

// 1.判断本次要不要真正 sync,真正 sync 需要等一会,详见上文的说明
if (!flush_error && (sync_counter + 1 >= get_sync_period()))
Commit_stage_manager::get_instance().wait_count_or_timeout(
opt_binlog_group_commit_sync_no_delay_count,
opt_binlog_group_commit_sync_delay, Commit_stage_manager::SYNC_STAGE);

// 仅是后面更新 binlog end 位点用
final_queue = Commit_stage_manager::get_instance().fetch_queue_acquire_lock(
Commit_stage_manager::SYNC_STAGE);

// 2\. sync
if (flush_error == 0 && total_bytes > 0) {
std::pair<bool, bool> result = sync_binlog_file(false);
sync_error = result.first;
}

Stage 3 -- COMMIT_STAGE

依次将 redolog 中已经 prepare 的事务在引擎层提交,该阶段不用刷盘,因为 flush 阶段中的 redolog 刷盘已经足够保证数据库崩溃时的数据安全了。

COMMIT_STAGE 的主要工作包括:

1.达成多数派后,调用 ha_commit_low 提交,提交完成后还需减少 prepared XID counter

2.唤醒所有等待的 follower,完成提交。

if ((opt_binlog_order_commits || Clone_handler::need_commit_order()) &&
(sync_error == 0 || binlog_error_action != ABORT_SERVER)) {
if (change_stage(thd, Commit_stage_manager::COMMIT_STAGE, final_queue,
leave_mutex_before_commit_stage, &LOCK_commit)) {
return finish_commit(thd);
}
THD *commit_queue =
Commit_stage_manager::get_instance().fetch_queue_acquire_lock(
Commit_stage_manager::COMMIT_STAGE);

// 运行 after sync hook(如果有的话)
if (flush_error == 0 && sync_error == 0)
sync_error = call_after_sync_hook(commit_queue);

// 1.在该函数内完成,调用 ha_commit_low 提交引擎
process_commit_stage_queue(thd, commit_queue);
mysql_mutex_unlock(&LOCK_commit);

// 运行 after commit hook(如果有的话)
process_after_commit_stage_queue(thd, commit_queue);
final_queue = commit_queue;
} else {
// 如果因为 opt_binlog_order_commits 为 false 进入这里。
// 不 ordered commit,那么就等 follower 被通知后,自己去提交
if (leave_mutex_before_commit_stage)
mysql_mutex_unlock(leave_mutex_before_commit_stage);
if (flush_error == 0 && sync_error == 0)
sync_error = call_after_sync_hook(final_queue);
}

// 3\. 通知队列中所有等待的线程
// follower 如果发现事务没有提交,会调用 ha_commit_low, 此时就不能保证 commit 的顺序了。
/* Commit done so signal all waiting threads */
Commit_stage_manager::get_instance().signal_done(final_queue);

举报

相关推荐

0 条评论