2015-12-27 24 views
5

Próbuję uzyskać schemat komunikacji typu klient-serwer w pełnym dupleksie, na dwóch różnych komputerach (tylko), gdzie każdy punkt końcowy (klient lub serwer) może wysyłać rzeczy w dowolnym momencie , asynchronicznie (bez blokowania rury), a drugi koniec podniesie go i odczyta.C# Asymetryczne monitory z pełnym dupleksem .NET

Nie chcę odpowiedzi odnoszących mnie do jakiejkolwiek innej technologii poza nazwanymi rurami, wiem o innych technologiach, ale chcę odpowiedzi na to konkretne pytanie. (Widziałem to pytanie tak wiele razy na różnych forach i ciągle widzę odpowiedzi sugerujące użycie innej technologii. Myślę, że to graniczy z niegrzecznością?)

Czytałem, że potoki nazwane muszą być jednym tylko w kierunku lub zamykają się, ale domyślam się, że to prawdopodobnie błąd. Sądzę, że potoki są oparte na gniazdach i nie wyobrażam sobie, że bazowe gniazdo będzie tylko w jedną stronę.

Wszelkie odpowiedzi na to pytanie, trzeba rozwiązać te problemy, aby była ona naprawdę przydatne:

  1. Odpowiedzi muszą zająć rur asynchronicznych, nie mogę użyć synchronicznego rozwiązanie.
  2. odpowiedzi muszą zademonstrować lub pozwolić, aby rury pozostały OTWARTE. Jestem zmęczony czytaniem przykładów, gdzie rura jest otwarta, ciąg jest przesyłany, a następnie rura jest natychmiast zamknięta. Chciałbym uzyskać odpowiedź, która zakłada, że ​​rury pozostają otwarte i przesyłają dużo śmieci w przypadkowych czasach i powtarzają. bez wisi.
  3. C# -na rozwiązanie

przykro mi dźwięk wymagający i zasmarkany, ale po kilku dniach szorowania internet nadal nie znalazłem dobry przykład, a nie chcę używać WFC. Jeśli znasz szczegóły tej odpowiedzi i dobrze na nią odpowiesz, jestem pewien, że ten temat będzie prawdziwym zwycięzcą na nadchodzące wieki. Sam opublikuję odpowiedź, jeśli się zorientuję.

Jeśli masz zamiar napisać i powiedzieć "Musisz użyć dwóch rur", wytłumacz dlaczego, i jak wiesz, że to prawda, ponieważ nic o czym nie przeczytałem, nie wyjaśnia, dlaczego tak się dzieje. .

dziękuję!

+0

Dlaczego nie chcesz używać WCF z powiązaniem nazwanych potoków? Widziałeś takie rozwiązania http://stackoverflow.com/questions/16432813/async-two-way-communication-with-windows-named-pipes-net – admax

+0

Hej, dlaczego tak zły, właśnie zapytałem ** dlaczego ** nie chcesz wcf. I to był komentarz, a nie sugerowane rozwiązanie. Jeśli zdajesz sobie sprawę z pewnych wad wcf, proszę podziel się, może to być przydatne dla społeczności. – admax

+1

Nie jestem zły, tylko konkretny. Powodem jest to, że próbuję oderwać się od technologii, które są uważane za "przestarzałe", nawet jeśli są nadal w użyciu, szczególnie jeśli nie dodają dużo wartości dodanej za grosze. WCF nie dodaje dużo do Named Pipes + JSON, więc wolałbym go nie używać. Lubię pozostać "minimal". Przesyłam tylko kilka bajtów w jedną i drugą stronę, aby system rozproszony działał, potrzebuję tylko nazwanego potoku! –

Odpowiedz

0

Myślę, że kiedy używasz asynchronicznej komunikacji, musisz użyć dwóch potoków.

Jednym z nich jest inny jest rura recv wysłać Rura

Bo nie wiem kiedy recv dane.

Gdy wysyłasz dane za pomocą jednej rury, dane recv nie mogą pisać na rurze.

W przeciwieństwie do tego nie można pisać danych wysyłanych na rurze.

dlatego trzeba dwie rury do komunikacji asynchronicznej.

+1

Rury są z gniazdami. każde normalne gniazdo unixowe i wysyłać i odbierać na obu końcach, jeśli jest otwarte w ten sposób. Myślę, że dawno temu, rury Unix były tylko w jedną stronę, ale to już przeszłość. Sztuczka polegająca na używaniu pełno-dupleksowych rur w oknach polega na upewnieniu się, w jaki sposób wykonywane są czynności, i wtedy rura się nie myli. Jeśli używasz dwóch potoków, jestem pewien, że to jest prostsze, ale marnujesz zasoby i alokację portów. –

+0

oh .. Rozumiem. Nie wiedziałem o tym. dziękuję za twoją wiedzę. Muszę się więcej uczyć. – lucidmaj7

6

