F# computation expression transparent state passing with Bind

633 views Asked by At

I have the following code that try to read a possibly incomplete data (image data, for example) from a network stream using usual MaybeBuilder:

let image = maybe {
    let pos = 2 //Initial position skips 2 bytes of packet ID
    let! width, pos = readStreamAsInt 2 pos
    let! height, pos = readStreamAsInt 2 pos
    let! data, pos = readStream (width*height) pos
    advanceInStream pos
    return {width = width; height = height; pixels = data}
}

So, readStream[asInt] [numBytes] [offset] function returns Some [data] or None if data has not arrived yet in a NetworkStream. advanceInStream function is executed when whole network packet is read.

I wonder if there is some way to write some custom computation expression builder to hide pos passing from its user, since it's always the same - I read some data and position in stream and pass it to the next read function as a last parameter.

P.S. MaybeBuilder used:

type MaybeBuilder() =    
    member x.Bind(d,f) = Option.bind f d
    member x.Return d = Some d
    member x.ReturnFrom d = d
    member x.Zero() = None
let maybe = new MaybeBuilder()

P.P.S

On second thought it seems I have to make pos mutable, because of possible "for" or "while" loops in reading. Simple let! works fine with pos Bind shadowing, but you can't hold onto immutability if you add reading in a loop, right? The task becomes trivial then.

1

There are 1 answers

3
Just another metaprogrammer On BEST ANSWER

@bytebuster is making good points of maintainability about custom computation expressions but I still thought I demonstrate how to combine the State and Maybe monad into one.

In "traditional" languages we have good support for composing values such as integers but we run into problems when developing parsers (Producing values from a binary stream is essentially parsing). For parsers we would like to compose simple parser functions into more complex parser functions but here "traditional" languages often lack good support.

In functional languages functions are as ordinary as values and since values can be composed obviously functions can be as well.

First let's define a StreamReader function. A StreamReader takes a StreamPosition (stream + position) and produces an updated StreamPosition and a StreamReaderResult (the read value or a failure).

type StreamReader<'T> = 
  StreamReader of (StreamPosition -> StreamPosition*StreamReaderResult<'T>)

(This is the most important step.)

We like to be able to compose simple StreamReader functions into more complex ones. A very important property we want to maintain is that the compose operation is "closed" under StreamReader meaning that result of composition is a new StreamReader which in turn can be composed endlessly.

In order to read an image we need to read the width & height, compute the product and read the bytes. Something like this:

let readImage = 
  reader {
    let! width  = readInt32 
    let! height = readInt32 
    let! bytes  = readBytes (width*height)

    return width, height, bytes
  }

Because of composition being closed readImage is a StreamReader<int*int*byte[]>.

In order to be able to compose StreamReader like above we need to define a computation expression but before we can do that we need to define the operation Return and Bind for StreamReader. It turns out Yield is good to have as well.

module StreamReader =
  let Return v : StreamReader<'T> =
    StreamReader <| fun sp -> 
      sp, (Success v)

  let Bind (StreamReader t) (fu : 'T -> StreamReader<'U>) : StreamReader<'U> =
    StreamReader <| fun sp -> 
      let tsp, tr = t sp
      match tr with
      | Success tv ->
        let (StreamReader u) = fu tv
        u tsp
      | Failure tfs -> tsp, Failure tfs

  let Yield (ft : unit -> StreamReader<'T>) : StreamReader<'T> =
    StreamReader <| fun sp -> 
      let (StreamReader t) = ft ()
      t sp

Return is trivial as the StreamReader should return the given value and don't update the StreamPosition.

Bind is a bit more challenging but describes how to compose two StreamReader functions into a new one. Bind runs the first StreamReader function and checks the result, if it's a failure it returns a failure otherwise it uses the StreamReader result to compute the second StreamReader and runs that on the update stream position.

Yield just creates the StreamReader function and runs it. Yield is used by F# when building computation expressions.

Finally let's create the computation expression builder

type StreamReaderBuilder() =
  member x.Return v   = StreamReader.Return v
  member x.Bind(t,fu) = StreamReader.Bind t fu
  member x.Yield(ft)  = StreamReader.Yield ft

let reader = StreamReaderBuilder ()

Now we built the basic framework for combining StreamReader functions. In addition we would we need to define the primitive StreamReader functions.

Full example:

open System
open System.IO

// The result of a stream reader operation is either
//  Success of value
//  Failure of list of failures
type StreamReaderResult<'T> =
  | Success of 'T
  | Failure of (string*StreamPosition) list

and StreamPosition =
  {
    Stream    : byte[]
    Position  : int
  }

  member x.Remaining = max 0 (x.Stream.Length - x.Position)

  member x.ReadBytes (size : int) : StreamPosition*StreamReaderResult<byte[]> =
    if x.Remaining < size then
      x, Failure ["EOS", x]
    else
      let nsp = StreamPosition.New x.Stream (x.Position + size)
      nsp, Success (x.Stream.[x.Position..(x.Position + size - 1)])

  member x.Read (converter : byte[]*int -> 'T) : StreamPosition*StreamReaderResult<'T> =
    let size = sizeof<'T>
    if x.Remaining < size then
      x, Failure ["EOS", x]
    else
      let nsp = StreamPosition.New x.Stream (x.Position + size)
      nsp, Success (converter (x.Stream, x.Position))

  static member New s p = {Stream = s; Position = p;}

// Defining the StreamReader<'T> function is the most important decision
//   In this case a stream reader is a function that takes a StreamPosition 
//   and produces a (potentially) new StreamPosition and a StreamReadeResult
type StreamReader<'T> = StreamReader of (StreamPosition -> StreamPosition*StreamReaderResult<'T>)

// Defining the StreamReader CE
module StreamReader =
  let Return v : StreamReader<'T> =
    StreamReader <| fun sp -> 
      sp, (Success v)

  let Bind (StreamReader t) (fu : 'T -> StreamReader<'U>) : StreamReader<'U> =
    StreamReader <| fun sp -> 
      let tsp, tr = t sp
      match tr with
      | Success tv ->
        let (StreamReader u) = fu tv
        u tsp
      | Failure tfs -> tsp, Failure tfs

  let Yield (ft : unit -> StreamReader<'T>) : StreamReader<'T> =
    StreamReader <| fun sp -> 
      let (StreamReader t) = ft ()
      t sp

type StreamReaderBuilder() =
  member x.Return v   = StreamReader.Return v
  member x.Bind(t,fu) = StreamReader.Bind t fu
  member x.Yield(ft)  = StreamReader.Yield ft

let reader = StreamReaderBuilder ()

let read (StreamReader sr) (bytes : byte[]) (pos : int) : StreamReaderResult<'T> =
  let sp    = StreamPosition.New bytes pos
  let _, sr = sr sp
  sr

// Defining various stream reader functions
let readValue (converter : byte[]*int -> 'T) : StreamReader<'T> =
  StreamReader <| fun sp -> sp.Read converter

let readInt32 = readValue BitConverter.ToInt32
let readInt16 = readValue BitConverter.ToInt16
let readBytes size : StreamReader<byte[]> = 
  StreamReader <| fun sp -> 
    sp.ReadBytes size

let readImage = 
  reader {
    let! width  = readInt32 
    let! height = readInt32 
    let! bytes  = readBytes (width*height)

    return width, height, bytes
  }

[<EntryPoint>]
let main argv = 
  // Sample byte stream
  let bytes   = [|2;0;0;0;3;0;0;0;1;2;3;4;5;6|] |> Array.map byte
  let result  = read readImage bytes 0

  printfn "%A" result

  0