Add unique id to rows in batches in Pyspark dataframe

66 views Asked by At

I have a PySpark dataframe in which I need to add new column with unique id in row batches. For eg. I need to generate and assign unique id to first set of 100 rows and then so on for each batch of 100.

How can I get this done efficiently ?

2

There are 2 answers

1
QuantumRifts On

i hope i did understand well your question.

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id", monotonically_increasing_id())

# This creates a new column 'batch_id' which assigns the same id for each batch of 100 rows
df = df.withColumn("batch_id", (df["id"] / 100).cast("integer"))

monotonically_increasing_id() will create a unique id for each row.

6
Vikas Sharma On

As you have not shared the schema of your dataframe, I am assuming your df has an "id" column. You can update it accordingly to any other column as per your requirement.

You can simply apply the row_number() window function as follows to get the desired result:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# 100 in the denominator is the batch size
df = df.withColumn(
    "unique_batch_id",
    ((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
    .cast("integer")
)

Update 1: As per your reply - you don't have an "id" like column so I have added a monotonically_increasing_id() to make the above code work properly and fulfil your requirement.

Update 2: So, as per the comments, you need uuid and not integers as the batch ID - therefore I came up with the following workaround - extending the previous code:

from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, monotonically_increasing_id, udf
import hashlib
import uuid

df = df.withColumn(
    "batch_id",
    ((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
    .cast("integer").cast("string")
)

def generate_uuid(batch_id):
    return str(uuid.UUID(bytes=hashlib.md5(batch_id.encode()).digest()))

uuid_udf = udf(generate_uuid)

df = df.withColumn("uuid", uuid_udf(df["batch_id"])).drop("batch_id")