How to use Swift's AsyncThrowingStream with Firestore Listeners

258 views Asked by At

How can I convert this code to use AsyncThrowingStream

    private var listenerRegistration: ListenerRegistration?

    func unsubscribe() {
        if listenerRegistration != nil {
            listenerRegistration?.remove()
            listenerRegistration = nil
        }
    }
    
    func subscribe() {
        if listenerRegistration != nil {
            unsubscribe()
        }
        guard let userID = Auth.auth().currentUser?.uid else { return }
        let docRef = Firestore.db.collection("mdates")
        let query = docRef.whereField(MeetDateKeys.selectedId.rawValue, isEqualTo: userID)
        
        listenerRegistration = query
            .addSnapshotListener { [weak self] querySnapshot, error in
                guard let documents = querySnapshot?.documents else {
                    print("No documents in 'reminders' collection")
                    return
                }
                
                self?.reminders = documents.compactMap { queryDocumentSnapshot in
                    let result = Result { try? queryDocumentSnapshot.data(as: MeetDate.self, decoder: Firestore.Decoder()) }
                    
                    switch result {
                    case .success(let reminder):
                        // A `Reminder` value was successfully initialized from the DocumentSnapshot.
                        return reminder
                    case .failure(let error):
                        // A `Reminder` value could not be initialized from the DocumentSnapshot.
                        switch error {
                        case DecodingError.typeMismatch(_, let context):
                            print("\(error.localizedDescription): \(context.debugDescription)")
                        case DecodingError.valueNotFound(_, let context):
                            print("\(error.localizedDescription): \(context.debugDescription)")
                        case DecodingError.keyNotFound(_, let context):
                            print("\(error.localizedDescription): \(context.debugDescription)")
                        case DecodingError.dataCorrupted(let key):
                            print("\(error.localizedDescription): \(key.debugDescription)")
                        default:
                            print("Error decoding document: \(error.localizedDescription)")
                        }
                        return nil
                    }
                }
                
                self?.thisWeek = self?.dateThisWeekFilter(dates: self?.reminders ?? [])
                self?.nextWeek = self?.dateNextWeekFilter(dates: self?.reminders ?? [])
                self?.laterWeek = self?.dateLaterFilter(dates: self?.reminders ?? [])
            }
    }

using this extension

extension Query {
    func addSnapshotListener1<T>(
        includeMetadataChanges: Bool = false
    ) -> AsyncThrowingStream<[T], Error> where T: Decodable{
        .init { continuation in
            let listener = addSnapshotListener(includeMetadataChanges: includeMetadataChanges) { result in
                do {
                    let snapshot = try result.get()
                    continuation.yield(try snapshot.documents.map { try $0.data(as: T.self) })
                } catch {
                    continuation.finish(throwing: error)
                }
            }

            continuation.onTermination = { @Sendable _ in
                listener.remove()
            }
        }
    }
}
1

There are 1 answers

0
lorem ipsum On BEST ANSWER

You are almost there.

extension Query {
    func addSnapshotListener1<T>(
        includeMetadataChanges: Bool = false
    ) -> AsyncThrowingStream<[T], Error> where T : Decodable{
        .init { continuation in
            let listener = addSnapshotListener(includeMetadataChanges: includeMetadataChanges) { snapshot, error in //Add Error
                if let error {
                    continuation.finish(throwing: error)
                } else{
                    continuation.yield(snapshot?.documents
                        .compactMap { //Will ignore any nil
                            do {
                                return try $0.data(as: T.self)
                            } catch {
                                print(" Error \n\(error)")// Expose in the console any decoding errors, there are other more graceful ways to handle this. But this give you some objects if there is a decoding issue.
                                
                                // The other way give you no objects if there is a decoding issue
                                return nil
                            }
                        } ?? [])
                }
            }
            continuation.onTermination = { @Sendable _ in
                listener.remove()
            }
        }
    }
}

Then you can use it something like

func listen<T>(query: Query, type element: T) async throws where T : Decodable {
    let stream: AsyncThrowingStream<[T], Error> = query.addSnapshotListener1()
    
    for try await documents in stream {
        print("document count = \(documents.count)")
    }
}