Sliding windows in RxSwift

776 views Asked by At

Coming from the RxJava background, I can not come up with a standard approach to implement sliding windows in RxSwift. E.g. I have the following sequence of events:

1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...

Let's imagine event emission happens twice in a second. What I want to be able to do is to transform this sequence into a sequence of buffers, each buffer containing last three seconds of data. Plus, each buffer is to be emitted once in a second. So the result would look like that:

[1,2,3,4,5,6], [3,4,5,6,7,8], [5,6,7,8,9,10], ...

What I would do in RxJava is I would use one of the overloads of the buffer method like so:

stream.buffer(3000, 1000, TimeUnit.MILLISECONDS)

Which leads exactly to the result I need to accomplish: sequence of buffers, each buffer is emitted once in a second and contains last three seconds of data.

I checked RxSwift docs far and wide and I did not find any overloads of buffer operator which would allow me to do that. Am I missing some non-obvious (for RxJava user, ofc) operator?

1

There are 1 answers

3
Daniel T. On BEST ANSWER

I initially wrote the solution using a custom operator. I have since figured out how it can be done with the standard operators.

extension ObservableType {

    func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        let trigger = Observable<Int>.timer(timeSpan, period: timeShift, scheduler: scheduler)
            .takeUntil(self.takeLast(1))

        let buffer = self
            .scan([Date: E]()) { previous, current in
                var next = previous
                let now = scheduler.now
                next[now] = current
                return next.filter { $0.key > now.addingTimeInterval(-timeSpan) }
        }

        return trigger.withLatestFrom(buffer)
            .map { $0.sorted(by: { $0.key <= $1.key }).map { $0.value } }
    }
}

I'm leaving my original solution below for posterity:


Writing your own operator is the solution here.

extension ObservableType {

    func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        return Observable.create { observer in
            var buf: [Date: E] = [:]
            let lock = NSRecursiveLock()
            let elementDispoable = self.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case let .next(element):
                    buf[Date()] = element
                case .completed:
                    observer.onCompleted()
                case let .error(error):
                    observer.onError(error)
                }
            }
            let spanDisposable = scheduler.schedulePeriodic((), startAfter: timeSpan, period: timeShift, action: { state in
                lock.lock(); defer { lock.unlock() }
                let now = Date()
                buf = buf.filter { $0.key > now.addingTimeInterval(-timeSpan) }
                observer.onNext(buf.sorted(by: { $0.key <= $1.key }).map { $0.value })
            })
            return Disposables.create([spanDisposable, elementDispoable])
        }
    }
}