Python NtQueryDirectoryFile (File information structure)

1.1k views Asked by At

I've written a simple (test) script to list files in a selected directory. Not using FindFirstFile; only native API. When I execute the script and watch, Win32API monitor tells me STATUS_SUCCESS. My File Information buffer is c_buffer(1024), not using a Unicode buffer to see the raw data.

So after call NtQueryDirectoryFile all is ok. When I write c_buffer in raw mode to console to see the files in the directory, the output is not structured. I created a FILE_DIRECTORY_INFORMATION structure but either it does not work Windows 7 X86 or there's a problem in my code.

My Question: Please tell me which FILE_DIRECTORY_INFORMATION structure use on Windows 7 X86 or any variants

from ctypes import *

hFile = windll.kernel32.CreateFileW("C:\\a",0x80000000,0,0,3,0x02000000,0)

class Info(Union):
    _fields_ = [('STATUS',c_long),
                ('Pointer',c_ulong),]


class io_stat(Structure):
    _fields_ = [('Stat',Info),
                ('Information',c_ulong),]


class FILE_OBJECT(Structure):
    _fields_ = [('Next',c_ulong),
                ('FileIndex',c_ulong),
                ('ctime',c_longlong),
                ('lat',c_longlong),
                ('wtime',c_longlong),
                ('ch',c_longlong),
                ('Endogfile',c_longlong),
                ('allo',c_longlong),
                ('Fileattr',c_ulong),
                ('Filenalen',c_ulong),
                ('Filename',c_wchar * 2),]

b = io_stat()
a = c_buffer(1024)

windll.ntdll.NtQueryDirectoryFile(hFile,0,0,0,byref(b),byref(a),sizeof(a), 1,0,None,0)

print(a.raw)

Not optimized.

1

There are 1 answers

1
Eryk Sun On BEST ANSWER

NtQueryDirectoryFile should be called in a loop until it returns STATUS_NO_MORE_FILES. If either the returned status is STATUS_BUFFER_OVERFLOW or the status is successful (non-negative) with the status block Information as 0, then double the buffer size and try again. For each successful pass, copy the FILE_DIRECTORY_INFORMATION records out of the buffer. Each record has to be sized to include the FileName. You've reached the end when the Next field is 0.

The following example subclasses FILE_DIRECTORY_INFORMATION as a DirEntry class that has a listbuf class method to list the records in a queried buffer. It skips the "." and ".." entries. It uses this class in an ntlistdir function that lists the DirEntry records for a given directory via NtQueryDirectoryFile. It supports passing an open file descriptor as the path argument, which is like how os.listdir works on POSIX systems.

ctypes definitions

import os
import msvcrt
import ctypes

from ctypes import wintypes

ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

def NtError(status):
    err = ntdll.RtlNtStatusToDosError(status)
    return ctypes.WinError(err)

NTSTATUS = wintypes.LONG
STATUS_BUFFER_OVERFLOW = NTSTATUS(0x80000005).value
STATUS_NO_MORE_FILES = NTSTATUS(0x80000006).value
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value

ERROR_DIRECTORY = 0x010B
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 1
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_ATTRIBUTE_DIRECTORY = 0x0010

FILE_INFORMATION_CLASS = wintypes.ULONG
FileDirectoryInformation = 1
FileBasicInformation = 4

LPSECURITY_ATTRIBUTES = wintypes.LPVOID
PIO_APC_ROUTINE = wintypes.LPVOID
ULONG_PTR = wintypes.WPARAM

class UNICODE_STRING(ctypes.Structure):
    _fields_ = (('Length',        wintypes.USHORT),
                ('MaximumLength', wintypes.USHORT),
                ('Buffer',        wintypes.LPWSTR))

PUNICODE_STRING = ctypes.POINTER(UNICODE_STRING)

class IO_STATUS_BLOCK(ctypes.Structure):
    class _STATUS(ctypes.Union):
        _fields_ = (('Status',  NTSTATUS),
                    ('Pointer', wintypes.LPVOID))
    _anonymous_ = '_Status',
    _fields_ = (('_Status',     _STATUS),
                ('Information', ULONG_PTR))

PIO_STATUS_BLOCK = ctypes.POINTER(IO_STATUS_BLOCK)

ntdll.NtQueryInformationFile.restype = NTSTATUS
ntdll.NtQueryInformationFile.argtypes = (
    wintypes.HANDLE,        # In  FileHandle
    PIO_STATUS_BLOCK,       # Out IoStatusBlock
    wintypes.LPVOID,        # Out FileInformation
    wintypes.ULONG,         # In  Length
    FILE_INFORMATION_CLASS) # In  FileInformationClass

ntdll.NtQueryDirectoryFile.restype = NTSTATUS
ntdll.NtQueryDirectoryFile.argtypes = (
    wintypes.HANDLE,        # In     FileHandle
    wintypes.HANDLE,        # In_opt Event
    PIO_APC_ROUTINE,        # In_opt ApcRoutine
    wintypes.LPVOID,        # In_opt ApcContext
    PIO_STATUS_BLOCK,       # Out    IoStatusBlock
    wintypes.LPVOID,        # Out    FileInformation
    wintypes.ULONG,         # In     Length
    FILE_INFORMATION_CLASS, # In     FileInformationClass
    wintypes.BOOLEAN,       # In     ReturnSingleEntry
    PUNICODE_STRING,        # In_opt FileName
    wintypes.BOOLEAN)       # In     RestartScan

