Usage of ReadFile(Ex) with BindIoCompletionCallback

1.2k views Asked by At

I'm trying to write an IPC with named pipe.

The server code : http://pastebin.com/tHyAv0e0

The client code : http://pastebin.com/Qd0yGBca

My question is about the server. Following a SO user, i'm trying to use BindIoCompletionCallback() in the server code. The server consists of the following functions:

  • print_last_error : print a readable message of the last error
  • IocpThreadProc : the callback passed to BindIoCompletionCallback(), it calls ConnectNamedPipe() and ReadFile()
  • server_new : creates the named pipe and try to connect to the other end of the pipe (that is, the client), creates an exit event and call BindIoCompletionCallback()
  • server_del : should release the resources
  • the main function which has an infinite loop which wait on the exit event to be signaled.

When the client connects, it send the message "salut, c'est le client !". I have set the buffer of ReadFile() to 5, to test the case where I have to call ReadFile() several times. I have the following output:

connection pending...
waiting for client...
 ** 0, 0
reading data
 * ReadFile : 0
 ** 0, 5
msg:
reading data
 ** 0, 5
 * ReadFile : 5
reading data
msg: , c'e
 * ReadFile : 5
 ** 0, 5
msg: st le
reading data
 * ReadFile : 5
 ** 0, 5
msg:  clie
reading data
 * ReadFile : 5
 ** 0, 4
msg: nt !~
reading data
IO_PENDING
 ** -1073741493, 0
reading data
unexpected error failed with error 109: Le canal de communication a ÚtÚ fermÚ.
WaitForSingleObject : 0

the lines beginning with **: it prints the arguments of the callback

the lines beginning with 'msg' : it prints the message of the buffer filled by Readfile

As the length of the message sent by the client is 24, I should normally get these 5 messages (each of them being of 5 char, except the last one, being of 4 char) :

salut
, c'e
st le 
 clie
nt !

but I can't have the first part of the messge (that is : "salut"). The callback is called when an I/O operation is complete, maybe for this first part. But I have not succeded in calling ReadFile() in a way to get the first part of the message. I have tried to call ReadFile() in the main loop of the main function, in a thread, in server_new(), etc... Everything except the correct way.

Does someone know what to do to fix this issue ?

thank you

1

There are 1 answers

34
RbMm On

your code containing huge count of fundamental errors. more exactly all code - one complete error

look at code snippet (in IocpThreadProc and server_new)

    char buf[READ_BUFSIZE];
    ret = ReadFileEx(svr->pipe, buf, sizeof(buf), &svr->ol, IocpThreadProc);

char buf[READ_BUFSIZE] - this is local variable in function. after you exit from function - this become arbitrary address in stack. so when read operation complete - this faster of all corrupt your stack or will be undefinded result. so this is error. you must pass not stack memory as read buffer or not exit from function until read operation complete

you pass IocpThreadProc as argument to ReadFileEx

lpCompletionRoutine

A pointer to the completion routine to be called when the read operation is complete and the calling thread is in an alertable wait state.

but you never wait in alertable state !

later you use

BindIoCompletionCallback(svr->pipe, IocpThreadProc, 0);

but bind file to IOCP and use APC completion (lpCompletionRoutine ) is mutually exclusive. if say you call BindIoCompletionCallback before ReadFileEx(.., IocpThreadProc) - you will got error ERROR_INVALID_PARAMETER

from NtReadFile source code:

        //
        // If this file has an I/O completion port associated w/it, then
        // ensure that the caller did not supply an APC routine, as the
        // two are mutually exclusive methods for I/O completion
        // notification.
        //

        if (fileObject->CompletionContext && IopApcRoutinePresent( ApcRoutine )) {
            ObDereferenceObject( fileObject );
            return STATUS_INVALID_PARAMETER;
        }

your code "work" ony because you bind IOCP after call ReadFileEx(.., IocpThreadProc). but what happens when read operation is completed ? the APC (for IocpThreadProc) will be inserted to thread and packet queued to IOCP. so IocpThreadProc will be called twice with same data for single operation. it called once only because you never wait in alertable state and not pop APC from thread.

