How to prevent overlapping tasks, that executing to schedule, in django celery beat?

55 views Asked by At

I'm constructing some weather reminder API, where users can register, and take JWT token, for using api endpoints, such create subscriptions, with period of notifications, and adding cities to that subscription. app connects to third party API, and gets weather data related to city, that in subscription. After that, weather data sends to user email, according to subscriptions period of notification. So my problem is, when i have for example subscription that will sends email every hour(sub_A), and subscription that sends emails every 3 hour(sub_B), and so on...emails sends twice, for example on third hour i have two emails from sub_A, and sub_B in the same time. I need that my app sending one email, that will contain weather data for cities from sub_A and sub_B. I am using DRF, django_celery_beat, redis, all my app containerized, and starts from docker compose.

i have tried turn off tasks, like task.enabled = False, task.save(), for tasks, with period notification less than execute now. And after function logic execute, in the end of, i activate tasks, using task.enabled = True, task.save() but when i am enabling tasks, they executed instanly, without shedule. here is my models.py:

import json
from django.utils import timezone
from django.contrib.auth import get_user_model
# from django.contrib.auth.models import User
from django.db import models
from django_celery_beat.models import IntervalSchedule, PeriodicTask


class Subscription(models.Model):

    class Period(models.IntegerChoices):
        ONE = 1
        THREE = 3
        SIX = 6
        TWELVE = 12

    user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
    period_notifications = models.IntegerField(choices=Period.choices)
    date_of_subscription = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return f'{self.user} has a subscription since {self.date_of_subscription} ' \
               f'with period of notifications - {self.period_notifications} hours.'


class CityInSubscription(models.Model):
    subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE, related_name='cities')
    name = models.CharField(max_length=64)

    def __str__(self):
        return f'{self.name}'


def create_task(subscription):
    task_name = f'Send email to {subscription.user.email} (subscription {subscription.id})'
    schedule, created = IntervalSchedule.objects.get_or_create(
        every=subscription.period_notifications,
        period=IntervalSchedule.MINUTES
    )
    task = PeriodicTask.objects.create(
        name=task_name,
        task='send_email_task',
        interval=schedule,
        args=json.dumps([subscription.id]),
        start_time=timezone.now()
    )
    task.save()
    return


def edit_task(subscription):
    task_name = f'Send email to {subscription.user.email} (subscription {subscription.id})'
    existing_task = PeriodicTask.objects.filter(name=task_name).first()
    existing_task.interval.every = subscription.period_notifications
    existing_task.interval.save()
    existing_task.save()
    return


def delete_task(subscription):
    task_name = f'Send email to {subscription.user.email} (subscription {subscription.id})'
    existing_task = PeriodicTask.objects.filter(name=task_name).first()
    existing_task.delete()
    return

here is my tasks.py:

import itertools
import time
import requests
from celery import shared_task
from celery.utils.functional import memoize
from django_celery_beat.models import PeriodicTask

from weatherreminder.settings import OPEN_WEATHER_API_URL, OPEN_WEATHER_API_KEY
from .models import Subscription, CityInSubscription

from django.core.mail import EmailMessage
from django.core.cache import cache


def unpack_nested_list(nested_list):
    unpacked_list = list(itertools.chain(*nested_list))
    return unpacked_list


def get_weather(city_name):
    url = OPEN_WEATHER_API_URL + f'?q={city_name}&units=metric&appid={OPEN_WEATHER_API_KEY}'
    data = requests.get(url).json()
    weather_data = {
        'city': data['name'],
        'temperature': f"{data['main']['temp']}°C",
        'feels like': f"{data['main']['feels_like']}°C",
        'description': data['weather'][0]['description'],
        'wind speed': f'{data["wind"]["speed"]} m/s',
    }
    return weather_data


def get_cached_weather(city_name):
    cached_weather = cache.get(city_name)
    if cached_weather is not None:
        return cached_weather

    weather_data = get_weather(city_name)
    cache.set(city_name, weather_data, timeout=60 * 60)  # Cache for 1 hour
    return weather_data


@shared_task(name="send_email_task")
def send_email_task(sub_id):
    subscription = Subscription.objects.get(id=sub_id)

    tasks = PeriodicTask.objects.filter(name__startswith=f'Send email to {subscription.user.email}')
    for task in tasks:
        if task.interval.every <= subscription.period_notifications:
            task.enabled = False
            task.save()


    user_subscriptions = []
    query_user_subscriptions = Subscription.objects.filter(user=subscription.user).all()
    for sub in query_user_subscriptions:
        if sub.period_notifications <= subscription.period_notifications:
            user_subscriptions.append(sub)

    subscription_cities = [subscription.cities for subscription in user_subscriptions]
    unpacked_cities = [city.all() for city in subscription_cities]
    cities = [city.name for city in unpack_nested_list(unpacked_cities)]

    if len(cities) != 0:
        html_content = ''
        for city in cities:
            weather = get_cached_weather(city)
            html_content += f'''<p>
                                    <strong>{weather["city"]}</strong><br>
                                    Temperature {weather["temperature"]}<br>
                                    Feels like {weather["feels like"]}<br>
                                    {weather["description"]}<br>
                                    Wind speed {weather["wind speed"]}<br>
                                </p>'''
        message = EmailMessage('Weather notification', html_content, to=[subscription.user.email])
        message.content_subtype = 'html'
        if message.send():
            print("Message was send.")
        else:
            print("Message didn't send.")

        for task in tasks:
            task.enabled = True
            task.save()

        return


