Why does mpd_connection_clear_error () fail after MPD_ERROR_TIMEOUT from mpd_recv_idle ()?

320 views Asked by At

I'm trying to run an idle loop using libmpdclient, but already on the first idle call, I get to an apparently unrecoverable error state.

I'm passing false to the disable_timeout parameter of mpd_recv_idle, so that I can stop the loop from the outside (it will be run in a background thread), to ensure a clean shutdown procedure.

Here is my test code:

#include <string>
#include <stdexcept>
#include <memory>

#include <mpd/client.h>

typedef std::unique_ptr<mpd_connection, decltype(&mpd_connection_free)>
mpd_connection_ptr;

void
check_error (const mpd_connection_ptr &c, const std::string &s)
{
  if (mpd_connection_get_error (c.get ()) != MPD_ERROR_SUCCESS)
    {
      throw std::runtime_error (s);
    }
}

int
main (void)
{
  const std::string host = "127.0.0.1";
  const uint16_t port = 7701;
  const int timeout = 1 * 1000;
  mpd_connection_ptr c {mpd_connection_new (host.c_str (), port, timeout),
                        &mpd_connection_free
                       };
  if (c == nullptr)
    {
      throw std::runtime_error ("connection_new returned nullptr");
    }
  if (!mpd_send_idle (c.get ()))
    {
      throw std::runtime_error ("mpd_send_idle returned false");
    }
  check_error (c, "mpd_send_idle caused error status");
  auto idle = mpd_recv_idle (c.get (), false);
  if (idle == 0)
    {
      if (mpd_connection_get_error (c.get ()) == MPD_ERROR_TIMEOUT)
        {
          if (!mpd_connection_clear_error (c.get ()))
            {
              throw std::runtime_error ("mpd_connection_clear_error returned false");
            }
        }
      else
        {
          check_error (c, "mpd_recv_idle caused error status");
        }
    }
  return 0;
}

When I run this code (mpd is running on 127.0.0.1:7701, I checked with netstat), I get this result:

terminate called after throwing an instance of 'std::runtime_error'
  what():  mpd_connection_clear_error returned false

Why can't I clear the timeout error here, it seems like a recoverable situation or not?

1

There are 1 answers

0
Promi On BEST ANSWER

From studying the libmpdclient source code I think I can answer that myself.

A timeout is an unrecoverable error in the library design. That's why the disable_timeout parameter for mpd_recv_idle () is there in the first place.

Synchronous idle requests are expected to block "forever" (until MPD answers the request). This is incompatible with what I want, I will probably have to use the low level async interface to achieve what I want.

And here is my solution (with minimal error checking).

The program waits for the user to press ENTER and processes MPD idle messages in the background which can be interrupted every 200 ms.

What's missing:

  • return code parsing
  • idle message response parsing

Here is the code:

#include <string>
#include <stdexcept>
#include <memory>
#include <iostream>
#include <thread>
#include <chrono>

#include <netinet/in.h>
#include <netdb.h>
#include <strings.h>
#include <unistd.h>

#include <mpd/async.h>
// #include <mpd/client.h>

typedef std::unique_ptr<mpd_async, decltype(&mpd_async_free)>
mpd_async_ptr;

void
check_error (const mpd_async_ptr &c, const std::string &s)
{
  if (mpd_async_get_error (c.get ()) != MPD_ERROR_SUCCESS)
    {
      throw std::runtime_error (s);
    }
}

mpd_async_event
async_poll (const mpd_async *async, timeval *tv)
{
  int events = mpd_async_events (async);
  if (events == 0)
    {
      throw std::runtime_error ("mpd_async_events failed");
    }
  int fd = mpd_async_get_fd (async);
  fd_set rfds, wfds, efds;
  FD_ZERO(&rfds);
  FD_ZERO(&wfds);
  FD_ZERO(&efds);

  if (events & MPD_ASYNC_EVENT_READ)
    {
      FD_SET(fd, &rfds);
    }
  if (events & MPD_ASYNC_EVENT_WRITE)
    {
      FD_SET(fd, &wfds);
    }
  if (events & (MPD_ASYNC_EVENT_HUP|MPD_ASYNC_EVENT_ERROR))
    {
      FD_SET(fd, &efds);
    }

  int ret = select (fd + 1, &rfds, &wfds, &efds, tv);
  if (ret > 0)
    {
      if (!FD_ISSET(fd, &rfds))
        {
          events &= ~MPD_ASYNC_EVENT_READ;
        }
      if (!FD_ISSET(fd, &wfds))
        {
          events &= ~MPD_ASYNC_EVENT_WRITE;
        }
      if (!FD_ISSET(fd, &efds))
        {
          events &= ~(MPD_ASYNC_EVENT_HUP| MPD_ASYNC_EVENT_ERROR);
        }
      return (mpd_async_event) events;
    }
  return (mpd_async_event) 0;
}

