When I try this :
cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).getOrCreate()
lines = spark.readStream.load(format='socket', host='localhost', port=9999,
schema=StructType(StructField('value', StringType, True)))
words = lines.groupBy('value').count()
query = words.writeStream.format('console').outputMode("complete").start()
query.awaitTermination()
Then I get some error :
AssertionError: dataType should be DataType
And I search the source code in ./pyspark/sql/types.py at line 403:
assert isinstance(dataType, DataType), "dataType should be DataType"
But StringType based on AtomicType not DataType
class StringType(AtomicType):
"""String data type.
"""
__metaclass__ = DataTypeSingleton
So is there a mistake?
In Python
DataTypes
are not used as singletons. When creatingStructField
you have have to use an instance. AlsoStructType
requires a sequence ofStructField
:Nevertheless this is completely pointless here. Schema of
TextSocketSource
is fixed and cannot be modified with schema argument.