2016-01-15 17 views
5

Witam Chciałbym wiedzieć, czy jest coś podobnego do oświadczenia await, które jest używane z zadaniami, które mogę wdrożyć z wątków w C#?Czy jest oczekiwane oświadczenie dotyczące wątków?

Co chcę zrobić, to:

Rozpocznij wątek A, obliczyć pewne dane i umieścić wynik na zmiennej x. Po przeniesieniu tej zmiennej x do innego wątku B i jednocześnie Gwint A rozpoczyna się ponownie w celu obliczenia niektórych danych, podczas gdy wątek B rozpoczyna inne obliczenie z wynikiem x.

UPDATE: Ok wydaje się pewne zamieszanie więc będę bardziej dokładny w moim opisie:

używam dwa czujniki, które wytwarzają dane. Dane muszą być pobierane w taki sposób, aby dane z SensorA były pobierane (co zajmuje dużo czasu) i zaraz po tym dane z SensorB muszą zostać pobrane w innym wątku, podczas gdy SensorA kontynuuje pobieranie innego bloku danych. Problem polega na tym, że nie mogę umieścić w kolejce danych obu czujników w tej samej kolejce, ale muszę przechowywać dane obu czujników w JEDNEJ strukturze danych/obiekcie.

Mój pomysł był taki:

  1. Pobierz dane z czujnika A w wątku A.
  2. rezultacie dać wątek B i ponownie wątku A.
  3. Podczas nawlec uruchamia ponownie wątek B pobiera dane z czujnika B i oblicza dane z czujników a i B

można założyć, że nawlec zawsze potrzebuje więcej czasu niż nici B

+3

Nie jest jasne, co masz oczekują 'await' zrobić ... Po tym wszystkim, jesteś nie czekając na wątek * ukończ *. Być może powinieneś użyć 'Task ' dla 'x' za pośrednictwem' TaskCompletionSource', o którym wie wątek A? Następnie wątek B może właśnie czekać na to zadanie. Możesz jednak spojrzeć na Dataflow: https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx –

+2

To brzmi jak klasyczny producent/konsument. –

+0

Dlaczego chcesz użyć wycofanego wątku zamiast "zadania"? –

Odpowiedz

6

Jak powiedział w komentarzu. To wygląda jak klasyczny Producent/Konsument, do którego możemy użyć np. a BlockingCollection.

Jest to niewielka modyfikacja próbki z tej strony:

BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100); 

// "Thread B" 
Task.Run(() => 
{ 
    while (!dataItems.IsCompleted) 
    { 
     Data dataA = null; 
     try 
     { 
      dataA = dataItems.Take(); 
     } 
     catch (InvalidOperationException) { } 

     if (dataA != null) 
     { 
      var dataB = ReadSensorB(); 
      Process(dataA,dataB); 
     } 
    } 
    Console.WriteLine("\r\nNo more items to take."); 
}); 

// "Thread A" 
Task.Run(() => 
{ 
    while (moreItemsToAdd) 
    { 
     Data dataA = ReadSensorA(); 
     dataItems.Add(dataA); 
    } 
    // Let consumer know we are done. 
    dataItems.CompleteAdding(); 
}); 

A potem moreItemsToAdd tylko cokolwiek kod trzeba mieć, aby poradzić sobie z konieczności zamknąć ten proces w dół.

3

Nie jestem pewien, dlaczego unikasz korzystania z zadań? Może masz starszą wersję .net? Jeśli tak, BlockingCollection jako sugerowane przez Damien również nie jest opcją. Jeśli używasz "normalnych" wątków, możesz użyć polecenia waithandle do sygnalizowania wyników między wątkami. Na przykład: AutoResetEvent.

private int a; 
private AutoResetEvent newResult = new AutoResetEvent(false); 

private void ThreadA() 
{ 
    while (true) 
    { 
     a = GetSensorA(); 
     newResult.Set(); 
    } 
} 

private void ThreadB() 
{ 
    int b; 

    while (true) 
    { 
     newResult.WaitOne(); 
     b = GetSensorB();   // or before "waitone" 
     Console.WriteLine(a + b); // do something 
    } 
} 

edit: miał niewielki błąd w tam z resetem, dzięki za wskazanie Damien - Aktualizacja

+0

