Service Fabric, Akka.net and Persistent actor integration

2k views Asked by At

I have some snippet of code from a working example. This example is to make a REST call (WebAPI) and a poller in the service fabric to poll the requests. There are five actors (1) FileImportValidator to validate the file name (2) FileParser to parse the file (3) AgeValidator to validate the age (4) FilePersister to persist the name and age as an event.

Please share whether this design is what expected of actor modeling with AKKA.NET for a Event Sourced system.

PS. The file to parse has been already uploaded. The REST call is to provide the file name only. I have purposefully eliminated some validation logic.

//WebAPI:

        [HttpPost]
        [Route("import")]
        public async Task<IHttpActionResult> Import(FileImportRequest request)
        {
            IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");

            using (ITransaction tx = stateManager.CreateTransaction())
            {
                await queue.EnqueueAsync(tx, request.FileName);

                await tx.CommitAsync();
            }

            return Ok();
        }

  // Poller in Microsoft Service Fabric MicroService:

    public class FileImportMicroService : StatefulService
    {
        public FileImportMicroService()
        {
            domainActorSystem = ActorSystem.Create("DomainActorSystem");

            fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
        }

        protected override ICommunicationListener CreateCommunicationListener()
        {
            ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);

            return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
        }

        protected override async Task RunAsync(CancellationToken cancellationToken)
        {
            var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");

            while (!cancellationToken.IsCancellationRequested)
            {
                using (ITransaction tx = this.StateManager.CreateTransaction())
                {
                    ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);

                    if (dequeuReply.HasValue)
                    {
                        FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
                    }

                    ServiceEventSource.Current.Message(dequeuReply.Value);

                    await tx.CommitAsync();
                }

                await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
            }
        }

        ActorSystem domainActorSystem;

        IActorRef fileImportValidator;
    }


//FileImportValidator Actor

    public class FileImportValidator : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Handle((dynamic) message);
        }

        public void Handle(ValidateFileCommand command)
        {
            _fileParser = Context.ActorOf(Props.Create(() => new FileParser()));

            ...

            _fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
        }

        private IActorRef _fileParser;
    }

//FileParser Actor:

    public class FileParser : UntypedActor
    {
        private IActorRef _ageValidator;

        protected override void OnReceive(object message)
        {
            Handle((dynamic) message);
        }

        public void Handle(ValidationSuccessfulEvent message)
        {
            var lines = File.ReadLines(message.FileName);

            foreach(var line in lines)
            {
                var cols = line.Split(',');

                var File = new { Name = cols[0], Age = cols[1] };

                _ageValidator.Tell(new ValidateAge(File.Name, File.Age));
            }
        }

        protected override void PreStart()
        {
            _ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));

            base.PreStart();
        }
    }

//AgeValidator Actor:

    public class AgeValidator : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if (message is ValidateAge)
            {
                _filePersistor.Tell(new SaveNameAndAge(message));
            }
        }

        protected override void PreStart()
        {
            _filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");

            base.PreStart();
        }

        private IActorRef _filePersistor;
    }

//Persistent Actor:

    public class FilePersistor : PersistentActor
    {
...
        protected override bool ReceiveCommand(object message)
        {
            Persist(/* Handler to persist name and age */);

            return true;
        }
...
    }
1

There are 1 answers

2
clca On BEST ANSWER

Another approach you may consider is to use a ReliableDictionary in a Service to 'persist' the state of the system (files that have been processed). When a new file is uploaded you create a new actor and you pass a FileId so the actor can retrieve the data and process it. When it is done it calls the Service so the item can be removed from the list. In this way you can parallelize file processing.