How to implement a Publisher extension, It drops elements until the asynchronous task associated with each element is completed

88 views Asked by At

For example, there is a button. Every time users click the button, it will trigger a Task (Like a network request, an animation, or IO).

Before this Task is completed, the button should ignore any click event to avoid multiple running.

In other words, I want a Publisher extension method, It drops elements emitted by the publisher until the asynchronous task associated with each element is completed.

I implemented this extension of Publisher. Is it good? Is there any better solution?

import UIKit
import Combine


class SwiftViewController: UIViewController {
    
    let clickedSubject = PassthroughSubject<(), Never>()
    
    var cancellables = Set<AnyCancellable>()
        
    override func viewDidLoad() {
        super.viewDidLoad()
                
//        clickedSubject.flatMapAndDropUntilLastCompleted { () in
//            print("clicked")
//            return Future<Int, Never>.init { promise in
//                DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
//                    promise(Result.success(Int.random(in: 0...100)))
//                }
//            }.eraseToAnyPublisher()
//        }.sink { data in
//            print("finish \(data)")
//        }.store(in: &cancellables)
        
        clickedSubject.dropUntilTaskCompleted({ _ in
            print("clicked")
            try! await Task.sleep(nanoseconds: 1_000_000_000)
            return Int.random(in: 0...100)
        }).sink { data in
            print("finish \(data)")
        }.store(in: &cancellables)
        
        let button = UIButton(type: .close)
        view.addSubview(button)
        button.center = view.center
        button.addTarget(self, action: #selector(clicked), for: .touchUpInside)
    }


    @objc func clicked() {
        clickedSubject.send(())
    }
}


extension Publisher {
    
    /// Applies a transformation to each element emitted by the publisher, but drops all subsequent elements until the last transformation completes.
    /// - Parameter transform: A closure that takes an element emitted by the publisher and returns a new publisher.
    /// - Returns: A publisher that emits elements from the last completed transformation.
    public func flatMapAndDropUntilLastCompleted<P>(_ transform: @escaping (Self.Output) -> P) -> AnyPublisher<P.Output, P.Failure> where P: Publisher, Self.Failure == P.Failure {
        var drop = false
        return self.flatMap { output in
            if drop {
                return Empty<P.Output, P.Failure>().eraseToAnyPublisher()
            } else {
                drop = true
                return transform(output).handleEvents(receiveCompletion: { _ in
                    drop = false
                }).eraseToAnyPublisher()
            }
        }.eraseToAnyPublisher()
    }
    
    
    ///  Drops elements emitted by the publisher until the asynchronous task associated with each element is completed.
    /// - Parameter task: A closure that takes an element emitted by the publisher and returns an asynchronous task.
    /// - Returns: A publisher that emits the results of the completed tasks.
    public func dropUntilTaskCompleted<T>(_ task: @escaping (Self.Output) async -> T) -> AnyPublisher<T, Self.Failure> {
        flatMapAndDropUntilLastCompleted { output in
            Future { promise in
                Task {
                    let result = await task(output)
                    promise(.success(result))
                }
            }
        }
    }
}

I post this question because there is a common use case.

Click a button to read some data from the disk then navigate to a new page with the data.

Because reading data from disk is async, it will not push to the new page immediately.

In this case, if users click the button quickly and continuously, it may do the push multiple times.

Not only for reading data from the disk. As long as the navigation is not sync. the issue is here.

1

There are 1 answers

0
Kamil.S On

There is an alternative approach to your problem. Instead of controlling the trigger of the asynchronous task (button tap/click in your case) you can constraint the asynchronous task to run at most one at a time.

In a scenario like the network request you want to additionally use the .share() operator (Apple doc).

Shares the output of an upstream publisher with multiple subscribers.

let request = URLRequest(url: url)
let publisher = URLSession.shared.dataTaskPublisher(for: request)
    .shared()

Now every time you subscribe to the publisher upon a button action if the request is already happening it will be "attached" to the already existing operation in progress because of .share().

This has added bonus, since you abstract away from the button, you can use the same publisher.share() instance to control the button and any other subscribers (e.g. other triggers not necessarily being buttons) not to spam with redundant requests for some particular network operation provided shared response is valid in your logic.