本文最后更新于 2021-08-05 11:42:59
自定义UDF函数 1.创建一个Maven工程Hive
2.导入依赖
1 2 3 4 5 6 7 <dependencies > <dependency > <groupId > org.apache.hive</groupId > <artifactId > hive-exec</artifactId > <version > 1.2.1</version > <scope > provided</scope > </dependency > </dependencies >
3.创建一个类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package xxx.xxx.xxx;import org.apache.hadoop.hive.ql.exec.UDF;public class Lower extends UDF { public String evaluate (final String s) { if (s == null ) { return null ; } return s.toLowerCase(); } }
4.打成jar包上传到服务器/xxx/xxx/xxx/udf.jar
5.将jar包添加到hive的classpath
hive (default)> add jar /xxx/xxx/xxx/udf.jar;
或则是放到hvie的lib或auxlib下
6.创建临时函数与开发好的java class关联
hive (default)> create temporary function mylower as "xxx.xxx.xxx.Lower";
没有加temporary 就是永久函数 ,加了 重启hive就没了
7.即可在hql中使用自定义的函数
hive (default)> select ename, mylower(ename) lowername from emp;
UDF UDF 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Description( name = "hello", value = "_FUNC_(str) - from the input string" + "returns the value that is \"Hello $str\" ", extended = "Example:\n" + " > SELECT _FUNC_(str) FROM src;" ) public class HelloUDF extends UDF { public String evaluate (String str) { try { return "Hello " + str; } catch (Exception e) { e.printStackTrace(); return "ERROR" ; } } }
GenericUDF 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.io.BooleanWritable;@Description(name = "array_contains", value = "_FUNC_(array, value) - Returns TRUE if the array contains value.", extended = "Example:\n > SELECT _FUNC_(array(1, 2, 3), 2) FROM src LIMIT 1;\n true") public class GenericUDFArrayContains extends GenericUDF { private static final int ARRAY_IDX = 0 ; private static final int VALUE_IDX = 1 ; private static final int ARG_COUNT = 2 ; private static final String FUNC_NAME = "ARRAY_CONTAINS" ; private transient ObjectInspector valueOI; private transient ListObjectInspector arrayOI; private transient ObjectInspector arrayElementOI; private BooleanWritable result; public ObjectInspector initialize (ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length != 2 ) { throw new UDFArgumentException ("The function ARRAY_CONTAINS accepts 2 arguments." ); } if (!(arguments[0 ].getCategory().equals(ObjectInspector.Category.LIST))) { throw new UDFArgumentTypeException (0 , "\"array\" expected at function ARRAY_CONTAINS, but \"" + arguments[0 ].getTypeName() + "\" " + "is found" ); } this .arrayOI = ((ListObjectInspector) arguments[0 ]); this .arrayElementOI = this .arrayOI.getListElementObjectInspector(); this .valueOI = arguments[1 ]; if (!(ObjectInspectorUtils.compareTypes(this .arrayElementOI, this .valueOI))) { throw new UDFArgumentTypeException (1 , "\"" + this .arrayElementOI.getTypeName() + "\"" + " expected at function ARRAY_CONTAINS, but " + "\"" + this .valueOI.getTypeName() + "\"" + " is found" ); } if (!(ObjectInspectorUtils.compareSupported(this .valueOI))) { throw new UDFArgumentException ("The function ARRAY_CONTAINS does not support comparison for \"" + this .valueOI.getTypeName() + "\"" + " types" ); } this .result = new BooleanWritable (false ); return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; } public Object evaluate (GenericUDF.DeferredObject[] arguments) throws HiveException { this .result.set(false ); Object array = arguments[0 ].get(); Object value = arguments[1 ].get(); int arrayLength = this .arrayOI.getListLength(array); if ((value == null ) || (arrayLength <= 0 )) { return this .result; } for (int i = 0 ; i < arrayLength; ++i) { Object listElement = this .arrayOI.getListElement(array, i); if ((listElement == null ) || (ObjectInspectorUtils.compare(value, this .valueOI, listElement, this .arrayElementOI) != 0 )) continue ; this .result.set(true ); break ; } return this .result; } public String getDisplayString (String[] children) { assert (children.length == 2 ); return "array_contains(" + children[0 ] + ", " + children[1 ] + ")" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Description(name = "nvl", value = "_FUNC_(value,default_value) - Returns default value if value is null else returns value", extended = "Example:\n" + " > SELECT _FUNC_(null,'bla') FROM src LIMIT 1;\n" + " bla") public class GenericUDFNvl extends GenericUDF { private transient GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver; private transient ObjectInspector[] argumentOIs; @Override public ObjectInspector initialize (ObjectInspector[] arguments) throws UDFArgumentException { argumentOIs = arguments; if (arguments.length != 2 ) { throw new UDFArgumentLengthException ( "The operator 'NVL' accepts 2 arguments." ); } returnOIResolver = new GenericUDFUtils .ReturnObjectInspectorResolver(true ); if (!(returnOIResolver.update(arguments[0 ]) && returnOIResolver .update(arguments[1 ]))) { throw new UDFArgumentTypeException (1 , "The first and seconds arguments of function NLV should have the same type, " + "but they are different: \"" + arguments[0 ].getTypeName() + "\" and \"" + arguments[1 ].getTypeName() + "\"" ); } return returnOIResolver.get(); } @Override public Object evaluate (DeferredObject[] arguments) throws HiveException { Object retVal = returnOIResolver.convertIfNecessary(arguments[0 ].get(), argumentOIs[0 ]); if (retVal == null ){ retVal = returnOIResolver.convertIfNecessary(arguments[1 ].get(), argumentOIs[1 ]); } return retVal; } @Override public String getDisplayString (String[] children) { StringBuilder sb = new StringBuilder (); sb.append("NVL(" ); sb.append(children[0 ]); sb.append(',' ); sb.append(children[1 ]); sb.append(')' ); return sb.toString() ; } }
UDAF 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class MaxNumberUDAF extends UDAF { public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator { private FloatWritable result; public void init () { result=null ; } public boolean iterate (FloatWritable value) { if (value==null ) return false ; if (result==null ) result=new FloatWritable (value.get()); else result.set(Math.max(result.get(), value.get())); return true ; } public FloatWritable terminatePartial () { return result; } public boolean merge (FloatWritable other) { return iterate(other); } public FloatWritable terminate () { return result; } } }
AbstractGenericUDAFResolver
这张图中没有包含COMPLETE,从上面代码中COMPLETE的注释可以看出来,COMPLETE表示直接从原始数据聚合到最终结果,也就是说不存在中间需要先在map端完成部分聚合结果,然后再到reduce端完成最终聚合一个过程,COMPLETE出现在一个完全map only的任务中,所以没有和其他三个阶段一起出现
PARTIAL1
iterate(AggregationBuffer agg, Object[] parameters) AggregationBuffer是一个需要你实现的数据结构,用来临时保存聚合的数据,parameters是传递给udaf的实际参数,这个方法的功能可以描述成: 拿到一条条数据记录方法在parameters里,然后聚合到agg中,怎么聚合自己实现,比如agg就是一个数组,你把所有迭代的数据保存到数组中都可以。agg相当于返回结果,
terminatePartial(AggregationBuffer agg) iterate迭代了map中的数据并保存到agg中,并传递给terminatePartial,接下来terminatePartial完成计算,terminatePartial返回Object类型结果显然还是要传递给下一个阶段PARTIAL2的,但是PARTIAL2怎么知道Object到底是什么?前面提到HIVE都是通过ObjectInspector来获取数据类型信息的,但是PARTIAL2的输入数据ObjectInspector怎么来的?显然每个阶段输出数据对应的ObjectInspector只有你自己知道,上面代码中还有一个init()方法是需要你实现了(init在每一个阶段都会调用一次 ),init的参数m表明了当前阶段(当前处于PARTIAL1),你需要在init中根据当前阶段m,设置一个ObjectInspector表示当前的输出oi就行了,init返回一个ObjectInspcetor表示当前阶段的输出数据类信息(也就是下一阶段的输入数据信息)。
PARTIAL2 PARTIAL2的输入是基于PARTIAL1的输出的,PARTIAL1输出即terminatePartial的返回值。
merge(AggregationBuffer agg, Object partial) agg和partial1中的一样,既是参数,也是返回值。partial就是partial1中terminatePartial的返回值,partial的具体数据信息需要你根据ObjectInspector获取了。merger就表示把partial值先放到agg里,待会计算。
terminatePartial 和partial1一样。
FINAL FINAL进入到reduce阶段,也就是要完成最终结果的计算,和PARTIAL2不同的是它调用terminate,没什么好说的,输出最终结果而已
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 public class GenericUDAFColConcat extends AbstractGenericUDAFResolver { public GenericUDAFColConcat () { } @Override public GenericUDAFEvaluator getEvaluator (TypeInfo[] parameters) throws SemanticException { if (parameters.length != 4 ){ throw new UDFArgumentTypeException (parameters.length - 1 , "COL_CONCAT requires 4 argument, got " + parameters.length); } if (parameters[0 ].getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException (0 , "COL_CONCAT can only be used to concat PRIMITIVE type column, got " + parameters[0 ].getTypeName()); } for (int i = 1 ; i < parameters.length; ++i){ if (parameters[i].getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException (i, "COL_CONCAT only receive type CHAR/STRING as its 2nd to 4th argument's type, got " + parameters[i].getTypeName()); } PrimitiveObjectInspector poi = (PrimitiveObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[i]); if (poi.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.CHAR && poi.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentTypeException (i, "COL_CONCAT only receive type CHAR/STRING as its 2nd to 4th argument's type, got " + parameters[i].getTypeName()); } } return new GenericUDAFCOLCONCATEvaluator (); } private static class ColCollectAggregationBuffer extends GenericUDAFEvaluator .AbstractAggregationBuffer{ private List<String> colValueList ; private String open; private String close; private String seperator; private boolean isInit; public ColCollectAggregationBuffer () { colValueList = new LinkedList <>(); this .isInit = false ; } public void init (String open, String close, String seperator) { this .open = open; this .close = close; this .seperator = seperator; this .isInit = true ; } public boolean isInit () { return isInit; } public String concat () { String c = StringUtils.join(colValueList,seperator); return open + c + close; } } public static class GenericUDAFCOLCONCATEvaluator extends GenericUDAFEvaluator { private transient List<ObjectInspector> inputOIs = new LinkedList <>(); private transient Mode m; private transient String pString; private transient StructObjectInspector soi; private transient ListObjectInspector valueFieldOI; private transient PrimitiveObjectInspector openFieldOI; private transient PrimitiveObjectInspector closeFieldOI; private transient PrimitiveObjectInspector seperatorFieldOI; private transient StructField valueField; private transient StructField openField; private transient StructField closeField; private transient StructField seperatorField; @Override public ObjectInspector init (Mode m, ObjectInspector[] parameters) throws HiveException { super .init(m,parameters); this .m = m; pString = "" ; for (ObjectInspector p : parameters){ pString += p.getTypeName(); } if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ inputOIs.clear(); for (ObjectInspector p : parameters){ inputOIs.add((PrimitiveObjectInspector)p); } }else { soi = (StructObjectInspector)parameters[0 ]; valueField = soi.getStructFieldRef("values" ); valueFieldOI = (ListObjectInspector)valueField.getFieldObjectInspector(); openField = soi.getStructFieldRef("open" ); openFieldOI = (PrimitiveObjectInspector) openField.getFieldObjectInspector(); closeField = soi.getStructFieldRef("close" ); closeFieldOI = (PrimitiveObjectInspector)closeField.getFieldObjectInspector(); seperatorField = soi.getStructFieldRef("seperator" ); seperatorFieldOI = (PrimitiveObjectInspector)seperatorField.getFieldObjectInspector(); } if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2){ ArrayList<ObjectInspector> foi = new ArrayList <>(); foi.add(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector)); foi.add( PrimitiveObjectInspectorFactory.javaStringObjectInspector ); foi.add( PrimitiveObjectInspectorFactory.javaStringObjectInspector ); foi.add( PrimitiveObjectInspectorFactory.javaStringObjectInspector ); ArrayList<String> fname = new ArrayList <String>(); fname.add("values" ); fname.add("open" ); fname.add("close" ); fname.add("seperator" ); return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); }else { return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } } @Override public AggregationBuffer getNewAggregationBuffer () throws HiveException { return new ColCollectAggregationBuffer (); } @Override public void reset (AggregationBuffer aggregationBuffer) throws HiveException { ((ColCollectAggregationBuffer)aggregationBuffer).colValueList.clear(); } @Override public void iterate (AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException { assert objects.length == 4 ; ColCollectAggregationBuffer ccAggregationBuffer = (ColCollectAggregationBuffer)aggregationBuffer; ccAggregationBuffer.colValueList.add( PrimitiveObjectInspectorUtils.getString(objects[0 ], (PrimitiveObjectInspector)inputOIs.get(0 ))); if (!ccAggregationBuffer.isInit()){ ccAggregationBuffer.init( PrimitiveObjectInspectorUtils.getString(objects[1 ], (PrimitiveObjectInspector)inputOIs.get(1 )), PrimitiveObjectInspectorUtils.getString(objects[2 ],(PrimitiveObjectInspector)inputOIs.get(2 )), PrimitiveObjectInspectorUtils.getString(objects[3 ],(PrimitiveObjectInspector)inputOIs.get(3 )) ); } } @Override public Object terminatePartial (AggregationBuffer aggregationBuffer) throws HiveException { ColCollectAggregationBuffer ccAggregationBuffer = (ColCollectAggregationBuffer)aggregationBuffer; List<Object> partialRet = new ArrayList <>(); partialRet.add(ccAggregationBuffer.colValueList); partialRet.add(ccAggregationBuffer.open); partialRet.add(ccAggregationBuffer.close); partialRet.add(ccAggregationBuffer.seperator); return partialRet; } @Override public void merge (AggregationBuffer aggregationBuffer, Object partial) throws HiveException { ColCollectAggregationBuffer ccAggregationBuffer = (ColCollectAggregationBuffer)aggregationBuffer; if (partial != null ){ List<Object> partialList = soi.getStructFieldsDataAsList(partial); List<String> values = (List<String>)valueFieldOI.getList(partialList.get(0 )); ccAggregationBuffer.colValueList.addAll(values); if (!ccAggregationBuffer.isInit){ ccAggregationBuffer.open = PrimitiveObjectInspectorUtils.getString(partialList.get(1 ), openFieldOI); ccAggregationBuffer.close = PrimitiveObjectInspectorUtils.getString(partialList.get(2 ), closeFieldOI); ccAggregationBuffer.seperator = PrimitiveObjectInspectorUtils.getString(partialList.get(3 ), seperatorFieldOI); } } } @Override public Object terminate (AggregationBuffer aggregationBuffer) throws HiveException { return ((ColCollectAggregationBuffer)aggregationBuffer).concat(); } } }
UDTF 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class FeatureParseUDTF extends GenericUDTF { private PrimitiveObjectInspector stringOI = null ; @Override public StructObjectInspector initialize (ObjectInspector[] objectInspectors) throws UDFArgumentException { if (objectInspectors.length != 1 ) { throw new UDFArgumentException ("NameParserGenericUDTF() takes exactly one argument" ); } if (objectInspectors[0 ].getCategory()!=ObjectInspector.Category.PRIMITIVE&&((PrimitiveObjectInspector) objectInspectors[0 ]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException ("NameParserGenericUDTF() takes a string as a parameter" ); } stringOI = (PrimitiveObjectInspector) objectInspectors[0 ]; List<String> fieldNames = new ArrayList <String>(2 ); List<ObjectInspector> fieldOIs = new ArrayList <ObjectInspector>(2 ); fieldNames.add("name" ); fieldNames.add("value" ); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process (Object[] record) throws HiveException { final String feature = stringOI.getPrimitiveJavaObject(record[0 ]).toString(); ArrayList<Object[]> results = parseInputRecord(feature); Iterator<Object[]> it = results.iterator(); while (it.hasNext()){ Object[] r= it.next(); forward(r); } } public ArrayList<Object[]> parseInputRecord(String feature){ ArrayList<Object[]> resultList = null ; try { JSONObject json = JSON.parseObject(feature); resultList = new ArrayList <Object[]>(); for (String nameSpace : json.keySet()) { JSONObject dimensionJson = json.getJSONObject(nameSpace); for (String dimensionName : dimensionJson.keySet()) { JSONObject featureJson = dimensionJson.getJSONObject(dimensionName); for (String featureName : featureJson.keySet()) { String property_name = nameSpace + ":" + dimensionName + ":" + featureName; Object[] item = new Object [2 ]; item[0 ] = property_name; item[1 ] = featureJson.get(featureName); resultList.add(item); } } } } catch (Exception e) { e.printStackTrace(); } return resultList; } @Override public void close () throws HiveException { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 @Description(name = "explode", value = "_FUNC_(a) - separates the elements of array a into multiple rows," + " or the elements of a map into multiple rows and columns ") public class GenericUDTFExplode extends GenericUDTF { private transient ObjectInspector inputOI = null ; @Override public void close () throws HiveException { } @Override public StructObjectInspector initialize (ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1 ) { throw new UDFArgumentException ("explode() takes only one argument" ); } ArrayList<String> fieldNames = new ArrayList <String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList <ObjectInspector>(); switch (args[0 ].getCategory()) { case LIST: inputOI = args[0 ]; fieldNames.add("col" ); fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector()); break ; case MAP: inputOI = args[0 ]; fieldNames.add("key" ); fieldNames.add("value" ); fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector()); fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector()); break ; default : throw new UDFArgumentException ("explode() takes an array or a map as a parameter" ); } return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } private transient final Object[] forwardListObj = new Object [1 ]; private transient final Object[] forwardMapObj = new Object [2 ]; @Override public void process (Object[] o) throws HiveException { switch (inputOI.getCategory()) { case LIST: ListObjectInspector listOI = (ListObjectInspector)inputOI; List<?> list = listOI.getList(o[0 ]); if (list == null ) { return ; } for (Object r : list) { forwardListObj[0 ] = r; forward(forwardListObj); } break ; case MAP: MapObjectInspector mapOI = (MapObjectInspector)inputOI; Map<?,?> map = mapOI.getMap(o[0 ]); if (map == null ) { return ; } for (Entry<?,?> r : map.entrySet()) { forwardMapObj[0 ] = r.getKey(); forwardMapObj[1 ] = r.getValue(); forward(forwardMapObj); } break ; default : throw new TaskExecutionException ("explode() can only operate on an array or a map" ); } } @Override public String toString () { return "explode" ; } }