TMVar, but without the buffer?

87 views Asked by At

I'm trying to do communication between Haskell lightweight threads. Threads want to send each other messages for communication and synchronisation.

I was originally using TMVar for this, but I've just realised that the semantics are wrong: a TMVar will store one message in it internally, so positing a message to an empty TMVar won't block. It'll only block if you post a message to a full TMVar.

Can anyone suggest a similar STM IPC construct which:

  • will cause all writes to block until the message is consumed;
  • will cause all reads to block until a message is provided?

i.e. a zero-length pipe would be ideal; but I don't think BoundedChan would be happy if I gave it a capacity of 0. (Also, it's not STM.)

3

There are 3 answers

10
Fraser On

If I understand your problem correctly, I don't think you can, since the transactional guarantees mean that transaction A can't read from transaction B's write until transaction B is committed, at which point it can no longer block.

TMVar is the closest you're going to get if you're using STM. With IO, you may be able to build a structure which only completes a write when a reader is available (this structure may already exist, but I'm not aware of it).

0
Petr On

I'd suggest to reformulate the two requirements:

  • will cause all writes to block until the message is consumed;
  • will cause all reads to block until a message is provided.

The problem is with terms block and consumed/provided. With STM there is no notion of block, there is just retry, which has a different semantics: It restarts the current transaction - it doesn't wait until something happens (this could cause deadlocks). So we can't say "block until ...", we can only say something like "the transaction succeeds only when ...".

Similarly, what does "until a message is consumed/provided" mean? Since transactions are atomic, it can only be "until the transaction that consumed/provided a message succeeded".

So let's try to reformulate:

  • will cause all writes to retry until a transaction that consumes the message succeeds;
  • will cause all reads to retry until a transaction that provides a message succeeds.

But now the first point doesn't make sense: If a write retries, there is no message to be consumed, the transaction didn't pause, it's been discarded and started over - possibly producing a different message!

In other words: Any data can ever leave a STM transaction only when it succeeds (completes). This is by design - the transactions are always atomic from the point of view of the outside world / other transactions - you can never observe results of only a part of a transaction. You can never observe two transactions interacting.

So a 0-length queue is a bad analogy - it will never allow to pass any data though. At the end of any transaction, it'll have to have to be empty, so no data will ever pass through.

Nevertheless I believe it'll be possible to reformulate the requirements according to your goals and subsequently find a solution.

0
Daniel Wagner On

You say you would be happy with one side or the other being in IO rather than STM. So then it is not too hard to code this up. Let's start with the version that has receiving in IO. To make this happen, the receiver will have to initiate the handshake.

type SynchronousVar a = TChan (TMVar a)

send :: SynchronousVar a -> a -> STM a
receive :: SynchronousVar a -> IO a

send svar a = do
    tmvar <- readTChan svar
    putTMVar tmvar a

receive svar = do
    tmvar <- newEmptyTMVarIO
    atomically $ writeTChan svar tmvar
    atomically $ takeTMVar tmvar

A similar protocol can be written that has sending start the handshake.

type SynchronousVar a = TChan (a, TMVar ())

send :: SynchronousVar a -> a -> IO a
receive :: SynchronousVar a -> STM a

send svar a = do
    tmvar <- newEmptyTMVarIO
    atomically $ writeTChan svar (a, tmvar)
    atomically $ takeTMVar tmvar

receive svar = do
    (a, tmvar) <- readTChan svar
    putTMvar tmvar ()
    return a

Probably, if you really need synchronous communication, this is because you want two-way communication (i.e. the action that's running in IO wants to know something about the thread it's synchronizing with). It is not hard to extend the above protocol to pass off a tad more information about the synchronization (by adding it to the one-tuple in the former case or to the TMVar in the latter case).