FP patterns for combining data from different sources (preferrably in Kotlin and Arrow)

723 views Asked by At

Disclaimer upfront: Recently, my interest in functional programming has grown and I've been able to apply the most basic approaches (using pure functions as much as my knowledge and working environment permits) in my job. However, I'm still very inexperienced when it comes to more advanced techniques and I thought trying to learn some by asking a question on this site might be the right idea. I've stumbled over similar issues once every while, so I think there should be patterns in FP to deal with this type of problems.

Problem description

It boils down to the following. Suppose there is an API somewhere providing a list of all possible pets.

data class Pet(val name: String, val isFavorite: Boolean = false)

fun fetchAllPetsFromApi(): List<Pet> {
    // this would call a real API irl
    return listOf(Pet("Dog"), Pet("Cat"), Pet("Parrot"))
}

This API knows nothing about the "favorite" field and it shouldn't. It's not under my control. It's basically just returning a list of pets. Now I want to allow users to mark pets as their favorite. And I store this flag in a local database.

So after fetching all pets from the api, I have to set the favorite flag according to the persisted data.

class FavoriteRepository {
    fun petsWithUserFavoriteFlag(allPets: List<Pet>) {
        return allPets.map { it.copy(isFavorite = getFavoriteFlagFromDbFor(it) }
    }

    fun markPetAsFavorite(pet: Pet) {
        // persist to db ...
    }

    fun getFavoriteFlagFromDbFor(pet: Pet): Boolean {...}
}

For some reason, I think this code dealing with the problem of "fetch one part of the information from one data source, then merge it with some information from another" might benefit from the application of FP patterns, but I'm not really sure in which direction to look.

I've read some of the documentation of Arrow (great project btw :)) and am quite a Kotlin enthusiast, so answers utilizing this library would be very appreciated.

1

There are 1 answers

4
Jorge Castillo On BEST ANSWER

Here's something I'd potentially do. Your code has a couple of important flaws that make it unsafe from the functional programming perspective:

  • It doesn't flag side effects, so compiler is not aware of those and cannot track how they're are used. That means we could call those effects from anywhere without any sort of control. Examples of effects would be the network query or all the operations using the database.
  • Your operations don't make explicit the fact that they might succeed or fail, so callers are left to try / catch exceptions or the program will blow up. So, there's not a strong requirement to handle both scenarios, which could drive to missing some exceptions and therefore get runtime errors.

Let's try to fix it. Let's start by modeling our domain errors so we have a set of expected errors that our domain understands. Let's also create a mapper so we map all potential exceptions thrown to one of those expected domain errors, so our business logic can react to those accordingly.

sealed class Error {
    object Error1 : Error()
    object Error2 : Error()
    object Error3 : Error()
}

// Stubbed
fun Throwable.toDomainError() = Error.Error1

As you see, we're stubbing the errors and the mapper. You can put time on designing what errors you'll need for your domain on an architecture level and write a proper pure mapper for those. Let's keep going.

Time for flagging our effects to make the compiler aware of those. To do that we use suspend in Kotlin. suspend enforces a calling context at compile time, so you cannot ever call the effect unless you're within a suspended environment or the integration point (a couroutine). We are going to flag as suspend all operations that would be a side effect here: the network request and all db operations.

I'm also taking the freedom to pull out all DB operations to a Database collaborator just for readability.

suspend fun fetchAllPetsFromApi(): List<Pet> = ...

class FavoriteRepository(private val db: Database = Database()) {
    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>) {
        ... will delegate in the Database ops
    }
}

class Database {
    // This would flag it as fav on the corresponding table
    suspend fun markPetAsFavorite(pet: Pet): Pet = ...

    // This would get the flag from the corresponding table
    suspend fun getFavoriteFlagFromDbFor(pet: Pet) = ...
}

Our side effects are safe now. They've become description of effects instead, since we cannot ever run them without providing an environment capable of running suspended effects (a coroutine or another suspended function). In functional jargon we'd say our effects are pure now.

Now, let's go for the second issue.

We also said that we were not making explicit the fact that each effect might succeed or fail, so callers might miss potential exceptions thrown and get the program blown up. We can raise that concern over our data by wrapping it with the functional Either<A, B> data type. Let's combine both ideas together:

suspend fun fetchAllPetsFromApi(): Either<Error, List<Pet>> = ...

class FavoriteRepository(private val db: Database = Database()) {
    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> {
        ... will delegate in the Database ops
    }
}

class Database {
    // This would flag it as fav on the corresponding table
    suspend fun markPetAsFavorite(pet: Pet): Either<Error, Pet> = ...

    // This would get the flag from the corresponding table
    suspend fun getFavoriteFlagFromDbFor(pet: Pet): Either<Error, Boolean> = ...
}

Now this makes explicit the fact that each one of those computations might succeed or fail, so the caller will be forced to handle both sides and will not forget about handling the potential errors. We're using the types in our benefit here.

Let's add the logics for the effects now:

// Stubbing a list of pets but you'd have your network request within the catch block
suspend fun fetchAllPetsFromApi(): Either<Error, List<Pet>> =
    Either.catch { listOf(Pet("Dog"), Pet("Cat")) }.mapLeft { it.toDomainError() }

We can use Either#catch to wrap any suspended effects that might throw. This automatically wraps the result into Either so we can keep computing over it.

