I want to use PyQt5 to access flows in mitmproxy. Since mitmproxy is asynchronous, I use asyncqt.QEventLoop to create a loop to share with mitmproxy. I can start the mitmproxy master service now, but I am unable to stop the master service. How can I correctly stop this task?
import asyncio
from PyQt5 import QtWidgets
from asyncqt import QEventLoop
from mitmproxy import addons
from mitmproxy.options import Options
from mitmproxy.master import Master
class MitmProxy:
def __init__(self, options: Options) -> None:
self.loop = asyncio.get_event_loop()
print(id(loop), loop)
self.master = Master(options, event_loop=self.loop)
self.master.addons.add(*addons.default_addons())
def run(self) -> None:
self.task = self.loop.create_task(self.master.run())
def stop(self) -> None:
self.task.cancel()
if self.task.cancelled():
print("task cancelled, shutdown")
self.master.shutdown()
print('task cancel over')
class MyWidget(QtWidgets.QWidget):
def __init__(self):
super().__init__()
self.loop = asyncio.get_event_loop()
self.mitm = None
self.lyt = QtWidgets.QHBoxLayout(self)
self.btn_start = QtWidgets.QPushButton("Start")
self.btn_stop = QtWidgets.QPushButton("Stop")
self.btn_start.clicked.connect(self.add_task)
self.btn_stop.clicked.connect(self.stop_task)
self.lyt.addWidget(self.btn_start)
self.lyt.addWidget(self.btn_stop)
def add_task(self):
self.mitm = MitmProxy(
Options(listen_host="0.0.0.0", listen_port=8080, http2=True)
)
self.mitm.run()
def stop_task(self):
self.mitm.stop()
if __name__ == "__main__":
app = QtWidgets.QApplication([])
loop = QEventLoop(app)
print(id(loop), loop)
asyncio.set_event_loop(loop)
label = MyWidget()
label.show()
with loop:
loop.run_forever()