int
socket_connect (const std::string &host, uint16_t port)
{
  int sockfd = socket (AF_INET, SOCK_STREAM, 0);
  hostent *server = gethostbyname (host.c_str ());
  sockaddr_in server_addr;
  bzero ((char *) &server_addr, sizeof (server_addr));
  server_addr.sin_family = AF_INET;
  bcopy ((char *) server->h_addr, (char *) &server_addr.sin_addr.s_addr,
         server->h_length);
  server_addr.sin_port = htons (port);
  if (::connect (sockfd, (struct sockaddr*) &server_addr,
                 sizeof (server_addr)) < 0)
    {
      throw std::string ("ERROR connecting");
    }
  return sockfd;
}

void
mpd_notify_thread_proc (bool &app_is_running)
{
  const std::string host = "127.0.0.1";
  const uint16_t port = 7701;

  auto sockfd = socket_connect (host, port);
  mpd_async_ptr async_ptr {mpd_async_new (sockfd), mpd_async_free};
  auto async = async_ptr.get ();
  if (async == nullptr)
    {
      throw std::runtime_error ("mpd_async_new failed");
    }

  while (app_is_running)
    {
      timeval tv;
      tv.tv_sec = 0;
      tv.tv_usec = 200 * 1000;
      auto events = async_poll (async, &tv);
      if (events != 0)
        {
          if (!mpd_async_io (async, (mpd_async_event) events))
            {
              throw std::runtime_error ("connection was closed");
            }

          char* line_ptr;
          while ((line_ptr = mpd_async_recv_line (async)) != nullptr)
            {
              std::cout << "recv: " << line_ptr << "\n";
              std::string line {line_ptr};
              if (line.find ("OK") == 0)
                {
                  if (!mpd_async_send_command (async, "idle", nullptr))
                    {
                      throw std::runtime_error ("mpd_async_send_command failed");
                    }
                }
            }
        }
    }
}

int
main(void)
{
  bool app_is_running = true;
  std::thread mpd_notify_thread =
    std::thread (
      [&] ()
  {
    mpd_notify_thread_proc (app_is_running);
  });


  std::string response;
  getline (std::cin, response);
  std::cout << "shutting down...\n";
  app_is_running = false;
  mpd_notify_thread.join ();
}

An improved version can be interrupted "as fast as possible", by calling select () without a timeval and watching for a shutdown pipe () instead:

#include <string>
#include <stdexcept>
#include <memory>
#include <iostream>
#include <thread>
#include <chrono>

#include <netinet/in.h>
#include <netdb.h>
#include <strings.h>
#include <unistd.h>

#include <mpd/async.h>
// #include <mpd/client.h>

typedef std::unique_ptr<mpd_async, decltype(&mpd_async_free)>
mpd_async_ptr;

void
check_error (const mpd_async_ptr &c, const std::string &s)
{
  if (mpd_async_get_error (c.get ()) != MPD_ERROR_SUCCESS)
    {
      throw std::runtime_error (s);
    }
}

mpd_async_event
async_poll (const mpd_async *async, int *shutdown_fd)
{
  int events = mpd_async_events (async);
  if (events == 0)
    {
      throw std::runtime_error ("mpd_async_events failed");
    }
  int fd = mpd_async_get_fd (async);
  fd_set rfds, wfds, efds;
  FD_ZERO(&rfds);
  FD_ZERO(&wfds);
  FD_ZERO(&efds);

  if (events & MPD_ASYNC_EVENT_READ)
    {
      FD_SET(fd, &rfds);
    }
  if (events & MPD_ASYNC_EVENT_WRITE)
    {
      FD_SET(fd, &wfds);
    }
  if (events & (MPD_ASYNC_EVENT_HUP|MPD_ASYNC_EVENT_ERROR))
    {
      FD_SET(fd, &efds);
    }

  FD_SET(*shutdown_fd, &rfds);
  FD_SET(*shutdown_fd, &wfds);
  FD_SET(*shutdown_fd, &efds);

  int ret = select ((fd > *shutdown_fd ? fd : *shutdown_fd) + 1, &rfds, &wfds, &efds, NULL);
  if (ret > 0)
    {
      if (!FD_ISSET(fd, &rfds))
        {
          events &= ~MPD_ASYNC_EVENT_READ;
        }
      if (!FD_ISSET(fd, &wfds))
        {
          events &= ~MPD_ASYNC_EVENT_WRITE;
        }
      if (!FD_ISSET(fd, &efds))
        {
          events &= ~(MPD_ASYNC_EVENT_HUP| MPD_ASYNC_EVENT_ERROR);
        }
      if (FD_ISSET(*shutdown_fd, &rfds))
    {
      *shutdown_fd = 0;
    }
      if (FD_ISSET(*shutdown_fd, &wfds))
    {
      *shutdown_fd = 0;
    }
      if (FD_ISSET(*shutdown_fd, &efds))
    {
      *shutdown_fd = 0;
    }
      return (mpd_async_event) events;
    }
  return (mpd_async_event) 0;
}