Nie musisz używać dwóch rur. Znalazłem dużo odpowiedzi w sieci, że stan trzeba użyć dwóch rur. Kręciłem się, nie spałem przez całą noc, próbowałem i próbowałem jeszcze raz, i zorientowałem się, jak to zrobić, to jest super proste, ale musisz wszystko naprawić (zwłaszcza, aby uzyskać porządek w odpowiedniej kolejce), albo po prostu nie zadziała. . Inną sztuczką jest, aby zawsze upewnić się, że masz wyjątkowe wezwanie do przeczytania, lub też się zablokuje. Nie pisz, zanim się dowiesz, że ktoś czyta. Nie rozpoczynaj połączenia odczytywania, chyba że wcześniej skonfigurowano zdarzenie. Tego typu rzeczy.

Oto klasa potoku, której używam. Prawdopodobnie nie jest wystarczająco odporny na błędy związane z rurami, zamknięcia i przepełnienia.

OK Nie mam pojęcia, co jest nie tak, ale formatowanie jest nieznaczne! vvvv

namespace Squall 
{ 
    public interface PipeSender 
    { 
     Task SendCommandAsync(PipeCommandPlusString pCmd); 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 
    public class ClientPipe : BasicPipe 
    { 
     NamedPipeClientStream m_pPipe; 

     public ClientPipe(string szServerName, string szPipeName) 
      : base("Client") 
     { 
      m_szPipeName = szPipeName; // debugging 
      m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous); 
      base.SetPipeStream(m_pPipe); // inform base class what to read/write from 
     } 

     public void Connect() 
     { 
      Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server"); 
      m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout. 
      StartReadingAsync(); 
     } 

     // the client's pipe index is always 0 
     internal override int PipeId() { return 0; } 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 
    public class ServerPipe : BasicPipe 
    { 
     public event EventHandler<EventArgs> GotConnectionEvent; 

     NamedPipeServerStream m_pPipe; 
     int m_nPipeId; 

     public ServerPipe(string szPipeName, int nPipeId) 
      : base("Server") 
     { 
      m_szPipeName = szPipeName; 
      m_nPipeId = nPipeId; 
      m_pPipe = new NamedPipeServerStream(
       szPipeName, 
       PipeDirection.InOut, 
       NamedPipeServerStream.MaxAllowedServerInstances, 
       PipeTransmissionMode.Message, 
       PipeOptions.Asynchronous); 
      base.SetPipeStream(m_pPipe); 
      m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this); 
     } 

     static void StaticGotPipeConnection(IAsyncResult pAsyncResult) 
     { 
      ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe; 
      pThis.GotPipeConnection(pAsyncResult); 
     } 

     void GotPipeConnection(IAsyncResult pAsyncResult) 
     { 
      m_pPipe.EndWaitForConnection(pAsyncResult); 

      Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection"); 

      if (GotConnectionEvent != null) 
      { 
       GotConnectionEvent(this, new EventArgs()); 
      } 

      // lodge the first read request to get us going 
      // 
      StartReadingAsync(); 
     } 

     internal override int PipeId() { return m_nPipeId; } 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 

    public abstract class BasicPipe : PipeSender 
    { 
     public static int MaxLen = 1024 * 1024; // why not 
     protected string m_szPipeName; 
     protected string m_szDebugPipeName; 

     public event EventHandler<PipeEventArgs> ReadDataEvent; 
     public event EventHandler<EventArgs> PipeClosedEvent; 

     protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen]; 

     PipeStream m_pPipeStream; 

     public BasicPipe(string szDebugPipeName) 
     { 
      m_szDebugPipeName = szDebugPipeName; 
     } 

     protected void SetPipeStream(PipeStream p) 
     { 
      m_pPipeStream = p; 
     } 

     protected string FullPipeNameDebug() 
     { 
      return m_szDebugPipeName + "-" + m_szPipeName; 
     } 

     internal abstract int PipeId(); 

     public void Close() 
     { 
      m_pPipeStream.WaitForPipeDrain(); 
      m_pPipeStream.Close(); 
      m_pPipeStream.Dispose(); 
      m_pPipeStream = null; 
     } 

     // called when Server pipe gets a connection, or when Client pipe is created 
     public void StartReadingAsync() 
     { 
      Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync"); 

      // okay we're connected, now immediately listen for incoming buffers 
      // 
      byte[] pBuffer = new byte[MaxLen]; 
      m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t => 
      { 
       Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request"); 

       int ReadLen = t.Result; 
       if (ReadLen == 0) 
       { 
        Debug.WriteLine("Got a null read length, remote pipe was closed"); 
        if (PipeClosedEvent != null) 
        { 
         PipeClosedEvent(this, new EventArgs()); 
        } 
        return; 
       } 

       if (ReadDataEvent != null) 
       { 
        ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen)); 
       } 
       else 
       { 
        Debug.Assert(false, "something happened"); 
       } 

