PySpark OpenLineage configuration

451 views Asked by At

I am trying to get OpenLineage information from a pyspark program. As an MVP I'm trying to run spark locally on my machine (this works) and somehow log the openlineage messages that would be sent to an openlineage server. However, I don't seem to get this right. In the logs when running the spark-submit job nothing openlineage related is ever mentioned.

I have the following file openlineage_spark.py:

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("PySpark OpenLineage Example") \
        .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:log4j-openlineage.xml") \
        .config("openlineage.url", "http://localhost:5000")  \
        .config("openlineage.backend", "io.openlineage.spark.agent.backends.ConsoleLoggingBackend") \
        .config("openlineage.namespace", "test") \
        .getOrCreate()

    # Example PySpark job
    df = spark.read.text("some_input_file.txt")
    df.show()

    # Perform some transformation
    transformed_df = df.selectExpr("length(value) as length")
    transformed_df.show()

    # Write data
    transformed_df.write.mode('overwrite').csv("some_output_file.csv")

    spark.stop()

And I call this with:

spark-submit --jars openlineage-spark-1.4.1.jar --files log4j-openlineage.xml openlineage_spark.py

The result of this spark-submit is all the logging you would normally expect, the dataframes are printed etc. However, nothing is mentioned about the lineage.

The content of the xml files looks as follows:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Logger name="io.openlineage.spark.agent" level="info">
            <AppenderRef ref="Console"/>
        </Logger>
        <Root level="error">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

Which should log the openlineage stuff to the console?

Can anyone help me to set up an MVP in which I can clearly see something openlineage related being logged or send?

1

There are 1 answers

2
user238607 On BEST ANSWER

Here are the steps you need to follow to get OpenLineage working. It took me sometime to get it working properly.

Following this guide :

https://openlineage.io/docs/development/ol-proxy

  1. git clone https://github.com/OpenLineage/OpenLineage.git
  2. cd OpenLineage/proxy
  3. cd backend/
  4. ./gradlew build
  5. cp proxy.example.yml proxy.yml
  6. ./gradlew runShadow

This will start your proxy local OpenLineage server whose output you can see in the console. Make sure you have Java 11 installed and no other java version. Because it works only with Java 11.

Once the above server is started without any error. Run the following pyspark program. The job events will be sent to the above server through the http api backend and the server will output the events in its console output which you can see on the terminal.

from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd


spark = SparkSession.builder \
    .appName("OpenLineageExample") \
    .config("spark.jars.packages", "io.openlineage:openlineage-spark:1.4.1") \
    .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \
    .config('spark.openlineage.transport.type', 'http') \
    .config('spark.openlineage.transport.url', 'http://localhost:8080') \
    .config('spark.openlineage.transport.endpoint', '/api/v1/lineage') \
    .config('spark.openlineage.namespace', 'spark_integration') \
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO")

sc = spark.sparkContext
sqlContext = SQLContext(sc)

