2015-03-13 27 views
8

Używam kafka 0.8.1.1 na maszynie Red Hat VM z wtyczką kafka-net. Jak mogę skonfigurować mojego klienta, aby przestał otrzymywać wcześniejsze wiadomości od kafka?Konfiguracja kafka-net, aby przestać wysyłać najnowsze wiadomości

Mój kod konsument:

var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092")); 

Stopwatch sp = new Stopwatch(); 
var router = new BrokerRouter(options); 
var consumer = new Consumer(new ConsumerOptions("Test", router)); 

ThreadStart start2 =() => 
{ 
    while (true) 
    { 
     sp.Start(); 
     foreach (var message in consumer.Consume()) 
     { 
      if (MessageDecoderReceiver.MessageBase(message.Value) != null) 
      { 
       PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString()); 
      } 
      else 
      { 
       Console.WriteLine(message.Value); 
      } 
     } 
     sp.Stop(); 
    } 
}; 
var thread2 = new Thread(start2); 
thread2.Start(); 
+0

Dodałem tag do języka, w którym wydaje się być używany i poprawiono treść i tytuł, aby uzyskać lepszą czytelność. Również [usunąłem tag z tytułu] (http://meta.stackexchange.com/questions/19190/should-questions-include-tags-in-their-titles). –

Odpowiedz

11

Konsument w Kafka-net aktualnie nie automatycznego śledzenia przesunięcia konsumowane. Będziesz musiał ręcznie zaimplementować śledzenie przesunięcia.

aby zapisać przesunięcie w wersji 0.8.1: kafka

var commit = new OffsetCommitRequest 
      { 
       ConsumerGroup = consumerGroup, 
       OffsetCommits = new List<OffsetCommit> 
          { 
           new OffsetCommit 
            { 
             PartitionId = partitionId, 
             Topic = IntegrationConfig.IntegrationTopic, 
             Offset = offset, 
             Metadata = metadata 
            } 
          } 
      }; 

var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault(); 

Aby ustawić konsumenta, aby rozpocząć importowanie w określonym przesunięciem punktu:

var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result 
        .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray(); 

var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets); 

Uwaga powyższy kod będzie ustawiony konsumenta aby rozpocząć konsumpcję na samym końcu dziennika, skutecznie tylko odbierając nowe wiadomości.

+0

Awesome, to jest to, czego potrzebuję, dziękuję bardzo! –

+0

Czy możesz rzucić trochę światła na sposób spożywania przesunięcia, które popełniłeś? Uzyskanie klienta GetTopicOffsetAsync nie zwraca zatwierdzonych przesunięć. Zwraca tylko 2 wartości: 0 (prawdopodobnie pierwsze) i ostatnie przesunięcie. Również dla innych czytelników, można uzyskać połączenia za pomocą KafkaOptions' KafkaConnectionFactory metody: '_options.KafkaConnectionFactory.Create (URI, _options.ResponseTimeoutMs, _options.Log)' – arviman

+0

NVM kopany przez źródło i znaleźć OffsetFetchRequest \ Klasa odpowiedzi. Wygląda więc na to, że musimy pobrać wszystkie partycje za pomocą Twojego GetTopicOffset, a następnie pobrać zapisane przesunięcie dla każdej z partycji za pomocą OffsetFetchRequest, którego odpowiedź jest następnie przekazywana do konsumenta w jego ConsumerOptions. – arviman