PIG merge two lines in the log

132 views Asked by At

I am now doing Data Transformation from weblogic log into csv format in order to let Hive to do further executing jobs However, i faced some problems from the log below:

####<Mar 16, 2015 12:27:27 AM HKT> <Info> <WebLogicServer> <hklp141p.xxxx.com> <> <[ACTIVE] ExecuteThread: '0' for queue: 'weblogic.kernel.Default (self-tuning)'> <> <> <> <1426436849796> <BEA-000000> <Initializing self-tuning thread pool> 
####<Mar 16, 2015 12:27:28 AM HKT> <Info> <Management> <hklp141p.xxxx.com> <> <[ACTIVE] ExecuteThread: '0' for queue: 'weblogic.kernel.Default (self-tuning)'> <> <> <> <1426436850227> <BEA-000000> <WebLogic Server "WLS_DOM_CMN1" version:
WebLogic Server 10.3.6.0.8 PSU Patch for BUG18040640 THU MARCH 27 15:54:42 IST 2014
WebLogic Server 10.3.6.0  Tue Nov 15 08:52:36 PST 2011 1441050  Copyright (c) 1995, 2011, Oracle and/or its affiliates. All rights reserved.>

I can use the PIG scripts below to extract the first two lines:

A = LOAD '/user/hdfs/csv/log/flume/*';
B = FOREACH A GENERATE REPLACE($0,',','');
C = FOREACH B GENERATE FLATTEN(REGEX_EXTRACT_ALL($0, '####<([^<>]+)> <([^<>]+)> <([^<>]+)> <([^<>]+)> <([^<>]*?)> <([^<>]+)> <<?([^<>]*?)>?> <([^<>]*?)> <([^<>]*?)> <([^<>]*?)> <([^<>]+)> <([^<>]+)>? ?'));
dump C;

The result would be like this:

(Mar 16 2015 12:27:27 AM HKT,Info,WebLogicServer,hklp141p.xxxx.com,,[ACTIVE] ExecuteThread: '0' for queue: 'weblogic.kernel.Default (self-tuning)',,,,1426436847404,BEA-000000,Starting WebLogic Server with Oracle JRockit(R) Version R28.3.2-14-160877-1.6.0_75-20140321-2359-linux-x86_64 from Oracle Corporation)
(Mar 16 2015 12:27:28 AM HKT,Info,Management,hklp141p.xxxx.com,,[ACTIVE] ExecuteThread: '0' for queue: 'weblogic.kernel.Default (self-tuning)',,,,1426436848329,BEA-000000,Version: WebLogic Server 10.3.6.0.8 PSU Patch for BUG18040640 THU MARCH 27 15:54:42 IST 2014)
()
()

However, the last two lines should be within the same message to the second line, the expected result should be like this:

(Mar 16 2015 12:27:27 AM HKT,Info,WebLogicServer,hklp141p.xxxx.com,,[ACTIVE] ExecuteThread: '0' for queue: 'weblogic.kernel.Default (self-tuning)',,,,1426436847404,BEA-000000,Starting WebLogic Server with Oracle JRockit(R) Version R28.3.2-14-160877-1.6.0_75-20140321-2359-linux-x86_64 from Oracle Corporation)
(Mar 16 2015 12:27:28 AM HKT,Info,Management,hklp141p.xxxx.com,,[ACTIVE] ExecuteThread: '0' for queue: 'weblogic.kernel.Default (self-tuning)',,,,1426436848329,BEA-000000,Version: WebLogic Server 10.3.6.0.8 PSU Patch for BUG18040640 THU MARCH 27 15:54:42 IST 2014 WebLogic Server 10.3.6.0.8 PSU Patch for BUG18040640 THU MARCH 27 15:54:42 IST 2014 WebLogic Server 10.3.6.0  Tue Nov 15 08:52:36 PST 2011 1441050  Copyright (c) 1995, 2011, Oracle and/or its affiliates. All rights reserved.)

May i know how can i get that result set from PIG scripts?


UPDATE:

I am trying to write a UDF for LOAD Function, I find that returning line is rely on this function: Text value = (Text) recordReader.getCurrentValue();

However, i still cannot customized the reading line method in the code and i am not sure if i should modify which part of the code, should it be inside of prepareToRead function?

Here is the sample code:

package com.weblogic.pig;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.*;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.*;
import org.apache.pig.data.*;

import java.io.IOException;
import java.util.*;


public class MyLoader extends LoadFunc {
  protected RecordReader recordReader = null;

  @Override
  public void setLocation(String s, Job job) throws IOException {
    FileInputFormat.setInputPaths(job, s);
  }

  @Override
  public InputFormat getInputFormat() throws IOException {
    return new PigTextInputFormat();
  }

  @Override
  public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException {
    this.recordReader = recordReader;
  }

  @Override
  public Tuple getNext() throws IOException {
    try {
      boolean flag = recordReader.nextKeyValue();
      if (!flag) {
        return null;
      }
      Text value = (Text) recordReader.getCurrentValue();
      String[] strArray = value.toString().split(",");
      List lst = new ArrayList<String>();
      int i = 0;
      for (String singleItem : strArray) {
        lst.add(i++, singleItem);
      }
      return TupleFactory.getInstance().newTuple(lst);
    } catch (InterruptedException e) {
      throw new ExecException("Read data error", PigException.REMOTE_ENVIRONMENT, e);
    }
  }
}

Thanks a lot!!

Best Regards, Johnson

0

There are 0 answers