PostgreSQL数据库FDW——读取parquet文件用例

ITWYY

关注

阅读 105

2023-01-14


以下打开存储在S3上Parquet file的代码来自于定义在parquet_s3_fdw_connection.cpp文件中parquetGetFileReader函数(该函数由DefaultParquetReader类的有参open函数调用)。其代码和Apache Arrow User Guide —— Reading and writing Parquet files官方历程提供的读取parquet文件的代码一致。只不过其input ​​arrow::io::RandomAccessFile​​​类使用的是S3RandomAccessFile类(S3RandomAccessFile类继承自​​arrow::io::RandomAccessFile​​),具体实现可以查看​​PostgreSQL数据库FDW——Parquet S3 DefaultParquetReader类​​中摘录的源码。

std::unique_ptr<parquet::arrow::FileReader> reader;
entry->pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input(new S3RandomAccessFile(s3client, dname, fname)); // 使用上述S3RandomAccessFile类构造arrow::io::RandomAccessFile
arrow::Status status = parquet::arrow::OpenFile(input, entry->pool, &reader);

if (!status.ok()) throw Error("failed to open Parquet file %s", status.message().c_str());

如下代码是打开存储在本地的Parquet file的代码,由DefaultParquetReader类的无参open函数调用。这也是parquet fdw插件创建​​parquet::arrow::FileReader​​实例的方法。

arrow::Status   status;
std::unique_ptr<parquet::arrow::FileReader> reader;
status = parquet::arrow::FileReader::Make( arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(filename, use_mmap), &reader);
if (!status.ok()) throw Error("parquet_s3_fdw: failed to open Parquet file %s", status.message().c_str());

PostgreSQL数据库FDW——读取parquet文件用例_数据库

读取数据到chunk中:arrow::FileReader类将整个文件或行组的数据读取到::arrow::Table。The arrow::FileReader class reads data for an entire file or row group into an ::arrow::Table. 如下读取数据的代码也是和Apache Arrow User Guide —— Reading and writing Parquet files官方历程提供的读取parquet文件的代码一致。并将数据存储到chunk中。

// std::shared_ptr<arrow::Table>   table;
// std::vector<arrow::Array *> chunks;
int rowgroup = this->rowgroups[this->row_group];
status = this->reader->RowGroup(rowgroup)->ReadTable(this->indices, &this->table);
if (!status.ok()) throw Error("parquet_s3_fdw: failed to read rowgroup #%i: %s", rowgroup, status.message().c_str());
if (!this->table) throw std::runtime_error("parquet_s3_fdw: got empty table");

/* TODO: don't clear each time */
this->chunk_info.clear(); this->chunks.clear();
for (uint64_t i = 0; i < types.size(); ++i){ // 遍历查询涉及project column
const auto &column = this->table->column(i);
int64 len = column->chunk(0)->length();
this->chunk_info.emplace_back(len); // 列数据长度信息
this->chunks.push_back(column->chunk(0).get()); // 列数据
}

读取数据到column_data中:read_column同样也是使用arrow::ChunkedArray存储获取的列数据,通过allocator为每列根据数据类型分配存储空间,拷贝arrow::ChunkedArray中的列数据到新分配的存储空间中。最终将数据空间设置到this->column_data[col]中。可以参考PostgreSQL数据库FDW——Parquet S3 CachingParquetReader中列出的read_column源码。

// std::vector<void *>             column_data;
// std::vector<std::vector<bool> > column_nulls;
int rowgroup = this->rowgroups[this->row_group];
auto rowgroup_meta = this->reader->parquet_reader()->metadata()->RowGroup(rowgroup);
status = this->reader->RowGroup(rowgroup)->ReadTable(this->indices, &table);
if (!status.ok()) throw Error("parquet_s3_fdw: failed to read rowgroup #%i: %s", rowgroup, status.message().c_str());

allocator->recycle(); /* Release resources acquired in the previous iteration */
/* Read columns data and store it into column_data vector */
for (std::vector<TypeInfo>::size_type col = 0; col < types.size(); ++col){
std::shared_ptr<parquet::Statistics> stats;
if (types[col].index >= 0) stats = rowgroup_meta->ColumnChunk(types[col].index)->statistics();
bool has_nulls = stats ? stats->null_count() > 0 : true;

this->num_rows = table->num_rows();
this->column_nulls[col].resize(this->num_rows);

this->read_column(table, col, has_nulls); //读取数据
}


精彩评论(0)

0 0 举报