here is my views.py:

import requests
from django.core.mail import EmailMessage
from django.template.loader import render_to_string
from weatherreminder.settings import OPEN_WEATHER_API_URL, OPEN_WEATHER_API_KEY
from .models import Subscription, CityInSubscription, create_task, edit_task, delete_task
from .serializers import SubscriptionSerializer, CityInSubscriptionSerializer
from django.shortcuts import render
from rest_framework import status
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.generics import ListCreateAPIView, RetrieveDestroyAPIView

from django.shortcuts import get_object_or_404


from .tasks import get_weather, get_cached_weather


def get_tokens_for_user(user):
    refresh = RefreshToken.for_user(user)

    return {
        'refresh': str(refresh),
        'access': str(refresh.access_token),
    }


def check_existing_city(city_name):
    url = OPEN_WEATHER_API_URL + f'?q={city_name}&appid={OPEN_WEATHER_API_KEY}'
    r = requests.get(url)
    return r.status_code != 200


def homepage(request):
    tokens = get_tokens_for_user(request.user)
    return render(request=request,
                  template_name='core/home.html',
                  context={
                      "refresh": tokens['refresh'],
                      "access": tokens['access'],
                  }
                  )


class MySubscriptionsView(APIView):
    permission_classes = (IsAuthenticated,)

    def get(self, request):
        subscription = Subscription.objects.filter(user=request.user).all()
        serializer = SubscriptionSerializer(subscription, many=True)
        return Response(serializer.data)

    def post(self, request):
        new_subscription = Subscription.objects.create(
            user=request.user,
            period_notifications=request.data["period_notifications"]
        )
        new_subscription.save()
        create_task(new_subscription)
        serializer = SubscriptionSerializer(new_subscription)
        return Response(serializer.data, status=status.HTTP_201_CREATED)


    def delete(self, request):
        subscriptions = Subscription.objects.all()
        for subscription in subscriptions:
            delete_task(subscription)
            subscription.delete()
        return Response("Subscriptions has been deleted")


class MySubscriptionView(APIView):
    permission_classes = (IsAuthenticated,)

    def get(self, request, pk):
        subscription = Subscription.objects.filter(pk=pk).first()
        serializer = SubscriptionSerializer(subscription)
        return Response(serializer.data)


    def put(self, request, pk):
        subscription = Subscription.objects.filter(pk=pk).first()
        subscription.period_notifications = request.data["period_notifications"]
        subscription.save()
        edit_task(subscription)
        serializer = SubscriptionSerializer(subscription)
        return Response(serializer.data)

    def delete(self, request, pk):
        subscription = Subscription.objects.filter(pk=pk).first()
        delete_task(subscription)
        subscription.delete()
        return Response("Subscription has been deleted")


class MyCitiesListView(ListCreateAPIView):
    permission_classes = (IsAuthenticated,)
    serializer_class = CityInSubscriptionSerializer

    def get_queryset(self):
        subscription_pk = self.kwargs['pk']
        subscription = Subscription.objects.get(pk=subscription_pk, user=self.request.user)
        return CityInSubscription.objects.filter(subscription=subscription)

    def create(self, request, *args, **kwargs):
        input_city = request.data['name']
        subscription_pk = self.kwargs['pk']
        subscription = Subscription.objects.get(pk=subscription_pk, user=self.request.user)
        existing_city = CityInSubscription.objects.filter(subscription=subscription, name=input_city)
        if existing_city:
            return Response("City already added in your subscription")
        if check_existing_city(input_city):
            return Response("City doesn't exist")
        new_city = CityInSubscription.objects.create(
            subscription=subscription,
            name=input_city,
        )
        new_city.save()
        serializer = CityInSubscriptionSerializer(new_city)
        return Response(serializer.data, status=status.HTTP_201_CREATED)


class GetWeatherView(APIView):
    permission_classes = (IsAuthenticated,)

    def get(self, request, pk):
        subscription = Subscription.objects.get(pk=pk, user=request.user)
        response_get_weather = []
        for city in subscription.cities.all():
            response_get_weather.append(get_cached_weather(city.name))
        return Response(response_get_weather)


class GetWeatherOneCityView(APIView):
    permission_classes = (IsAuthenticated,)

    def get(self, request, pk, city_name):
        subscription = Subscription.objects.get(pk=pk, user=request.user)
        city = CityInSubscription.objects.filter(subscription=subscription, name=city_name).first()

        return Response(get_cached_weather(city.name))

0

There are 0 answers