CQRS + ES - Where to query Data needed for business logic?

1.9k views Asked by At

I'm using CQRS + ES and I have a modeling problem that can't find a solution for.
You can skip the below and answer the generic question in the title: Where would you query data needed for business logic?
Sorry of it turned out to be a complex question, my mind is twisted at the moment!!!
Here's the problem:
I have users that are members of teams. It's a many to many relationship. Each user has an availability status per team.
Teams receive tickets, each with a certain load factor, that should be assigned to one of the team's members depending on their availability and total load.
First Issue, I need to query the list of users that are available in a team and select the one with the least load since he's the eligible for assignment.(to note that this is one of the cases, it might be a different query to run)
Second Issue, load factor of a ticket might change so i have to take that into consideration when calculating the total load per user . Noting that although ticket can belong to 1 team, the assignment should be based on the user total load and not his load per that team.
Currently a TicketReceivedEvent is received by this bounded context and i should trigger a workflow to assign that ticket to a user.
Possible Solutions:

  1. The easiest way would be to queue events and sequentially send a command AssignTicketToUser and have a service query the read model for the user id, get the user and user.assignTicket(Ticket). Once TicketAssignedEvent is received, send the next assignment command. But it seems to be a red flag to query the read model from within the command handler! and a hassle to queue all these tickets!
  2. Have a process manager per user with his availability/team and tickets assigned to that user. In that case we replace the query to the read side by a "process manager lookup" query and the command handler would call Ticket.AssignTo(User). The con is that i think too much business logic leaked outside the domain model specifically that we're pulling all the info/model from the User aggregate to make it available for querying

I'm inclined to go with the first solution, it seems easier to maintain, modify/extend and locate in code but maybe there's something i'm missing.

3

There are 3 answers

7
MikeSW On

Always (well, 99.99% of cases) in the business/domain layer i.e in your "Command" part of CQRS. This means that your repositories should have methods for the specific queries and your persistence model should be 'queryable' enough for this purpose. This means you have to know more about the use cases of your Domain before deciding how to implement persistence.

Using a document db (mongodb, raven db or postgres) might make work easier. If you're stuck with a rdbms or a key value store, create querying tables i.e a read model for the write model, acting as an index :) (this assumes you're serializing objects). If you're storing things relationally with specific table schema for each entity type (huge overhead, you're complicating your life) then the information is easily queryable automatically.

6
Alexey Zimarev On

You can query the data you need in your application service. This seems to be similar to your first solution.

Usually, you keep your aggregates cross-referenced, so I am not quite sure where the first issue comes from. Each user should have a list of teams it belongs to and each group has the list of users. You can complement this data with any attributes you want, including, for example, availability. So, when you read your aggregate, you have the data directly available. Surely, you will have lots of data duplication, but this is very common.

In the event sourced model never domain repositories are able to provide any querying ability. AggregateSource by Yves Reynhout is a good reference, here is the IRepository interface there. You can easily see there is no "Query" method in this interface whatsoever.

There is also a similar question Domain queries in CQRS

4
Thomas Eyde On

Why can't you query the aggregates involved?

I took the liberty to rewrite the objective:

Assign team-ticket to user with the lowest total load.

Here we have a Ticket which should be able to calculate a standard load factor, a Team which knows its users, and a User which knows its total load and can accept new tickets:

Update: If it doesn't feel right to pass a repository to an aggregate, it can be wrapped in a service, in this case a locator. Doing it this way makes it easier to enforce that only one aggregate is updated at a time.

public void AssignTicketToUser(int teamId, int ticketId)
{
    var ticket = repository.Get<Ticket>(ticketId);
    var team = repository.Get<Team>(teamId);
    var users = new UserLocator(repository);
    var tickets = new TicketLocator(repository);
    var user = team.GetUserWithLowestLoad(users, tickets);

    user.AssignTicket(ticket);

    repository.Save(user);
}

The idea is that the User is the only aggregate we update.

The Team will know its users:

public User GetGetUserWithLowestLoad(ILocateUsers users, ILocateTickets tickets)
{
    User lowest = null;

    foreach(var id in userIds)
    {
        var user = users.GetById(id);
        if(user.IsLoadedLowerThan(lowest, tickets))
        {
            lowest = user;
        }
    }
    return lowest;
}

Update: As a ticket may change load over time, the User needs to calculate its current load.

public bool IsLoadedLowerThan(User other, ILocateTickets tickets)
{
    var load = CalculateLoad(tickets);
    var otherLoad = other.CalculateLoad(tickets);

    return load < otherLoad;
}

public int CalculateLoad(ILocateTickets tickets)
{
    return assignedTicketIds
        .Select(id => tickets.GetById(id))
        .Sum(ticket.CalculateLoad());
}

The User then accepts the Ticket:

public void AssignTicket(Ticket ticket)
{
    if(ticketIds.Contains(ticket.Id)) return;

    Publish(new TicketAssignedToUser
        {
            UserId = id,
            Ticket = new TicketLoad
                {
                    Id = ticket.Id,
                    Load = ticket.CalculateLoad() 
                }
        });
}

public void When(TicketAssignedToUser e)
{
    ticketIds.Add(e.Ticket.Id);
    totalLoad += e.Ticket.Load;
}

I would use a process manager / saga to update any other aggregate.