Read XML and insert data into tables in parallel. Is it possible to achieve this using Oracle's DBMS_PARALLEL_EXECUTE?

99 views Asked by At

We have two procedures , as below,

1.UPLOADXML

create or replace PROCEDURE             "UPLOADXMLFILE"(v_schemaName IN VARCHAR2)
    AS
    v_documentId NUMBER;
  v_file_name VARCHAR2(100);
  v_xml_data XMLTYPE;
--  v_xml_filesize NUMBER;
  v_docCount NUMBER;
  v_temp SYS_REFCURSOR;
  v_error_message VARCHAR2(4000);
    v_error_line    NUMBER;
    v_error_info  VARCHAR2(4000);
BEGIN
 -- Start a transaction
 v_docCount := 0;
    SAVEPOINT start_savepoint;

     OPEN v_temp FOR
      SELECT DOCUMENTID, FILENAME FROM COMMON.XMLDOCUMENTQUEUE@BFCENTRALDATA WHERE STATUSID = 1110 AND FLOWTYPEID = 2 
                                    AND processedByDatabase=v_schemaName AND ROWNUM<=20;

    -- Fetch the data
    FETCH v_temp INTO v_documentId,v_file_name;
    -- Loop through the records
    LOOP
    EXIT WHEN v_temp%NOTFOUND ;
        BEGIN
        v_xml_data := XMLTYPE(BFILENAME('MEDIA_DIR', v_file_name),NLS_CHARSET_ID ('AL32UTF8'));
            DBMS_OUTPUT.PUT_LINE('DocumentID: ' || v_documentid ||' fileName: '||v_file_name ||' INSTANCENAME: '||v_schemaName);
--      v_xml_filesize := DBMS_LOB.GETLENGTH(BFILENAME('MEDIA_DIR', v_file_name)) ;

        -- Check if the document already exists
            BEGIN
                SELECT COUNT(documentid) INTO v_docCount FROM XMLFILETABLE WHERE DOCUMENTID=v_documentid;
            EXCEPTION
                    WHEN NO_DATA_FOUND THEN
                        v_docCount:=0;
            END;
            IF(v_docCount>0) THEN
                BEGIN 
                    DELETE FROM XMLFILETABLE WHERE DOCUMENTID=v_documentid;
                END;
            END IF;
                DBMS_OUTPUT.PUT_LINE('V_doccount: '||v_docCount||'Upload file start');
                  -- Insert the XML data into the table
                    INSERT INTO XMLFILETABLE (documentid, File_name, xml_data)
                    VALUES (v_documentId,v_file_name, v_xml_data);

                    UPDATE COMMON.XMLDOCUMENTQUEUE@BFCENTRALDATA
                             SET  STATUSID =1115,
                             dateModified = SYSDATE
                             WHERE documentid=v_documentId;

            EXCEPTION
            WHEN OTHERS THEN
                --DBMS_OUTPUT.PUT_LINE('Error during XML parsing: ' || SQLERRM);

                --Log XML Preprocess Error
                SSIS.ADDOPERATIONALLOG(v_documentid, 'ERROR', 'XML Failed Parsing Error', 'BF1', v_schemaName);
                UPDATE COMMON.XMLDOCUMENTQUEUE@BFCENTRALDATA
                     SET  STATUSID =11150,
                     dateModified = SYSDATE
                     WHERE documentid=v_documentId;

            END;
        -- Fetch the next row
        FETCH v_temp INTO v_documentId, v_file_name;
        END LOOP;
    -- Close the cursor
    CLOSE v_temp;
-- Commit the transaction
    COMMIT;

    EXCEPTION
            WHEN OTHERS THEN
                -- Handle any exceptions, such as constraint violations
                -- Capture and store error information
            v_error_message := SQLERRM;
            v_error_line := DBMS_UTILITY.FORMAT_ERROR_STACK;
-- Concatenate error message and line number into a single string
            v_error_info := v_error_message || ' (Line ' || v_error_line || ')';
            SSIS.ADDOPERATIONALLOG(v_documentid, 'ERROR',v_error_info, 'BF1', v_schemaName);
        ROLLBACK TO start_savepoint;
 END;

2.PUSHXMLFILEDATATOTABLE

create or replace PROCEDURE                                     "PUSHXMLFILEDATATOTABLE" (
    p_documentid IN NUMBER,
    p_result OUT NUMBER
    
) AS

BEGIN