you embedded OVERLAPPED to Server - this is error. you must have unique OVERLAPPED per every asynchronous I/O. more exactly you must define own class, which inherit from OVERLAPPED. have in this class pointer to Server, operation code, may be some additional data. you need allocate this struct before every I/O operation and free it in completion.

GetLastError() in IocpThreadProc !!!

you need use DWORD dwErrorCode here, GetLastError() no sense because here on another thread called, absolte unrelated to operation. and becase this is callback from kernel here really NTSTATUS values is in dwErrorCode, but not win32 errors. say for example on read complete you can got STATUS_PIPE_BROKEN but not ERROR_BROKEN_PIPE but this already big defect in MSDN docs

code example:

class __declspec(novtable) IoObject
{
    friend struct UIRP;

    LONG _dwRef;

public:

    ULONG AddRef()
    {
        return InterlockedIncrement(&_dwRef);
    }

    ULONG Release()
    {
        ULONG dwRef = InterlockedDecrement(&_dwRef);

        if (!dwRef)
        {
            delete this;
        }

        return dwRef;
    }

protected:

    IoObject()
    {
        _dwRef = 1;
    }

    virtual ~IoObject() 
    {
    };

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered) = 0;
};

struct UIRP : OVERLAPPED
{
    IoObject* _obj;
    PVOID _buf;
    ULONG _op;

    UIRP(IoObject* obj, ULONG op, PVOID buf = 0)
    {
        RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
        _obj = obj;
        obj->AddRef();
        _op = op;
        _buf = buf;
    }

    void CheckError(BOOL f)
    {
        if (!f)
        {
            DWORD dwErrorCode = RtlGetLastNtStatus();

            if (dwErrorCode != STATUS_PENDING)
            {
                OnComplete(dwErrorCode, 0);
            }
        }
    }

    ~UIRP()
    {
        _obj->Release();
    }

    static BOOL BindIoCompletion(HANDLE hObject)
    {
        return BindIoCompletionCallback(hObject, _OnComplete, 0);
    }

private:

    static void WINAPI _OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
    {
        static_cast<UIRP*>(lpOverlapped)->OnComplete(dwErrorCode, dwNumberOfBytesTransfered);
    }

    void OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered)
    {
        _obj->OnComplete(dwErrorCode, _op, _buf, dwNumberOfBytesTransfered);
        delete this;
    }
};

class __declspec(novtable) CPipe : public IoObject
{
    enum {
        pipe_connect, pipe_read, pipe_write
    };
protected:
    HANDLE _pipe;
    PBYTE _buf;
    ULONG _dataSize;
    ULONG _bufferSize;

public:

    CPipe()
    {
        _pipe = INVALID_HANDLE_VALUE;
        _buf = 0;
        _dataSize = 0;
        _bufferSize = 0;
    }

    BOOL Create(ULONG bufferSize, PCWSTR name);

    BOOL Listen();

    BOOL Write(const void* data, ULONG cb);

    BOOL Disconnect()
    {
        if (IsServer())
        {
            return DisconnectNamedPipe(_pipe);
        }

        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        return TRUE;
    }

protected:

    BOOL Read();// usually never call direct

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred) = 0;

    virtual BOOL OnConnect() = 0;   

    virtual void OnDisconnect() = 0;

    virtual BOOL IsServer() = 0;

    virtual void OnWrite(DWORD /*dwErrorCode*/)
    {
    }

    virtual ~CPipe()
    {
        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        if (_buf)
        {
            delete _buf;
        }
    }

private:

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered);
};

