Reactive Extensions (Rx) and asynchronous class

1.6k views Asked by At

I've read in this post: "The joy of Rx: The event-based asynchronous pattern vs IObservable" that the use of EBAP is discourage. What is the best way to design an asynchronous component with the new Rx extensions (something like the PrimeNumberCalculator example of msdn)?

Thank you in advance.

Update I managed to write my own prime number calculator, I would like to hear your opinions:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApplication13
{
    public class PrimeNumberCalculator
    {
        private readonly Subject<int> primeSubject;
        private IDisposable currentSubscription;

        public PrimeNumberCalculator()
        {
            primeSubject = new Subject<int>();
            Primes = primeSubject.Hide();
        }

        public IObservable<int> Primes{ get; private set; }

        /// <summary>
        /// Determine if n is prime.
        /// </summary>
        private static bool IsPrime(ArrayList primes, int n, out int firstDivisor)
        {
            bool foundDivisor = false;
            bool exceedsSquareRoot = false;

            int i = 0;
            firstDivisor = 1;

            // Stop the search if:
            // there are no more primes in the list,
            // there is a divisor of n in the list, or
            // there is a prime that is larger than 
            // the square root of n.
            while ( (i < primes.Count) && !foundDivisor && !exceedsSquareRoot)
            {
                // The divisor variable will be the smallest 
                // prime number not yet tried.
                int divisor = (int)primes[i++];

                // Determine whether the divisor is greater
                // than the square root of n.
                if (divisor * divisor > n)
                {
                    exceedsSquareRoot = true;
                }
                // Determine whether the divisor is a factor of n.
                else if (n % divisor == 0)
                {
                    firstDivisor = divisor;
                    foundDivisor = true;
                }
            }

            return !foundDivisor;
        }

        /// <summary>
        /// Itereates from 1 to numberToTest and returns all primes.
        /// </summary>
        private IEnumerable<int> PrimeNumberIterator(int numberToTest)
        {
            var primes = new ArrayList();
            var n = 5;

            // Add the first prime numbers.
            primes.Add(2);
            primes.Add(3);

            // Do the work.
            while (n < numberToTest)
            {
                int firstDivisor;
                if (IsPrime(primes, n, out firstDivisor))
                {
                    // Report to the client that a prime was found.
                    yield return n;

                    Thread.Sleep(5000); //simulate long running task.
                    primes.Add(n);

                }
                // Skip even numbers.
                n += 2;
            }
        }

        /// <summary>
        /// Begin a prime number exploration.
        /// If there is some exploration in progress unsubscribe.
        /// </summary>
        public void IsPrime(int numberToTest)
        {
            if (currentSubscription != null) currentSubscription.Dispose();
            currentSubscription = PrimeNumberIterator(numberToTest)
                                        .ToObservable()
                                        .Subscribe(primeSubject.OnNext);
        }

        /// <summary>
        /// Cancel a prime number exploration
        /// </summary>
        public void Cancel()
        {
            if (currentSubscription != null) currentSubscription.Dispose();
        }
    }

    internal class Program
    {


        private static void Main(string[] args)
        {
            var primeNumberCalculator = new PrimeNumberCalculator();
            primeNumberCalculator.Primes.Subscribe(p => Console.WriteLine("Is prime {0}", p));

            var exit = false;
            do
            {
                Console.WriteLine("Write a number to explore and press enter: ");
                var input = Console.ReadLine();
                int primeToExplore;
                if(int.TryParse(input, out primeToExplore))
                {
                    primeNumberCalculator.IsPrime(primeToExplore);
                }
                else {
                    primeNumberCalculator.Cancel();
                    exit = true;
                }
            } while (!exit);
        }
    }
}
1

There are 1 answers

3
Scott Weinstein On BEST ANSWER

Here's an approach, using mutable state:

int lastPrime = 0; // or starting prime
IObservable<int> Primes = 
                Observable.Defer(() =>
{
    do
    {
        lastPrime++;
    } while (!IsPrime(lastPrime));
    return Observable.Return(lastPrime);
}).Repeat();



var disp = Primes.Where(p => p < 1000000).Subscribe(Console.WriteLine);
///    ...
disp.Dispose();