Python custom tkinter and Asyncio and Threading with custom functions

101 views Asked by At

Basically, I am trying to make an asynchronous GUI. I have made 2 classes that use the pytube package. The Video class represents a video and when it's constructor runs it searches for a video. The _Playlist class represents a Playlist (a list of videos):

class Video:
    def __init__(self, url) -> None:

        try:
            self.yt = YouTube(url)
        except exceptions.VideoPrivate:
            raise CustomVideoException(ExceptionCode.PRIVATE_VIDEO)
        except exceptions.VideoRegionBlocked:
            raise CustomVideoException(ExceptionCode.REGION_BLOCKED)
        except exceptions.VideoUnavailable:
            raise CustomVideoException(ExceptionCode.VIDEO_UNAVAILABLE)
        except exceptions.PytubeError:
            raise CustomVideoException(ExceptionCode.PY_TUBE_ERROR)
        except Exception:
            raise CustomVideoException(ExceptionCode.MAJOR_ERROR)

        video_streams = self.yt.streams.filter(file_extension='mp4')
        audio_streams = self.yt.streams.filter(only_audio=True)
        self.video_resolutions = list()
        self.audio_resolutions = list()

        self.title = self.yt.title
        self.author = self.yt.author
        self.views = self.yt.views

        for stream in video_streams:
            self.video_resolutions.append({"_type": stream.mime_type, "res": stream.resolution})
        for stream in audio_streams:
            self.audio_resolutions.append({"_type": stream.mime_type, "abr": stream.abr})

    def _download(self, selected_res: str, on_progress_callback):
        # return self.yt.streams.get_by_resolution(selected_res)
        self.yt.register_on_progress_callback(on_progress_callback)
        self.yt.streams.get_by_resolution(selected_res).download()

    def on_callback(self, stream, chunk, bytes_remaining):
        total_size = stream.filesize
        bytes_downloaded = total_size - bytes_remaining
        percentage_of_completion = bytes_downloaded / total_size * 100

        print(round(percentage_of_completion))

    def __str__(self) -> str:
        return f'VIDEO:\nTitle: {self.title}, Author: {self.author}, Views: {self.views}\nVideo Resolutions: ' \
               f'{self.video_resolutions}\nAudio Resolutions: {self.audio_resolutions}'


class _Playlist:
    def __init__(self) -> None:
        self.videos = list()

    def read_playlist(self, url: str):
        temp_playlist = Playlist(url)
        for video_url in temp_playlist.video_urls:
            self.videos.append({"code": video_url.split("=")[1], "video": Video(video_url)})

    def add_video(self, url: str):
        if "youtube.com" not in url:
            raise CustomVideoException(ExceptionCode.WRONG_URL)
        if "watch?v=" not in url:
            raise CustomVideoException(ExceptionCode.WRONG_URL)
        self.videos.append({"code": url.split("=")[1], "video": Video(url)})

    def print_video_urls(self):
        for v in self.videos:
            print(v["video"])

The thing is that I want to be able to create an object of the Video class and run the _download function in video class asynchronously via a ctk.Button.

After reading this question I tried to apply it to my code. Currently I have make a class "App" that represents my main application and I added a list with tasks self.tasks where all my to do tasks are stored. The problem is that when I press run the function do_search_video() it does nothing. The task is created but apparently never runs?

My current code is this:

class App(ctk.CTk):
    def __init__(self, _loop: asyncio.AbstractEventLoop, interval: float = (1 / 120)):
        super().__init__()
        ctk.set_default_color_theme("green")
        # center window to the screen
        app_width = 900
        app_height = 500
        screen_width = self.winfo_screenwidth()
        screen_height = self.winfo_screenheight()
        x = (screen_width / 2) - (app_width/2)
        y = (screen_height / 2) - (app_height/2)
        self.geometry(f"{app_width}x{app_height}+{int(x)}+{int(y)}")

        self.i = 0
        self.loop = _loop
        self.protocol("WM_DELETE_WINDOW", self.close)
        self.tasks = list()
        self.tasks.append(self.loop.create_task(self.updater(interval)))

        self.url_entry = ctk.CTkEntry(self, width=int(app_width*0.8))
        self.url_entry.bind(sequence="<Return>", command=self.do_search_video)
        self.url_entry.pack(pady=20, padx=20)

        self.playlist_frame = PlaylistFrame(self)
        self.playlist_frame.add_video_frame("Limmy's Show - Water", 4868637, "LANman247")
        self.playlist_frame.pack(padx=20, pady=20)

        self.mainloop()
        # self.btn = ctk.CTkButton(self, text='Change Label', command=self.do_on_click)

    def do_search_video(self, *args):
        self.tasks.append(self.loop.create_task(self.search_video()))

    async def search_video(self):
        try:
            self.playlist_frame.playlist.add_video(self.url_entry.get())

            self.playlist_frame.add_video_frame(
                title=self.playlist_frame.playlist.videos[-1]["video"].title,
                author=self.playlist_frame.playlist.videos[-1]["video"].author,
                views=self.playlist_frame.playlist.videos[-1]["video"].views,
            )
        except Exception as e:
            print(e)

    async def updater(self, interval):
        while True:
            self.update()
            await asyncio.sleep(interval)

I have also tried running the search_video function like a coroutine (this is how I understand it):

def do_search_video(self, *args):
        self.tasks.append(self.loop.create_task(asyncio.run(self.search_video())))

    @asyncio.coroutine
    async def search_video(self):
        try:
            self.playlist_frame.playlist.add_video(self.url_entry.get())

            self.playlist_frame.add_video_frame(
                title=self.playlist_frame.playlist.videos[-1]["video"].title,
                author=self.playlist_frame.playlist.videos[-1]["video"].author,
                views=self.playlist_frame.playlist.videos[-1]["video"].views,
            )
        except Exception as e:
            print(e)

When I run it this way I get the actual outcome I want but I also get an error: TypeError: a coroutine was expected, got None Probably cause I don't initialize a coroutine correctly?

The question is how could this be implied? Am I thinking correctly? I am fairly new to asyncio and asynchronous programming as it seems.

Thank you for your help in advance

UPDATE I added 2 more elements in my GUI, "test_label" and c_button, and 2 more functions just to see if the label would change asynchronously:

self.label_stuck = ctk.StringVar()
        self.label_stuck.set("a label")
        test_label = ctk.CTkLabel(self, width=int(app_width * 0.8), textvariable=self.label_stuck)
        test_label.pack()

        c_button = ctk.CTkButton(self, width=int(app_width * 0.2), command=self.do_on_click)
        c_button.pack()

        self.counter = 0

def do_on_click(self):
        self.tasks.append(self.loop.create_task(self.on_click()))

    async def on_click(self):
        self.label_stuck.set(f"{self.counter}")
        await asyncio.sleep(5)
        self.counter += 1
        self.label_stuck.set(f"{self.counter}")
        self.counter += 1

If I remove the line self.mainloop() this implementation works. When I press the c_button the label's text changes and after 5 seconds it changes again and GUI remains responsive. Still though I cannot do it with my functions. It there a way to implement the same concept with the Video class? Maybe by using threading? Plus, there is this bizarre error when I close the app:

while executing
"3080665250440update"
    ("after" script)
invalid command name "3080636189768check_dpi_scaling"
    while executing
"3080636189768check_dpi_scaling"
    ("after" script)
0

There are 0 answers