2015-06-17 9 views
20

Jestem nowym studentem studiującym Kafkę i natknąłem się na pewne zasadnicze problemy, które polegały na zrozumieniu wielu konsumentów, że artykuły, dokumentacje itp. Nie były dotychczas zbyt pomocne.Jak korzystać z wielu klientów w Kafce?

Jedną z rzeczy, które próbowałem zrobić, to napisanie własnego, wysokiego poziomu producenta i konsumenta Kafki, i jednoczesne ich uruchamianie, publikując 100 prostych wiadomości do tematu i umożliwiając ich odzyskanie przez konsumenta. Udało mi się to zrobić z powodzeniem, ale kiedy próbuję wprowadzić drugiego konsumenta do konsumpcji z tego samego tematu, do którego właśnie zostały opublikowane wiadomości, nie otrzymuje on żadnych wiadomości.

To było moje zrozumienie, że dla każdego tematu można mieć konsumentów z oddzielnych grup konsumenckich, a każda z tych grup konsumentów otrzymywałaby pełną kopię wiadomości wyprodukowanych na dany temat. Czy to jest poprawne? Jeśli nie, jaki byłby właściwy sposób na ustawienie wielu klientów? Jest to klasa konsumentów, które pisałem do tej pory:

public class AlternateConsumer extends Thread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final Boolean isAsync = false; 

    public AlternateConsumer(String topic, String consumerGroup) { 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers", "localhost:9092"); 
     properties.put("group.id", consumerGroup); 
     properties.put("partition.assignment.strategy", "roundrobin"); 
     properties.put("enable.auto.commit", "true"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     properties.put("session.timeout.ms", "30000"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumer = new KafkaConsumer<Integer, String>(properties); 
     consumer.subscribe(topic); 
     this.topic = topic; 
    } 


    public void run() { 
     while (true) { 
      ConsumerRecords<Integer, String> records = consumer.poll(0); 
      for (ConsumerRecord<Integer, String> record : records) { 
       System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); 
      } 
     } 

    } 
} 

Ponadto zauważyłem, że początkowo byłem badania nad konsumpcję na temat „test” z tylko jednej partycji. Kiedy dodałem kolejnego konsumenta do istniejącej grupy konsumenckiej, powiedzmy "testGroup", spowodowało to ponowne zrównoważenie Kafki, które spowolniło opóźnienie mojego zużycia o znaczną ilość, w wielkości sekund. Pomyślałem, że jest to problem z przywracaniem równowagi, ponieważ miałem tylko jedną partycję, ale kiedy stworzyłem nowy temat "wielopoziomowe" z powiedzeniem 6 partycji, pojawiły się podobne problemy, w których dodanie większej liczby konsumentów do tej samej grupy konsumentów spowodowało problemy z opóźnieniami. Rozejrzałem się i ludzie mówią mi, że powinienem używać konsumenta wielowątkowego - czy ktoś może rzucić na to światło?

+0

Istnieje doskonały przykład konsumenta wysokiego poziomu [tutaj] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) dla kafka '0.8.1'. – chrsblck

+0

@chrsblck dzięki za link.Sprawdziłem to wcześniej i prawdopodobnie nie rozumiałem tego tak dobrze jak mogłem - czy mógłbyś wyjaśnić nieco, w jaki sposób ten przykład wykorzystuje wątki? Nie w pełni rozumiem, co robią w tej chwili. –

+0

Jednym ze sposobów jest posiadanie takiej samej liczby wątków jak partycji dla danego tematu. Z artykułu - Pobierz listę strumieni 'Lista > strumienie = consumerMap.get (topic);' ... Następnie przypisz każdemu wątkowi partycję 'executor.submit (new ConsumerTest (stream, threadNumber)) '. – chrsblck

Odpowiedz

17

Myślę, że twój problem leży w właściwości auto.offset.reset. Gdy nowy konsument czyta z partycji i nie ma wcześniej zatwierdzonego przesunięcia, właściwość auto.offset.reset służy do określenia, jakie powinno być przesunięcie początkowe. Jeśli ustawisz go na "największy" (domyślny), rozpoczniesz czytanie od najnowszego (ostatniego) komunikatu. Jeśli ustawisz go na "najmniejszy", otrzymasz pierwszą dostępną wiadomość.