       // lodge ANOTHER read request 
       // 
       StartReadingAsync(); 

      }); 
     } 

     protected Task WriteByteArray(byte[] pBytes) 
     { 
      // this will start writing, but does it copy the memory before returning? 
      return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length); 
     } 

     public Task SendCommandAsync(PipeCommandPlusString pCmd) 
     { 
      Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); 
      string szSerializedCmd = JsonConvert.SerializeObject(pCmd); 
      byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd); 
      Task t = WriteByteArray(pSerializedCmd); 
      return t; 
     } 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 

    public class PipeEventArgs 
    { 
     public byte[] m_pData; 
     public int m_nDataLen; 

     public PipeEventArgs(byte[] pData, int nDataLen) 
     { 
      // is this a copy, or an alias copy? I can't remember right now. 
      m_pData = pData; 
      m_nDataLen = nDataLen; 
     } 
    } 

    /****************************************************************************** 
    * if we're just going to send a string back and forth, then we can use this 
    * class. It it allows us to get the bytes as a string. sort of silly. 
    ******************************************************************************/ 

    [Serializable] 
    public class PipeCommandPlusString 
    { 
     public string m_szCommand; // must be public to be serialized 
     public string m_szString; // ditto 

     public PipeCommandPlusString(string sz, string szString) 
     { 
      m_szCommand = sz; 
      m_szString = szString; 
     } 

     public string GetCommand() 
     { 
      return m_szCommand; 
     } 

     public string GetTransmittedString() 
     { 
      return m_szString; 
     } 
    } 
} 

i tu jest moja próba rura, działa na jednym procesie. To działa na dwóch procesów też sprawdziłem

namespace NamedPipeTest 
{ 
    public partial class Form1 : Form 
    { 
     SynchronizationContext _context; 
     Thread m_pThread = null; 
     volatile bool m_bDieThreadDie; 
     ServerPipe m_pServerPipe; 
     ClientPipe m_pClientPipe; 

     public Form1() 
     { 
      InitializeComponent(); 
     } 

     private void Form1_Load(object sender, EventArgs e) 
     { 
      _context = SynchronizationContext.Current; 

      m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0); 
      m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent; 
      m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent; 

      // m_pThread = new Thread(StaticThreadProc); 
      // m_pThread.Start(this); 
     } 

     private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e) 
     { 
      Debug.WriteLine("Server: Pipe was closed, shutting down"); 

      // have to post this on the main thread 
      _context.Post(delegate 
      { 
       Close(); 
      }, null); 
     } 

     private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e) 
     { 
      // this gets called on an anonymous thread 

      byte[] pBytes = e.m_pData; 
      string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length); 
      PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes); 
      string szValue = pCmd.GetTransmittedString(); 

      if (szValue == "CONNECT") 
      { 
       Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client"); 
       PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED"); 
       // fire off an async write 
       Task t = m_pServerPipe.SendCommandAsync(pCmdToSend); 
      } 
     } 

     static void StaticThreadProc(Object o) 
     { 
      Form1 pThis = o as Form1; 
      pThis.ThreadProc(); 
     } 

     void ThreadProc() 
     { 
      m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE"); 
      m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent; 
      m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent; 
      m_pClientPipe.Connect(); 

      PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT"); 
      int Counter = 1; 
      while (Counter++ < 10) 
      { 
       Debug.WriteLine("Counter = " + Counter); 
       m_pClientPipe.SendCommandAsync(pCmd); 
       Thread.Sleep(3000); 
      } 

      while (!m_bDieThreadDie) 
      { 
       Thread.Sleep(1000); 
      } 

      m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent; 
      m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent; 
      m_pClientPipe.Close(); 
      m_pClientPipe = null; 
     } 

     private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e) 
     { 
      // wait around for server to shut us down 
     } 

     private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e) 
     { 
      byte[] pBytes = e.m_pData; 
      string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen); 
      PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes); 
      string szValue = pCmd.GetTransmittedString(); 

      Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); 

      if (szValue == "CONNECTED") 
      { 
       PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA"); 
       m_pClientPipe.SendCommandAsync(pCmdToSend); 
      } 
     } 

     private void Form1_FormClosing(object sender, FormClosingEventArgs e) 
     { 
      if (m_pThread != null) 
      { 
       m_bDieThreadDie = true; 
       m_pThread.Join(); 
       m_bDieThreadDie = false; 
      } 

      m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent; 
      m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent; 
      m_pServerPipe.Close(); 
      m_pServerPipe = null; 

     } 
    } 
} 
0

Wystarczy utworzyć rurę jako kryciu i kod może zablokować na odczycie w jednym wątku pisząc do rury od drugiego.

void StartServer() 
    { 
     Task.Factory.StartNew(() => 
     { 
      var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); 
      server.WaitForConnection(); 
      reader = new StreamReader(server); 
      writer = new StreamWriter(server); 
     }); 
    } 

    private async void timer1_Tick(object sender, EventArgs e) 
    { 
     timer1.Stop(); 
     if (null != reader) 
     { 
      char[] buf = new char[50]; 

      int count = await reader.ReadAsync(buf, 0, 50); 

      if (0 < count) 
      { 
       m_textBox_from.Text = new string(buf, 0, count); 
      } 
     } 
     timer1.Start(); 
    }