How to apply multithreading to Bio::SeqIO translate code (Bioperl)?

212 views Asked by At

I am translating a fasta nucleotide file into protein sequences by this code

use Bio::SeqIO;
use Getopt::Long;

my ($format,$outfile) = 'fasta';

GetOptions(
    'f|format:s'  => \$format,
    'o|out|outfile:s' => \$outfile,
    );

my $oformat = 'fasta';
$file=$ARGV[0];
chomp $file;

# this implicity uses the <> file stream
my $seqin = Bio::SeqIO->new( -format => $format, -fh => \*ARGV);
my $seqout;
if( $outfile ) {
    $seqout = Bio::SeqIO->new( -format => $oformat, -file => ">$outfile" );
} else {
# defaults to writing to STDOUT
    $seqout = Bio::SeqIO->new( -format => $oformat );
}

    while( (my $seq = $seqin->next_seq()) ) {
            my $pseq = $seq->translate();
            $seqout->write_seq($pseq);
    }

I implement threads and threads::shared perl modules to achieve in other cases but I want to apply following code into previous task

use threads;
use threads::shared;
use List::Util qw( sum );
use YAML;
use constant NUM_THREADS =>100;

my @output :shared;

my $chunk_size = @data / NUM_THREADS;

my @threads;
for my $chunk ( 1 .. NUM_THREADS ) {
    my $start = ($chunk - 1) * $chunk_size;
    push @threads, threads->create(
        \&doOperation,
        \@data,
        $start,
        ($start + $chunk_size - 1),
        \@output,
    );
}
$_->join for @threads;

sub doOperation{
    my ($data, $start, $end, $output) = @_;

    my $id = threads->tid;

    print "$id ";

    for my $i ($start .. $end) {
        print "Thread [$id] processing row $i\n";

#THIS WHILE SHOULD BE MULTITHREADED

    while( (my $seq = $seqin->next_seq()) ) {
            my $pseq = $seq->translate();
            $seqout->write_seq($pseq);
    }

#THIS WHILE SHOULD BE MULTITHREADED

        sleep 1 if 0.2 > rand;
    }
    print "Thread done.\n";
    return;
}
print "\n$time\n";
my $time = localtime;
print "$time\n";

Threads are being created but somehow it can not process the fasta file. The fisrt code works fine without multi threading.

1

There are 1 answers

1
Sobrique On

I'm afraid I'm not going to rewrite your code for you, but I can give you some pointers on how to accomplish threading.

The thing you need to understand about perl threading is it's not a lightweight thread. You should spawn a number of threads equal to the parallelism, run them off a Thread::Queue and go from there.

You also need to avoid any non-thread-safe modules - you can use them if you're careful but that usually means instantiating them within the thread with require and import instead of use at the start of the program.

I would also suggest avoiding trying to do your output IO in parallel - return the thread results and coalesce them (sorting if necessary) in the 'main' thread (or spin off a single writer).

So I'd go with something like;

#!/usr/bin/env perl

use strict;
use warnings;

use threads;

use Thread::Queue;
use Storable qw ( freeze thaw );

my $NUM_THREADS = 16;    #approx number of cores.

my $translate_q         = Thread::Queue->new;
my $translate_results_q = Thread::Queue->new;


sub translate_thread {
   while ( my $item = translate_q->dequeue ) {
      my $seq  = thaw $item;
      my $pseq = $seq->translate();
      $translate_results_q->enqueue( freeze $pseq );

   }

}

threads->create( \&translate_thread ) for 1 .. $NUM_THREADS;

while ( my $seq => $seqin->next_seq ) {
   $translate_q->enqueue( freeze($seq) );
}
$translate_q->end;

$_->join for threads->list;
$translate_results_q->end;

while ( my $result = $translate_results_q->dequeue ) {
   my $pseg = thaw($result);
}

Note - this won't work as is, because it's missing merging with the rest of your ocde. But hopefully it illustrates how the queue and threading can work to get parallelism?

You pass around your objects using freeze and thaw from Storable, and use the parallelism to unpack them.

Don't go too mad on the number of threads - for primarily compute workloads (e.g. no IO) then a number of threads equal to the number of cores is about right. If they'll be blocking on IO, you can increase this number, but going past about double isn't going to do very much.

You can't really parallelise disk IO efficiently - it just doesn't work like that. So do that in the 'main' thread.