Więc dodać:

properties.put("auto.offset.reset", "smallest"); 

i spróbuj ponownie.

+1

To jest późna odpowiedź, ale dzięki Chris! Twoje rozwiązania są poprawne i po dokładniejszym przyjrzeniu się jakiejś dokumentacji, powinienem był zauważyć, że po uruchomieniu nowego konsumenta ustawiane jest używanie tylko najnowszych wysłanych wiadomości - NIE wcześniej istniejących, chyba że powyższe właściwości są ustawione. –

4

W dokumentacji here jest napisane: "jeśli podasz więcej wątków niż partycji na temat, niektóre wątki nigdy nie zobaczą komunikatu". Czy możesz dodać partycje do swojego tematu? Mam liczbę wątków grupy konsumenckiej równą liczbie partycji w moim temacie, a każdy wątek pobiera komunikaty.

Oto mój temat config:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins 
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

A moja konsument:

package com.cie.dispatcher.services; 

import com.cie.dispatcher.model.WinNotification; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.inject.Inject; 
import io.dropwizard.lifecycle.Managed; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* This will create three threads, assign them to a "group" and listen for notifications on a topic. 
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by 
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the 
* lifecycle manager in dropwizard. 
* <p/> 
* Created by aakture on 6/15/15. 
*/ 
public class KafkaTopicListener implements Managed { 
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); 
private final ConsumerConnector consumer; 
private final String topic; 
private ExecutorService executor; 
private int threadCount; 
private WinNotificationWorkflow winNotificationWorkflow; 
private ObjectMapper objectMapper; 

@Inject 
public KafkaTopicListener(String a_zookeeper, 
          String a_groupId, String a_topic, 
          int threadCount, 
          WinNotificationWorkflow winNotificationWorkflow, 
          ObjectMapper objectMapper) { 
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig(a_zookeeper, a_groupId)); 
    this.topic = a_topic; 
    this.threadCount = threadCount; 
    this.winNotificationWorkflow = winNotificationWorkflow; 
    this.objectMapper = objectMapper; 
} 

/** 
* Creates the config for a connection 
* 
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example. 
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. 
* @return the config props 
*/ 
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public void stop() { 
    if (consumer != null) consumer.shutdown(); 
    if (executor != null) executor.shutdown(); 
    try { 
     if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
      LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     } 
    } catch (InterruptedException e) { 
     LOG.info("Interrupted during shutdown, exiting uncleanly"); 
    } 
    LOG.info("{} shutdown successfully", this.getClass().getName()); 
} 
/** 
* Starts the listener 
*/ 
public void start() { 
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put(topic, new Integer(threadCount)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
    executor = Executors.newFixedThreadPool(threadCount); 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ListenerThread(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

private class ListenerThread implements Runnable { 
    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     try { 
      String message = null; 
      LOG.info("started listener thread: {}", m_threadNumber); 
      ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
      while (it.hasNext()) { 
       try { 
        message = new String(it.next().message()); 
        LOG.info("receive message by " + m_threadNumber + " : " + message); 
        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); 
        winNotificationWorkflow.process(winNotification); 
       } catch (Exception ex) { 
        LOG.error("error processing queue for message: " + message, ex); 
       } 
      } 
      LOG.info("Shutting down listener thread: " + m_threadNumber); 
     } catch (Exception ex) { 
      LOG.error("error:", ex); 
     } 
    } 
    } 
} 
+0

Czy możesz udostępnić przykład dla wersji Kafki 1.0, ponieważ większość klas używanych w powyższym przykładzie jest przestarzała. –

+0

Nie wierzę, że to było w tym czasie, może niedługo niedługo uaktualnię swój kod, przepraszam. –

4

Jeśli chcesz wielu konsumentów do spożywania same komunikaty (np transmisji), można tarło je z innej grupy konsumentów a także ustawienie auto.offset.reset na najmniejszą w konfiguracji klienta. Jeśli chcesz, aby wielu użytkowników zakończyło sporządzanie równoległe (dzielenie pracy między nimi), powinieneś utworzyć liczbę partycji> = liczbę odbiorców. Jedna partycja może być zużywana tylko przez maksymalnie jeden proces konsumencki. Ale jeden konsument może spożywać więcej niż jedną partycję.