I'm trying out some leader election logic in one of my Microservice using the Kubernetes fabirc java client. Here is what I came up with:
def testAndAssertSingleLeader(client: KubernetesClient, id: String, lock: Lock): Assertion = {
// Given
val leaderLatch = new CountDownLatch(1)
val newLeaderRecord = new AtomicReference[String]
val stoppedLeading = new CountDownLatch(1)
val executorService = Executors.newSingleThreadExecutor
val leaderCallback = new LeaderCallbacks(
() => {
println("I am the leader now, performing the batch job.")
leaderLatch.countDown()
},
() => {
println("I am no longer the leader.")
stoppedLeading.countDown()
},
(newLeader: String) => {
println(s"A new leader is elected: $newLeader")
newLeaderRecord.set(newLeader)
})
val leaderElectionConfig = new LeaderElectionConfig(
lock,
Duration.ofSeconds(15L),
Duration.ofSeconds(10L),
Duration.ofSeconds(2L),
leaderCallback,
true,
"Integration test leader election configuration"
)
// When
val leaderElectorTask = executorService.submit(
() => assertThrows(
classOf[InterruptedException],
() => client.leaderElector.withConfig(leaderElectionConfig).build().run()
)
)
// Then
println(leaderLatch.getCount)
assert(leaderLatch.await(10, TimeUnit.SECONDS))
assert(id == newLeaderRecord.get)
assert(0 == leaderLatch.getCount)
leaderElectorTask.cancel(true)
executorService.shutdownNow
assert(executorService.awaitTermination(10, TimeUnit.SECONDS))
assert(stoppedLeading.await(10, TimeUnit.SECONDS))
}
"K8sLeaderElection" should "create a lease lock" in {
// Given
server.expect
.post
.withPath("/apis/coordination.k8s.io/v1/namespaces/namespace/leases")
.andReturn(200, null)
.once
// When - Then
testAndAssertSingleLeader(client, "lead-lease", new LeaseLock("namespace", "name", "lead-lease"))
}
I just adapted the code that I found on the fabric GitHub and converted that to Scala as it can be seen. When I ran my unit test, I see the following:
Testing started at 22:39 ...
Oct 24, 2023 10:39:31 PM okhttp3.mockwebserver.MockWebServer$2 execute
INFO: MockWebServer[51585] starting to accept connections
22:39:31.833 [ScalaTest-run] DEBUG io.fabric8.kubernetes.client.utils.HttpClientUtils - Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
22:39:42.108 [ScalaTest-run] DEBUG io.fabric8.kubernetes.client.impl.BaseClient - The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
1
Oct 24, 2023 10:39:42 PM okhttp3.mockwebserver.MockWebServer$2 acceptConnections
INFO: MockWebServer[51585] done accepting connections: Socket closed
leaderLatch.await(10L, SECONDS) was false
ScalaTestFailureLocation: com.openelectrons.cpo.k8s.K8sLeaderElectionSpec at (K8sLeaderElectionSpec.scala:137)
org.scalatest.exceptions.TestFailedException: leaderLatch.await(10L, SECONDS) was false
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
at com.openelectrons.cpo.k8s.K8sLeaderElectionSpec.testAndAssertSingleLeader(K8sLeaderElectionSpec.scala:137)
at com.openelectrons.cpo.k8s.K8sLeaderElectionSpec.$anonfun$new$2(K8sLeaderElectionSpec.scala:154)
I have the following dependencies in my build.sbt:
"org.scalatest" %% "scalatest" % "3.2.9" % Test,
"org.mockito" %% "mockito-scala" % "1.16.46" % Test,
"org.bouncycastle" % "bcpkix-jdk15on" % "1.68" % Test,
"io.fabric8" % "kubernetes-server-mock" % "6.9.0" % Test exclude("com.fasterxml.jackson.core", "jackson-databind")
What am I doing wrong? I was expecting that I get to see the println statement showing me that a leader is elected and after the lease time, I get to see another message from the callback that it is no longer the leader.