0
点赞
收藏
分享

微信扫一扫

MySQL 8.0.22执行器源码分析HashJoin —— 一些初始化函数的细节步骤


目录

  • ​​InitRowBuffer(101行~126行)​​
  • ​​InitProbeIterator(142行~153行)​​
  • ​​*HashJoinIterator* 的Init(155行~240行)​​
  • ​​InitializeChunkFiles(364行~401行)​​
  • ​​InitWritingToProbeRowSavingFile(1115~1119)​​
  • ​​InitReadingFromProbeRowSavingFile(1121~1126)​​


让我们接着上一篇分析BuildHashTable函数细节步骤​继续吧,这些函数都在​​hash_join_iterator.cc​​文件中。

InitRowBuffer(101行~126行)

需要注意的两个步骤

1、初始化row buffer,使用到一个seed。具体哈希计算算法我并没有特定去了解,这里也就不深入了

kHashTableSeed是个没有意义的数字,是用于计算hash的key。一个种子在内存哈希表中进行哈希运算,一个种子计算用于确定chunk 文件应该放置的行。如果两个操作使用同一个种子,将会把块文件加载到哈希表中,这样是错误的。

2、初始化row buffer后,需要调整迭代器的指向,将其头尾均指向row buffer的尾部。

......
if (m_row_buffer.Init(kHashTableSeed)) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}

m_hash_map_iterator = m_row_buffer.end();
m_hash_map_end = m_row_buffer.end();
return false;

InitProbeIterator(142行~153行)

1、初始化m_probe_input迭代器(为一个RowIterator)

2、判断m_probe_input_batch_mode是否为真(默认为false,指的是不开启批处理模式),为真的话就调用StartPSIBatchMode

if (m_probe_input->Init()) {
return true;
}

if (m_probe_input_batch_mode) {
m_probe_input->StartPSIBatchMode();
}
return false;

HashJoinIterator 的Init(155行~240行)

1、准备从build(驱动表)输入中读取行数据到哈希表中,用于构建哈希表

PrepareForRequestRowId(m_build_input_tables.tables(),
m_tables_to_get_rowid_for);

函数调用栈:

|PrepareForRequestRowId
||prepare_for_position

涉及到使用主键去找rows,必须用主键字段扩展读取位图

然后初始化build的迭代器(为一个RowIterator)

2、初始化一些乱七八糟的变量

//默认在内存中做所有操作,并且没有任何对哈希表的重新填充操作。每个输入都只读一次,不会对磁盘写入任何数据。 
m_hash_join_type = HashJoinType::IN_MEMORY;
//不需要把被驱动表读出来的行数据写到saving files中。因为只有当哈希连接生成块不能完全放到内存中 或者 哈希连接不能延伸到磁盘时,即probe输入行需要多次Read才需要开启probe行保存
m_write_to_probe_row_saving = false;
//很显然此时build迭代器读取的buffer中是有行数据的,当build input数据被消耗掉,停止hash join 迭代器请求更多行
m_build_iterator_has_more_rows = true;
//开启批处理模式
m_probe_input->EndPSIBatchModeIfStarted();
//如过外连接溢出到磁盘上操作,那么probe行可以和我们未看到的build input中的row匹配,若匹配为true。这里默认是内存里操作,所以初始化为false
m_probe_row_match_flag = false;

3、计算build row、probe row 占的内存大小

计算给定表单行数据需要占多大的byte,记录上界,这样之后pack的数据长度总是会比这个长度短。

upper_row_size 为build row、probe row两者的最大值。

size_t upper_row_size = 0;
if (!m_build_input_tables.has_blob_column()) {
upper_row_size =
hash_join_buffer::ComputeRowSizeUpperBound(m_build_input_tables);
}

if (!m_probe_input_tables.has_blob_column()) {
upper_row_size = std::max(
upper_row_size,
hash_join_buffer::ComputeRowSizeUpperBound(m_probe_input_tables));
}

4、一个看不懂的操作,将一个string类型的变量reverse了一下

if (m_temporary_row_and_join_key_buffer.reserve(upper_row_size)) {
my_error(ER_OUTOFMEMORY, MYF(0), upper_row_size);
return true; // oom
}

5、如果一个表包含一个geometry列,确保该数据复制到行缓冲区,而不是只设置指向数据的指针。因为当hash join溢出到磁盘上,需要从块文件读回一行,行数据存储在一个临时缓冲区中,当临时缓冲区用于其他用途时,字段指向的数据将变为无效。

MarkCopyBlobsIfTableContainsGeometry(m_probe_input_tables);
MarkCopyBlobsIfTableContainsGeometry(m_build_input_tables);

6、chunk 数组、标志clear操作

