How can I set a schema for a streaming DataFrame
in PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
For example I need a table like :
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
How can I set the header/schema to ['Name','lastName','PhoneNumber'] with their data types.
Also, Is it possible to display this table continuously, or say top 20 rows of the DataFrame
. When I tried it I get the error
"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"
TextSocketSource
doesn't provide any integrated parsing options. It is only possible to use one of the two formats:timestamp and text if
includeTimestamp
is set totrue
with the following schema:text only if
includeTimestamp
is set tofalse
with the schema as shown below:If you want to change this format you'll have to transform the stream to extract fields of interest, for example with regular expressions: