I have the following setup (simplified version) using Test Containers for my integration tests:
public class IntegrationTestFactory : WebApplicationFactory<Program>, IAsyncLifetime
{
private static readonly string NetworkName = Guid.NewGuid().ToString();
private readonly INetwork _network = new NetworkBuilder()
.WithName(NetworkName)
.Build();
private readonly PostgreSqlContainer _postgreSqlContainer = new PostgreSqlBuilder()
.WithImage("postgres:14.7")
.WithName("psql")
.WithHostname("psql")
.WithCleanUp(true)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(5432))
.WithPortBinding(5432, 5432)
.WithEnvironment(
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
{
{ "POSTGRES_DB", “test-db” },
{ "POSTGRES_USER", "test-db-user“ },
{ "POSTGRES_PASSWORD", "test-db-password“ }
}))
.WithNetwork(NetworkName)
.Build();
private readonly IContainer _zookeeperContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-zookeeper:5.5.0")
.WithHostname("zookeeper")
.WithName("zookeeper")
.WithCleanUp(true)
.WithEnvironment(
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
{
{ "ZOOKEEPER_CLIENT_PORT", "2181" },
{ "ZOOKEEPER_TICK_TIME", "2000" }
}))
.WithPortBinding(2181, 2181)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(2181))
.WithNetwork(NetworkName)
.Build();
private readonly IContainer _kafkaContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kafka:5.5.0")
.WithHostname("broker")
.WithName("broker")
.WithNetwork(NetworkName)
.WithEnvironment(
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
{
{ "KAFKA_BROKER_ID", "1" },
{ "KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181" },
{ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" },
{ "KAFKA_LISTENERS", "PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092" },
{ "KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092" },
{ "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1" },
{ "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0" },
{ "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1" },
{ "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1" },
{ "KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true" }
}))
.WithPortBinding(9092, 9092)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092))
.WithCleanUp(true)
.Build();
private readonly IContainer _schemaRegistryContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:5.5.0")
.WithHostname("schema-registry")
.WithName("schema-registry")
.WithCleanUp(true)
.WithEnvironment(
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
{
{ "SCHEMA_REGISTRY_HOST_NAME", "schema-registry" },
{ "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:29092" },
{ "SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081" }
}))
.WithPortBinding(8081, 8081)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8081))
.WithNetwork(NetworkName)
.Build();
private readonly IContainer _restProxyContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kafka-rest:6.2.0")
.WithHostname("rest-proxy")
.WithName("rest-proxy")
.WithCleanUp(true)
.WithEnvironment(
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
{
{ "KAFKA_REST_HOST_NAME", "rest-proxy" },
{ "KAFKA_REST_BOOTSTRAP_SERVERS", "broker:29092" },
{ "KAFKA_REST_LISTENERS", "http://0.0.0.0:8082" },
{ "KAFKA_REST_SCHEMA_REGISTRY_URL", "http://schema-registry:8081" }
}))
.WithPortBinding(8082, 8082)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8082))
.WithNetwork(NetworkName)
.Build();
protected override void ConfigureWebHost(IWebHostBuilder builder)
{
builder.ConfigureTestServices(services =>
{
var isBuildkite = Environment.GetEnvironmentVariable("BUILDKITE") == "true";
var host = isBuildkite ? "172.17.0.1" : "localhost";
// Replace DbContext
AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true);
services.RemoveDbContext<ApplicationDbContext>();
var appConnectionString =
$"Host={host};Port=5432;Database=test-db;Username=test-db-user;Password=test-db-password;”;
services.AddDbContext<ApplicationDbContext>(options => options.UseNpgsql(appConnectionString, sqlOptions =>
{
sqlOptions.EnableRetryOnFailure();
sqlOptions.UseNodaTime();
}));
// Override Kafka config
services.AddKafkaProducer(new ConfigurationBuilder().Build(), options =>
{
options.TopicName = "MyTopicName";
options.RetryCount = 1;
options.RetryTimeoutInSeconds = 1;
options.BootstrapServers = $"{host}:9092";
options.SchemaRegistry = $"http://{host}:8081";
});
});
}
}
I can run my integration tests on my local machine with the host value localhost. When I try to run these tests on Buildkite pipeline, I confirm that I can connect to PostgreSql instance by using the host 172.17.0.1 but can not access to Kafka broker. I tried multiple host values as the BootstrapServers like 172.17.0.1:9092, 172.17.0.1:29092, broker:9092, broker: 29092, localhost:9092, localhost:29092 but no luck.
When I try to run it with 172.17.0.1:9092, I get the following error:
localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
What should be my BootstrapServers value to access to my Kafka container in Buildkite pipeline?