Linked Questions

Popular Questions

Python batching stuff causes crash/out of memory

Asked by At

I am trying to batch up my PySpark job (but this is not specific to PySpark) that processes data. Here is what I tried:

def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]

for items in batch(arr, 1):
    flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "...", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
        querydatetime BETWEEN '2019-01-22' AND '2019-01-31' 
        AND querydestinationplace IN (%s)
    """ % (",".join(items)))

    flightsDf = flightsGDF.toDF()
    flightsDf.createOrReplaceTempView("flights")

    resultDf = spark.sql("""
        SELECT 
            f.*, countryName, cityName, airportName, a.name AS agentName,
            CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
        FROM flights f
        LEFT JOIN agents a
        ON cast(f.agent as bigint) = a.id
        LEFT JOIN airports p
        ON cast(f.querydestinationplace as bigint) = p.airportId
    """)
    df = resultDf.withColumn("querydatetime", regexp_replace(resultDf["querydatetime"], "-", "").cast("int"))
    df = resultDf.withColumn("queryoutbounddate", regexp_replace(resultDf["queryoutbounddate"], "-", "").cast("int"))
    df = resultDf.withColumn("queryinbounddate", regexp_replace(resultDf["queryinbounddate"], "-", "").cast("int"))
    df = resultDf.withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
    df = resultDf.withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
    df = resultDf.withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
    df = resultDf.withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))

    print("===LOG:WRITING_RAW===")
    df \
        .write \
        .mode("append") \
        .partitionBy(["countryName", "querydatetime"]) \
        .parquet("s3://...-glue/rawFlights")
    print("===LOG:DONE_WRITING_RAW===")


    df.createOrReplaceTempView("flights")

    # GET DISTINCT DATASET
    distinctKeysDf = resultDf.select(resultDf['key']).distinct
    df.createOrReplaceTempView("distinctKeys")

    def generate_date_series(start, stop):
        return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

    spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))


    # GET RELAVENT DATES DATASET
    # CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
    today = datetime.utcnow().date()
    start = today - timedelta(days = 25) # TODO: CHANGE TO 90
    sevenDaysAgo = today - timedelta(days = 7)
    print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
    relaventDatesDf = spark.createDataFrame([
        Row(start=start, stop=sevenDaysAgo)
    ])
    relaventDatesDf.createOrReplaceTempView("relaventDates")
    spark.sql("SELECT explode(generate_date_series(start, stop)) FROM relaventDates").show()

    print("===LOG:WRITING_EXPANDED===")
    expandedKeyDatesDf = spark.sql("""
        SELECT key, querydatetime
        FROM relaventDates
        CROSS JOIN distinctKeys
    """)
    print("===LOG:DONE_WRITING_EXPANDED===")


    expandedKeyDatesDf \
        .coalesce(1) \
        .write \
        .mode("append") \
        .parquet("s3://...-glue/expanded")

    expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")

    cleanedFlightsDf = spark.sql("""
        SELECT e.key AS master_key, e.querydatetime AS master_querydatetime, f.*
        FROM expandedKeyDates e
        LEFT JOIN flights f
        ON e.key = f.key
        AND e.querydatetime = f.querydatetime
        ORDER BY e.key, e.querydatetime
    """)
    print("===LOG:WRITING_CLEANED===")
    cleanedFlightsDf \
        .write \
        .mode("append") \
        .partitionBy(["countryName", "querydatetime"]) \
        .parquet("s3://...-glue/cleanedFlights")
    print("===LOG:DONE_WRITING_CLEANED===")

    print("===LOG:DONE BATCH %s" % (batch))

This always seem to cause Python to crash, I think due to it running out of memory. I am guessing the batches are all run in parallel thus causing this to happen? I am not familiar with Python but does the for loop and yield not wait until the previous iteration finishes before continuing with the next?

I am guessing they are all run in parallel or async because the logs seem interleaved

Related Questions