Remove duplicates on column based in apache beam java sdk

1.1k views Asked by At

How do I remove multiple occurrences of row based on SessionId in apache beam java skd. I have tried with Distinct as well as Deduplicate but that takes entire row based and removes.

import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.values.PCollection;

public class DistinctExample {
    
    public static void main(String[] s) {
        Pipeline p = Pipeline.create();
        PCollection<String> input = p.apply(TextIO.read().from("C:\\ApacheBeam\\Beam\\distinct\\user.csv"));
        PCollection<String> unique = input.apply(Distinct.<String>create());
        unique.apply(TextIO.write().to("C:\\ApacheBeam\\Beam\\distinct\\dis_user.csv").withNumShards(1).withSuffix(".csv"));
        p.run();
    }

}

user.csv as below

SessionId,UserId,UserName,VideoId,Duration,StartTime,Gender,Phone
1001,1,James,1,500,2011-11-11T10:00:00Z,M,1111111111
1002,2,Leena,2,500,2011-11-12T10:00:00Z,F,1111111112
1003,3,James,2,1500,2011-12-12T10:00:00Z,M,11111113
1004,3,James,2,1500,2011-12-12T10:00:00Z,,1111111114
1005,3,James,2,1500,2011-12-12T10:00:00Z,O,1111111115
1005,3,abcd,2,1500,2011-12-12T10:00:00Z,O,1111111116
1004,3,def,2,1500,2011-12-12T10:00:00Z,,1111111117


Expected output
SessionId,UserId,UserName,VideoId,Duration,StartTime,Gender,Phone
1001,1,James,1,500,2011-11-11T10:00:00Z,M,1111111111
1002,2,Leena,2,500,2011-11-12T10:00:00Z,F,1111111112
1003,3,James,2,1500,2011-12-12T10:00:00Z,M,11111113```
1

There are 1 answers

1
Alexey Romanenko On

By default, Distinct transform uses the whole input element (row) for comparison. So, if you need to filter the duplicates based only on specific column, then you have to use Distinct.withRepresentativeValueFn(SerializableFunction) and provide a function that maps each element to a representative value.