Detect when a text editor is run in xterm such as nano,vim etc

177 views Asked by At

I am currently using 5.2.1 version of xterm.js and django in backend. I want to detect when a text editor is run in terminal and log it. My current approach is to filter it using the keywords such as nano,vim etc. The only problem is that the user may use alias so i want to run "alias" command in the background constantly and check if there is any alias of nano or vim and add them to blacklist too. The "alias" command must be run in the bash process that is run from connect function otherwise it would be another terminal instance. Also the user must not see the command in browser so using os.write() is not an option. How can i achieve this in the most foolproof way?

EDIT:

I made some researches and it is very hard to implement current approach. Is there a way to block user from using alias entirely?

index.html:

<script type="module">

        var socket = io.connect({ transports: ["websocket", "polling"] });

        const status = document.getElementById("status")
        const button = document.getElementById("button")
        const fit = new FitAddon.FitAddon();

        var term = new Terminal({
            cursorBlink: true,
        });

        term.loadAddon(fit);
        term.open(document.getElementById('terminal'));
        fit.fit();

        var terminal_line = '';

        term.onKey(e => {
            if (e.key == "\r") {
                terminal_line = term.buffer.active.getLine(term._core.buffer.y)?.translateToString();
                console.log("terminal line: ", terminal_line);
                socket.emit("log_input", { "user_input": terminal_line });
            }

            socket.emit("pty_input", { "input": e.key });
        })

        socket.on("pty_output", function (output) {
            console.log("output: ", output["output"]);
            term.write(output["output"]);
        })

        socket.on("connect", () => {
            status.innerHTML = '<span style="background-color: lightgreen;">connected</span>'
            button.innerHTML = 'Disconnect'
        })

        socket.on("disconnect", () => {
            status.innerHTML = '<span style="background-color: #ff8383;">disconnected</span>'
            button.innerHTML = 'Connect'

        })

        function myFunction() {
            if (button.innerHTML == 'Connect') {
                location.reload();
            }

            else if (button.innerHTML == "Disconnect") {
                socket.emit("disconnect_request")
            }
        }

        function resize() {
            console.log("resized")
            fit.fit()
            socket.emit("resize", { "cols": term.cols, "rows": term.rows })
        }

        window.onresize = resize
        window.onload = resize

</script>

views.py:

import os
from django.shortcuts import render
import socketio
import pty
import select
import subprocess
import struct
import fcntl
import termios
import signal
import time

async_mode = "eventlet"
sio = socketio.Server(async_mode=async_mode)

fd = None
child_pid = None

def index(request):
    return render(request, "index.html")

def set_winsize(fd, row, col, xpix=0, ypix=0):
    winsize = struct.pack("HHHH", row, col, xpix, ypix)
    fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)


def read_and_forward_pty_output():
    global fd

    max_read_bytes = 1024 * 20
    
    while True:
        sio.sleep(0.01)
        if fd:
            timeout_sec = 0
            (data_ready, _, _) = select.select([fd], [], [], timeout_sec)
            
            if data_ready:
                output = os.read(fd, max_read_bytes).decode()
                sio.emit("pty_output", {"output": output})
        else:
            print("process killed")
            return


@sio.event
def resize(sid, message):
    print("entered resize")
    if fd:
        set_winsize(fd, message["rows"], message["cols"])


@sio.event
def pty_input(sid, message):
    if fd:
        os.write(fd, message["input"].encode())

@sio.event
def log_input(sid, user_input):    
    with open("userinput.log", "a") as f:
        f.write(user_input["user_input"]+"\n")

def extract_path(raw_input):
    if "/" not in raw_input: 
        filename = raw_input.split()[-1]
    else:
        filename = raw_input.split()[-1][raw_input.split()[-1].rfind("/")+1:]
    
    path = os.getcwd()+"/"+filename
    print("Filename: ",filename)
    print(os.path.abspath(path))

@sio.event
def disconnect_request(sid):
    sio.disconnect(sid)


@sio.event
def connect(sid, environ):
    global fd
    global child_pid
    
    if child_pid:
        os.write(fd, "\n".encode())
        return

    (child_pid, fd) = pty.fork()

    if child_pid == 0:
        subprocess.run("bash")
        subprocess.run("clear")
    else:
        sio.start_background_task(target=read_and_forward_pty_output)


@sio.event
def disconnect(sid):
    global fd
    global child_pid
    
    os.kill(child_pid, signal.SIGKILL)
    os.wait()

    fd = None
    child_pid = None
    print("Client disconnected")
2

There are 2 answers

4
ivenoidea On BEST ANSWER

At first I was trying to get and save the aliases in the terminal but it is a specific bash process that is run from subprocess module. It means using subprocess.run("alias") is not an option because it is run in a separate process. Also copying the environment variables and starting another bash process is not viable because this is an interactive terminal that can constantly change. After a lot of researches and trials i decided to block the user from using alias entirely. This approach basicly detects "alias" prompt in the frontend and sends kill(ctrl+c) signal to django. It is possible by sending the special character that is equivalent of ctrl+c in the pty_input method. I used that instead of just sending enter or new line because this program mimics the terminal so after typing alias sending those characters is no different than running it.

TLDR:

These changes in the views.py and index.html blocks the usage of "alias" and prints warning logs when user attempts to use text editors.

views.py:

...
    @sio.event
    def log_input(sid, user_input):
        if "root" in user_input["user_input"][:4]:
            prompt = user_input["user_input"][user_input["user_input"].find('#'):].rstrip()
            logger.info(f"User run '{prompt[2:]}' command")
        else:
            prompt = user_input["user_input"][user_input["user_input"].find('$'):].rstrip()
            logger.info(f"User run '{prompt[2:]}' command")
        
        
        if any(keyword in prompt for keyword in command_filter):
            extract_path(prompt[2:])
        
    
    def extract_path(raw_input):
        if "/" not in raw_input: 
            filename = raw_input.split()[-1]
        else:
            filename = raw_input.split()[-1][raw_input.split()[-1].rfind("/")+1:]
    
        path = os.getcwd()+"/"+filename
        
        path = os.path.abspath(path)
        if os.path.exists(path):
            logger.warning(f"User edited file named '{filename}' in {path}")
        else:
            logger.warning(f"User trying to create file named '{filename}' in {path}")
        return path
...

index.html:

...
term.onKey(e => {
            if (e.key == "\r") {
                terminal_line = term.buffer.active.getLine(term._core.buffer.y)?.translateToString();
                console.log("terminal line: ", terminal_line);

                socket.emit("log_input", { "user_input": terminal_line });
            }
            else if (e.key == "\x04"){
                console.log("Exited gracefully")
            }
            
            if(terminal_line.includes("alias")){
                term.write("\r\nalias not allowed!\r\n");
                socket.emit("pty_input", { "input": ""});
                term.write("\r");
                terminal_line = "";
            }
            else{
                socket.emit("pty_input", { "input": e.key });
            }
        })
...
1
Statistics Tutorial On

So I modified your code a bit here. Try the following:

Create a Function to Check for Aliases: Write a function named check_aliases that will send the alias command to the terminal, read the output, and look for aliases of nano or vim.

Update the pty_input Event Handler: Update the pty_input event handler to check if the entered command is in the list of blacklisted aliases and log it if found.

Run the check_aliases Function Periodically: Start a background task to periodically run the check_aliases function to keep the list of blacklisted aliases updated.

import os
import re
import socketio
import pty
import select
import subprocess
import struct
import fcntl
import termios
import signal

# ... other code remains unchanged ...

# Global variable to store blacklisted aliases
blacklist_aliases = set()

def check_aliases():
    global fd
    global blacklist_aliases
    while True:
        sio.sleep(60)  # sleep for 60 seconds
        if fd:
            os.write(fd, "alias\n".encode())
            output = os.read(fd, 1024).decode()
            # Regular expression to match aliases for nano, vim, etc.
            matches = re.findall(r"alias (\w+)='(nano|vim)'", output)
            for match in matches:
                alias_name, _ = match
                blacklist_aliases.add(alias_name)

@sio.event
def pty_input(sid, message):
    global blacklist_aliases
    if fd:
        os.write(fd, message["input"].encode())
        input_command = message["input"].strip()
        # Check if the input command is in the blacklist_aliases
        if input_command in blacklist_aliases:
            with open("userinput.log", "a") as f:
                f.write(f"Alias used for text editor: {input_command}\n")

# ... other code remains unchanged ...

# Start the check_aliases function as a background task
sio.start_background_task(target=check_aliases)

Here is a bit more explaination: The check_aliases function is continuously running in the background, sleeping for 60 seconds between each check. It sends the alias command to the terminal, reads the output, and uses a regular expression to find aliases for nano and vim. If found, these aliases are added to the blacklist_aliases set. The pty_input event handler is updated to check if the entered command is in the blacklist_aliases set. If it is, the alias usage is logged in the userinput.log file. The check_aliases function is started as a background task when the server starts, ensuring that the blacklist is continuously updated. Note: You might need to adjust the sleep time in the check_aliases function based on your preference. Also, consider handling potential conflicts between os.read in different functions.

new edits:

Capture Environment Variables: When spawning the shell, capture its environment variables and store them.

Create a Function to Check for Aliases: This function will run the alias command in the background, capturing its output and updating the blacklist.

Run the check_aliases Function Periodically: Start a background task to periodically run the check_aliases function. import os import re import socketio import pty import select import subprocess import struct import fcntl import termios import signal

# ... other code remains unchanged ...

# Global variable to store blacklisted aliases and environment
blacklist_aliases = set()
shell_env = None

def check_aliases():
    global shell_env
    global blacklist_aliases
    while True:
        sio.sleep(60)  # sleep for 60 seconds
        if shell_env:
            result = subprocess.run(['bash', '-i', '-c', 'alias'], capture_output=True, text=True, env=shell_env)
            output = result.stdout
            # Regular expression to match aliases for nano, vim, etc.
            matches = re.findall(r"alias (\w+)='(nano|vim)'", output)
            for match in matches:
                alias_name, _ = match
                blacklist_aliases.add(alias_name)

@sio.event
def connect(sid, environ):
    global fd
    global child_pid
    global shell_env
    
    if child_pid:
        os.write(fd, "\n".encode())
        return

    (child_pid, fd) = pty.fork()

    if child_pid == 0:
        subprocess.run("bash")
        subprocess.run("clear")
    else:
        # Capture the shell environment
        shell_env = os.environ.copy()
        sio.start_background_task(target=read_and_forward_pty_output)

# ... other code remains unchanged ...

# Start the check_aliases function as a background task
sio.start_background_task(target=check_aliases)

The shell_env variable captures the environment variables when spawning the shell. The check_aliases function uses subprocess.run() to execute the alias command in the background. The capture_output=True argument ensures the output is captured, while the env=shell_env argument ensures the command runs in the context of the shell's environment. The rest of the logic remains the same.