kernel32.CreateFileW.restype = wintypes.HANDLE
kernel32.CreateFileW.argtypes = (
    wintypes.LPCWSTR,      # In     lpFileName
    wintypes.DWORD,        # In     dwDesiredAccess
    wintypes.DWORD,        # In     dwShareMode
    LPSECURITY_ATTRIBUTES, # In_opt lpSecurityAttributes
    wintypes.DWORD,        # In     dwCreationDisposition
    wintypes.DWORD,        # In     dwFlagsAndAttributes
    wintypes.HANDLE)       # In_opt hTemplateFile

class FILE_BASIC_INFORMATION(ctypes.Structure):
    _fields_ = (('CreationTime',   wintypes.LARGE_INTEGER),
                ('LastAccessTime', wintypes.LARGE_INTEGER),
                ('LastWriteTime',  wintypes.LARGE_INTEGER),
                ('ChangeTime',     wintypes.LARGE_INTEGER),
                ('FileAttributes', wintypes.ULONG))

class FILE_DIRECTORY_INFORMATION(ctypes.Structure):
    _fields_ = (('_Next',          wintypes.ULONG),
                ('FileIndex',      wintypes.ULONG),
                ('CreationTime',   wintypes.LARGE_INTEGER),
                ('LastAccessTime', wintypes.LARGE_INTEGER),
                ('LastWriteTime',  wintypes.LARGE_INTEGER),
                ('ChangeTime',     wintypes.LARGE_INTEGER),
                ('EndOfFile',      wintypes.LARGE_INTEGER),
                ('AllocationSize', wintypes.LARGE_INTEGER),
                ('FileAttributes', wintypes.ULONG),
                ('FileNameLength', wintypes.ULONG),
                ('_FileName',      wintypes.WCHAR * 1))

    @property
    def FileName(self):
        addr = ctypes.addressof(self) + type(self)._FileName.offset
        size = self.FileNameLength // ctypes.sizeof(wintypes.WCHAR)
        return (wintypes.WCHAR * size).from_address(addr).value

DirEntry and ntlistdir

class DirEntry(FILE_DIRECTORY_INFORMATION):
    def __repr__(self):
        return '<{} {!r}>'.format(self.__class__.__name__, self.FileName)

    @classmethod
    def listbuf(cls, buf):
        result = []
        base_size = ctypes.sizeof(cls) - ctypes.sizeof(wintypes.WCHAR)
        offset = 0
        while True:
            fdi = cls.from_buffer(buf, offset)
            if fdi.FileNameLength and fdi.FileName not in ('.', '..'):
                cfdi = cls()
                size = base_size + fdi.FileNameLength
                ctypes.resize(cfdi, size)
                ctypes.memmove(ctypes.byref(cfdi), ctypes.byref(fdi), size)
                result.append(cfdi)
            if fdi._Next:
                offset += fdi._Next
            else:
                break
        return result

def isdir(path):
    if not isinstance(path, int):
        return os.path.isdir(path)
    try:
        hFile = msvcrt.get_osfhandle(path)
    except IOError:
        return False
    iosb = IO_STATUS_BLOCK()
    info = FILE_BASIC_INFORMATION()
    status = ntdll.NtQueryInformationFile(hFile, ctypes.byref(iosb),
                ctypes.byref(info), ctypes.sizeof(info),
                FileBasicInformation)
    return bool(status >= 0 and info.FileAttributes & FILE_ATTRIBUTE_DIRECTORY)

def ntlistdir(path=None):
    result = []

    if path is None:
        path = os.getcwd()

    if isinstance(path, int):
        close = False
        fd = path
        hFile = msvcrt.get_osfhandle(fd)
    else:
        close = True
        hFile = kernel32.CreateFileW(path, GENERIC_READ, FILE_SHARE_READ,
                    None, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, None)
        if hFile == INVALID_HANDLE_VALUE:
            raise ctypes.WinError(ctypes.get_last_error())
        fd = msvcrt.open_osfhandle(hFile, os.O_RDONLY)

    try:
        if not isdir(fd):
            raise ctypes.WinError(ERROR_DIRECTORY)
        iosb = IO_STATUS_BLOCK()
        info = (ctypes.c_char * 4096)()
        while True:
            status = ntdll.NtQueryDirectoryFile(hFile, None, None, None,
                        ctypes.byref(iosb), ctypes.byref(info),
                        ctypes.sizeof(info), FileDirectoryInformation,
                        False, None, False)
            if (status == STATUS_BUFFER_OVERFLOW or
                iosb.Information == 0 and status >= 0):
                info = (ctypes.c_char * (ctypes.sizeof(info) * 2))()
            elif status == STATUS_NO_MORE_FILES:
                break
            elif status >= 0:
                sublist = DirEntry.listbuf(info)
                result.extend(sublist)
            else:
                raise NtError(status)
    finally:
        if close:
            os.close(fd)

    return result

Example

if __name__ == '__main__':
    import sys
    for entry in ntlistdir(sys.exec_prefix):
        print(entry)

Output:

<DirEntry 'DLLs'>
<DirEntry 'include'>
<DirEntry 'Lib'>
<DirEntry 'libs'>
<DirEntry 'LICENSE.txt'>
<DirEntry 'NEWS.txt'>
<DirEntry 'python.exe'>
<DirEntry 'python.pdb'>
<DirEntry 'python3.dll'>
<DirEntry 'python36.dll'>
<DirEntry 'python36.pdb'>
<DirEntry 'python36_d.dll'>
<DirEntry 'python36_d.pdb'>
<DirEntry 'python3_d.dll'>
<DirEntry 'pythonw.exe'>
<DirEntry 'pythonw.pdb'>
<DirEntry 'pythonw_d.exe'>
<DirEntry 'pythonw_d.pdb'>
<DirEntry 'python_d.exe'>
<DirEntry 'python_d.pdb'>
<DirEntry 'Scripts'>
<DirEntry 'tcl'>
<DirEntry 'Tools'>
<DirEntry 'vcruntime140.dll'>