Spark Structured Streaming using sockets, set SCHEMA, Display DATAFRAME in console

1.8k views Asked by At

How can I set a schema for a streaming DataFrame in PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()

For example I need a table like :

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....

How can I set the header/schema to ['Name','lastName','PhoneNumber'] with their data types.

Also, Is it possible to display this table continuously, or say top 20 rows of the DataFrame. When I tried it I get the error

"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"

1

There are 1 answers

0
zero323 On BEST ANSWER

TextSocketSource doesn't provide any integrated parsing options. It is only possible to use one of the two formats:

  • timestamp and text if includeTimestamp is set to true with the following schema:

    StructType([
        StructField("value", StringType()),
        StructField("timestamp", TimestampType())
    ])
    
  • text only if includeTimestamp is set to false with the schema as shown below:

    StructType([StructField("value", StringType())]))
    

If you want to change this format you'll have to transform the stream to extract fields of interest, for example with regular expressions:

from pyspark.sql.functions import regexp_extract
from functools import partial

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)

lines.select(
    fields(idx=1).alias("name"),
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number")
)