More specifically, it wraps the result of the block in Either.Right in case it succeeds, or the exception into Either.Left in case it throws. We also have mapLeft to map potential exceptions thrown (Left side) to one of our strongly typed domain errors. That is why it returns Either<Error, List<Pet>> instead of Either<Throwable, List<Pet>>.

Note that with Either we always model errors on the left side. This is by convention, since Right represents the happy path and we want our successful data there, so we can keep computing over it with map, flatMap, or whatever.

We can apply the same idea for our db methods now:

class Database {
    // This would flag it as fav on the corresponding table, I'm stubbing it here for the example.
    suspend fun markPetAsFavorite(pet: Pet): Either<Error, Pet> =
        Either.catch { pet }.mapLeft { it.toDomainError() }

    // This would get the flag from the corresponding table, I'm stubbing it here for the example.
    suspend fun getFavoriteFlagFromDbFor(pet: Pet): Either<Error, Boolean> =
        Either.catch { true }.mapLeft { it.toDomainError() }
}

We're stubbing the results again, but you can imagine we'd have our actual suspended effects loading from or updating the DB tables inside each Either.catch {} block above.

Finally, we can add some logic to the repo:

class FavoriteRepository(private val db: Database = Database()) {

    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> =
        allPets.map { pet ->
            db.getFavoriteFlagFromDbFor(pet).map { isFavInDb ->
                pet.copy(isFavorite = isFavInDb)
            }
        }.sequence(Either.applicative()).fix().map { it.toList() }
}

Ok this one might be a bit more complex due to how our effects are written, but I'll try to make it clear.

We need to map the list so for each pet loaded from network we can load its fav state from the Database. Then we copy it as you were doing. But given getFavoriteFlagFromDbFor(pet) returns Either<Error, Booelan> now we'd have a List<Either<Error, Pet>> as a result That might make it hard to work with the complete list of pets, since we'd need to iterate and for each one first we'd need to check whether it's Left or Right.

To make it easier to consume the List<Pet> as a whole, we might want to swap the types here, so we'd have Either<Error, List<Pet>> instead.

To this magic, one option would be sequence. sequence requires the Either applicative in this case since that'll be used to lift the intermediate results and the final list into Either.

We're also using the chance to map the ListK into the stdlib List instead, since ListK is what sequence uses internally, but we can understand it as a functional wrapped over List in broad words, so you have an idea. Since here we're only interested on the actual list to match our types, we can map the Right<ListK<Pet>> to Right<List<Pet>>.

Finally, we can go ahead and consume this suspended program:

suspend fun main() {
    val repo = FavoriteRepository()
    val hydratedPets = fetchAllPetsFromApi().flatMap { pets -> repo.petsWithUserFavoriteFlag(pets) }
    hydratedPets.fold(
        ifLeft = { error -> println(error) },
        ifRight = { pets -> println(pets) }
    )
}

We're going for flatMap since we have sequential ops here.

There are potential optimizations we could do like using parTraverse to load all the fav states from DB for a list of pets in parallel and gather results in the end, but I didn't use it since I'm not sure your database is prepared for concurrent access.

Here's how you could do it:

suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> =
        allPets.parTraverse { pet -> 
            db.getFavoriteFlagFromDbFor(pet).map { isFavInDb ->
                pet.copy(isFavorite = isFavInDb)
            }
        }.sequence(Either.applicative()).fix().map { it.toList() }

I think we could also simplify the whole thing a bit more by changing some of the types and how operations are structured but wasn't sure about refactoring it too much from your codebase since I'm not aware of your current team constraints.

And here's the complete codebase:

import arrow.core.Either
import arrow.core.extensions.either.applicative.applicative
import arrow.core.extensions.list.traverse.sequence
import arrow.core.extensions.listk.foldable.toList
import arrow.core.fix
import arrow.core.flatMap

data class Pet(val name: String, val isFavorite: Boolean = false)

// Our sealed hierarchy of potential errors our domain understands
sealed class Error {
    object Error1 : Error()
    object Error2 : Error()
    object Error3 : Error()
}

// Stubbed, would be a mapper from throwable to any of the expected domain errors used via mapLeft.
fun Throwable.toDomainError() = Error.Error1

// This would call a real API irl, stubbed here for the example.
suspend fun fetchAllPetsFromApi(): Either<Error, List<Pet>> =
    Either.catch { listOf(Pet("Dog"), Pet("Cat")) }.mapLeft { it.toDomainError() }

class FavoriteRepository(private val db: Database = Database()) {

    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> =
        allPets.map { pet ->
            db.getFavoriteFlagFromDbFor(pet).map { isFavInDb ->
                pet.copy(isFavorite = isFavInDb)
            }
        }.sequence(Either.applicative()).fix().map { it.toList() }
}

class Database {
    // This would flag it as fav on the corresponding table, I'm stubbing it here for the example.
    suspend fun markPetAsFavorite(pet: Pet): Either<Error, Pet> =
        Either.catch { pet }.mapLeft { it.toDomainError() }

    // This would get the flag from the corresponding table, I'm stubbing it here for the example.
    suspend fun getFavoriteFlagFromDbFor(pet: Pet): Either<Error, Boolean> =
        Either.catch { true }.mapLeft { it.toDomainError() }
}

suspend fun main() {
    val repo = FavoriteRepository()
    val hydratedPets = fetchAllPetsFromApi().flatMap { pets -> repo.petsWithUserFavoriteFlag(pets) }
    hydratedPets.fold(
        ifLeft = { error -> println(error) },
        ifRight = { pets -> println(pets) }
    )
}