I'm trying to understand how to reorganize a program which I would previously have written as a sequence of state transitions:
I have some business logic:
type In = Long
type Count = Int
type Out = Count
type S = Map[Int, Count]
val inputToIn: String => Option[In]
= s => try Some(s.toLong) catch { case _ : Throwable => None }
def transition(in: In): S => (S, Out)
= s => { val n = s.getOrElse(in, 0); (s + (in -> n+1), n+1) }
val ZeroOut: Out = 0
val InitialState: S = Map.empty
With these I wish to construct a program to pass in some initial State (an empty Map), read input from stdin, convert it to In, run the state transition and output the current state S and the output Out to stdout.
Previously, I would have done something like this:
val runOnce = StateT[IO, S, Out](s => IO.readLn.map(inputToIn) flatMap {
case None => IO((s, ZeroOut))
case Some(in) => val (t, o) = transition(in)(s)
IO.putStrLn(t.toString) |+| IO.putStrLn(o.toString) >| IO((t, o))
})
Stream.continually(runOnce).sequenceU.eval(InitialState)
However, I'm really struggling to see how to connect this approach (a stream of state transitions) with scalaz-stream. I started with this:
type Transition = S => (S, Out)
val NoTransition: Transition = s => (s, 0)
io.stdInLines.map(inputToIn).map(_.fold(NoTransition)(transition))
This is of type: Process[Task, Transition]. I don't really know where to go from there.
- How do I "pass in" my
InitialStateand run the program, threading in the outputSat each step as the inputSto the next one? - How do I get the values of
SandOutat each step and print them to stdout (assuming I can convert them to strings)?
In trying to use a single for-comprehension, I get similarly stuck:
for {
i <- Process.eval(Task.now(InitialState))
l <- io.stdInLines.map(inputToIn)
...
Any help is greatly appreciated!
I've got a bit further now.
type In_ = (S, Option[In])
type Out_ = (S, Out)
val input: Process[Task, In_]
= for {
i <- Process.emit(InitialState)
o <- io.stdInLines.map(inputToIn)
} yield (i, o)
val prog =
input.pipe(process1.collect[In_, Out_]) {
case (s, Some(in)) => transition(in)(s)
}).to(io.stdOutLines.contramap[Out_](_.toString))
Then
prog.run.run
It doesn't work: It seems like the state is not being threaded through the stream. Rather, at each stage, the initial state is being passed in.
Paul Chiusano suggested using the approach of process1.scan. So now I do this:
type In_ = In
type Out_ = (S, Out)
val InitialOut_ = (InitialState, ZeroOut)
val program =
io.stdInLines.collect(Function.unlift(inputToIn)).pipe(
process1.scan[In_, Out_](InitialOut_) {
case ((s, _), in) => transition(in)(s)
}).to(io.stdOutLines.contramap[Out_](_.shows))
There's a problem here: In this specific example, my Out type is a monoid, so my initial state can be created using its identity but this may not generally be the case. What would I do then? (I guess I could use Option but this seems like it's unnecessary.)