Jestem nowym studentem studiującym Kafkę i natknąłem się na pewne zasadnicze problemy, które polegały na zrozumieniu wielu konsumentów, że artykuły, dokumentacje itp. Nie były dotychczas zbyt pomocne.Jak korzystać z wielu klientów w Kafce?
Jedną z rzeczy, które próbowałem zrobić, to napisanie własnego, wysokiego poziomu producenta i konsumenta Kafki, i jednoczesne ich uruchamianie, publikując 100 prostych wiadomości do tematu i umożliwiając ich odzyskanie przez konsumenta. Udało mi się to zrobić z powodzeniem, ale kiedy próbuję wprowadzić drugiego konsumenta do konsumpcji z tego samego tematu, do którego właśnie zostały opublikowane wiadomości, nie otrzymuje on żadnych wiadomości.
To było moje zrozumienie, że dla każdego tematu można mieć konsumentów z oddzielnych grup konsumenckich, a każda z tych grup konsumentów otrzymywałaby pełną kopię wiadomości wyprodukowanych na dany temat. Czy to jest poprawne? Jeśli nie, jaki byłby właściwy sposób na ustawienie wielu klientów? Jest to klasa konsumentów, które pisałem do tej pory:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Ponadto zauważyłem, że początkowo byłem badania nad konsumpcję na temat „test” z tylko jednej partycji. Kiedy dodałem kolejnego konsumenta do istniejącej grupy konsumenckiej, powiedzmy "testGroup", spowodowało to ponowne zrównoważenie Kafki, które spowolniło opóźnienie mojego zużycia o znaczną ilość, w wielkości sekund. Pomyślałem, że jest to problem z przywracaniem równowagi, ponieważ miałem tylko jedną partycję, ale kiedy stworzyłem nowy temat "wielopoziomowe" z powiedzeniem 6 partycji, pojawiły się podobne problemy, w których dodanie większej liczby konsumentów do tej samej grupy konsumentów spowodowało problemy z opóźnieniami. Rozejrzałem się i ludzie mówią mi, że powinienem używać konsumenta wielowątkowego - czy ktoś może rzucić na to światło?
Istnieje doskonały przykład konsumenta wysokiego poziomu [tutaj] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) dla kafka '0.8.1'. – chrsblck
@chrsblck dzięki za link.Sprawdziłem to wcześniej i prawdopodobnie nie rozumiałem tego tak dobrze jak mogłem - czy mógłbyś wyjaśnić nieco, w jaki sposób ten przykład wykorzystuje wątki? Nie w pełni rozumiem, co robią w tej chwili. –
Jednym ze sposobów jest posiadanie takiej samej liczby wątków jak partycji dla danego tematu. Z artykułu - Pobierz listę strumieni 'Lista> strumienie = consumerMap.get (topic);' ... Następnie przypisz każdemu wątkowi partycję 'executor.submit (new ConsumerTest (stream, threadNumber)) '. –
chrsblck