Is there an existing way to serialize invocations of a boost::signals2 signal?

1.2k views Asked by At

I would like to serialize multithreaded invocations of a boost::signals2 signal in order to make sure that notifications about state changes from an object arrive at slots in a well defined order.

Background

I have an object with an internal state in a multithreaded program. Some parts of the internal state are interesting for other parts of the program, and the object exposes state changes by using a boost::signals2 signal, similar to this:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
        }
        m_OnStateChanged(newState);
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
};

Problem

If there are multiple concurrent invocations of the OnEvent handler, this can potentially lead to listeners being notified about state changes in another order than the changes actually took place. The state itself is protected by a mutex like above, so the actual state is welldefined. However, the mutex cannot be held across the call to the signal, as this could lead to deadlocks. This means that the actual invocations of the signal might happen in any order, whereas I would require them to be called in the same order as the state changes have actually taken place.

One way to handle this problem would be to remove the State from the signal and just notify listeners that the state has changed. They could then query the object for its state and would get the state that the object had when the signal was fired or a later state. In my scenario, the listeners need to be informed about all state changes so this method won't work here.

My next approach would be something like the following:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        boost::unique_future<void> waitForPrevious;
        boost::shared_ptr<boost::promise<void> > releaseNext;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
            waitForPrevious = m_CurrentInvocation->get_future();
            m_CurrentInvocation.reset(new boost::promise<void>());
            releaseNext = m_CurrentInvocation;
        }
        // Wait for all previous invocations of the signal to finish
        waitForPrevious.get();

        // Now it is our turn to invoke the signal
        // TODO: use try-catch / scoped object to release next if an exception is thrown
        OnStateChanged(newState);

        // Allow the next state change to use the signal
        releaseNext->set_value();
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
    // Initialized with a "fulfilled" promise in the constructor
    // or do special handling of initially empty promise above
    boost::shared_ptr<boost::promise<void> > m_CurrentInvocation;
};

I haven't tried the above code, so it might be littered with bugs and compilation errors, but it should be possible to deduce what I am after. My gut feeling tells me I'm not the first one to encounter this type of problem and I prefer using tried and tested code to my own... :) So my question is really:

Is there a preexisting way to achieve serialized invocations of a boost::signals2 signal (such as built into the signals2 library or a common pattern)?

1

There are 1 answers

3
user1202136 On

I propose the following solution. Create a queue of pending signals and have a separate thread dispatch them. The code would roughly look as follows:

class ObjectWithState {
private:
    bool running;
    std::queue<State> pendingSignals;
    boost::condition_variable cond;
    boost::mutex mut;

    void dispatcherThread()
    {
        while (running)
        {
            /* local copy, so we don't need to hold a lock */
            std::vector<State> pendingSignalsCopy;

            /* wait for new signals, then copy them locally */
            {
                boost::unique_lock<boost::mutex> lock(mut);
                cond.wait(mut);
                pendingSignalsCopy = pendingSignals;
                pendingSignals.clear();
            }

            /* dispatch */
            while (!pendingSignalsCopy.empty())
            {
                State newState = pendingSignalsCopy.front();
                OnStateChanged(newState);
                pendingSignalsCopy.pop();
            }
        }
    }

public:
    void OnEvent()
    {
        State newState;
        ...

        /* add signal to queue of pending signals and wake up dispatcher thread */
        {
            boost::unique_lock<boost::mutex> lock(mut);
            pendingSignals.push(state);
            cond.notify_all();
        }
    }
};