Duplicate Key in index of Deedle Series

649 views Asked by At

I have a list of events that occur in a system. My goal is to take the list of events and create a sliding window of the series to determine rate event occurrences. The events are loaded into the events list from an application outside of this scope of the issue.

Because the system can receive events from multiple sources at the same time, some of the event occurrence timestamps (the value I am using as a key for the series) are the same. What is the proper way to achieve this?

This is the error I get:

An unhandled exception of type 'System.ArgumentException' occurred in Deedle.dll

Additional information: Duplicate key '6/12/2015 3:14:43 AM'. Duplicate keys are not allowed in the index.

My code:

let mutable events = new ResizeArray<StreamEvent>()
let getSeries =
    let eventsKvp = events |>  Seq.map(fun(event) -> new KeyValuePair<DateTime,StreamEvent>(event.OccuredAt,event))
        let series = Series(eventsKvp)
    series |> Series.windowDist (TimeSpan(0, 0, 0,30))

Update #1

What isn't depicted here is some C# code which instantiates some of the F# Stream objects and adds events via the Stream.ProcessEvent method. That code is unimportant to the issue I am experiencing here.

I am no longer getting the duplicate key issue, but am getting a Additional information: Floating window aggregation and chunking is only supported on ordered indices. error.

Update #2 I needed to use sortByKey instead of sort.

Here is my F# code:

namespace Storck.Data
open System
open System.Collections.Generic
open Deedle

type EventType =
    | ClientConnected
    | ClientDisconnect

type Edge(id:string,streamId:string) = 
    member this.Id = id
    member this.StreamId = streamId
    member this.Edges =  new ResizeArray<Edge>() 

type StreamEvent(id:string,originStreamId:string,eventType:EventType,ocurredAt:DateTime) = 
    member this.Id = id
    member this.Origin = originStreamId
    member this.EventType = eventType
    member this.OccuredAt = ocurredAt
    override this.Equals(o) =
        match o with
        | :? StreamEvent as sc -> this.Id = sc.Id
        | _ -> false
    override this.GetHashCode() =
        id.GetHashCode()
    interface System.IComparable with
        member this.CompareTo(o) =
            match o with
            | :? StreamEvent as sc -> compare this.Id sc.Id
            | _ -> -1

type Client(id:string) = 
    member this.Id=id
type Key = 
  | Key of DateTime * string
  static member (-) (Key(a, _), Key(b, _)) = a - b
  override x.ToString() = let (Key(d, s)) = x in d.ToString() + ", " + s

  type Stream(id:string, origin:string) = 
    let mutable clients = new   ResizeArray<Client>()
    let mutable events = new ResizeArray<StreamEvent>()

    member this.Events =  clients.AsReadOnly()
    member this.Clients = clients.AsReadOnly()
    member this.Id = id
    member this.Origin = origin
    member this.Edges =  new ResizeArray<Edge>() 
    member this.ProcessEvent(client:Client,event:StreamEvent)  =  
        match event.EventType with
            |EventType.ClientConnected -> 
                events.Add(event)
                clients.Add(client)
                true
            |EventType.ClientDisconnect -> 
                events.Add(event)
                let clientToRemove = clients |> Seq.find(fun(f)-> f.Id = client.Id)
                clients.Remove(clientToRemove)
    member this.GetSeries() =       
        let ts = series [ for e in events -> Key(e.OccuredAt, e.Id) => e ]
        ts |> Series.sortByKey |> Series.windowDist (TimeSpan(0, 0, 0,30))
1

There are 1 answers

13
Tomas Petricek On BEST ANSWER

One of the design decisions we made in Deedle is that a series can be treated as a continuous series (rather than a sequence of events) and so Deedle does not allow duplicate keys (which make sense for events but not for time series).

I wish there was a nicer support for things like your scenario - it is something we are thinking about for the next version, but I'm not sure how to best do this.

As Fyodor suggests in the comments, you can use unique index that consists of the date together with something (either source or just an ordinal index).

If you define the - operator on your key, then you can even use the windowDist function:

type StreamEvent = { OccuredAt : DateTime; Source : string; Value : int }

/// A key combines date with the source and defines the 
/// (-) operator which subtracts the dates returning TimeSpan
type Key = 
  | Key of DateTime * string
  static member (-) (Key(a, _), Key(b, _)) = a - b
  override x.ToString() = let (Key(d, s)) = x in d.ToString() + ", " + s

Now we can create a bunch of sample events:

let events = 
  [ { OccuredAt = DateTime(2015,1,1,12,0,0); Source = "one"; Value = 1 }
    { OccuredAt = DateTime(2015,1,1,12,0,0); Source = "two"; Value = 2 }
    { OccuredAt = DateTime(2015,1,1,13,0,0); Source = "one"; Value = 3 } ]

Here, I'll use built-in series function with the Deedle => operator to create series that maps the keys to values:

let ts = series [ for e in events -> Key(e.OccuredAt, e.Source) => e ]

And we can even use the windowDist function because the key type supports -!

ts |> Series.windowDist (TimeSpan(0, 0, 0,30))