Speed up XML processing with Python and lxml

81 views Asked by At

I've implemented the following function, that processes xml file and extracts specific data from the RcrdSts nodes:

import logging
import pandas as pd
from lxml import etree

logging.basicConfig(filename="log.log",
                    filemode="w",
                    format="%(asctime)s - %(levelname)s - %(message)s",
                    level=logging.INFO)
                    
def fca_feedback_read_record_status(working_dir, response_file):
    # initialize variables
    ns3 = "urn:iso:std:iso:20022:tech:xsd:auth.031.001.01"
    column_names = ['record_id', 'status', 'error_code', 'error_description']
    record_list = []
    local_start = 0
    local_end = 0

    # open response file and initialize tree
    logging.info("Extracting rejected records from file '%s' ... " % response_file)
    tree = etree.parse(working_dir + response_file)
    root = tree.getroot()

    # count the number of RcrdSts nodes in the responce file
    record_status_nodes_count = len(root.xpath("//ns3:RcrdSts",
                                               namespaces={"ns3": ns3}))

    if record_status_nodes_count > 0:
        logging.info("| --> Number of record status nodes: '%s'" % record_status_nodes_count)

        # iterate over the tree
        for i in range(record_status_nodes_count):
            # get record id
            record_id = root.xpath("//ns3:RcrdSts/ns3:OrgnlRcrdId",
                                   namespaces={"ns3": ns3})[i].text

            # get record status
            record_status = root.xpath("//ns3:RcrdSts/ns3:Sts",
                                       namespaces={"ns3": ns3})[i].text

            # get number of validation errors for the current records
            local_validation_rule_count = len(root.xpath("//ns3:RcrdSts",
                                                         namespaces={"ns3": ns3})[i]) - 2

            
            # LOOP section
            local_end = local_end + local_validation_rule_count

            for k in range(local_start, local_end):
                # get error code
                validation_rule_id = root.xpath("//ns3:RcrdSts/ns3:VldtnRule",
                                                namespaces={"ns3": ns3})[k][0].text
                # get error description
                validation_rule_desc = root.xpath("//ns3:RcrdSts/ns3:VldtnRule",
                                                  namespaces={"ns3": ns3})[k][1].text
                # append values to list
                record_list.append([record_id, record_status, validation_rule_id, validation_rule_desc])

            local_start = local_start + local_validation_rule_count

        logging.info("Creating output file ... ")
        export_df = pd.DataFrame(record_list, columns=column_names)
        logging.info("Exporting file ... ")
        export_filename = response_file.replace(".xml", ".csv")
        export_df.to_csv("D:\\Personal\\" + export_filename, index=None)

        logging.info("Processing completed. ")

        return True

    else:

        logging.info("Rejected records not found in file '%s' ... " % response_file)

        return False
        
        
if __name__ == "__main__":
    workdir = "D:\\Personal\\"
    response_xml = "NCAGB_FDBTRA_I213800H4ECWD5Z7JJ680_004665_23.xml"

    result = process_fca_feedback_file(workdir, response_xml)

The source xml file looks like this:

