0
点赞
收藏
分享

微信扫一扫

编写代码创建UDTF函数


1)创建UDTF函数——编写代码

(1)创建一个maven工程:hivefunction

(2)创建包名:com.atguigu.hive.udtf

(3)引入如下依赖

​pom.xml​

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.qc</groupId>
<artifactId>gmall-udtf</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<!--添加hive依赖-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>

</dependencies>

</project>

(4)编码

  • 参考写法

package com.atguigu.hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;

import java.util.ArrayList;
import java.util.List;

public class ExplodeJSONArray extends GenericUDTF {

@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {

// 1 参数合法性检查
if (argOIs.length != 1) {
throw new UDFArgumentException("explode_json_array 只需要一个参数");
}

// 2 第一个参数必须为string
//判断参数是否为基础数据类型
if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("explode_json_array 只接受基础类型参数");
}

//将参数对象检查器强转为基础类型对象检查器
PrimitiveObjectInspector argumentOI = (PrimitiveObjectInspector) argOIs[0];

//判断参数是否为String类型
if (argumentOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentException("explode_json_array 只接受string类型的参数");
}

// 3 定义返回值名称和类型
List<String> fieldNames = new ArrayList<String>();
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

fieldNames.add("items");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

public void process(Object[] objects) throws HiveException {

// 1 获取传入的数据
String jsonArray = objects[0].toString();

// 2 将string转换为json数组
JSONArray actions = new JSONArray(jsonArray);

// 3 循环一次,取出数组中的一个json,并写出
for (int i = 0; i < actions.length(); i++) {

String[] result = new String[1];
result[0] = actions.getString(i);
forward(result);
}
}

public void close() throws HiveException {

}

}

  • 我的写法

​ExplodeJSONArray.java​

package com.qc.gmall.hive.udtf;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.json.JSONArray;

// UDTF 函数编码
public class ExplodeJSONArray extends GenericUDTF {

private PrimitiveObjectInspector inputOI;

@Override
public void close() throws HiveException {
// 不需要实现任何逻辑
}

@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {

if (argOIs.length != 1){
throw new UDFArgumentException("explode_json_array函数只能接收1个参数");
}

ObjectInspector argOI = argOIs[0];

if (argOI.getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentException("explode_json_array函数只能接收基本数据类型的参数");
}

// 强转类型
PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) argOI;
inputOI = primitiveOI;

if (primitiveOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
throw new UDFArgumentException("explode_json_array函数只能接收STRING类型的参数");
}

ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("item");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {

Object arg = args[0];
String jsonArrayStr = PrimitiveObjectInspectorUtils.getString(arg, inputOI);

// json 解析工具
JSONArray jsonArray = new JSONArray(jsonArrayStr);

// for循环遍历
for (int i = 0; i < jsonArray.length(); i++) {
String json = jsonArray.getString(i);
// 即便只有一列,也要用一个字符数组去装
String[] result = {json};
// 把数组通过 forward 方法输出出去
forward(result);
}

}

}

2)创建函数

(1)打包

(2)将hivefunction-1.0-SNAPSHOT.jar上传到hadoop102的/opt/module,然后再将该jar包上传到HDFS的/user/hive/jars路径下

[atguigu@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
[atguigu@hadoop102 module]$ hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars

(3)创建永久函数与开发好的java class关联

create function explode_json_array as 'com.atguigu.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';

(4)注意:如果修改了自定义函数重新生成jar包怎么处理?

只需要替换HDFS路径上的旧jar包,然后重启Hive客户端即可。

参考官方

​​DeveloperGuide UDTF​​

GenericUDTF Interface
A custom UDTF can be created by extending the GenericUDTF abstract class and then implementing the initialize, process, and possibly close methods. The initialize method is called by Hive to notify the UDTF the argument types to expect. The UDTF must then return an object inspector corresponding to the row objects that the UDTF will generate. Once initialize() has been called, Hive will give rows to the UDTF using the process() method. While in process(), the UDTF can produce and forward rows to other operators by calling forward(). Lastly, Hive will call the close() method when all the rows have passed to the UDTF.


举报

相关推荐

0 条评论