Sqoop 2 Connector Development
本文档描述了如何实现Sqoop2 中的一个connector。
What is Connector?
Connector提供了与外部数据库的交互。Connector从数据库中读取数据导入数据,写数据到数据库导出数据。Sqoop2 框架管理与Hadoop交互的公共模块。
Connector Implementation
SqoopConnector类定义了一个Connector必须提供的功能。每一个Connector必须继承SqoopConnector然后重载下面的方法。
public abstract String getVersion();
public abstract ResourceBundle getBundle(Locale locale);
public abstract Class getConnectionConfigurationClass();
public abstract Class getJobConfigurationClass(MJob.TypejobType);
public abstract Importer getImporter();
public abstract Exporter getExporter();
public abstract Validator getValidator();
public abstract MetadataUpgrader getMetadataUpgrader();
getImporter方法返回一个Importer类的实例,这个实例是需要导入的模块的占位符。
getExporter方法返回一个Exporter类的实例,这个实例是需要导出的模块的占位符。
getBundle,getConnectionConfigurationClass,getJobConfigurationClass和getValidator方法与Connector的配置相关。
Importer
Connector#getImporter返回一个Importer实例,这是一个像Partitioner和Extractor这样的需要导出的模块的占位符。内置的GenericJdbcConnector定义了一个像下面这样的Importer:
private static final Importer IMPORTER = new Importer(
GenericJdbcImportInitializer.class,
GenericJdbcImportPartitioner.class,
GenericJdbcImportExtractor.class,
GenericJdbcImportDestroyer.class);
...
@Override
public Importer getImporter() {
return IMPORTER;
}
Extractor
Extractor(E或者ETL)从外部数据库提取数据,将其写入到Sqoop框架来导入数据。
Extractor必须重载extract方法。
public abstract void extract(ExtractorContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration,
Partition partition);
extract方法从数据库中提取数据,然后将数据写入到DataWriter(context提供)作为中间数据(Intermediaterepresentation)。
Extractor必须在extract方法中迭代直到来自数据库的数据耗尽。
while (resultSet.next()) {
...
context.getDataWriter().writeArrayRecord(array);
...
}
Partitioner
Partitioner根据配置创建一个Partition实例。Partition实例的数量就是Map任务的数量。Partition实例被传入到Extractor对象的extract方法中。Extractor来判定数据的哪一个部分被Partition提取。
对于Partition类没有特别的约定,除了重载定义的3个方法。
public abstract class Partition {
public abstract void readFields(DataInput in) throws IOException;
public abstract void write(DataOutput out) throws IOException;
public abstract String toString();
}
Connector可以自定义Partition的设计。
Initializer and Destroyer
Initializer在MR Job提交之前被实例化,做一些像增加依赖Jar文件这样的准备工作。
Destroy在MR Job完成之后被实例化,做些清理工作。
Exporter
Connector#getExporter方法返回一个Exporter实例,这个一个像Loader这样的导出数据需要的模块的占位符。内置的GenericJdbcConnector方法定义了像下面这样的Exporter。
private static final Exporter EXPORTER = new Exporter(
GenericJdbcExportInitializer.class,
GenericJdbcExportLoader.class,
GenericJdbcExportDestroyer.class);
...
@Override
public Exporter getExporter() {
return EXPORTER;
}
Loader
Loader(L或者ETL)从Sqoop框架接收数据然后将数据加载到外部数据库。
Loader必须重载下面的方法。
public abstract void load(LoaderContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration) throws Exception;
Load方法从DataReader(context提供)读取中间数据,然后将其加载到数据库中。
Loader的load方法必须迭代直到DataReader中的数据耗尽。
while ((array = context.getDataReader().readArrayRecord()) != null) {
...
}
Initializer and Destroyer
Initializer在MR Job提交之前被实例化,做一些像增加依赖Jar文件这样的准备工作。
Destroy在MR Job完成之后被实例化,做些清理工作。
Connector Configuration
Configuration
org.apache.sqoop.model包中定义的是代表配置的实体。
ConnectionConfigurationClass
JobConfigurationClass
ResourceBundle
配置数据存放在property文件中,可被Connector#getBundle方法获取。
Validator
Validator验证用户设置的配置。
Internal of Sqoop2 MapReduce Job
Sqoop2提供了一个通用的MR模块,像数据导入导出使用的SqoopMapper和SqoopReducer。
1. InputFormat使用Partitioner创建split
2. SqoopMapper调用Extractor对象的extract方法
3. SqoopReducer不做实际的工作
4. OutputFormat调用Loader的load方法(通过SqoopOutputFormatLoadExecutor)
对于数据导入,Connector提供的Extractor从数据库中提取数据,Sqoop2提供的Loader将数据加载到Hadoop。
对于数据导出,Sqoop2提供的Extractor从Hadoop中提取数据,然后Connector提供的Loader将数据加载到数据库。