<?xml version='1.0' encoding='UTF-8'?>
<BizData xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ns3="urn:iso:std:iso:20022:tech:xsd:auth.031.001.01" xmlns:ns2="urn:iso:std:iso:20022:tech:xsd:head.001.001.01" xmlns="urn:iso:std:iso:20022:tech:xsd:head.003.001.01" xsi:schemaLocation="urn:iso:std:iso:20022:tech:xsd:head.003.001.01 head.003.001.01.xsd urn:iso:std:iso:20022:tech:xsd:head.001.001.01 head.001.001.01_ESMAUG_1.0.0.xsd urn:iso:std:iso:20022:tech:xsd:auth.031.001.01 auth.031.001.01_ESMAUG_Reporting_1.1.0.xsd">
  <Hdr>
    <ns2:AppHdr>
      <ns2:Fr>
        <ns2:OrgId>
          <ns2:Id>
            <ns2:OrgId>
              <ns2:Othr>
                <ns2:Id>GB</ns2:Id>
              </ns2:Othr>
            </ns2:OrgId>
          </ns2:Id>
        </ns2:OrgId>
      </ns2:Fr>
      <ns2:To>
        <ns2:OrgId>
          <ns2:Id>
            <ns2:OrgId>
              <ns2:Othr>
                <ns2:Id>253800H4ECWD5Z7JJ890</ns2:Id>
              </ns2:Othr>
            </ns2:OrgId>
          </ns2:Id>
        </ns2:OrgId>
      </ns2:To>
      <ns2:BizMsgIdr>005469-0_23</ns2:BizMsgIdr>
      <ns2:MsgDefIdr>auth.031.001.01</ns2:MsgDefIdr>
      <ns2:CreDt>2023-05-10T16:43:26Z</ns2:CreDt>
      <ns2:Rltd>
        <ns2:Fr>
          <ns2:OrgId>
            <ns2:Id>
              <ns2:OrgId>
                <ns2:Othr>
                  <ns2:Id>213800H4ECWD5Z7JJ680</ns2:Id>
                  <ns2:SchmeNm>
                    <ns2:Prtry>LEI</ns2:Prtry>
                  </ns2:SchmeNm>
                </ns2:Othr>
              </ns2:OrgId>
            </ns2:Id>
          </ns2:OrgId>
        </ns2:Fr>
        <ns2:To>
          <ns2:OrgId>
            <ns2:Id>
              <ns2:OrgId>
                <ns2:Othr>
                  <ns2:Id>GB</ns2:Id>
                  <ns2:SchmeNm>
                    <ns2:Prtry>FCA</ns2:Prtry>
                  </ns2:SchmeNm>
                </ns2:Othr>
              </ns2:OrgId>
            </ns2:Id>
          </ns2:OrgId>
        </ns2:To>
        <ns2:BizMsgIdr>10052023REP09</ns2:BizMsgIdr>
        <ns2:MsgDefIdr>auth.016.001.01</ns2:MsgDefIdr>
        <ns2:CreDt>2023-05-10T12:01:09Z</ns2:CreDt>
      </ns2:Rltd>
    </ns2:AppHdr>
  </Hdr>
  <Pyld>
    <ns3:Document>
      <ns3:FinInstrmRptgStsAdvc>
        <ns3:StsAdvc>
          <ns3:MsgRptIdr>000175-0_23</ns3:MsgRptIdr>
          <ns3:MsgSts>
            <ns3:Sts>RJCT</ns3:Sts>
            <ns3:Sttstcs>
              <ns3:TtlNbOfRcrds>1</ns3:TtlNbOfRcrds>
              <ns3:NbOfRcrdsPerSts>
                <ns3:DtldNbOfRcrds>1</ns3:DtldNbOfRcrds>
                <ns3:DtldSts>RJCT</ns3:DtldSts>
              </ns3:NbOfRcrdsPerSts>
            </ns3:Sttstcs>
          </ns3:MsgSts>
          <ns3:RcrdSts>
            <ns3:OrgnlRcrdId>213800H4ECWD5Z7JJ68009052023D00009</ns3:OrgnlRcrdId>
            <ns3:Sts>RJCT</ns3:Sts>
            <ns3:VldtnRule>
              <ns3:Id>CON-610</ns3:Id>
              <ns3:Desc>Waiver indicator is inconsistent with trading venue</ns3:Desc>
            </ns3:VldtnRule>
            <ns3:VldtnRule>
              <ns3:Id>CON-640</ns3:Id>
              <ns3:Desc>Commodity derivative indicator is missing</ns3:Desc>
            </ns3:VldtnRule>
          </ns3:RcrdSts>
        </ns3:StsAdvc>
      </ns3:FinInstrmRptgStsAdvc>
    </ns3:Document>
  </Pyld>
</BizData>

The code works fine and it does extract the content from multiple RcrdSts nodes. The loop is needed, because sometimes there are multiple validation errors and descriptions per record.

The issue I'm facing is that when the number of nodes increases - like 1 million nodes, the process takes forever. For large files the code processes some 4.5 records per second, which is slow IMO.

I would appreciate ideas how to optimize the code for speed.

Thanks in advance for everyone's time and efforts.

1

There are 1 answers

1
mrxra On BEST ANSWER

each time you are accessing a particular value in your file, you are searching again the entire file (xpath). so basically if you have N RcrdSts records in a file and you need like M of its properties, you currently search the file NxM times...instead of only once.

I've changed your code so it finds all RcrdSts nodes, then iterates over the found nodes and accesses its child nodes directly to generate your dataframe. this processes the entire file only once.

another way to further speed it up (if necessary) would be to use a SAX parser instead of a DOM parser...which is particularly useful if your files tend to be large...that would be a complete rewrite though. So try this one and see if it works for you:

    import logging
    import pathlib
    import pandas as pd
    from lxml import etree
    
    logging.basicConfig(filename="log.log",
                        filemode="w",
                        format="%(asctime)s - %(levelname)s - %(message)s",
                        level=logging.INFO)
    
    def fca_feedback_read_record_status(working_dir, response_file):
        # initialize variables
        ns3 = "urn:iso:std:iso:20022:tech:xsd:auth.031.001.01"
    
        logging.info("Extracting rejected records from file '%s' ... " % response_file)
        root = etree.parse(working_dir / response_file).getroot()
    
        if (record_status_nodes := root.xpath("//ns3:RcrdSts", namespaces={"ns3": ns3})):
            logging.info("| --> Number of record status nodes: '%s'" % len(record_status_nodes))
            record_list = []
            for i, node in enumerate(record_status_nodes):
                for c in node:
                    if etree.QName(c).localname == "OrgnlRcrdId":
                        record_id = c.text
                    elif etree.QName(c).localname == "Sts":
                        record_status = c.text
                    elif etree.QName(c).localname == 'VldtnRule':
                        record_list.append(dict(
                            record_id = record_id,
                            record_status = record_status,
                            error_code=c[0].text,
                            error_description=c[1].text
                        ))
    
            logging.info("Creating output file ... ")
            export_df = pd.DataFrame.from_records(record_list)
            logging.info("Exporting file ... ")
            export_filename = response_file.replace(".xml", ".csv")
            export_df.to_csv(working_dir / export_filename, index=None)
            logging.info("Processing completed. ")
            return True
        else:
            logging.info("Rejected records not found in file '%s' ... " % response_file)
            return False
    
    
    if __name__ == "__main__":
        workdir = pathlib.Path(__file__).parent
        response_xml = "NCAGB_FDBTRA_I213800H4ECWD5Z7JJ680_004665_23.xml"
        result = fca_feedback_read_record_status(workdir, response_xml)
        print(result)