Many times I have typical ETL code that looks like this

./call_some_api.py | ./extract_transform | ./load_to_some_sql

What if either of the first two scripts stop sending bytes because of some internal error that causes them to stall. I wish there was another program I could put before ./load_to_some_sql that will detect that 0 bytes has been sent in 5 minutes, and throw an exit code.

Curl has --speed-limit and --speed-time which do exactly this, but it's not generalized for other cli apps.

Is there a way to do this? Have a way to crash if the throughput hits a certain level on a pipe? I know state machines and other orchestration tools can do this, but in general if there's a way to do it from bash it would be helpful!

2

There are 2 answers

1
niry On BEST ANSWER

If you are interested in inactivity timeout, these can do it.

One liner (mentioned in comments). Adjust $t for inactivity seconds:

perl -e'$t=300;$SIG{ALRM}=sub{die"Timeout\n"};alarm$t;while(<>){print;alarm$t}'

More robust:

#include <sys/epoll.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <signal.h>

long secs = 300;
int stdin_mode, stdout_mode;

void my_exit(int code) {
    fcntl(STDIN_FILENO, F_SETFL, stdin_mode);
    fcntl(STDOUT_FILENO, F_SETFL, stdout_mode);
    exit(code);
}

char* progname = "";
void perror_die(char* msg) {
    dprintf(STDERR_FILENO, "%s: %s: %m\n", progname, msg);
    my_exit(EXIT_FAILURE);
}

int check(int ret, char *msg) {
    if (ret == -1) perror_die(msg);
    return ret;
}

void usage() {
    dprintf(STDERR_FILENO, "Usage:\n-s <secs> inactivity timeout in seconds (default: %ld seconds)\n", secs);
    exit(EXIT_SUCCESS);
}

int main(int argc, char* argv[]) {
    progname = argv[0];
    signal(SIGHUP, SIG_IGN);
    stdin_mode = fcntl(STDIN_FILENO, F_GETFL);
    stdout_mode = fcntl(STDOUT_FILENO, F_GETFL);

    int opt; 
    while((opt = getopt(argc, argv, "s:h")) != -1)
        switch(opt) {
            case 's':
                secs = strtol(optarg, NULL, 10);
                break;
            case 'h':
            case '?':
                usage();             
        }; 
    if (optind < argc) usage();

    int epfd = check(epoll_create(3), "epoll_create");

    struct epoll_event ev = { 
        .events = EPOLLIN | EPOLLET,
        .data.fd = STDIN_FILENO
    };

    check(epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev), "epoll_ctl");
    check(fcntl(STDIN_FILENO, F_SETFL, stdin_mode | O_NONBLOCK), "fcntl stdin");

    ev.events = 0;
    ev.data.fd = STDOUT_FILENO;
    int epout = !epoll_ctl(epfd, EPOLL_CTL_ADD, STDOUT_FILENO, &ev);
    if (epout)
        check(fcntl(STDOUT_FILENO, F_SETFL, stdout_mode | O_NONBLOCK), "fcntl stdout");

    char buf[4096];

    ssize_t b_read = 0, b_wrote = 0, bytes = 0;

    secs *= 1000;
    int epres;
    while((epres = epoll_wait(epfd, &ev, 1, secs)) == 1) {
        if (ev.events & EPOLLERR) {
            errno = EPIPE;
            break;
        }
        for(;;) {
            if (b_read == 0) {
                b_read = read(STDIN_FILENO, buf, sizeof(buf));
                if (b_read > 0) bytes += b_read;
                else if (b_read == -1) {
                    if (errno == EAGAIN) {
                        b_read = 0;
                        break;
                    } else perror_die("read");
                } else my_exit(EXIT_SUCCESS);
            }
            if (b_read) {
                int w = write(STDOUT_FILENO, buf + b_wrote, b_read - b_wrote);
                if (w != -1) b_wrote += w;
                else {
                    if (errno == EAGAIN && epout) {
                        ev.events = EPOLLOUT | EPOLLONESHOT;
                        ev.data.fd = STDOUT_FILENO;
                        check(epoll_ctl(epfd, EPOLL_CTL_MOD, STDOUT_FILENO, &ev), "epoll_ctl");
                        break;
                    } else perror_die("write");
                }
                if (b_wrote == b_read) b_read = b_wrote = 0;
            }
        }
    }
    if (epres) perror_die("event loop");
    dprintf(STDERR_FILENO, "%s: Timeout reached\n", progname);
    my_exit(EXIT_FAILURE);
}

If you are interested in measuring bytes/secs this can do it, on an interval: (calculation could be improved to keep last n secs bandwidth instead of an interval)

#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/timerfd.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <signal.h>

ssize_t min_bw = 1000;
long secs = 5;

int stdin_mode, stdout_mode;

void my_exit(int code) {
    fcntl(STDIN_FILENO, F_SETFL, stdin_mode);
    fcntl(STDOUT_FILENO, F_SETFL, stdout_mode);
    exit(code);
}

char* progname = "";
void perror_die(char* msg) {
    dprintf(STDERR_FILENO, "%s: %s: %m\n", progname, msg);
    my_exit(EXIT_FAILURE);
}

