[ Team LiB ] |
Recipe 13.13 Using Named Pipes to CommunicateProblemYou need a way to use named pipes to communicate with another application across the network. SolutionCreate a P/Invoke wrapper class for the named pipe APIs in Kernel32.dll to allow for managed access, and then create a managed client and managed server class to work with named pipes. Here are the named pipe interop wrappers in a class called NamedPipeInterop: namespace NamedPipes { /// <summary> /// Imported namedpipe entry points for p/invoke into native code. /// </summary> [SuppressUnmanagedCodeSecurity] public class NamedPipeInterop { // #defines related to named pipe processing public const uint PIPE_ACCESS_OUTBOUND = 0x00000002; public const uint PIPE_ACCESS_DUPLEX = 0x00000003; public const uint PIPE_ACCESS_INBOUND = 0x00000001; public const uint PIPE_WAIT = 0x00000000; public const uint PIPE_NOWAIT = 0x00000001; public const uint PIPE_READMODE_BYTE = 0x00000000; public const uint PIPE_READMODE_MESSAGE = 0x00000002; public const uint PIPE_TYPE_BYTE = 0x00000000; public const uint PIPE_TYPE_MESSAGE = 0x00000004; public const uint PIPE_CLIENT_END = 0x00000000; public const uint PIPE_SERVER_END = 0x00000001; public const uint PIPE_UNLIMITED_INSTANCES = 255; public const uint NMPWAIT_WAIT_FOREVER = 0xffffffff; public const uint NMPWAIT_NOWAIT = 0x00000001; public const uint NMPWAIT_USE_DEFAULT_WAIT = 0x00000000; public const uint GENERIC_READ = (0x80000000); public const uint GENERIC_WRITE = (0x40000000); public const uint GENERIC_EXECUTE = (0x20000000); public const uint GENERIC_ALL = (0x10000000); public const uint CREATE_NEW = 1; public const uint CREATE_ALWAYS = 2; public const uint OPEN_EXISTING = 3; public const uint OPEN_ALWAYS = 4; public const uint TRUNCATE_EXISTING = 5; public const int INVALID_HANDLE_VALUE = -1; public const uint ERROR_PIPE_BUSY = 231; public const uint ERROR_NO_DATA = 232; public const uint ERROR_PIPE_NOT_CONNECTED = 233; public const uint ERROR_MORE_DATA = 234; public const uint ERROR_PIPE_CONNECTED = 535; public const uint ERROR_PIPE_LISTENING = 536; public static int GetLastError( ) { return Marshal.GetLastWin32Error( ); } [DllImport("kernel32.dll", SetLastError=true)] public static extern bool CallNamedPipe( string lpNamedPipeName, byte[] lpInBuffer, uint nInBufferSize, byte[] lpOutBuffer, uint nOutBufferSize, byte[] lpBytesRead, uint nTimeOut); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool CloseHandle(int hObject); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool ConnectNamedPipe( int hNamedPipe, // handle to named pipe IntPtr lpOverlapped // overlapped structure ); [DllImport("kernel32.dll", SetLastError=true)] public static extern int CreateNamedPipe( String lpName, // pipe name uint dwOpenMode, // pipe open mode uint dwPipeMode, // pipe-specific modes uint nMaxInstances, // maximum number of instances uint nOutBufferSize, // output buffer size uint nInBufferSize, // input buffer size uint nDefaultTimeOut, // time-out interval //SecurityAttributes attr IntPtr pipeSecurityDescriptor // security descriptor ); [DllImport("kernel32.dll", SetLastError=true)] public static extern int CreatePipe( int hReadPipe, int hWritePipe, IntPtr lpPipeAttributes, uint nSize); [DllImport("kernel32.dll", SetLastError=true)] public static extern int CreateFile( String lpFileName, // filename uint dwDesiredAccess, // access mode uint dwShareMode, // share mode IntPtr attr, // security descriptor uint dwCreationDisposition, // how to create uint dwFlagsAndAttributes, // file attributes uint hTemplateFile); // handle to template file [DllImport("kernel32.dll", SetLastError=true)] public static extern bool DisconnectNamedPipe(int hNamedPipe); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool FlushFileBuffers(int hFile); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool GetNamedPipeHandleState( int hNamedPipe, IntPtr lpState, IntPtr lpCurInstances, IntPtr lpMaxCollectionCount, IntPtr lpCollectDataTimeout, string lpUserName, uint nMaxUserNameSize); [DllImport("KERNEL32.DLL", SetLastError=true)] public static extern bool GetNamedPipeInfo( int hNamedPipe, out uint lpFlags, out uint lpOutBufferSize, out uint lpInBufferSize, out uint lpMaxInstances); [DllImport("KERNEL32.DLL", SetLastError=true)] public static extern bool PeekNamedPipe( int hNamedPipe, byte[] lpBuffer, uint nBufferSize, byte[] lpBytesRead, out uint lpTotalBytesAvail, out uint lpBytesLeftThisMessage); [DllImport("KERNEL32.DLL", SetLastError=true)] public static extern bool SetNamedPipeHandleState( int hNamedPipe, ref int lpMode, IntPtr lpMaxCollectionCount, IntPtr lpCollectDataTimeout); [DllImport("KERNEL32.DLL", SetLastError=true)] public static extern bool TransactNamedPipe( int hNamedPipe, byte [] lpInBuffer, uint nInBufferSize, [Out] byte [] lpOutBuffer, uint nOutBufferSize, IntPtr lpBytesRead, IntPtr lpOverlapped); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool WaitNamedPipe( string name, uint timeout); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool ReadFile( int hFile, // handle to file byte[] lpBuffer, // data buffer uint nNumberOfBytesToRead, // number of bytes to read byte[] lpNumberOfBytesRead, // number of bytes read uint lpOverlapped // overlapped buffer ); [DllImport("kernel32.dll", SetLastError=true)] public static extern bool WriteFile( int hFile, // handle to file byte[] lpBuffer, // data buffer uint nNumberOfBytesToWrite, // number of bytes to write byte[] lpNumberOfBytesWritten, // number of bytes written uint lpOverlapped // overlapped buffer ); } } // end namespace NamedPipes Now, using the interop wrappers, we can create a named pipe client class named NamedPipeClient: namespace NamedPipes { /// <summary> /// NamedPipeClient - An implementation of a synchronous, /// message-based, named pipe client /// ///</summary> public class NamedPipeClient : IDisposable { /// <summary> /// the full name of the pipe being connected to /// </summary> string _pipeName = ""; /// <summary> /// the pipe handle once connected /// </summary> int _handle = NamedPipeInterop.INVALID_HANDLE_VALUE; /// <summary> /// default response buffer size (1K) /// </summary> uint _responseBufferSize = 1024; /// <summary> /// indicates if this has been closed once which calls /// for us to re-register for finalization on subsequent /// connect calls /// </summary> bool disposedOnce = false; /// <summary> /// WriteMessageResponseDelegate - callback for when a response /// to when a WriteMessage returns from the server /// /// </summary> public delegate void WriteMessageResponseDelegate(MemoryStream responseStream); /// <summary> /// CTOR /// </summary> /// <param name="pipeName">name of the pipe</param> public NamedPipeClient(string pipeName) { _pipeName = pipeName; } /// <summary> /// Finalizer /// </summary> ~NamedPipeClient( ) { Dispose( ); } /// <summary> /// Dispose /// </summary> public void Dispose( ) { if(_handle != NamedPipeInterop.INVALID_HANDLE_VALUE) { NamedPipeInterop.CloseHandle(_handle); _handle = NamedPipeInterop.INVALID_HANDLE_VALUE; } // Suppress Finalization since we have now cleaned up our // handle System.GC.SuppressFinalize(this); // indicate we have disposed at least once if(disposedOnce == false) disposedOnce = true; } /// <summary> /// Close - because it is more intuitive than Dispose... :) /// </summary> public void Close( ) { Dispose( ); } /// <summary> /// ResponseBufferSize Property - the size used to create response buffers /// for messages written using WriteMessage /// </summary> public uint ResponseBufferSize { get { return _responseBufferSize; } set { _responseBufferSize = value; } } /// <summary> /// Connect - connect to an existing pipe /// </summary> /// <returns>true if connected</returns> public bool Connect( ) { if(disposedOnce == true) System.GC.ReRegisterForFinalize(this); if(_handle != NamedPipeInterop.INVALID_HANDLE_VALUE) throw new InvalidOperationException("Pipe is already connected!"); // keep trying to connect while (true) { // connect to existing pipe _handle = NamedPipeInterop.CreateFile(_pipeName, NamedPipeInterop.GENERIC_READ | NamedPipeInterop.GENERIC_WRITE, 0, IntPtr.Zero, NamedPipeInterop.OPEN_EXISTING, 0, 0); // check to see if we connected if(_handle != NamedPipeInterop.INVALID_HANDLE_VALUE) break; // the pipe could not be opened as all instances are busy // any other error we bail for if(NamedPipeInterop.GetLastError( ) != NamedPipeInterop.ERROR_PIPE_BUSY) { Debug.WriteLine("Could not open pipe: " + _pipeName); return false; } // if it was busy, see if we can wait it out for 20 seconds if(!NamedPipeInterop.WaitNamedPipe(_pipeName, 20000)) { Debug.WriteLine("Specified pipe was over-burdened: " + _pipeName); return false; } } // indicate connection in debug Debug.WriteLine("Connected to pipe: " + _pipeName); // The pipe connected; change to message-read mode. bool success = false; int mode = (int) NamedPipeInterop.PIPE_READMODE_MESSAGE; // set to message mode success = NamedPipeInterop.SetNamedPipeHandleState( _handle, // pipe handle ref mode, // new pipe mode IntPtr.Zero, // don't set maximum bytes IntPtr.Zero); // don't set maximum time // currently implemented for just synchronous, message-based pipes, // so bail if we couldn't set the client up properly if(false == success) { Debug.WriteLine("Could not change pipe mode to message," + " shutting client down."); Dispose( ); return false; } return true; } /// <summary> /// WriteMessage - write an array of bytes and return the response from the /// server /// </summary> /// <param name="buffer">bytes to write</param> /// <param name="bytesToWrite">number of bytes to write</param> /// <param name="ResponseDelegate">callback with the message response</param> /// <returns>true if written successfully</returns> public bool WriteMessage(byte [] buffer, // the write buffer uint bytesToWrite, // number of bytes in the write buffer WriteMessageResponseDelegate ResponseDelegate) // callback for // message responses { // buffer to get the number of bytes read/written back byte[] _numReadWritten = new byte[4]; bool success = false; // Write the byte buffer to the pipe success = NamedPipeInterop.WriteFile(_handle, buffer, bytesToWrite, _numReadWritten, 0); if(true == success) { byte[] responseBuffer = new byte[_responseBufferSize]; int size = Convert.ToInt32(_responseBufferSize); MemoryStream fullBuffer = new MemoryStream(size); do { // Read the response from the pipe. success = NamedPipeInterop.ReadFile( _handle, // pipe handle responseBuffer, // buffer to receive reply _responseBufferSize, // size of buffer _numReadWritten, // number of bytes read 0); // not overlapped // failed, not just more data to come if (! success && NamedPipeInterop.GetLastError( ) != NamedPipeInterop.ERROR_MORE_DATA) break; // concat response to stream fullBuffer.Write(responseBuffer, 0, responseBuffer.Length); } while (! success); // repeat loop if ERROR_MORE_DATA // Callback the caller with this response buffer if(ResponseDelegate != null) ResponseDelegate(fullBuffer); } return success; } } } // end namespace NamedPipes Then we need to create a server class for testing, which we will call NamedPipeServer: namespace NamedPipes { /// <summary> /// NamedPipeServer - An implementation of a synchronous, message-based, /// named pipe server /// /// </summary> public class NamedPipeServer : IDisposable { /// <summary> /// the pipe handle /// </summary> int _handle = NamedPipeInterop.INVALID_HANDLE_VALUE; /// <summary> /// the name of the pipe /// </summary> string _pipeName = ""; /// <summary> /// the name of the machine the server pipe is on /// </summary> string _machineName = ""; /// <summary> /// default size of message buffer to read /// </summary> uint _receiveBufferSize = 1024; /// <summary> /// indicates if this has been closed once, which calls /// for us to re-register for finalization on subsequent /// connect calls /// </summary> bool disposedOnce = false; /// <summary> /// the internal delegate holder for the callback on message receipt /// from clients /// </summary> MessageReceivedDelegate _messageReceivedDelegate; /// <summary> /// PIPE_SERVER_BUFFER_SIZE set to 8192 by default /// </summary> const int PIPE_SERVER_BUFFER_SIZE = 8192; /// <summary> /// MessageReceivedDelegate - callback for message received from /// client /// /// </summary> public delegate void MessageReceivedDelegate(MemoryStream message, out MemoryStream response); /// <summary> /// CTOR /// </summary> /// <param name="machineName">name of the machine the pipe is on, /// use null for local machine</param> /// <param name="pipeBaseName">the base name of the pipe</param> /// <param name="msgReceivedDelegate">delegate to be notified when /// a message is received</param> public NamedPipeServer(string machineName, string pipeBaseName, MessageReceivedDelegate msgReceivedDelegate) { // hook up the delegate _messageReceivedDelegate = msgReceivedDelegate; if(machineName == null) _machineName = "."; else _machineName = machineName; // assemble the pipe name _pipeName = "\\\\" + _machineName + "\\PIPE\\" + pipeBaseName; } /// <summary> /// Finalizer /// </summary> ~NamedPipeServer( ) { Dispose( ); } /// <summary> /// Dispose - clean up handle /// </summary> public void Dispose( ) { // if we have a pipe handle, disconnect and clean up if(_handle > 0) { NamedPipeInterop.DisconnectNamedPipe(_handle); NamedPipeInterop.CloseHandle(_handle); _handle = 0; } // Suppress Finalization since we have now cleaned up our // handle System.GC.SuppressFinalize(this); // indicate we have disposed at least once if(disposedOnce == false) disposedOnce = true; } /// <summary> /// Close - because it is more intuitive than Dispose... /// </summary> public void Close( ) { Dispose( ); } /// <summary> /// PipeName /// </summary> /// <returns>the composed pipe name</returns> public string PipeName { get { return _pipeName; } } /// <summary> /// CreatePipe - create the named pipe /// </summary> /// <returns>true is pipe created</returns> public bool CreatePipe( ) { if(disposedOnce == true) System.GC.ReRegisterForFinalize(this); // make a named pipe in message mode _handle = NamedPipeInterop.CreateNamedPipe(_pipeName, NamedPipeInterop.PIPE_ACCESS_DUPLEX, NamedPipeInterop.PIPE_TYPE_MESSAGE | NamedPipeInterop.PIPE_READMODE_MESSAGE | NamedPipeInterop.PIPE_WAIT, NamedPipeInterop.PIPE_UNLIMITED_INSTANCES, PIPE_SERVER_BUFFER_SIZE, PIPE_SERVER_BUFFER_SIZE, NamedPipeInterop.NMPWAIT_WAIT_FOREVER, IntPtr.Zero); // make sure we got a good one if (_handle == NamedPipeInterop.INVALID_HANDLE_VALUE) { Debug.WriteLine("Could not create the pipe (" + _pipeName + ") - os returned " + NamedPipeInterop.GetLastError( )); return false; } return true; } /// <summary> /// WaitForClientConnect - wait for a client to connect to this pipe /// </summary> /// <returns>true if connected, false if timed out</returns> public bool WaitForClientConnect( ) { bool success = false; // wait for someone to talk to us success = NamedPipeInterop.ConnectNamedPipe(_handle,IntPtr.Zero); if(true == success) { // process the first message while ( WaitForMessage( ) ); } return success; } /// <summary> /// WaitForMessage - have the server wait for a message /// </summary> /// <returns>true if got a message, false if timed out</returns> public bool WaitForMessage( ) { bool success = false; // they want to talk to us, read their messages and write // replies int size = Convert.ToInt32(_receiveBufferSize); MemoryStream fullMessageStream = new MemoryStream(size); byte [] buffer = new byte[_receiveBufferSize]; byte [] _numReadWritten = new byte[4]; // need to read the whole message and put it in one message // byte buffer do { // Read the response from the pipe. success = NamedPipeInterop.ReadFile( _handle, // pipe handle buffer, // buffer to receive reply _receiveBufferSize, // size of buffer _numReadWritten, // number of bytes read 0); // not overlapped // failed, not just more data to come if (! success && (NamedPipeInterop.GetLastError( ) != NamedPipeInterop.ERROR_MORE_DATA)) break; // concat the message bytes to the stream fullMessageStream.Write(buffer,0,buffer.Length); } while (! success); // repeat loop if ERROR_MORE_DATA // we read a message from a client if(true == success) { // call delegate if connected for message processing MemoryStream responseStream; if(_messageReceivedDelegate != null) { // call delegate _messageReceivedDelegate(fullMessageStream, out responseStream); if(responseStream != null) { // get raw byte array from stream byte [] responseBytes = responseStream.ToArray( ); uint len = Convert.ToUInt32(responseBytes.Length); // write the response message provided // by the delegate NamedPipeInterop.WriteFile(_handle, responseBytes, len, _numReadWritten, 0); } } } return success; } } } // end namespace NamedPipes In order to use the NamedPipeClient class, we need some code like the following: using System; using System.Diagnostics; using System.Text; using System.IO; namespace NamedPipes { class NamedPipesClientTest { static void Main(string[] args) { // create our pipe client NamedPipeClient _pc = new NamedPipeClient("\\\\.\\PIPE\\mypipe"); if(_pc != null) { // connect to the server if(true == _pc.Connect( )) { // set up a dummy message string testString = "This is my message!"; UnicodeEncoding UEncoder = new UnicodeEncoding( ); // turn it into a byte array byte[] writebuffer = UEncoder.GetBytes(testString); uint len = Convert.ToUInt32(writebuffer.Length); // write the message ten times for(int i=0;i<10;i++) { if(false == _pc.WriteMessage(writebuffer, len, new NamedPipeClient.WriteMessageResponseDelegate(WriteMessageResponse))) { Debug.Assert(false, "Failed to write message!"); } } // close up shop _pc.Close( ); } } Console.WriteLine("Press Enter to exit..."); Console.ReadLine( ); } static void WriteMessageResponse(MemoryStream responseStream) { UnicodeEncoding UEncoder = new UnicodeEncoding( ); string response = UEncoder.GetString(responseStream.ToArray( )); Console.WriteLine("Received response: {0}",response); } } } Then, to set up a server for the client to talk to, we would use the NamedPipeServer class, like this: namespace NamedPipes { class NamedPipesServerTest { // // MessageReceived - This is the method used in the delegate for the server // that gets called after every message is received and before it is replied to // static void MessageReceived(MemoryStream message,out MemoryStream response) { // get the bytes of the message from the stream byte [] msgBytes = message.ToArray( ); string messageText; // I know in the client I used Unicode encoding for the string to // turn it into a series of bytes for transmission so just reverse that UnicodeEncoding UEncoder = new UnicodeEncoding( ); messageText = UEncoder.GetString(msgBytes); // write out our string message from the client Console.WriteLine(messageText); // now set up response with a polite response using the same // Unicode string protocol string reply = "Thanks for the message!"; msgBytes = UEncoder.GetBytes(reply); response = new MemoryStream(msgBytes,0,msgBytes.Length); } // // Main - nuff said // static void Main(string[] args) { // create pipe server NamedPipeServer _ps = new NamedPipeServer(null, "mypipe", new NamedPipeServer.MessageReceivedDelegate(MessageReceived) ); // create pipe if(true == _ps.CreatePipe( )) { // I get the name of the pipe here just to show you can. // Normally we would then have to get this name to the client // so it knows the name of the pipe to open but hey, I wrote // the client too so for now I'm just hard-coding it in the // client so we can ignore it :) string pipeName = _ps.PipeName( ); // wait for clients to connect and process the first message if(true == _ps.WaitForClientConnect( )) { // process messages until the read fails // (client goes away...) bool success = true; while(success) { success = _ps.WaitForMessage( ); } } // done; bail and clean up the server _ps.Close( ); } // make our server hang around so you can see the messages sent Console.WriteLine("Press Enter to exit..."); Console.ReadLine( ); } } } DiscussionNamed pipes are a mechanism to allow interprocess or intermachine communications in Windows. As of v1.1, the .NET Framework has not provided managed access to named pipes, so the first thing we need to do is to wrap the functions in Kernel32.dll for direct access from managed code in our NamedPipesInterop class. Once we have this foundation, we can then build a client for using named pipes to talk to a server, exposing a pipe that we did in the NamedPipeClient class. The methods on the NamedPipeClient are listed here with a description:
We then create the NamedPipeServer class to be able to have something for the NamedPipeClient to connect to. The methods on the NamedPipeServer are listed here with a description as well:
Finally we created some code to use NamedPipeClient and NamedPipeServer. The interaction between these two goes like this:
See AlsoSee the "Named Pipes," "DllImport Attribute," "IDisposable Interface," and "GC.SuppressFinalize Method" topics in the MSDN documentation . |
[ Team LiB ] |