So, i need to make something like botnet but for educational purposes, just to demonstrate how botnet works, i need it for video, so i was trying to write my own but cant make work both websocket server for communication and take input from the user, the code just freezes on the websocket and wont continue to take input from user. Theres my code:
from colorama import Fore, Style
import socket
import title
import os
import asyncio
import websockets
from aioconsole import ainput
connections = 0
started = False
started_lock = asyncio.Lock()
# ----------- TITLE ----------- #
print(title.generate_title())
# ----------- COMMANDS ----------- #
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
async def handler(websocket, path):
global connections
connections += 1
try:
async for message in websocket:
pass
except websockets.exceptions.ConnectionClosed:
connections -= 1
async def start_websocket_server():
global started
try:
async with started_lock:
started = True
start_server = await websockets.serve(handler, "localhost", 5914)
print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}Websocket server started!")
await start_server.wait_closed()
except Exception as e:
print(f"{Fore.RED}[ERR]: {Style.RESET_ALL}An error occurred while starting websocket server: {e}")
def clear():
if os.name == 'nt':
os.system('cls')
else:
os.system('clear')
async def user_input():
while True:
if started:
try:
user_input = await ainput(f"{ip}:{hostname}> ")
if user_input.strip() == "help":
print("""List of available commands:
connections - number of connected bots.
sessions - get all bots and ids
getinfo {id} - get info about bot by id
exit - exit the program""")
elif user_input.strip() == "cls" or user_input.strip() == "clear":
clear()
elif user_input.strip() == "connections":
print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}There are currently connected {connections} bots.")
elif user_input.strip() == "exit":
print("Exiting...")
return
else:
print(f"Unknown command '{user_input}'\nView all commands using 'help'")
except EOFError:
print("EOFError. Exiting...")
break
except KeyboardInterrupt:
print("User interrupted. Exiting...")
break
except Exception as e:
print(f"An error occurred: {e}")
async def main():
print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}Starting websocket server...")
# Start WebSocket server
await start_websocket_server()
# Start user input
await user_input()
if __name__ == "__main__":
asyncio.run(main())
And theres the title library:
from pyfiglet import Figlet
from colorama import Fore, Style
import random
title_fonts = ['cybermedium', 'rectangles', 'cyberlarge', '3-d', 'banner', 'banner3', 'banner4', 'chunky', 'colossal', 'computer', 'cosmic', 'crawford', 'cricket', 'doom', 'epic', 'poison']
try:
import pyfiglet
except ImportError:
import subprocess
subprocess.run(['pip', 'install', 'pyfiglet'])
import pyfiglet
# Generate and return the title with a random readable font
def generate_title():
selected_font = random.choice(title_fonts)
if selected_font == 'cosmic':
fig = Figlet(font=selected_font, width=120)
title = fig.renderText('Control Hub').strip()
else:
fig = Figlet(font=selected_font, width=100)
title = fig.renderText('ControlHub').strip()
return Fore.MAGENTA + title + f"\n" + Style.RESET_ALL
Tried adding multithreads and it helped, but not much cause I have to every time kill python via task manager cause it wont exit using KeyboardInterrupt