Akka Durable State Postgres Exception

136 views Asked by At

I am using the Play Framework 2.8.18 and I am trying to use akka's Durable State Feature. This is my implementation so far:

public class CompanyActorTest extends DurableStateBehavior<CompanyActorTest.CompanyCommand, CompanyActorTest.CompanyState> {

    public interface CompanyCommand {
    }

    public static class GetCompany implements CompanyCommand {

        public final ActorRef<Object> replyTo;
        public final String userEmail;

        public GetCompany(ActorRef<Object> replyTo, String userEmail) {
            this.replyTo = replyTo;
            this.userEmail = userEmail;
        }
    }

    public static class CompanyState {
        private final Set<Company> companies;

        public CompanyState(Set<Company> companies) {
            this.companies = companies;
        }

        public Set<Company> getCompanies() {
            return companies;
        }
    }

    public static class AddCompany implements CompanyCommand {
        public final Company company;

        public AddCompany(Company company) {
            this.company = company;
        }
    }

    private final ActorContext<CompanyCommand> ctx;

    public static Behavior<CompanyCommand> create(PersistenceId persistenceId) {
        return Behaviors.setup(ctx -> new CompanyActorTest(persistenceId, ctx));
    }

    private CompanyActorTest(PersistenceId persistenceId, ActorContext<CompanyCommand> ctx) {
        super(persistenceId);
        this.ctx = ctx;
    }

    @Override
    public CompanyState emptyState() {
        return new CompanyState(Collections.emptySet());
    }

    @Override
    public CommandHandler<CompanyCommand, CompanyState> commandHandler() {
        return newCommandHandlerBuilder()
                .forAnyState()
                .onCommand(AddCompany.class, (state, command) -> {
                    state.companies.add(command.company);
                    return Effect().persist(new CompanyState(state.companies));
                })
                .onCommand(
                        GetCompany.class, (state, command) -> {
                            ActorRef<GetCompanyActor.GetCompany> ref = this.ctx.getSystem().systemActorOf(GetCompanyActor.create(), "Get-Company-Actor", Props.empty());
                            ref.tell(new GetCompanyActor.GetCompany(command.replyTo, command.userEmail));
                            return Effect().none();
                        })
                .build();
    }
}

Since I can't use my MySQL Database (that's what I think at least because I didn't find any docu for it) I am now using Postgres.

In my application.conf file I set

akka.persistence.state.plugin = "akka.persistence.r2dbc.state"

akka.persistence.r2dbc {

  # postgres or yugabyte
  dialect = "postgres"

  # set this to your database schema if applicable, empty by default
  schema = ""

  connection-factory {
    driver = "postgres"

    # the connection can be configured with a url, eg: "r2dbc:postgresql://<host>:5432/<database>"
    url = ""

    # The connection options to be used. Ignored if 'url' is non-empty
    host = "localhost"
    port = 5432
    database = "postgres"
    user = "postgres"
    password = "postgres"

    ssl {
      enabled = off
      # See PostgresqlConnectionFactoryProvider.SSL_MODE
      # Possible values:
      #  allow - encryption if the server insists on it
      #  prefer - encryption if the server supports it
      #  require - encryption enabled and required, but trust network to connect to the right server
      #  verify-ca - encryption enabled and required, and verify server certificate
      #  verify-full - encryption enabled and required, and verify server certificate and hostname
      #  tunnel - use a SSL tunnel instead of following Postgres SSL handshake protocol
      mode = ""

      # Server root certificate. Can point to either a resource within the classpath or a file.
      root-cert = ""

      # Client certificate. Can point to either a resource within the classpath or a file.
      cert = ""

      # Key for client certificate. Can point to either a resource within the classpath or a file.
      key = ""

      # Password for client key.
      password = ""
    }

    # Initial pool size.
    initial-size = 5
    # Maximum pool size.
    max-size = 20
    # Maximum time to create a new connection.
    connect-timeout = 3 seconds
    # Maximum time to acquire connection from pool.
    acquire-timeout = 5 seconds
    # Number of retries if the connection acquisition attempt fails.
    # In the case the database server was restarted all connections in the pool will
    # be invalid. To recover from that without failed acquire you can use the same number
    # of retries as max-size of the pool
    acquire-retry = 1

    # Maximum idle time of the connection in the pool.
    # Background eviction interval of idle connections is derived from this property
    # and max-life-time.
    max-idle-time = 30 minutes

    # Maximum lifetime of the connection in the pool.
    # Background eviction interval of connections is derived from this property
    # and max-idle-time.
    max-life-time = 60 minutes

    # Configures the statement cache size.
    # 0 means no cache, negative values will select an unbounded cache
    # a positive value will configure a bounded cache with the passed size.
    statement-cache-size = 5000

    # Validate the connection when acquired with this SQL.
    # Enabling this has some performance overhead.
    # A fast query for Postgres is "SELECT 1"
    validation-query = ""
  }

  # If database timestamp is guaranteed to not move backwards for two subsequent
  # updates of the same persistenceId there might be a performance gain to
  # set this to `on`. Note that many databases use the system clock and that can
  # move backwards when the system clock is adjusted.
  db-timestamp-monotonic-increasing = off

  # Enable this for testing or workaround of https://github.com/yugabyte/yugabyte-db/issues/10995
  # FIXME: This property will be removed when the Yugabyte issue has been resolved.
  use-app-timestamp = off

  # Logs database calls that take longer than this duration at INFO level.
  # Set to "off" to disable this logging.
  # Set to 0 to log all calls.
  log-db-calls-exceeding = 300 ms

}