//m_chunk_files_on_disk数组来保存磁盘上的块文件列表,以防我们降级为磁盘上的散列联接.此时需要clear
m_chunk_files_on_disk.clear();
//build 和 probe 当前从hash join chunk中读取的是第0行
m_build_chunk_current_row = 0;
m_probe_chunk_current_row = 0;
//现在不使用任何一种hash join chunk
m_current_chunk = -1;

7、对于每个给定的表,如果需要,请求填写行ID(相当于调用file->position())

PrepareForRequestRowId(m_probe_input_tables.tables(),
m_tables_to_get_rowid_for);

8、构建哈希表

// Build the hash table
if (BuildHashTable()) {
DBUG_ASSERT(thd()->is_error() ||
thd()->killed); // my_error should have been called.
return true;
}

9、当build 和 probe 都没有row数据了,返回。

if (m_state == State::END_OF_ROWS) {
// BuildHashTable() decided that the join is done (the build input is
// empty, and we are in an inner-/semijoin. Anti-/outer join must output
// NULL-complemented rows from the probe input).
return false;
}

10、如果是反连接并且join情况数组为空 并且 没有其他情况并且此时row buffer中仍然有数据。说明此时不需要输出任何东西,因为所有数据都在哈希表中。

if (m_join_type == JoinType::ANTI && m_join_conditions.empty() &&
m_extra_condition == nullptr && !m_row_buffer.empty()) {
// For degenerate antijoins, we know we will never output anything
// if there's anything in the hash table, so we can end right away.
// (We also don't need to read more than one row, but
// CreateHashJoinAccessPath() has already added a LIMIT 1 for us
// in this case.)
m_state = State::END_OF_ROWS;
return false;
}

11、初始化probe迭代器

return InitProbeIterator();

InitializeChunkFiles(364行~401行)

初始化两个input的hashjoinchunk。

先估计需要多少块,有一个来源是planner估计的。此外可以假设当前行缓冲区代表了总体的行密度,将估计的剩余row数量/目前读到的row数量,就能得到chunk数。

1、缩减后的哈希表中的行 = 缩减因子* 哈希表中行数 这是一种保护措施,因为我们宁愿得到一个或两个额外的块,而不必多次重新读取探测输入。

constexpr double kReductionFactor = 0.9;
const size_t reduced_rows_in_hash_table =
std::max<size_t>(1, rows_in_hash_table * kReductionFactor)

2、剩余行数 = max(哈希表中行数,planner估计的join的行数) - 哈希表中的行数

const size_t remaining_rows =
std::max(rows_in_hash_table, estimated_rows_produced_by_join) -
rows_in_hash_table;

3、 需要的chunk数 = max(剩余行数 / 缩减后的哈希表中的行数)

const size_t chunks_needed = std::max<size_t>(
1, std::ceil(remaining_rows / reduced_rows_in_hash_table));

4、真正的chunk数 = min(最大chunk文件数,需要的chunk数) 限制每个输入的块数,这样就不会冒着达到服务器对打开文件数的限制的风险。

const size_t num_chunks = std::min(max_chunk_files, chunks_needed);

5、确保chunk数目是偶数,因为我们join的时候是按照probe和build的一对chunk进行join的

const size_t num_chunks_pow_2 = my_round_up_to_next_power(num_chunks);

6、调整chunk数目到偶数;然后对每对chunk进行初始化,一个是buildchunk一个是probechunk

chunk_pairs->resize(num_chunks_pow_2);
for (ChunkPair &chunk_pair : *chunk_pairs) {
if (chunk_pair.build_chunk.Init(build_tables, /*uses_match_flags=*/false) ||
chunk_pair.probe_chunk.Init(probe_tables,
include_match_flag_for_probe)) {
my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
return true;
}
}

对于Init的两个参数解释:

tables  行数据存储在哪个input
uses_match_flags 是否应在每行前面加上匹配标志,表示该行是否有匹配行。

InitWritingToProbeRowSavingFile(1115~1119)

标记已经启用probe row saving,并准备probe row saving file以供写入

m_write_to_probe_row_saving = true;
return m_probe_row_saving_write_file.Init(m_probe_input_tables,
m_join_type == JoinType::OUTER);

至于HashJoinChunk::Init函数我们就不去细究了,它涉及到了IOcache操作。

InitReadingFromProbeRowSavingFile(1121~1126)

标记我们从probe row saving files中读取数据。然后将saving files 倒回开头

m_probe_row_saving_read_file = std::move(m_probe_row_saving_write_file);
m_probe_row_saving_read_file_current_row = 0;
m_read_from_probe_row_saving = true;
return m_probe_row_saving_read_file.Rewind();

1、将write file内容传给read file,同时使用move,避免拷贝赋值,仅仅改动指针指向

2、初始化当前应该读的行数为0,表示还没开始读

3、确定我们应当从probe row saving read files中读取数据

4、Rewind函数将file buffer 清除,准备读的新数据腾出空间


举报

相关推荐

0 条评论