Basically, I am trying to make an asynchronous GUI. I have made 2 classes that use the pytube package. The Video class represents a video and when it's constructor runs it searches for a video. The _Playlist class represents a Playlist (a list of videos):
class Video:
def __init__(self, url) -> None:
try:
self.yt = YouTube(url)
except exceptions.VideoPrivate:
raise CustomVideoException(ExceptionCode.PRIVATE_VIDEO)
except exceptions.VideoRegionBlocked:
raise CustomVideoException(ExceptionCode.REGION_BLOCKED)
except exceptions.VideoUnavailable:
raise CustomVideoException(ExceptionCode.VIDEO_UNAVAILABLE)
except exceptions.PytubeError:
raise CustomVideoException(ExceptionCode.PY_TUBE_ERROR)
except Exception:
raise CustomVideoException(ExceptionCode.MAJOR_ERROR)
video_streams = self.yt.streams.filter(file_extension='mp4')
audio_streams = self.yt.streams.filter(only_audio=True)
self.video_resolutions = list()
self.audio_resolutions = list()
self.title = self.yt.title
self.author = self.yt.author
self.views = self.yt.views
for stream in video_streams:
self.video_resolutions.append({"_type": stream.mime_type, "res": stream.resolution})
for stream in audio_streams:
self.audio_resolutions.append({"_type": stream.mime_type, "abr": stream.abr})
def _download(self, selected_res: str, on_progress_callback):
# return self.yt.streams.get_by_resolution(selected_res)
self.yt.register_on_progress_callback(on_progress_callback)
self.yt.streams.get_by_resolution(selected_res).download()
def on_callback(self, stream, chunk, bytes_remaining):
total_size = stream.filesize
bytes_downloaded = total_size - bytes_remaining
percentage_of_completion = bytes_downloaded / total_size * 100
print(round(percentage_of_completion))
def __str__(self) -> str:
return f'VIDEO:\nTitle: {self.title}, Author: {self.author}, Views: {self.views}\nVideo Resolutions: ' \
f'{self.video_resolutions}\nAudio Resolutions: {self.audio_resolutions}'
class _Playlist:
def __init__(self) -> None:
self.videos = list()
def read_playlist(self, url: str):
temp_playlist = Playlist(url)
for video_url in temp_playlist.video_urls:
self.videos.append({"code": video_url.split("=")[1], "video": Video(video_url)})
def add_video(self, url: str):
if "youtube.com" not in url:
raise CustomVideoException(ExceptionCode.WRONG_URL)
if "watch?v=" not in url:
raise CustomVideoException(ExceptionCode.WRONG_URL)
self.videos.append({"code": url.split("=")[1], "video": Video(url)})
def print_video_urls(self):
for v in self.videos:
print(v["video"])
The thing is that I want to be able to create an object of the Video class and run the _download function in video class asynchronously via a ctk.Button.
After reading this question I tried to apply it to my code. Currently I have make a class "App" that represents my main application and I added a list with tasks self.tasks where all my to do tasks are stored. The problem is that when I press run the function do_search_video() it does nothing.
The task is created but apparently never runs?
My current code is this:
class App(ctk.CTk):
def __init__(self, _loop: asyncio.AbstractEventLoop, interval: float = (1 / 120)):
super().__init__()
ctk.set_default_color_theme("green")
# center window to the screen
app_width = 900
app_height = 500
screen_width = self.winfo_screenwidth()
screen_height = self.winfo_screenheight()
x = (screen_width / 2) - (app_width/2)
y = (screen_height / 2) - (app_height/2)
self.geometry(f"{app_width}x{app_height}+{int(x)}+{int(y)}")
self.i = 0
self.loop = _loop
self.protocol("WM_DELETE_WINDOW", self.close)
self.tasks = list()
self.tasks.append(self.loop.create_task(self.updater(interval)))
self.url_entry = ctk.CTkEntry(self, width=int(app_width*0.8))
self.url_entry.bind(sequence="<Return>", command=self.do_search_video)
self.url_entry.pack(pady=20, padx=20)
self.playlist_frame = PlaylistFrame(self)
self.playlist_frame.add_video_frame("Limmy's Show - Water", 4868637, "LANman247")
self.playlist_frame.pack(padx=20, pady=20)
self.mainloop()
# self.btn = ctk.CTkButton(self, text='Change Label', command=self.do_on_click)
def do_search_video(self, *args):
self.tasks.append(self.loop.create_task(self.search_video()))
async def search_video(self):
try:
self.playlist_frame.playlist.add_video(self.url_entry.get())
self.playlist_frame.add_video_frame(
title=self.playlist_frame.playlist.videos[-1]["video"].title,
author=self.playlist_frame.playlist.videos[-1]["video"].author,
views=self.playlist_frame.playlist.videos[-1]["video"].views,
)
except Exception as e:
print(e)
async def updater(self, interval):
while True:
self.update()
await asyncio.sleep(interval)
I have also tried running the search_video function like a coroutine (this is how I understand it):
def do_search_video(self, *args):
self.tasks.append(self.loop.create_task(asyncio.run(self.search_video())))
@asyncio.coroutine
async def search_video(self):
try:
self.playlist_frame.playlist.add_video(self.url_entry.get())
self.playlist_frame.add_video_frame(
title=self.playlist_frame.playlist.videos[-1]["video"].title,
author=self.playlist_frame.playlist.videos[-1]["video"].author,
views=self.playlist_frame.playlist.videos[-1]["video"].views,
)
except Exception as e:
print(e)
When I run it this way I get the actual outcome I want but I also get an error:
TypeError: a coroutine was expected, got None
Probably cause I don't initialize a coroutine correctly?
The question is how could this be implied? Am I thinking correctly? I am fairly new to asyncio and asynchronous programming as it seems.
Thank you for your help in advance
UPDATE I added 2 more elements in my GUI, "test_label" and c_button, and 2 more functions just to see if the label would change asynchronously:
self.label_stuck = ctk.StringVar()
self.label_stuck.set("a label")
test_label = ctk.CTkLabel(self, width=int(app_width * 0.8), textvariable=self.label_stuck)
test_label.pack()
c_button = ctk.CTkButton(self, width=int(app_width * 0.2), command=self.do_on_click)
c_button.pack()
self.counter = 0
def do_on_click(self):
self.tasks.append(self.loop.create_task(self.on_click()))
async def on_click(self):
self.label_stuck.set(f"{self.counter}")
await asyncio.sleep(5)
self.counter += 1
self.label_stuck.set(f"{self.counter}")
self.counter += 1
If I remove the line self.mainloop() this implementation works. When I press the c_button the label's text changes and after 5 seconds it changes again and GUI remains responsive.
Still though I cannot do it with my functions. It there a way to implement the same concept with the Video class?
Maybe by using threading?
Plus, there is this bizarre error when I close the app:
while executing
"3080665250440update"
("after" script)
invalid command name "3080636189768check_dpi_scaling"
while executing
"3080636189768check_dpi_scaling"
("after" script)