I am running the database in my docker. I also ran the following script:

CREATE TABLE IF NOT EXISTS public.event_journal(
                                                   ordering BIGSERIAL,
                                                   persistence_id VARCHAR(255) NOT NULL,
    sequence_number BIGINT NOT NULL,
    deleted BOOLEAN DEFAULT FALSE NOT NULL,

    writer VARCHAR(255) NOT NULL,
    write_timestamp BIGINT,
    adapter_manifest VARCHAR(255),

    event_ser_id INTEGER NOT NULL,
    event_ser_manifest VARCHAR(255) NOT NULL,
    event_payload BYTEA NOT NULL,

    meta_ser_id INTEGER,
    meta_ser_manifest VARCHAR(255),
    meta_payload BYTEA,

    PRIMARY KEY(persistence_id, sequence_number)
    );

CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering);

CREATE TABLE IF NOT EXISTS public.event_tag(
                                               event_id BIGINT,
                                               tag VARCHAR(256),
    PRIMARY KEY(event_id, tag),
    CONSTRAINT fk_event_journal
    FOREIGN KEY(event_id)
    REFERENCES event_journal(ordering)
    ON DELETE CASCADE
    );

CREATE TABLE IF NOT EXISTS public.snapshot (
    persistence_id VARCHAR(255) NOT NULL,
    sequence_number BIGINT NOT NULL,
    created BIGINT NOT NULL,

    snapshot_ser_id INTEGER NOT NULL,
    snapshot_ser_manifest VARCHAR(255) NOT NULL,
    snapshot_payload BYTEA NOT NULL,

    meta_ser_id INTEGER,
    meta_ser_manifest VARCHAR(255),
    meta_payload BYTEA,

    PRIMARY KEY(persistence_id, sequence_number)
    );

CREATE TABLE IF NOT EXISTS public.durable_state (
                                                    global_offset BIGSERIAL,
                                                    persistence_id VARCHAR(255) NOT NULL,
    revision BIGINT NOT NULL,
    state_payload BYTEA NOT NULL,
    state_serial_id INTEGER NOT NULL,
    state_serial_manifest VARCHAR(255),
    tag VARCHAR,
    state_timestamp BIGINT NOT NULL,
    PRIMARY KEY(persistence_id)
    );
CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag);
CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset);

Nevertheless, when trying to create that actor I get the following error:

[error] c.k.a.c.CompanyActorTest - Supervisor StopSupervisor saw failure: Exception during recovery. PersistenceId [Company]. Relation »durable_state« existiert nicht
akka.persistence.typed.state.internal.DurableStateStoreException: Exception during recovery. PersistenceId [Company]. Relation »durable_state« existiert nicht
        at akka.persistence.typed.state.internal.Recovering.onRecoveryFailure(Recovering.scala:125)
        at akka.persistence.typed.state.internal.Recovering.onGetFailure(Recovering.scala:188)
        at akka.persistence.typed.state.internal.Recovering.onMessage(Recovering.scala:78)
        at akka.persistence.typed.state.internal.Recovering.onMessage(Recovering.scala:61)
        at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84)
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:281)
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:237)
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
        at akka.persistence.typed.state.internal.DurableStateBehaviorImpl$$anon$1.aroundReceive(DurableStateBehaviorImpl.scala:125)
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlBadGrammarException: Relation »durable_state« existiert nicht
        at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:96)
        at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
        at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)

I am also unsure about these dependencies, I tried increasing and decreasing versions, which resulted in dependency errors:

libraryDependencies += "com.typesafe.akka" %% "akka-serialization-jackson" % "2.8.0"
libraryDependencies += "com.lightbend.akka" %% "akka-persistence-r2dbc" % "1.0.0"
libraryDependencies += "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.5.3"

I tried changing the config and the three dependencies I included. Could it be that I need to add another column in the config? By the way: I am not using Event Sourcing, I would only like to use the durable state feature.

Thank you for your help!

Kind regards,

Alex

0

There are 0 answers