Unique ID (UID) generation using pyspark across different data sources

389 views Asked by At

We are working on a use case to generate a unique ID (UID) for the Customers spanning across different systems/data sources. The unique ID will be generated using PII information such as email & phone no.

Problem Statement:

For example a Customer named as John Smith is doing a multiple transaction to purchase an item.

1st Transaction:

In the first transaction he has filled email as [email protected] and phone no. as 1234567890

Email Id=  [email protected] & phone no.  = 1234567890

2nd Transaction:

In the second transaction he has changed the email as [email protected] and phone no. is same i.e. 1234567890

Email Id=  [email protected] & phone no.  = 1234567890

3rd Transaction:

In the third transaction he has used his first email id i.e. [email protected] and phone no. has been changed to 2234567890

Email Id=  [email protected] & phone no.  = 2234567890

As we know from above 3 transaction John Smith is a single person doing all 3 purchases. Our requirement is to generate a unique ID (UID) for John Smith. In addition this will be continuous process for example lets say after a week, month or 6 month John Smith does a another transaction (4th Transaction)

4th Transaction:

In the fourth transaction he has changed the email to [email protected] and kept the same phone no. as 1234567890

Email Id=  [email protected] & phone no.  = 1234567890

In this case the 4th transaction should be tagged with the same unique (UID) generated on the basis of first 3 transaction because he is a same person but different email id this time.

There is also a possibility here that in Transaction 5 John Smith has purchased the item using altogether a new email id and phone no. as shown below. In this case system will generate a new unique UID as email and phone no. is different.

Transaction 5

Email Id= [email protected] and phone no. =3123456789

and in 6th transaction John has used a new phone no. 3123456789 but old email Id i.e. [email protected]

Transaction 6

Email Id= [email protected] and phone no. = 3123456789

Now in this case transaction 5 should not generate a new UID as he is the same person "John Smith" we got to know from transaction 6 by his old email id.

Data Volume:

The data volume we have for such Customers are in 100's GB.

Current Solution:

we are using Azure Databricks pyspark graph frame to solve this problem. Below is our solution approach.

Code Snippet:

from graphframes import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

customer_df = spark.table('default.customer_details').select(col('email').alias('id'),'phonenumber')
phone_df = customer_df.select(col('phonenumber').alias('id'),col('id').alias('phonenumber'))
final_df = customer_df.union(phone_df).dropDuplicates()
vertex = final_df.select('id')
edge = final_df.select(col('id').alias('src'),col('phonenumber').alias('dst'))
graph = GraphFrame(vertex, edge)
result = graph.stronglyConnectedComponents(maxIter=5)
newDf1 = result.withColumn("component",result.component.cast(StringType()))
hashdf1 = newDf1.withColumn("unique_id_gen", md5(newDf1.component)).dropDuplicates() 

ResultComponent from the above snippet will become our unique UID.

Challenges with the current solution:

As we get daily incremental data on all the new purchases made by the customers, we need to process the entire dataset, i.e. History + incremental dataset to generate a unique UID for new customers and assign the previously generated unique UID for the existing customers that have made a new transactions. Pipeline is currently unpredictable every time it fails with new exceptions

We need your recommendation & suggestions to fix this problem statement, we are also open to new design / architecture changes if required.

1

There are 1 answers

3
JarroVGIT On

If you are able to replay all transactions, you could consider building multiple K/V stores. In case of your first transaction, you will lookup the e-mail address (as key) and phone number (also as key). If none return, then create an UID and insert them into both K/V stores.

With the second transaction, do the same (check for existing email or phone number in KV stores). Only UID with corresponding phone number returns (as he changed email) and you can insert the new email address with the UID found with the phone number. Note that the Email K/V store now contains two keys with the same value (e.g. two email addresses with the same UID).

You will need to build in a check for when both email and phone number return a UID that is not the same. This represents that user A is using either an email address or phone number from another user B.

Using this approach, you will only need to process the incremental dataset you get in, rather than all of it. You do however, need to process in order of transactions (no parallelism).

EDIT:

With the added transactions 5 and 6, we run into a problem where previous transactions must be re-ID-ed based on data of newer transactions. In my opinion, the best way to deal with this is the current approach (e.g. do a full rebuild of the dataset using a graph, and match with previous ID's). I can think of one other approach, enhancing the K/V approach as set out in my original answer.

Rather than only storing raw key/value pairs, we will store keys (e.g. phone numbers or email addresses) with a dictionary value, containing:

  • ID
  • Timestamp of first transaction
  • List of matching counter parts (e.g. email addresses in case of phone KV store)

With every transaction, the steps are:

  • Check if phone or email already exists in respective KV store;
  • If exists in phone KV but not in email KV, insert into the email KV store with corresponding ID from phone KV
  • Update list of matching email in phone KV and matching phone in email KV

This works well for transaction 1 through 4. See below how the KV's look would look like after each transaction:

#After Transaction 1
email = {
    "[email protected]" :    { 'id' : 'A', 'ts' : 1, 'matched_phones' : ['1234567890',]}, #Trans 1, new entry assigned id A
}
phone = {
    "1234567890" :              { 'id' : 'A', 'ts' : 1, 'matched_email' : ['[email protected]',] }, #Trans 1, new entry assigned id A
}


#After Transaction 2:
email = {
    "[email protected]" :    { 'id' : 'A', 'ts' : 1, 'matched_phones' : ['1234567890',] }, 
    "[email protected]" :        { 'id' : 'A', 'ts' : 2, 'matched_phones' : ['1234567890',] }, #Trans 2, ID based on existing phone
}
phone = {
    "1234567890" :              { 'id' : 'A', 'ts' : 1, 'matched_email' : ['[email protected]', '[email protected]',] }, #Trans 2, added new matched email
}


#After Transaction 3:
email = {
    "[email protected]" :    { 'id' : 'A', 'ts' : 1, 'matched_phones' : ['1234567890', '2234567890'] }, #Trans 3: added new matching phone
    "[email protected]" :        { 'id' : 'A', 'ts' : 2, 'matched_phones' : ['1234567890',]} , 
}
phone = {
    "1234567890" :              { 'id' : 'A', 'ts' : 1, 'matched_email' : ['[email protected]', '[email protected]',] }, 
    "2234567890" :              { 'id' : 'A', 'ts' : 3, 'matched_email' : ['[email protected]',] }, #Trans 3, ID based on existing email
}


#After Transaction 4:
email = {
    "[email protected]" :    { 'id' : 'A', 'ts' : 1, 'matched_phones' : ['1234567890', '2234567890'] }, 
    "[email protected]" :        { 'id' : 'A', 'ts' : 2, 'matched_phones' : ['1234567890',]} , 
    "[email protected]" :       { 'id' : 'A', 'ts' : 4, 'matched_phones' : ['1234567890',] }, #Trans 4, ID based on existing phone
}
phone = {
    "1234567890" :              { 'id' : 'A', 'ts' : 1, 'matched_email' : ['[email protected]', '[email protected]', '[email protected]',] }, #Trans 4, added new matched email
    "2234567890" :              { 'id' : 'A', 'ts' : 3, 'matched_email' : ['[email protected]',] }, 
}

Now, we run into transaction 5, which will follow the same steps, ultimately resulting in a newly assigned ID (which is correct, in this point of time):

#After Transaction 5
email = {
    "[email protected]" :    { 'id' : 'A', 'ts' : 1, 'matched_phones' : ['1234567890', '2234567890'] }, 
    "[email protected]" :        { 'id' : 'A', 'ts' : 2, 'matched_phones' : ['1234567890',]} , 
    "[email protected]" :       { 'id' : 'A', 'ts' : 4, 'matched_phones' : ['1234567890',] },
    "[email protected]" :           { 'id' : 'B', 'ts' : 5, 'matched_phones' : ['3123456789',] }, #Trans 5, new ID because not yet used email/phone
}
phone = {
    "1234567890" :              { 'id' : 'A', 'ts' : 1, 'matched_email' : ['[email protected]', '[email protected]', '[email protected]',] }, 
    "2234567890" :              { 'id' : 'A', 'ts' : 3, 'matched_email' : ['[email protected]',] }, 
    "3123456789" :              { 'id' : 'B', 'ts' : 5, 'matched_email' : ['[email protected]',] }, #Trans 5, new ID because not yet used email/phone
}

Now we get to the exception that you've added to your question. When doing the same steps for Transaction 6, we will get two conflicting ID's back from our KV store. This is id A based on email, and id B based on phone. We can check which is the oldest one (based on 'ts') and see that the 'ts' of A is older (ts=1) than of B (ts=5). You might be tempted to change the ID on the phone entry, but before we do that we must be sure what the correct ID is. To do this, we must pull ALL corresponding phone numbers and email addresses (this is a recursive problem!). So, get all phone numbers from all email addresses matching with the 'conflicting' phone number entry and so fort, up to you have an exhaustive list of linked email addresses and phonenumbers. Then you take the ID of the oldest one and update all found entries with that ID. This approach prevents any hiccups with the following sequence of transactions:

T1: [email protected]        1234567890
T2: [email protected]            1234567890
T3: [email protected]        2234567890
T4: [email protected]           1234567890 #upto here it is the same
T5: [email protected]               3123456789 #new email and new phone
T6: [email protected]             3123456789 #new email and prev phone, would match to id B and new entry in email KV
T7: [email protected]         3123456789 #collision ID (A + B) but should be A, but would also need to change [email protected] from T6

Well, my brain is fried now, but now that I've typed it all out I might think this is actually a pretty solid approach. It is testable at least, which is important with these kinds of logic structures!