-- Initialize the result to a failure status
    p_result := 0;
                    --XML NODE READING FOR INFO TABLE
                   EXECUTE IMMEDIATE 'TRUNCATE TABLE XWDDSB100.INVOICE_INFO';
                INSERT INTO XWDDSB100.INVOICE_INFO
                SELECT INVOICE.INVOICEID,
                 --INVOICE.BILL_NO as BILL_NO, INVOICE.NAME as NAME
                                    INVOICE.BILL_NO AS BILL_NO, INVOICE.CURRENT_TOTAL AS CURRENT_TOTAL, INVOICE.TOTAL_DUE AS TOTAL_DUE, INVOICE.CREATED_T AS CREATED_T,
                                    INVOICE.START_T AS START_T, INVOICE.END_T AS END_T, INVOICE.DUE_T AS DUE_T, INVOICE.BI_BILL_TYPE AS BI_BILL_TYPE, INVOICE.PAY_TYPE AS PAY_TYPE,
                                    INVOICE.TYPE_STR AS TYPE_STR,INVOICE.MESSAGE AS MESSAGE, INVOICE.POID AS POID, 
                                     REGEXP_SUBSTR(INVOICE.POID, '([^ ]+)', 1, 3) AS POID_3,REGEXP_SUBSTR(INVOICE.POID, '([^ ]+)', 1, 4) AS POID_4
                                    --ACCOUNT INFO
                                    ,INVOICE.ACCTINFOID AS ACCTINFOID, INVOICE.ACCOUNT_NO AS ACCOUNT_NO, INVOICE.BUSINESS_TYPE AS BUSINESS_TYPE,
                                                INVOICE.CUSTOMER_SEGMENT_LIST AS CUSTOMER_SEGMENT_LIST, INVOICE.AAC_SERIAL_NUM AS AAC_SERIAL_NUM
                                    --NAME INFO
                                    ,INVOICE.ELEM AS ELEM, INVOICE.SALUTATION AS SALUTATION, INVOICE.FIRST_NAME AS FIRST_NAME, 
                                        INVOICE.MIDDLE_NAME AS MIDDLE_NAME, INVOICE.LAST_NAME AS LAST_NAME, INVOICE.COMPANY AS COMPANY
                                    --INV INFO
                                    ,INVOICE.ELEM AS INVELEM, INVOICE.BILL_TYPE AS INV_BILL_TYPE, INVOICE.INV_TYPE AS INV_TYPE, 
                                    INVOICE.DELIVERY_DESCR AS DELIVERY_DESCR, INVOICE.DELIVERY_PREFER AS DELIVERY_PREFER, INVOICE.EMAIL_ADDR AS EMAIL_ADDR,
                                    INVOICE.ADDRESS AS ADDRESS, INVOICE.CITY AS CITY, INVOICE.COUNTRY AS COUNTRY
                FROM XWDDSB100.XMLFILETABLE
                   , xmltable('/invoice'  
                           passing XMLFILETABLE.XML_DATA
                           columns 
                            INVOICEID  for ordinality,
                      --BILL INFO
                              "BILL_NO" NVARCHAR2(100) PATH 'BILLINFO/BILL_NO',
                              "CURRENT_TOTAL" FLOAT PATH 'BILLINFO/CURRENT_TOTAL',
                              "TOTAL_DUE" FLOAT PATH 'BILLINFO/TOTAL_DUE',
                              "CREATED_T" NUMBER(10,0) PATH 'BILLINFO/CREATED_T', 
                              "START_T" NUMBER(10) PATH 'BILLINFO/START_T', 
                              "END_T" NUMBER(10) PATH 'BILLINFO/END_T', 
                              "DUE_T" NUMBER(10) PATH 'BILLINFO/DUE_T', 
                              "BI_BILL_TYPE" NUMBER(10) PATH 'BILLINFO/BILL_TYPE',
                              "PAY_TYPE" NUMBER(10) PATH 'BILLINFO/PAY_TYPE', 
                              "TYPE_STR" NVARCHAR2(100) PATH 'BILLINFO/TYPE_STR',
                              "POID" NVARCHAR2(100) PATH 'BILLINFO/POID',
                              "MESSAGE" NVARCHAR2(255) PATH 'BILLINFO/MESSAGE'
                            --ACCT INFO
                            ,"ACCTINFOID" NUMBER(4) PATH 'ACCTINFO/ACCTINFOID',
                            "ACCOUNT_NO" NVARCHAR2(50) PATH 'ACCTINFO/ACCOUNT_NO',
                            "BUSINESS_TYPE" NUMBER(10) PATH 'ACCTINFO/BUSINESS_TYPE',
                            "CUSTOMER_SEGMENT_LIST" NVARCHAR2(20) PATH 'ACCTINFO/CUSTOMER_SEGMENT_LIST', 
                            "AAC_SERIAL_NUM" NVARCHAR2(255) PATH 'ACCTINFO/AAC_SERIAL_NUM'
                           --NAME INFO
                            ,"ELEM" NUMBER(10) PATH 'NAMEINFO[@elem="1"]/@elem', "SALUTATION" NVARCHAR2(255) PATH 'NAMEINFO[@elem="1"]/SALUTATION',
                            "FIRST_NAME" NVARCHAR2(255) PATH 'NAMEINFO[@elem="1"]/FIRST_NAME', "MIDDLE_NAME" NVARCHAR2(255) PATH 'NAMEINFO[@elem="1"]/MIDDLE_NAME', 
                            "LAST_NAME" NVARCHAR2(255) PATH 'NAMEINFO[@elem="1"]/LAST_NAME', "COMPANY" NVARCHAR2(255) PATH 'NAMEINFO[@elem="1"]/COMPANY'
                            --INV INFO                      
                            ,"INVELEM" NUMBER(10) PATH 'INV_INFO/@elem', "BILL_TYPE" NUMBER(10) PATH 'INV_INFO/BILL_TYPE', "INV_TYPE" NUMBER(10) PATH 'INV_INFO/INV_TYPE', 
                            "DELIVERY_DESCR" NVARCHAR2(255) PATH 'INV_INFO/DELIVERY_DESCR', "DELIVERY_PREFER" NUMBER(10) PATH 'INV_INFO/DELIVERY_PREFER', 
                            "EMAIL_ADDR" NVARCHAR2(255) PATH 'INV_INFO/EMAIL_ADDR', "ADDRESS" NVARCHAR2(255) PATH 'INV_INFO/ADDRESS', "CITY" NVARCHAR2(255) PATH 'INV_INFO/CITY',
                            "COUNTRY" NVARCHAR2(510) PATH 'INV_INFO/COUNTRY'
                            ) INVOICE
                        WHERE XMLFILETABLE.documentid=p_documentid  ;

                

                COMMIT;
                p_result := p_documentid;
        EXCEPTION
            WHEN OTHERS THEN
               p_result:=0;