** Auto ** ResetEvent - 'newResult.Reset();'? Ponadto, z ich narracji, chcą wartości czujnika B mniej więcej w tym samym czasie, kiedy wartości czujnika A stały się dostępne, a nie od pewnego czasu w przeszłości - co sugerowałoby, że wywołanie 'GetSensorB()' powinno być pomiędzy 'WaitOne 'i' // zrób coś'. –

+0

Dzięki. Zaktualizowano dla resetowania. Opuszczę, kiedy czujnik b stanie się ćwiczeniem dla plakatu. +1 przy twoim rozwiązaniu, przy okazji, zakładam, że użytkownik jest na .net <4, ponieważ nie może używać zadań, a BlockingCollection również nie jest dostępny. – user1970723

3

Jeśli możesz używać .Net 4.5 lub nowszej wersji, najlepszym sposobem podejścia jest użycie składnika DataFlow na licencji TPL.

(Do instalacji DataFlow należy użyć NuGet, domyślnie nie jest częścią CLR.)

Oto próbka aplikacji konsoli compilable który pokazuje, jak używać DataFlow to zrobić:

using System; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace SensorDemo 
{ 
    public sealed class SensorAData 
    { 
     public int Data; 
    } 

    public sealed class SensorBData 
    { 
     public double Data; 
    } 

    public sealed class SensorData 
    { 
     public SensorAData SensorAData; 
     public SensorBData SensorBData; 

     public override string ToString() 
     { 
      return $"SensorAData = {SensorAData.Data}, SensorBData = {SensorBData.Data}"; 
     } 
    } 

    class Program 
    { 
     static void Main() 
     { 
      var sensorADataSource = new TransformBlock<SensorAData, SensorData>(
       sensorAData => addSensorBData(sensorAData), 
       dataflowOptions()); 

      var combinedSensorProcessor = new ActionBlock<SensorData>(
       data => process(data), 
       dataflowOptions()); 

      sensorADataSource.LinkTo(combinedSensorProcessor, new DataflowLinkOptions { PropagateCompletion = true }); 

      // Create a cancellation source that will cancel after a few seconds. 
      var cancellationSource = new CancellationTokenSource(delay:TimeSpan.FromSeconds(20)); 

      Task.Run(() => continuouslyReadFromSensorA(sensorADataSource, cancellationSource.Token)); 

      Console.WriteLine("Started reading from SensorA"); 

      sensorADataSource.Completion.Wait(); // Wait for reading from SensorA to complete. 
      Console.WriteLine("Completed reading from SensorA."); 

      combinedSensorProcessor.Completion.Wait(); 
      Console.WriteLine("Completed processing of combined sensor data."); 
     } 

     static async Task continuouslyReadFromSensorA(TransformBlock<SensorAData, SensorData> queue, CancellationToken cancellation) 
     { 
      while (!cancellation.IsCancellationRequested) 
       await queue.SendAsync(readSensorAData()); 

      queue.Complete(); 
     } 

     static SensorData addSensorBData(SensorAData sensorAData) 
     { 
      return new SensorData 
      { 
       SensorAData = sensorAData, 
       SensorBData = readSensorBData() 
      }; 
     } 

     static SensorAData readSensorAData() 
     { 
      Console.WriteLine("Reading from Sensor A"); 
      Thread.Sleep(1000); // Simulate reading sensor A data taking some time. 
      int value = Interlocked.Increment(ref sensorValue); 
      Console.WriteLine("Read Sensor A value = " + value); 
      return new SensorAData {Data = value}; 
     } 

     static SensorBData readSensorBData() 
     { 
      Console.WriteLine("Reading from Sensor B"); 
      Thread.Sleep(100); // Simulate reading sensor B data being much quicker. 
      int value = Interlocked.Increment(ref sensorValue); 
      Console.WriteLine("Read Sensor B value = " + value); 
      return new SensorBData {Data = value}; 
     } 

     static void process(SensorData value) 
     { 
      Console.WriteLine("Processing sensor data: " + value); 
      Thread.Sleep(1000); // Simulate slow processing of combined sensor values. 
     } 

     static ExecutionDataflowBlockOptions dataflowOptions() 
     { 
      return new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 1, 
       BoundedCapacity  = 1 
      }; 
     } 

     static int sensorValue; 
    } 
}