How to create an outputschema which has nested bags in pig

566 views Asked by At

I am trying out Pig UDFs and have been reading about it. While the online content was helpful, I am still not sure if I understand how to create a complex output schema which has nested bags.

Please help.The requirement is as follows. Say for example, I am analyzing e-commerce orders data. An order can have multiple products ordered in them.

I have the product level data grouped at an order level. This is the input to my UDF. So each grouped data at an order level containing information about the products in each order is my input.

InputSchema:

(grouped_at_order, {
    (input_column_values_at_product1_level),
    (input_column_values_at_product2_level)
})

I would be computing metrics both at an order level and at a product level in UDF. For example: sum(products) is an order level metric, color of each product is a product level metric. So, ForEach row grouped at an order level sent to UDF, I want to compute the order level & item level metrics.

Expected OutputSchema:

{
 { (orders, (computed_values_at_order_level)) }, 
  {(productlevel, 
     {
      (computed_values_at_product1_level),
      (computed_values_at_product2_level),
      (computed_values_at_product3_level)
     }
   )
  }
}

The objective then is to persist the data at order level and product level in two separate output tables from pig.

Is there a better way of doing the same?

1

There are 1 answers

5
glefait On

As @maxymoo said, before returning nested data from an UDF, I would check first if I really need it.

Anyway, if you do, the solution is not complicated but painfull. You just create schema, add field, then create a schema for the tuple, add the fields or the subbags into, and so on.

@Override
public Schema outputSchema(Schema input) {

    Schema statsOrderLevel = new Schema();
    statsOrderLevel.add(new FieldSchema("value", DataType.CHARARRAY));

    Schema statsOrderLevelTuple = new Schema();
    statsOrderLevelTuple.add(new FieldSchema(null, statsOrderLevel, DataType.TUPLE);

    Schema statsOrderLevelBag = new Schema();
    statsOrderLevelBag.add(new FieldSchema("stats", statsOrderLevelTuple, DataType.BAG));

    [...]

 }