--                RAISE;
--                 IF SQLCODE <> 0 THEN
--                    DBMS_OUTPUT.PUT_LINE('Error: ' || SQLCODE || ' - ' || SQLERRM);
--                END IF;
    END PUSHXMLFILEDATATOTABLE;

The procedure UPLOADXML gets the xml file name from COMMON.XMLDOCUMENTQUEUE@BFCENTRALDATA table and insert it as xmltype into the table XMLFILETABLE. This procedure has been scheduled in a job.

Another procedure named PUSHXMLFILEDATATOTABLE read the xml from XMLFILETABLE and insert the data into a table named INVOICE_INFO.

Our front end app is inserting xml file name into the table COMMON.XMLDOCUMENTQUEUE@BFCENTRALDATA.

We need to run the procedure PUSHXMLFILEDATATOTABLE in parallel using DBMS_PARALLEL_EXECUTE. To achieve this , I guess , we need to use DBMS_PARALLEL_EXECUTE.run_task(..., parallel_level => 60) . As per my understanding DBMS_PARALLEL_EXECUTE.run_task(..., parallel_level => 60) creates 60 different sessions and the procedure PUSHXMLFILEDATATOTABLE executes in each of these 60 sessions separately. Right?

Also , if we set parallel_level => 60 that means that we do need to create 60 scheduled jobs. If so how to do that?

EDIT: documentid is the primary key of the table XMLFILETABLE used in the procedure PUSHXMLFILEDATATOTABLE. Currently PUSHXMLFILEDATATOTABLE procedure reads only one xml using a documentid at a time. In one parallel execution server I need to process 60 xmls from XMLFILETABLE.Can someone help me write PL/SQL code so that the procedure PUSHXMLFILEDATATOTABLE runs in parallel using DBMS_PARALLEL_EXECUTE.

We are using version 19c.

1

There are 1 answers

5
Connor McDonald On

DBMS_PARALLEL_EXECUTE is a simple 'divide-and-conquer' approach, so its up to you to ensure that the jobs that will be created (and run) do not have an overlap.

In terms of a simple example, if I had 100 employees to process, and each employee was numbered 1 to 100, and I currently have a program which does:

process_ALL_employees 
- cursor loops through all rows

I would need to alter that procedure to so process a subset of rows, eg

process_SOME_employees(start, end)
- cursor loops through rows
  where empno between start and end

Then use DBMS_PARALLEL_EXECUTE to build ranges to be processed, using the various CREATE_CHUNKS routines.

You might end up with something like

job1, start=1, end=10
job2, start=11, end=20
job3, start=21, end=30
...
job1, start=91, end=100

So DBMS_PARALLEL_EXECUTE will take care of

  • carving up the desired work into chunks
  • submitting them through the scheduler

but you'll need to the programs in place to process a range of rows based on some sort of start/end range.