Fuzzy join with Levenshtein distance

2.1k views Asked by At

I have a table containing user names (~1 000 rows) called "potential_users" and another one called "actual_users" (~ 10 million rows). All records are exclusively made up of [a-z] characters, no white space. Additionally, I know that none of the potential_users are in the actual_users tables.

I would like to be able to calculate for each row in potential_users what is the closest record in actual_users based on the Levenshtein distance. For example:

| potential_users|
|----------------|
| user1          |
| kajd           |
| bbbbb          |

and

| actual_users |
|--------------|
| kaj          |
| bbbbbbb      |
| user         |

Would return:

| potential_users | actual_users | levenshtein_distance |
|-----------------|--------------|----------------------|
| user1           | user         | 1                    |
| kajd            | kaj          | 1                    |
| bbbbb           | bbbbbbb      | 2                    |

If the tables were short, I could make a cross join that would calculate for each record in potential_users the Levenshtein distance in actual_users and then return the one with the lowest value. However, in my case this would create an intermediary table of 1 000 x 10 000 000 rows, which is a little impractical.

Is there a cleaner way to perform such operation with creating a cross join?

2

There are 2 answers

0
Martin Traverso On BEST ANSWER

Unfortunately, there's no way to do it without a cross join. At the end of the day, every potential user needs to be tested against every actual user.

However, Trino (formerly known as Presto SQL)will execute the join in parallel across many threads and machines, so it can execute very quickly given enough hardware. Note that in Trino, the intermediate results are streamed from operator to operator, so there's no "intermediate table" with 10M x 1k rows for this query.

For a query like

SELECT potential, min_by(actual, distance), min(distance)
FROM (
    SELECT *, levenshtein_distance(potential, actual) distance
    FROM actual_users, potential_users
)
GROUP BY potential

This is the query plan:

                                                   Query Plan                                                   
----------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                                                            
     Output layout: [potential, min_by, min]                                                                    
     Output partitioning: SINGLE []                                                                             
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                              
     Output[potential, _col1, _col2]                                                                            
     │   Layout: [potential:varchar(5), min_by:varchar(7), min:bigint]                                          
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                
     │   _col1 := min_by                                                                                        
     │   _col2 := min                                                                                           
     └─ RemoteSource[1]                                                                                         
            Layout: [potential:varchar(5), min_by:varchar(7), min:bigint]                                       
                                                                                                                
 Fragment 1 [HASH]                                                                                              
     Output layout: [potential, min_by, min]                                                                    
     Output partitioning: SINGLE []                                                                             
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                              
     Aggregate(FINAL)[potential]                                                                                
     │   Layout: [potential:varchar(5), min:bigint, min_by:varchar(7)]                                          
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                
     │   min := min("min_1")                                                                                    
     │   min_by := min_by("min_by_0")                                                                           
     └─ LocalExchange[HASH] ("potential")                                                                       
        │   Layout: [potential:varchar(5), min_1:bigint, min_by_0:row(boolean, boolean, bigint, varchar(7))]    
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                             
        └─ RemoteSource[2]                                                                                      
               Layout: [potential:varchar(5), min_1:bigint, min_by_0:row(boolean, boolean, bigint, varchar(7))] 
                                                                                                                
 Fragment 2 [SOURCE]                                                                                            
     Output layout: [potential, min_1, min_by_0]                                                                
     Output partitioning: HASH [potential]                                                                      
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                              
     Aggregate(PARTIAL)[potential]                                                                              
     │   Layout: [potential:varchar(5), min_1:bigint, min_by_0:row(boolean, boolean, bigint, varchar(7))]       
     │   min_1 := min("levenshtein_distance")                                                                   
     │   min_by_0 := min_by("actual", "levenshtein_distance")                                                   
     └─ Project[]                                                                                               
        │   Layout: [actual:varchar(7), potential:varchar(5), levenshtein_distance:bigint]                      
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                             
        │   levenshtein_distance := levenshtein_distance("potential", "actual")                                 
        └─ CrossJoin                                                                                            
           │   Layout: [actual:varchar(7), potential:varchar(5)]                                                
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                          
           │   Distribution: REPLICATED                                                                         
           ├─ TableScan[memory:9, grouped = false]                                                              
           │      Layout: [actual:varchar(7)]                                                                   
           │      Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                     
           │      actual := 0                                                                                   
           └─ LocalExchange[SINGLE] ()                                                                          
              │   Layout: [potential:varchar(5)]                                                                
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                      
              └─ RemoteSource[3]                                                                                
                     Layout: [potential:varchar(5)]                                                             
                                                                                                                
 Fragment 3 [SOURCE]                                                                                            
     Output layout: [potential]                                                                                 
     Output partitioning: BROADCAST []                                                                          
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                              
     TableScan[memory:8, grouped = false]                                                                       
         Layout: [potential:varchar(5)]                                                                         
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                              
         potential := 0                                                                                         
                                                                                                                
                                                                                                                
(1 row)

In particular, for this section, as soon as a row is produced by the cross join, it is fed into the projection operator that calculates the Levenshtein distance between the two values and then into the aggregation, which only stores one group per "potential" user. Therefore, the amount of memory required by this query should be low.

     Aggregate(PARTIAL)[potential]                                                                              
     │   Layout: [potential:varchar(5), min_1:bigint, min_by_0:row(boolean, boolean, bigint, varchar(7))]       
     │   min_1 := min("levenshtein_distance")                                                                   
     │   min_by_0 := min_by("actual", "levenshtein_distance")                                                   
     └─ Project[]                                                                                               
        │   Layout: [actual:varchar(7), potential:varchar(5), levenshtein_distance:bigint]                      
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                             
        │   levenshtein_distance := levenshtein_distance("potential", "actual")                                 
        └─ CrossJoin                                                                                            
           │   Layout: [actual:varchar(7), potential:varchar(5)]                                                
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                          
           │   Distribution: REPLICATED 
0
eshirvana On

I think you cannot do it with a simple join , there is whole algorithm to calculate that. look at this article shows Levenshtein distance algorithm implementation in sql:

https://www.sqlteam.com/forums/topic.asp?TOPIC_ID=51540&whichpage=1