perl multithreading: capturing stdio of subthread childs with "mixed" results

72 views Asked by At

I wrote a massively multithreaded application in perl which basically scans a file- or directory-structure for changes (either using inotify or polling). When it detects changes, it launches subthreads that execute programs with the changed files as an argument, according to a configuration.

This works quite nice so far, with the exception that my application also tries to capture stdout and stderr of the externally executed programs and write them to log files in a structured manner.

I am, however, experiencing an occasional but serious mixup of output here, in the way that every when and then (usually under heavy workload, of course, so that the normal tests always run fine) stdout from a program on thread A gets into the stdout pipe FH of another program running on thread B at the very same time.

My in-thread code to run the externally executed programs and capture the output from them looks like this:

my $out;
$pid = open($out, "( stdbuf -oL ".$cmd." | stdbuf -oL sed -e 's/^/:::LOG:::/' ) 2>&1 |") or xlog('async execution failed for: '.$cmd, LOG_LEVEL_NORMAL, LOG_ERROR);

# catch all worker output here
while(<$out>)
{
    if( $_ =~ /^:::LOG:::/ )
    {
        push(@log, $wname.':::'.$acnt.':::'.time().$_);
    } else {
        push(@log, $wname.':::'.$acnt.':::'.time().':::ERR:::'.$_);
    }
    if (time() - $last > 1)
    {
       mlog($acnt, @log);
       $last = time();
       @log = ();
    }
}
close($out); 
waitpid($pid, 0);
push(@log, $wname.':::'.$acnt.':::'.time().':::LOG:::--- thread finished ---');

stdbuf is being used here to suppress buffering delays whereever possible and the sed pipe is being used to avoid the need of handling multiple fds in the reader while still being able to separate normal output from errors. Captured log lines are being stuffed into a local array by the while loop and every other second contents of that array are handed over to a thread-safe global logging method using semaphores that makes sure nothing gets mixed up.

To avoid unneccesary feedback loops from you: I certainly have made sure (using debug output) that the output really is mixed up on the thread level already and is not a result of locking mistakes later in the output chain!

My Question is: how can it be, that the thread-locally defined $out FH from thread A does receive output that definitely comes from a totally different program running in thread B and therefor should end up in the separately defined thread-local $out FH of thread B? Did I make a grave mistake at some point here or is it just that perl threading is a mess? And, finally, what would be a recommended way to separate the data properly (and preferably in some elegant way)?


Update: due to popular demand I have added the full thread method here:

sub async_command {
    my $wname  = shift;
    my $cmd    = shift;
    my $acnt   = shift;
    my $delay  = shift;
    my $errlog = shift;
    my $last   = time();
    my $pid    = 0;
    my @log;
    my $out;

    push(@log, $wname.':::'.$acnt.':::'.$last.':::LOG:::--- thread started ---'.($delay ? ' (sleeping for '.$delay.' seconds)':''));
    push(@log, $wname.':::'.$acnt.':::'.$last.':::ERR:::--- thread started ---') if ($errlog);

    if ($delay) { sleep($delay); }

    # Start worker with output pipe. stdbuf prevents unwanted buffering
    # sed tags stdout vs stderr 
    $pid = open($out, "( stdbuf -oL ".$cmd." | stdbuf -oL sed -e 's/^/:::LOG:::/' ) 2>&1 |") or xlog('async execution failed for: '.$cmd, LOG_LEVEL_NORMAL, LOG_ERROR);

    # catch all worker output here
    while(<$out>)
    {
        if( $_ =~ /^:::LOG:::/ )
        {
            push(@log, $wname.':::'.$acnt.':::'.time().$_);
        } else {
            push(@log, $wname.':::'.$acnt.':::'.time().':::ERR:::'.$_);
        }
        if (time() - $last > 1)
        {
            mlog($acnt, @log);
            $last = time();
            @log = ();
        }
    }
    close($out);
    waitpid($pid, 0);
    push(@log, $wname.':::'.$acnt.':::'.time().':::LOG:::--- thread finished ---');
    push(@log, $wname.':::'.$acnt.':::'.time().':::ERR:::--- thread finished ---') if ($errlog);

    mlog($acnt, @log);

    byebye();
}

So... here you can see that @log as well as $out are thread-local variables. The xlog (global log) and mlog-methods (worker logs) actually use Thread::Queue for further processing. I just dont want to use it more than once a second per thread to avoid too much locking overhead.

I have duplicated the push(@log... statements into xlog() calls for debugging. Since the worker name $wname is somewhat tied to the $cmd executed and $acnt is a number unique for each thread, I came to see clearly that there is log output being read from the $out FH that definitely comes from a different $cmd than the one executed in this thread, while $acnt and $wname stay the ones that actually belong to the thread. Also I can see that these log lines then do NOT appear on the $out FH in the other thread where they should be.

0

There are 0 answers