int
socket_connect (const std::string &host, uint16_t port)
{
  int sockfd = socket (AF_INET, SOCK_STREAM, 0);
  hostent *server = gethostbyname (host.c_str ());
  sockaddr_in server_addr;
  bzero ((char *) &server_addr, sizeof (server_addr));
  server_addr.sin_family = AF_INET;
  bcopy ((char *) server->h_addr, (char *) &server_addr.sin_addr.s_addr,
         server->h_length);
  server_addr.sin_port = htons (port);
  if (::connect (sockfd, (struct sockaddr*) &server_addr,
                 sizeof (server_addr)) < 0)
    {
      throw std::string ("ERROR connecting");
    }
  return sockfd;
}

void
mpd_notify_thread_proc (int shutdown_fd)
{
  const std::string host = "127.0.0.1";
  const uint16_t port = 7701;

  auto sockfd = socket_connect (host, port);
  mpd_async_ptr async_ptr {mpd_async_new (sockfd), mpd_async_free};
  auto async = async_ptr.get ();
  if (async == nullptr)
    {
      throw std::runtime_error ("mpd_async_new failed");
    }

  while (shutdown_fd != 0)
    {
      auto events = async_poll (async, &shutdown_fd);
      if (shutdown_fd == 0)
    {
      break;
    }
      if (events != 0)
        {
          if (!mpd_async_io (async, (mpd_async_event) events))
            {
              throw std::runtime_error ("connection was closed");
            }

          char* line_ptr;
          while ((line_ptr = mpd_async_recv_line (async)) != nullptr)
            {
              std::cout << "recv: " << line_ptr << "\n";
              std::string line {line_ptr};
              if (line.find ("OK") == 0)
                {
                  if (!mpd_async_send_command (async, "idle", nullptr))
                    {
                      throw std::runtime_error ("mpd_async_send_command failed");
                    }
                }
            }
        }
    }
}

int
main(void)
{
  int shutdown_pipe[2];
  pipe (shutdown_pipe);
  std::thread mpd_notify_thread = std::thread ([&] ()
  {
    mpd_notify_thread_proc (shutdown_pipe[0]);
  });

  std::string response;
  getline (std::cin, response);
  std::cout << "shutting down...\n";
  close (shutdown_pipe[1]);
  mpd_notify_thread.join ();
  close (shutdown_pipe[0]);
}

An even better solution, if you're willing to write your code in an event based style (using libuv and uvw):

#include <string>
#include <stdexcept>
#include <memory>
#include <iostream>

#include <uvw.hpp>

int
main(void)
{
  auto loop = uvw::Loop::getDefault ();
  if (loop == nullptr)
    {
      throw std::runtime_error ("loop init failed");
    }
  auto tcp = loop->resource<uvw::TcpHandle>();
  if (tcp == nullptr)
    {
      throw std::runtime_error ("tcp init failed");
    }

  tcp->once<uvw::ConnectEvent> ([] (const uvw::ConnectEvent &, uvw::TcpHandle &tcp) mutable
  {
    tcp.read ();
  });

  tcp->once<uvw::ErrorEvent> ([] (const uvw::ErrorEvent &, uvw::TcpHandle &) mutable
  {
    std::cerr << "Connection error\n";
  });

  std::string buf;
  tcp->on<uvw::DataEvent> ([&] (const uvw::DataEvent &event, uvw::TcpHandle &tcp) mutable
  {
    std::string data {event.data.get (), event.length};
    buf += data;

    std::string::size_type pos;
    while ((pos = buf.find ('\n')) != std::string::npos)
      {
    std::string line = buf.substr (0, pos);
    buf.erase (0, pos + 1);
    if (!line.compare (0, 2, "OK"))
      {
        const std::string idle = "idle\n";
        std::unique_ptr<char[]> ptr {new char[idle.size ()]};
        idle.copy (ptr.get (), idle.size ());
        tcp.write (std::move (ptr), idle.size ());
      }
    else
      {
        std::cout << line << "\n";
      }
      }
  });

  tcp->connect ("127.0.0.1", 7701);

  loop->run<uvw::Loop::Mode::DEFAULT> ();
}