Delete data from elasticsearch index with async fixture

20 views Asked by At

I'm trying to create a fixture to delete the data in an Elasticsearch index for each test. So, in that way, the index is clean for each test

This is my conftest file:

import asyncio
import pytest
import os
from elasticsearch import AsyncElasticsearch

@pytest.fixture
def elasticsearch_client(shared_elasticsearch_client, clear_data):
    return shared_elasticsearch_client


@pytest.fixture
def clear_data(shared_elasticsearch_client):
    asyncio.run(clean_elasticsearch_index(shared_elasticsearch_client))
    yield


@pytest.fixture(scope="session")
def shared_elasticsearch_client():
    protocol = os.getenv("DB_PROTOCOL", "http")
    host = os.getenv("DB_HOST", "localhost")
    port = os.getenv("DB_PORT", "9200")
    es = AsyncElasticsearch(hosts=f"{protocol}://{host}:{port}")
    yield es
    es.close()


async def clean_elasticsearch_index(shared_elasticsearch_client: AsyncElasticsearch):
    await shared_elasticsearch_client.delete_by_query(
        index="company", body={"query": {"match_all": {}}}
    )
    await shared_elasticsearch_client.indices.refresh(index="company")
    await shared_elasticsearch_client.close()

And my test is:

from file import main


@pytest.mark.asyncio
async def test_function(elasticsearch_client):
    await main()
    await elasticsearch_client.search(index="company", body={})

The data in the index is cleaned, but when we try to use the client in the test, we receive the following exception:

RuntimeError: Session and connector has to use same event loop

I try to get the event loop but is not working:

@pytest.fixture
def clear_data(shared_elasticsearch_client):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(clean_elasticsearch_index(shared_elasticsearch_client))
    yield
0

There are 0 answers