data1 = [

["00045b1b-0ac9-4dce", "2023-07-26T16:35:34Z", "1f036bb8-cac4-43c1-b29e-a7646884fe2e"],
["00045b1b-0ac9-4dce", "2023-07-26T17:27:38Z", "134b785e-e013-41b1-aabc-094a90b95482"],
["00045b1b-0ac9-4dce", "2023-07-26T18:04:16Z", "51fb0e53-2938-431c-8825-7f461849dfe3"],
["00045b1b-0ac9-4dce", "2023-07-26T18:32:46Z", "954a4f96-2c51-403b-9fd5-d07a7cdc35dd"],
["00045b1b-0ac9-4dce", "2023-07-26T18:40:18Z", "811a1336-27f3-4e8c-99cc-22f5debe21a3"],
["8eba-55a058fb4dd0f", "2023-07-20T10:35:34Z", "1f036bff-cac4-dddd-ddsa-a7646884fe2e"],
["8eba-55a058fb4dd0f", "2023-07-20T10:55:34Z", "23226bff-cac4-dddd-ddsa-a7646884fe2e"],

]
columns1 =["vehicleId", "tripStartDateTime", "correlationId"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
df1 = df1.withColumn("tripStartDateTime", F.to_timestamp(F.col("tripStartDateTime")))
print("Given dataframe")
df1.show(n=100, truncate=False)

print("schema here")
print(df1.schema)



@pandas_udf(ArrayType(ArrayType(StringType())), PandasUDFType.GROUPED_AGG)
def custom_sum_udf(col1_series: pd.Series, col2_series: pd.Series, col3_series: pd.Series) -> ArrayType(ArrayType(StringType())):
    concat_df = pd.concat([col1_series, col2_series, col3_series], axis=1)
    concat_df.columns = columns1 # declared before
    print("Process the vehicle Id")  # Do complex processing since concat_df is now a pandas dataframe.
    print(concat_df)

    max_column = concat_df["tripStartDateTime"].max()
    min_column = concat_df["tripStartDateTime"].min()

    print("max_column", max_column)
    print("min_column", min_column)
    all_result = [ [concat_df.iloc[0,0]], [str(max_column)], [str(min_column)]]

    return all_result

df_new = df1.groupby(F.col("vehicleId")).agg(custom_sum_udf(F.col("vehicleId"), F.col("tripStartDateTime"), F.col("correlationId")).alias("reduced_columns")).cache()
print("Printing the column sum, max, min")
df_new.show(n=100, truncate=False)

df_new_sep = df_new.withColumn("id_recieved", F.col("reduced_columns").getItem(0))
df_new_sep = df_new_sep.withColumn("max_over_timestamp", F.col("reduced_columns").getItem(1))
df_new_sep = df_new_sep.withColumn("min_over_timestamp", F.col("reduced_columns").getItem(2)).drop(F.col("reduced_columns"))
print("Printing the column sum, max, min")
df_new_sep.show(n=100, truncate=False)

Console output :

Tail end of the output on my terminal.

              } ], [ {
                "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
                "num-children" : 0,
                "name" : "correlationId",
                "dataType" : "string",
                "nullable" : true,
                "metadata" : { },
                "exprId" : {
                  "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
                  "id" : 2,
                  "jvmId" : "4dd5e48c-44ff-493a-bc78-9007f17cb957"
                },
                "qualifier" : [ ]
              } ] ],
              "child" : 0
            }, {
              "class" : "org.apache.spark.sql.execution.RDDScanExec",
              "num-children" : 0,
              "output" : [ [ {
                "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
                "num-children" : 0,
                "name" : "vehicleId",
                "dataType" : "string",
                "nullable" : true,
                "metadata" : { },
                "exprId" : {
                  "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
                  "id" : 0,
                  "jvmId" : "4dd5e48c-44ff-493a-bc78-9007f17cb957"
                },
                "qualifier" : [ ]
              } ], [ {
                "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
                "num-children" : 0,
                "name" : "tripStartDateTime",
                "dataType" : "string",
                "nullable" : true,
                "metadata" : { },
                "exprId" : {
                  "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
                  "id" : 1,
                  "jvmId" : "4dd5e48c-44ff-493a-bc78-9007f17cb957"
                },
                "qualifier" : [ ]
              } ], [ {
                "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
                "num-children" : 0,
                "name" : "correlationId",
                "dataType" : "string",
                "nullable" : true,
                "metadata" : { },
                "exprId" : {
                  "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
                  "id" : 2,
                  "jvmId" : "4dd5e48c-44ff-493a-bc78-9007f17cb957"
                },
                "qualifier" : [ ]
              } ] ],
              "rdd" : null,
              "name" : "ExistingRDD",
              "outputPartitioning" : {
                "product-class" : "org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning",
                "numPartitions" : 0
              },
              "outputOrdering" : [ ]
            } ]
          },
          "outputOrdering" : [ ]
        } ]
      },
      "spark_version" : {
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/1.4.1/integration/spark",
        "_schemaURL" : "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
        "spark-version" : "3.4.1",
        "openlineage-spark-version" : "1.4.1"
      },
      "processing_engine" : {
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/1.4.1/integration/spark",
        "_schemaURL" : "https://openlineage.io/spec/facets/1-1-0/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
        "version" : "3.4.1",
        "name" : "spark",
        "openlineageAdapterVersion" : "1.4.1"
      },
      "environment-properties" : {
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/1.4.1/integration/spark",
        "_schemaURL" : "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
        "environment-properties" : { }
      }
    }
  },
  "job" : {
    "namespace" : "spark_integration",
    "name" : "open_lineage_example.collect_limit",
    "facets" : { }
  },
  "inputs" : [ ],
  "outputs" : [ ]
}
127.0.0.1 - - [27/Oct/2023:10:33:13 +0000] "POST /api/v1/lineage HTTP/1.1" 200 0 "-" "Apache-HttpClient/4.5.14 (Java/11.0.20.1)" 4