void CPipe::OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered)
{
    DbgPrint("%u>%s<%p>(%x, %x, %x)\n", IsServer(), __FUNCTION__, this, dwErrorCode, op, dwNumberOfBytesTransfered);

    switch (op)
    {
    case pipe_read:

        switch(dwErrorCode) 
        {
        case STATUS_SUCCESS:
            if (OnRead(buf, dwNumberOfBytesTransfered)) Read();
            break;

        case STATUS_PIPE_BROKEN:        // pipe handle has been closed, server must call DisconnectNamedPipe
        case STATUS_CANCELLED:          // CancelIo[Ex] called
            Disconnect();

        case STATUS_PIPE_DISCONNECTED:  // server call DisconnectNamedPipe
        case STATUS_INVALID_HANDLE:     // we close handle
            OnDisconnect();
            break;

        default:__debugbreak();
        }
        break;

    case pipe_connect:

        switch(dwErrorCode) 
        {
        case STATUS_SUCCESS:            // ERROR_SUCCESS 
        case STATUS_PIPE_CONNECTED:     // ERROR_PIPE_CONNECTED
        case STATUS_PIPE_CLOSING:       // ERROR_NO_DATA (really client can send data before disconnect, exist sense do read)
            if (OnConnect()) Read();
            break;
        case STATUS_PIPE_BROKEN:        // server call CloseHandle before ConnectNamedPipe complete
        case STATUS_PIPE_DISCONNECTED:  // server call DisconnectNamedPipe before ConnectNamedPipe
        case STATUS_CANCELLED:          // server call CancelIo[Ex]
            break;
        default: __debugbreak();
        }
        break;

    case pipe_write:
        OnWrite(dwErrorCode);
        LocalFree(buf);
        break;

    default: __debugbreak();
    }
}

BOOL CPipe::Create(ULONG bufferSize, PCWSTR name)
{
    if (_buf = new UCHAR[bufferSize])
    {
        _bufferSize = bufferSize;
    }
    else
    {
        return FALSE;
    }

    static WCHAR pipeprefix[] = L"\\\\?\\pipe\\";
    PWSTR path = (PWSTR)alloca(wcslen(name) * sizeof(WCHAR) + sizeof(pipeprefix));
    wcscat(wcscpy(path, pipeprefix), name);

    BOOL bServer = IsServer();

    _pipe = bServer 
        ?
    CreateNamedPipeW(path,
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
        PIPE_UNLIMITED_INSTANCES,
        PAGE_SIZE, PAGE_SIZE, INFINITE, NULL)
        :
    CreateFile(path, FILE_READ_ATTRIBUTES|FILE_READ_DATA|
        FILE_WRITE_ATTRIBUTES|FILE_WRITE_DATA, FILE_SHARE_READ|FILE_SHARE_WRITE, 0, OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED, 0);

    if (_pipe == INVALID_HANDLE_VALUE || !UIRP::BindIoCompletion(_pipe))
    {
        return FALSE;
    }

    return bServer ? Listen() : OnComplete(0, pipe_connect, 0, 0), TRUE;
}

