I have a Django Celery task that performs long-running operations and sometimes loses connection to the database (Postgres).
the task looks like something like this:
@app.task(name='my_name_of_the_task')
def my_long_running_task(params):
with transaction.atomic():
object_list = self.get_object_list_from_params(params)
for batch in batches(object_list):
self.some_calculations()
MyObject.objects.bulk_create(objs=batch) # <- here connection could be lost
I want to ensure the connection (but also be able to unit test this code).
For example:
@app.task(name='my_name_of_the_task')
def my_long_running_task(params):
with transaction.atomic():
object_list = self.get_object_list_from_params(params)
for batch in batches(object_list):
connection.connect()
self.some_calculations()
MyObject.objects.bulk_create(objs=batch) # <- here connection could be lost
This would work (because it always opens new connections) but the unit test throws the error that can't roll back on teardown.
I am thinking about
@app.task(name='my_name_of_the_task')
def my_long_running_task(params):
with transaction.atomic():
object_list = self.get_object_list_from_params(params)
for batch in batches(object_list):
try:
self.some_calculations()
MyObject.objects.bulk_create(objs=batch) # <- here connection could be lost
except Exception e:
connection.connect()
MyObject.objects.bulk_create(objs=batch) # retry this insert
but is there a better way to handle this?