int check(int ret, char *msg) {
    if (ret == -1) perror_die(msg);
    return ret;
}

void usage() {
    dprintf(STDERR_FILENO, "Usage:\n-b <bytes> minimum bytes per (default: %ld bytes)\n-s <secs> seconds (default: %ld seconds)\n", min_bw, secs);
    exit(EXIT_SUCCESS);
}

int main(int argc, char* argv[]) {
    progname = argv[0];
    signal(SIGHUP, SIG_IGN);
    stdin_mode = fcntl(STDIN_FILENO, F_GETFL);
    stdout_mode = fcntl(STDOUT_FILENO, F_GETFL);

    int opt; 
    while((opt = getopt(argc, argv, "b:s:h")) != -1)
        switch(opt) {
            case 'b':
                min_bw = strtol(optarg, NULL, 10);
                break;
            case 's':
                secs = strtol(optarg, NULL, 10);
                break;
            case 'h':
            case '?':
                usage();             
        }; 
    if (optind < argc) usage();

    int epfd = check(epoll_create(3), "epoll_create");

    int tmfd = check(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK), "timerfd_create");
    struct itimerspec when = {{secs, 0}, {secs, 0}};
    check(timerfd_settime(tmfd, 0, &when, NULL), "timerfd_settime");

    struct epoll_event ev = { 
        .events = EPOLLIN | EPOLLET,
        .data.fd = tmfd
    };
    check(epoll_ctl(epfd, EPOLL_CTL_ADD, tmfd, &ev), "epoll_ctl");

    ev.data.fd = STDIN_FILENO;
    check(epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev), "epoll_ctl");
    check(fcntl(STDIN_FILENO, F_SETFL, stdin_mode | O_NONBLOCK), "fcntl stdin");

    ev.events = 0;
    ev.data.fd = STDOUT_FILENO;
    int epout = !epoll_ctl(epfd, EPOLL_CTL_ADD, STDOUT_FILENO, &ev);
    if (epout)
        check(fcntl(STDOUT_FILENO, F_SETFL, stdout_mode | O_NONBLOCK), "fcntl stdout");

    char tmbuf[64];
    char buf[4096];

    ssize_t b_read = 0, b_wrote = 0, bytes = 0;

    int epres;
    while((epres = epoll_wait(epfd, &ev, 1, -1)) == 1) {
        if (ev.events & EPOLLERR) {
            errno = EPIPE;
            break;
        }
        if (ev.data.fd == tmfd) {
            if (bytes < min_bw) {
                dprintf(STDERR_FILENO, "Too slow\n");
                my_exit(EXIT_FAILURE);
            }
            check(read(tmfd, tmbuf, sizeof(tmbuf)), "read timerfd");
            bytes = 0;
            continue;
        }
        for(;;) {
            if (b_read == 0) {
                b_read = read(STDIN_FILENO, buf, sizeof(buf));
                if (b_read > 0) bytes += b_read;
                else if (b_read == -1) {
                    if (errno == EAGAIN) {
                        b_read = 0;
                        break;
                    } else perror_die("read");
                } else my_exit(EXIT_SUCCESS);
            }
            if (b_read) {
                int w = write(STDOUT_FILENO, buf + b_wrote, b_read - b_wrote);
                if (w != -1) b_wrote += w;
                else {
                    if (errno == EAGAIN && epout) {
                        ev.events = EPOLLOUT | EPOLLONESHOT;
                        ev.data.fd = STDOUT_FILENO;
                        check(epoll_ctl(epfd, EPOLL_CTL_MOD, STDOUT_FILENO, &ev), "epoll_ctl");
                        break;
                    } else perror_die("write");
                }
                if (b_wrote == b_read) b_read = b_wrote = 0;
            }
        }
    }
    perror_die("event loop");
}

Both programs are for linux only.

0
ruakh On

Provided you have a newish version of Bash, the read builtin can optionally enforce a timeout [docs], which means that it's actually not hard to write a bit of pure Bash code that does exactly what you describe:

... upstream command ... \
| {
  while true ; do
    read -r -N 1 -t 300 char
    result=$?
    if (( result > 128 )) ; then
      echo 'Read timed out.' >&2
      exit 1
    elif (( result > 0 )) ; then
      exit 0
    elif [[ "$char" == '' ]] ; then
      printf '\0'
    else
      printf %s "$char"
    fi
  done
} \
| ... downstream command ...

(Note: the above reads one character at a time for simplicity's sake; if you expect a high volume of data to pass through this, then you may need to adjust it for better performance.)

But even though the above does exactly what you describe, I'm not sure whether it will really accomplish your goal, because the upstream command won't actually die until it tries to write something and gets SIGPIPE. So even after the above prints Read timed out, Bash will just keep hanging indefinitely, probably until the user hits Ctrl-C. (The same goes for niry's answer, of course.) That's a bit harder to fix; I guess when your command detects a timeout, it will need to find the process-IDs of the upstream commands, and use kill to proactively send them SIGPIPE (or some other signal of your choosing)?