BOOL CPipe::Listen()
{
    if (UIRP* irp = new UIRP(this, pipe_connect))
    {
        irp->CheckError(ConnectNamedPipe(_pipe, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Read()
{
    ULONG NumberOfBytesToRead = _bufferSize - _dataSize;

    if (!NumberOfBytesToRead)
    {
        return FALSE;
    }

    PVOID buf = _buf + _dataSize;

    if (UIRP* irp = new UIRP(this, pipe_read, buf))
    {
        irp->CheckError(ReadFile(_pipe, buf, NumberOfBytesToRead, 0, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Write(const void* data, ULONG cb)
{
    if (PVOID buf = LocalAlloc(0, cb))
    {
        if (UIRP* irp = new UIRP(this, pipe_write, buf))
        {
            memcpy(buf, data, cb);

            irp->CheckError(WriteFile(_pipe, buf, cb, 0, irp));

            return TRUE;
        }
    }

    return FALSE;
}

class ServerPipe : public CPipe
{
    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        char sz[256];
        Write(sz, 1 + sprintf(sz, "response from %p server\n", this));

        return TRUE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
        Listen();//
    }

    virtual BOOL IsServer()
    {
        return TRUE;
    }

    virtual ~ServerPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

class ClientPipe : public CPipe
{
    int _n;

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        if (--_n)
        {
            char sz[256];
            Write(sz, 1 + sprintf(sz, "request[%u] from %p client\n", _n, this));
            return TRUE;
        }
        return FALSE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        _n = 3;

        char sz[256];
        Write(sz, 1 + sprintf(sz, "hello from %p client\n", this));

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }

    virtual BOOL IsServer()
    {
        return FALSE;
    }

    virtual ~ClientPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

DWORD CALLBACK ClientThread(void* name)
{
    int n = 2;
    do 
    {
        MessageBox(0,0,L"client",MB_ICONWARNING);
        if (ClientPipe* p = new ClientPipe)
        {
            p->Create(PAGE_SIZE, (PCWSTR)name);
            p->Release();
        }
    } while (--n);

    return 0;
}

void pipeTest()
{
    static WCHAR sname[] = L"__test_pipe__";

    if (HANDLE hThread = CreateThread(0, 0, ClientThread, sname, 0, 0))
    {
        CloseHandle(hThread);
    }

    if (ServerPipe* p = new ServerPipe)
    {
        p->Create(PAGE_SIZE, sname);
        p->Release();
    }

    MessageBox(0,0,0,0);
}

and about DWORD dwErrorCode in

VOID CALLBACK FileIOCompletionRoutine(
  __in  DWORD dwErrorCode,
  __in  DWORD dwNumberOfBytesTransfered,
  __in  LPOVERLAPPED lpOverlapped
);

in BindIoCompletionCallback documentation exist unclarity

Return value

If the function succeeds, the return value is nonzero.

If the function fails, the return value is zero. To get extended error information, call the GetLastError function. The value returned is an NTSTATUS error code. To retrieve the corresponding system error code, use the RtlNtStatusToDosError function.

what is mean under The value returned is an NTSTATUS error code ? what return value ?

this is DWORD dwErrorCode in FileIOCompletionRoutine

really we pass to kernel mode pointer to IO_STATUS_BLOCK (first 2 members of OVERLAPPED is IO_STATUS_BLOCK actually). when asynchronous operation complete - kernel fill IO_STATUS_BLOCK and queue packet to IOCP (or APC to Thread). ntdll extract PIO_STATUS_BLOCK from IOCP (so we got back pointer to our OVERLAPPED passed to I/O api), and fill

dwErrorCode = Iosb->Status, 
dwNumberOfBytesTransfered = (ULONG)Iosb->Information, 
lpOverlapped = (LPOVERLAPPED)Iosb; 

system not do conversion

dwErrorCode = RtlNtStatusToDosError(Iosb->Status)

but direct assign NTSTATUS to DWORD dwErrorCode - so in FileIOCompletionRoutine we must compare dwErrorCode not with wi32 error codes but with NTSTATUS codes (from "ntstatus.h" )

so we never seen ERROR_BROKEN_PIPE or ERROR_PIPE_NOT_CONNECTED in FileIOCompletionRoutine, but STATUS_PIPE_BROKEN or STATUS_PIPE_DISCONNECTED


and code example by using new Thread Pool API instead BindIoCompletionCallback. here big advantage that in IoCompletionCallback (PTP_WIN32_IO_CALLBACK) callback function in place ULONG IoResult already used win32 error, but not raw NTSTATUS ( IoResult = RtlNtStatusToDosError(Iosb->Status) and note ULONG_PTR NumberOfBytesTransferred (vs ULONG dwNumberOfBytesTransfered from FileIOCompletionRoutine (LPOVERLAPPED_COMPLETION_ROUTINE) callback function and compare this with ULONG_PTR Information from IO_STATUS_BLOCK. )

#define StartIo(irp, pio, f) StartThreadpoolIo(_pio); irp->CheckError(f, _pio);

class __declspec(novtable) IoObject
{
    friend struct UIRP;

    LONG _dwRef;

public:

    ULONG AddRef()
    {
        return InterlockedIncrement(&_dwRef);
    }

    ULONG Release()
    {
        ULONG dwRef = InterlockedDecrement(&_dwRef);

        if (!dwRef)
        {
            delete this;
        }

        return dwRef;
    }

protected:

    IoObject()
    {
        _dwRef = 1;
    }

    virtual ~IoObject() 
    {
    };

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered) = 0;
};

struct UIRP : OVERLAPPED
{
    IoObject* _obj;
    PVOID _buf;
    ULONG _op;

    UIRP(IoObject* obj, ULONG op, PVOID buf = 0)
    {
        RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
        _obj = obj;
        obj->AddRef();
        _op = op;
        _buf = buf;
    }

    void CheckError(BOOL f, PTP_IO pio)
    {
        if (!f)
        {
            DWORD dwErrorCode = GetLastError();

            if (dwErrorCode != ERROR_IO_PENDING)
            {
                CancelThreadpoolIo(pio);
                OnComplete(dwErrorCode, 0);
            }
        }
    }

    ~UIRP()
    {
        _obj->Release();
    }

    static PTP_IO BindIoCompletion(HANDLE hObject)
    {
        return CreateThreadpoolIo(hObject, _IoCompletionCallback, 0, 0);
    }

private:

    static VOID CALLBACK _IoCompletionCallback(
        __inout      PTP_CALLBACK_INSTANCE /*Instance*/,
        __inout_opt  PVOID /*Context*/,
        __inout_opt  PVOID Overlapped,
        __in         ULONG IoResult,
        __in         ULONG_PTR NumberOfBytesTransferred,
        __inout      PTP_IO /*Io*/
        )
    {
        static_cast<UIRP*>(Overlapped)->OnComplete(IoResult, (ULONG)NumberOfBytesTransferred);
    }

    void OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered)
    {
        _obj->OnComplete(dwErrorCode, _op, _buf, dwNumberOfBytesTransfered);
        delete this;
    }
};

class __declspec(novtable) CPipe : public IoObject
{
    enum {
        pipe_connect, pipe_read, pipe_write
    };
protected:
    HANDLE _pipe;
    PTP_IO _pio;
    PBYTE _buf;
    ULONG _dataSize;
    ULONG _bufferSize;

public:

    CPipe()
    {
        _pipe = INVALID_HANDLE_VALUE;
        _buf = 0;
        _dataSize = 0;
        _bufferSize = 0;
        _pio = 0;
    }

    BOOL Create(ULONG bufferSize, PCWSTR name);

    BOOL Listen();

    BOOL Write(const void* data, ULONG cb);

    BOOL Disconnect()
    {
        if (IsServer())
        {
            return DisconnectNamedPipe(_pipe);
        }

        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        return TRUE;
    }

protected:

    BOOL Read();// usually never call direct

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred) = 0;

    virtual BOOL OnConnect() = 0;   

    virtual void OnDisconnect() = 0;

    virtual BOOL IsServer() = 0;

    virtual void OnWrite(DWORD /*dwErrorCode*/)
    {
    }

    virtual ~CPipe()
    {
        if (_pio)
        {
            CloseThreadpoolIo(_pio);
        }

        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        if (_buf)
        {
            delete _buf;
        }
    }

private:

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered);
};

void CPipe::OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered)
{
    DbgPrint("%u>%s<%p>(%x, %x, %x)\n", IsServer(), __FUNCTION__, this, dwErrorCode, op, dwNumberOfBytesTransfered);

    switch (op)
    {
    case pipe_read:

        switch(dwErrorCode) 
        {
        case ERROR_SUCCESS:
            if (OnRead(buf, dwNumberOfBytesTransfered)) Read();
            break;

        case ERROR_BROKEN_PIPE:         // pipe handle has been closed , server must call DisconnectNamedPipe
        case ERROR_OPERATION_ABORTED:   // CancelIo[Ex] called
            Disconnect();

        case ERROR_PIPE_NOT_CONNECTED:  // server call DisconnectNamedPipe
        case ERROR_INVALID_HANDLE:      // we close handle
            OnDisconnect();
            break;

        default:__debugbreak();
        }
        break;

    case pipe_connect:

        switch(dwErrorCode) 
        {
        case ERROR_SUCCESS:             // client just connected 
        case ERROR_PIPE_CONNECTED:      // client already connected
        case ERROR_NO_DATA:             // client already connected and disconnected (really client can send data before disconnect, exist sense do read)
            if (OnConnect()) Read();
            break;
        case ERROR_BROKEN_PIPE:         // server call CloseHandle before ConnectNamedPipe complete
        case ERROR_PIPE_NOT_CONNECTED:  // server call DisconnectNamedPipe before ConnectNamedPipe
        case ERROR_OPERATION_ABORTED:   // server call CancelIo[Ex]
            break;
        default: __debugbreak();
        }
        break;

    case pipe_write:
        OnWrite(dwErrorCode);
        LocalFree(buf);
        break;

    default: __debugbreak();
    }
}

BOOL CPipe::Create(ULONG bufferSize, PCWSTR name)
{
    if (_buf = new UCHAR[bufferSize])
    {
        _bufferSize = bufferSize;
    }
    else
    {
        return FALSE;
    }

    static WCHAR pipeprefix[] = L"\\\\?\\pipe\\";
    PWSTR path = (PWSTR)alloca(wcslen(name) * sizeof(WCHAR) + sizeof(pipeprefix));
    wcscat(wcscpy(path, pipeprefix), name);

    BOOL bServer = IsServer();

    _pipe = bServer 
        ?
    CreateNamedPipeW(path,
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
        PIPE_UNLIMITED_INSTANCES,
        PAGE_SIZE, PAGE_SIZE, INFINITE, NULL)
        :
    CreateFile(path, FILE_READ_ATTRIBUTES|FILE_READ_DATA|
        FILE_WRITE_ATTRIBUTES|FILE_WRITE_DATA, FILE_SHARE_READ|FILE_SHARE_WRITE, 0, OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED, 0);

    if (_pipe == INVALID_HANDLE_VALUE || !(_pio = UIRP::BindIoCompletion(_pipe)))
    {
        return FALSE;
    }

    return bServer ? Listen() : OnComplete(0, pipe_connect, 0, 0), TRUE;
}

BOOL CPipe::Listen()
{
    if (UIRP* irp = new UIRP(this, pipe_connect))
    {
        StartIo(irp, _pio, ConnectNamedPipe(_pipe, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Read()
{
    ULONG NumberOfBytesToRead = _bufferSize - _dataSize;

    if (!NumberOfBytesToRead)
    {
        return FALSE;
    }

    PVOID buf = _buf + _dataSize;

    if (UIRP* irp = new UIRP(this, pipe_read, buf))
    {
        StartIo(irp, _pio, ReadFile(_pipe, buf, NumberOfBytesToRead, 0, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Write(const void* data, ULONG cb)
{
    if (PVOID buf = LocalAlloc(0, cb))
    {
        if (UIRP* irp = new UIRP(this, pipe_write, buf))
        {
            memcpy(buf, data, cb);

            StartIo(irp, _pio, WriteFile(_pipe, buf, cb, 0, irp));

            return TRUE;
        }
    }

    return FALSE;
}

class ServerPipe : public CPipe
{
    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        char sz[256];
        Write(sz, 1 + sprintf(sz, "response from %p server\n", this));

        return TRUE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
        Listen();//
    }

    virtual BOOL IsServer()
    {
        return TRUE;
    }

    virtual ~ServerPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

class ClientPipe : public CPipe
{
    int _n;

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        if (--_n)
        {
            char sz[256];
            Write(sz, 1 + sprintf(sz, "request[%u] from %p client\n", _n, this));
            return TRUE;
        }

        return FALSE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        _n = 3;

        char sz[256];
        Write(sz, 1 + sprintf(sz, "hello from %p client\n", this));

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }

    virtual BOOL IsServer()
    {
        return FALSE;
    }

    virtual ~ClientPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

DWORD CALLBACK ClientThread(void* name)
{
    int n = 2;
    do 
    {
        MessageBox(0,0,L"client",MB_ICONWARNING);
        if (ClientPipe* p = new ClientPipe)
        {
            p->Create(PAGE_SIZE, (PCWSTR)name);
            p->Release();
        }
    } while (--n);

    return 0;
}

void pipeTest()
{
    static WCHAR sname[] = L"__test_pipe__";

    if (HANDLE hThread = CreateThread(0, 0, ClientThread, sname, 0, 0))
    {
        CloseHandle(hThread);
    }

    if (ServerPipe* p = new ServerPipe)
    {
        p->Create(PAGE_SIZE, sname);
        p->Release();
    }

    MessageBox(0,0,0,0);
}