05 08

Writing hive udf

(How to convert an array<struct<target:

into an

If you’ve been reading my blog posts over the last few months, you will have noticed that they’ve been focused more on the issues in the engineering side of Baynote, and less on the technical details. This blog post is an exception. It’s going to get very technical very quickly. Sorry.

The back-end component of our platform is being built on top of a Hadoop cluster, and we do a lot of our model building and analytics using Hive. This insulates us from having to write low-level map-reduce jobs, and makes our algorithm development more efficient. We find it to be an extremely useful tool, but sometimes lacking in documentation.

One of the data items that we capture from our customers is the details of the purchases that their website users make. The tags (see Why Amazon and Netflix Have it Easy) send back the quantity purchased, the item’s price, and the customer’s identifier for the item. We translate the customer identifier into a Baynote-internal identifier, and store the purchase in our Hive table as an array<struct<target:bigint,quantity:int,price:float>> where target is the internal identifier.

Now it turns out that sometimes customers put new items on their web sites before we get the new catalog that tells us about them. In these cases, when creating the entries in the Hive table, we have no mapping from the customer’s identifier to the internal Baynote identifier. So we decided to change the schema in the table to array<struct<target:bigint,quantity:int,price:float,externalid:string>>, so that if we have an externalid with no current target, we could perform the lookup later.

So we have a change in the table schema, and so our data is necessarily split over two tables, one before the change, and one after. It’s very inconvenient to have the basic data that all models and analytics are based on split over two tables. But it is not a major problem, because Hive supports views, and it’s easy to create a view that uses a UNION ALL to merge the two – but only if you can translate the old schema into the new one.

So, how to convert an array<struct<target:bigint,quantity:int,price:float>> into an array<struct<target:bigint,quantity:int,price:float,externalid:string>> ? Unfortunately HiveQL has limited support for creating structs and arrays, so it’s time to turn to a UDF.

Hive supports two types of UDF. Simple UDFs [https://cwiki.apache.org/Hive/hiveplugins.html] and GenericUDFs. Simple UDFs are, well, simple. Easy to write, but limited in the types of data that can be input to them. In particular, they don’t deal with structs. GenericUDFs are the swiss-army knife of UDFs, but the documentation for them is very scant. Especially for dealing with complex inputs. There’s a tutorial for writing UDAFs at https://cwiki.apache.org/Hive/genericudafcasestudy.html, and a posting describing developing a serde (which uses a lot of the same techniques as a GenericUDF when it comes to accessing the data) at http://www.congiu.com/a-json-readwrite-serde-for-hive/. But, as evidenced by the lack of an answer to the stackoverflow question http://stackoverflow.com/questions/10895933/how-to-handle-nested-struct-in-hive-udf there’s a lack of readily available information regarding UDFs and complex data structures.

Here’s what goes in to a GenericUDF that takes as input an array of structs and outputs an array of a different type of struct.

Before we start, a few notes:

  1. A UDF gets access to the data passed to it via ObjectInspectors, so the ObjectInspector concept is very important.
  2. Many of the functions that are used in GenericUDFs, in particular the various subclasses of ObjectInspector, return Objects. This means that
    1. the compiler is of very little help when it comes to type-checking during development
    2. it’s sometimes hard to find out what the actual type that’s returned really is. The hadoop task logs can be very useful here, as we’ll see later.

All user-defined GenericUDFs inherit from the GenericUDF class, and so must implement the following three methods:

  1. public ObjectInspector initialize(ObjectInspector[] arguments)
  2. public Object evaluate(DeferredObject[] arguments)
  3. public String getDisplayString(String[] children)

initialize() is called the first time the UDF is invoked. It is typically used to do four things, only the first two of which are obvious.

  1. Verify that the input is of the type expected (in our case array<struct<target:bigint,quantity:int,price:float>> )
  2. Set up and return an ObjectInspector for the type of the output of the UDF. For a UDF that deals with struct<>s this is where the names of the elements of the structs in the output are defined.
  3. Store in global variables the ObjectInspectors for the elements of the input
  4. Set up the storage variable for the output.

Step 3 isn’t strictly necessary, as the ObjectInspectors could be set up in the call to evaluate(), but setting them up in the call to initialize() means that this is only done once.

evaluate() gets passed the input, does whatever it wants to it, and then returns the output. The input is accessed using the ObjectInspectors that were saved into global variables in the call to initialize(). How to store and return the output is also important – the signature for the function says that it returns an Object, so we need to know what sorts of Object Hive will not object to when it gets them. What’s important for this particular UDF is the way in which array<> and struct<> are represented so that they can be passed back to hive.

  1. array<> is represented by java ArrayList<>
  2. struct<> is represented by java Object[], and only the values are stored in the Object array (the names are defined in the ObjectInspector for the output that was set up in initialize() ). The elements must be stored in the same order as the fields were added to the output ObjectInspector.

Other than that, use hadoop’s Writable and Text classes rather than java primitives.

getDisplayString() returns the string that will be returned when explain is used.

So, that said, here’s the code, to which I’ve added a lot of comments explaining what’s going on and how the code fits with what I’ve written above.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.lazy.LazyLong; import org.apache.hadoop.hive.serde2.lazy.LazyInteger; import org.apache.hadoop.hive.serde2.lazy.LazyFloat; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; import java.util.ArrayList; public class GenericUDFAddExternalIdToPurchaseDetails extends GenericUDF { // the return variable. Java Object[] become hive struct<>. Java ArrayList<> become hive array<>. // The return variable only holds the values that are in the struct<>. The field names // are defined in the ObjectInspector that is returned by the initialize() method. private ArrayList ret; // Global variables that inspect the input. // These are set up during the initialize() call, and are then used during the // calls to evaluate() // // ObjectInspector for the list (input array<>) // ObjectInspector for the struct<> // ObjectInspectors for the elements of the struct<>, target, quantity and price private ListObjectInspector loi; private StructObjectInspector soi; private ObjectInspector toi, qoi, poi; @Override // This is what we do in the initialize() method: // Verify that the input is of the type expected // Set up the ObjectInspectors for the input in global variables // Initialize the output ArrayList // Return the ObjectInspector for the output public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { // Verify the input is of the required type. // Set the global variables (the various ObjectInspectors) while we're doing this // Exactly one input argument if( arguments.length != 1 ) throw new UDFArgumentLengthException("AddExternalIdToPurchaseDetails() accepts exactly one argument."); // Is the input an array<> if( arguments[0].getCategory() != ObjectInspector.Category.LIST ) throw new UDFArgumentTypeException(0,"The single argument to AddExternalIdToPurchaseDetails should be " + "Array<Struct>" + " but " + arguments[0].getTypeName() + " is found"); // Is the input an array<struct<>> // Get the object inspector for the list(array) elements; this should be a StructObjectInspector // Then check that the struct has the correct fields. // Also, store the ObjectInspectors for use later in the evaluate() method loi = ((ListObjectInspector)arguments[0]); soi = ((StructObjectInspector)loi.getListElementObjectInspector()); // Are there the correct number of fields? if( soi.getAllStructFieldRefs().size() != 3 ) throw new UDFArgumentTypeException(0,"Incorrect number of fields in the struct. " + "The single argument to AddExternalIdToPurchaseDetails should be " + "Array<Struct>" + " but " + arguments[0].getTypeName() + " is found"); // Are the fields the ones we want? StructField target = soi.getStructFieldRef("target"); StructField quantity = soi.getStructFieldRef("quantity"); StructField price = soi.getStructFieldRef("price"); if( target==null ) throw new UDFArgumentTypeException(0,"No \"target\" field in input structure "+arguments[0].getTypeName()); if( quantity==null ) throw new UDFArgumentTypeException(0,"No \"quantity\" field in input structure "+arguments[0].getTypeName()); if( price==null ) throw new UDFArgumentTypeException(0,"No \"price\" field in input structure "+arguments[0].getTypeName()); // Are they of the correct types? (primitives WritableLong, WritableInt, WritableFloat) // We store these Object Inspectors for use in the evaluate() method. toi = target.getFieldObjectInspector(); qoi = quantity.getFieldObjectInspector(); poi = price.getFieldObjectInspector(); // First, are they primitives? if(toi.getCategory() != ObjectInspector.Category.PRIMITIVE ) throw new UDFArgumentTypeException(0,"Is input primitive? target field must be a bigint; found "+toi.getTypeName()); if(qoi.getCategory() != ObjectInspector.Category.PRIMITIVE ) throw new UDFArgumentTypeException(0,"Is input primitive? quantity field must be an int; found "+toi.getTypeName()); if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE ) throw new UDFArgumentTypeException(0,"Is input primitive? price field must be a float; found "+toi.getTypeName()); // Second, are they the correct type of primitive? if( ((PrimitiveObjectInspector)toi).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG ) throw new UDFArgumentTypeException(0,"Is input correct primitive? target field must be a bigint; found "+toi.getTypeName()); if( ((PrimitiveObjectInspector)qoi).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT ) throw new UDFArgumentTypeException(0,"Is input correct primitive? target field must be an int; found "+toi.getTypeName()); if( ((PrimitiveObjectInspector)poi).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.FLOAT ) throw new UDFArgumentTypeException(0,"Is input correct primitive? price field must be a float; found "+toi.getTypeName()); // If we get to here, the input is an array<struct> // HOW TO RETURN THE OUTPUT? // A struct<> is stored as an Object[], with the elements being ,,... // See GenericUDFNamedStruct // The object inspector that we set up below and return at the end of initialize() takes care of the names, // so the Object[] only holds the values. // A java ArrayList is converted to a hive array<>, so the output is an ArrayList ret = new ArrayList(); // Now set up and return the object inspector for the output of the UDF // Define the field names for the struct<> and their types ArrayList structFieldNames = new ArrayList(); ArrayList structFieldObjectInspectors = new ArrayList(); structFieldNames.add("target"); structFieldNames.add("quantity"); structFieldNames.add("price"); structFieldNames.add("externalId"); // To get instances of PrimitiveObjectInspector, we use the PrimitiveObjectInspectorFactory structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableLongObjectInspector ); structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableIntObjectInspector ); structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableFloatObjectInspector ); structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector ); // Set up the object inspector for the struct<> for the output StructObjectInspector si2; si2 = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); // Set up the list object inspector for the output, and return it ListObjectInspector li2; li2 = ObjectInspectorFactory.getStandardListObjectInspector( si2 ); return li2; } @Override // The evaluate() method. The input is passed in as an array of DeferredObjects, so that // computation is not wasted on deserializing them if they're not actually used public Object evaluate(DeferredObject[] arguments) throws HiveException { // Empty the return array (re-used between calls) ret.clear(); // Should be exactly one argument if( arguments.length!=1 ) return null; // If passed a null, return a null if( arguments[0].get()==null ) return null; // Iterate over the elements of the input array // Convert the struct<>'s to the new format // Put them into the output array // Return the output array int nelements = loi.getListLength(arguments[0].get()); for( int i=0; i // getStructFieldData() returns an Object; however, it's actually a LazyLong // (How do we know it's a LazyLong, as the documentation says that getStructFieldData() returns an Object? //We know, because during development, the error in the hadoop task log was "can't cast LazyLong to ...") // How do you get the data out of a LazyLong? Using a LongObjectInspector... LazyLong LLtarget = (LazyLong)(soi.getStructFieldData( loi.getListElement(arguments[0].get(),i), soi.getStructFieldRef("target"))); long tt = ((LongObjectInspector)toi).get( LLtarget ); LazyInteger LIquantity = (LazyInteger)(soi.getStructFieldData( loi.getListElement(arguments[0].get(),i), soi.getStructFieldRef("quantity"))); int qq = ((IntObjectInspector)qoi).get( LIquantity ); LazyFloat LFprice = (LazyFloat)(soi.getStructFieldData( loi.getListElement(arguments[0].get(),i), soi.getStructFieldRef("price"))); float pp = ((FloatObjectInspector)poi).get( LFprice ); // The struct<> we're returning is stored as an Object[] of length 4 (it has 4 fields) Object[] e; e = new Object[4]; // The field values must be inserted in the same order as defined in the ObjectInspector for the output // The fields must also be hadoop writable/text classes e[0] = new LongWritable(tt); e[1] = new IntWritable(qq); e[2] = new FloatWritable(pp); e[3] = new Text(); ret.add(e); } return ret; } @Override public String getDisplayString(String[] children) { assert( children.length>0 ); StringBuilder sb = new StringBuilder(); sb.append("AddExternalIdToPurchaseDetails("); sb.append(children[0]); sb.append(")"); return sb.toString(); } }

So there you have it. A Hive GenericUDF that does stuff with array<struct<>> Hive data. Hope it’s interesting/useful.

[And I’ll send a Baynote t-shirt to the first person who emails me (rdm at baynote.com) pointing out where the input validation is incomplete.]


Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>