I am trying to batch up my PySpark job (but this is not specific to PySpark) that processes data. Here is what I tried:
def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]
for items in batch(arr, 1):
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "...", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
querydatetime BETWEEN '2019-01-22' AND '2019-01-31'
AND querydestinationplace IN (%s)
""" % (",".join(items)))
flightsDf = flightsGDF.toDF()
flightsDf.createOrReplaceTempView("flights")
resultDf = spark.sql("""
SELECT
f.*, countryName, cityName, airportName, a.name AS agentName,
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
FROM flights f
LEFT JOIN agents a
ON cast(f.agent as bigint) = a.id
LEFT JOIN airports p
ON cast(f.querydestinationplace as bigint) = p.airportId
""")
df = resultDf.withColumn("querydatetime", regexp_replace(resultDf["querydatetime"], "-", "").cast("int"))
df = resultDf.withColumn("queryoutbounddate", regexp_replace(resultDf["queryoutbounddate"], "-", "").cast("int"))
df = resultDf.withColumn("queryinbounddate", regexp_replace(resultDf["queryinbounddate"], "-", "").cast("int"))
df = resultDf.withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = resultDf.withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
df = resultDf.withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = resultDf.withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))
print("===LOG:WRITING_RAW===")
df \
.write \
.mode("append") \
.partitionBy(["countryName", "querydatetime"]) \
.parquet("s3://...-glue/rawFlights")
print("===LOG:DONE_WRITING_RAW===")
df.createOrReplaceTempView("flights")
# GET DISTINCT DATASET
distinctKeysDf = resultDf.select(resultDf['key']).distinct
df.createOrReplaceTempView("distinctKeys")
def generate_date_series(start, stop):
return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))
# GET RELAVENT DATES DATASET
# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime.utcnow().date()
start = today - timedelta(days = 25) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")
spark.sql("SELECT explode(generate_date_series(start, stop)) FROM relaventDates").show()
print("===LOG:WRITING_EXPANDED===")
expandedKeyDatesDf = spark.sql("""
SELECT key, querydatetime
FROM relaventDates
CROSS JOIN distinctKeys
""")
print("===LOG:DONE_WRITING_EXPANDED===")
expandedKeyDatesDf \
.coalesce(1) \
.write \
.mode("append") \
.parquet("s3://...-glue/expanded")
expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")
cleanedFlightsDf = spark.sql("""
SELECT e.key AS master_key, e.querydatetime AS master_querydatetime, f.*
FROM expandedKeyDates e
LEFT JOIN flights f
ON e.key = f.key
AND e.querydatetime = f.querydatetime
ORDER BY e.key, e.querydatetime
""")
print("===LOG:WRITING_CLEANED===")
cleanedFlightsDf \
.write \
.mode("append") \
.partitionBy(["countryName", "querydatetime"]) \
.parquet("s3://...-glue/cleanedFlights")
print("===LOG:DONE_WRITING_CLEANED===")
print("===LOG:DONE BATCH %s" % (batch))
This always seem to cause Python to crash, I think due to it running out of memory. I am guessing the batches are all run in parallel thus causing this to happen? I am not familiar with Python but does the for
loop and yield
not wait until the previous iteration finishes before continuing with the next?
I am guessing they are all run in parallel or async because the logs seem interleaved