Convert IEnumerable to IObservable with variable Period

275 views Asked by At

I am consuming 3-axis accelerometer data using Rx. I need to set up some unit tests. The data frames come in fast, with the median timespan between frames being 80ms, but on occasion it comes in at 120ms. Also, it is never exactly 80ms, but hovers in that range.

So I have created a class that subscribes to the IObservable and records the dataframes sequentially in a .csv file. One of the fields in the .csv file is the start time of the frame with the first frame having start time = 0.0.

Now I want to read that file and stream it again for the purpose of testing and development. I want to use the StartTime field as the schedule for when I will fire any given Accelerometer frame when testing.

I looked over the answers in this question, Scheduling a IEnumerable periodically with .NET reactive extensions but it seems only to address constant timespans.

Question: Is there already a canonical and preferred way to schedule frame pushes at irregular (but known) intervals in the Rx framework, or should I just roll my own somehow?

Edit 2: I am interested in something which could be as simple as:

IObservable<T> AsObservable(
   IEnumerable<T> source, Func<T, TimeSpan> getTimeDelta)
{
   var retVal = ColdObservableVaryingTime();
   foreach(var frame in source)
   {
       retVal.AddScheduled(getTimeDelta, frame);
   }
   return retVal;
}

Edit 1: what I call "frames" in this question, Rx documentation calls TState.

3

There are 3 answers

0
philologon On BEST ANSWER

The answer provided by James Lucas is a good pointer in the right direction, but the answer that succeded is more involved.

After I populate my IEnumerable with values from the csv file, I have to populate an array of

Recorded<Notification<AccelerometerFrame_raw>>

I then pass the array as parameter to scheduler CreateColdAbservable. After this has been accomplished, the cold observable is sitting there waiting to be started. In my particular situation, the code looks like this:

private TestScheduler sched {get; set;}
public IObservable<AccelerometerFrame_raw> DataStream { get; protected set; }
ctor()
{
   DataStream = SetupDeviceStream();
}
  private IObservable<AccelerometerFrame_raw> SetupDeviceStream()
  {
     var framesArray = 
       new Recorded<Notification<AccelerometerFrame_raw>>[allFrames.Count+1];
     int i = 0;
     long timeStamp = 0;
     foreach(var item in allFrames)
     {
        timeStamp += (long) (item.TimeStampSeconds * 1000.0);
        framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
           timeStamp, 
           Notification.CreateOnNext(item));
        i++;
     }
     framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
           timeStamp + 10, 
           Notification.CreateOnCompleted<AccelerometerFrame_raw>());

     sched = new TestScheduler();
     var stream =sched.CreateColdObservable(framesArray);
     return stream;
  }

//// when I am ready to start the cold observable, I call

sched.Start();

Help on putting this all together comes from

(Phil Haak) http://haacked.com/archive/2014/03/10/master-time-with-reactive-extensions/

and https://msdn.microsoft.com/en-us/library/hh229343(v=vs.103).aspx

It is also necessary to install another Nuget package:

http://www.nuget.org/packages/reactiveui-testing/

As Phil Haak says in the linked blog post, "Unfortunately, [TestScheduler] is a bit of a pain to use as-is which is why Paul Betts took it upon himself to write some useful TestScheduler extension methods"

2
James Lucas On

For this you can use the TestScheduler (get the Rx-Testing Nuget package). Using this class as your IScheduler implementation within your tests will allow you to schedule your sequence as the test scheduler allows you to create observable sequences by stating when each item should be pushed. You can then 'play' the scheduler or advance to a time etc and see what happened. Note that in this setting time is virtualised and for simplicity the TestScheduler deals in Ticks as its unit of time progression.

The Intro to RX site has a nice section on RX testing here: http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html

16
Enigmativity On

Here's another option for you. The Observable.Generate method is very powerful and it can be used to produce quite sophisticated sequences of values.

Here's how you could do it.

So, starting with a CSV file like this:

var csvLines = new []
{
    "A,0",
    "B,3",
    "C,4",
    "D,6",
};

(Which could be read like var csvLines = File.ReadAllLines(@"...");.)

You could then parse the lines:

var parsed =
(
    from line in csvLines
    let parts = line.Split(',')
    select new
    {
        item = parts[0],
        seconds = int.Parse(parts[1])
    }
).ToArray();

And then determine the seconds offset between each line in the CSV:

var data =
    parsed
        .Zip(parsed.Skip(1), (p0, p1) => new
        {
            p1.item,
            offset = p1.seconds - p0.seconds,
        })
        .ToArray();

Now you can create the observable:

var observable =
    Observable
        .Generate(
            0,
            n => n < data.Length,
            n => n + 1,
            n => data[n].item,
            n => TimeSpan.FromSeconds(data[n].offset))
        .StartWith(parsed[0].item);

When I ran through this code I got the correct timings from my source file.


I've updated my code given your class definition in the comments below:

IEnumerable<AccelerometerFrame_raw> frames = ...;

var data =
    frames
        .Zip(frames.Skip(1), (f0, f1) => new
        {
            f1,
            offset = f1.TimeStampSeconds - f0.TimeStampSeconds,
        })
        .ToArray();

IObservable<AccelerometerFrame_raw> observable =
    Observable
        .Generate(
            0,
            n => n < data.Length,
            n => n + 1,
            n => data[n].f1,
            n => TimeSpan.FromSeconds(data[n].offset))
        .StartWith(frames.First());