Background: I am using Azure Synapase to pipe data from an API to a json file. I am than using a PySpark Notebook to flatten that complex json so that I can load data into a SQL Database.
Comments I have spent hours trying to modify this PySpark Code to flatten this appropriately and have had zero luck, any help would be appreciated.
Structure of Complex JSON
My PySpark Code
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
dfGetReportData = spark.read.option("multiline", "true").option("mode", "PERMISSIVE").json('xxxxxxxxxxxxx')
#dfGetReportData.printSchema()
#dfGetReportData.show()
dfReportData = dfGetReportData.select("response.requestId", "response.code",explode("response.result.reportData").alias("reportData"))
#dfData.printSchema()
#dfData.show()
dfRentRoll = dfReportData.select(explode("reportData.current_residents").alias("current_residents"))
#dfRentRoll.printSchema()
dfCurrent_Residents = dfRentRoll.select("current_residents.property_name",
"current_residents.property",
"current_residents.lookup_code",
"current_residents.bldg_unit",
"current_residents.bldg",
"current_residents.unit",
"current_residents.floorplan_name",
"current_residents.unit_type",
"current_residents.space_option",
"current_residents.sqft",
"current_residents.unit_status",
"current_residents.unit_occupancy_type",
col("current_residents.unit_address").getItem(0).alias("unit_address"),
"current_residents.bed",
"current_residents.bath",
"current_residents.resident_name",
"current_residents.phone_number",
"current_residents.email",
"current_residents.occupants",
"current_residents.original_lease_start",
"current_residents.lease_id",
"current_residents.lease_status",
"current_residents.lease_occupancy_type",
"current_residents.move_in_date",
"current_residents.lease_start_date",
"current_residents.lease_end_date",
"current_residents.previous_lease_end_date",
"current_residents.move_out_date",
"current_residents.lease_term_name",
"current_residents.lease_term",
"current_residents.contract_length_months",
"current_residents.occupied_length_months",
"current_residents.market_rent"
)
#dfCurrent_Residents.show()
#dfCurrent_Residents.printSchema()
dfCurrent_Residents.write.mode('overwrite').parquet("abfss://[email protected]/Silver/entities/Reporting/RentRoll")
Error Message
AnalysisException Traceback (most recent call last)
Cell In [17], line 12
8 dfReportData = dfGetReportData.select("response.requestId", "response.code",explode("response.result.reportData").alias("reportData"))
9 #dfData.printSchema()
10 #dfData.show()
---> 12 dfRentRoll = dfReportData.select(explode("reportData.current_residents").alias("current_residents"))
13 #dfRentRoll.printSchema()
15 dfCurrent_Residents = dfRentRoll.select("current_residents.property_name",
16 "current_residents.property",
17 "current_residents.lookup_code",
(...)
91 "current_residents.military_rank_band"
92 )
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:2023, in DataFrame.select(self, *cols)
2002 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc]
2003 """Projects a set of expressions and returns a new :class:`DataFrame`.
2004
2005 .. versionadded:: 1.3.0
(...)
2021 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
2022 """
-> 2023 jdf = self._jdf.select(self._jcols(*cols))
2024 return DataFrame(jdf, self.sparkSession)
File ~/cluster-env/env/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
192 converted = convert_exception(e.java_exception)
193 if not isinstance(converted, UnknownException):
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: Can't extract value from reportData#191: need struct type but got string
The reason your getting this error is because some of the records inside
reportData
consists strings. So it takes all of the record type as string type, if you try to extract it you will get error.Due to second record which is having a string
Done
all other are stored with type string.So, to avoid this you can load this data using function
from_json
providing a standard schema for the columnreportData
schema.Below is the standard schema created on
reportData
Now you can use your further code for
dfCurrent_Residents
and write to parquet.Note: Some of the records in
reportData
becomes null if they doesn't match the standard schema, check them properly and use.