2015-12-14 27 views
8

Po prostu zaczynam od Apache Kafka/Zookeeper i mam problemy z konfiguracją klastra na AWS. Obecnie mam trzy serwery:Apache Kafka: Nie udało się zaktualizować metadanych/java.nio.channels.ClosedChannelException

Jeden prowadzony Zookeeper i dwa działające Kafka.

Mogę uruchamiać serwery Kafka bez problemu i tworzyć tematy na oba. Jednak problem pojawia się, gdy próbuję uruchomić producenta na jednym komputerze a konsumentem na drugiej:

na producenta Kafka:

kafka-console-producer.sh --broker-list <kafka server 1 aws public dns>:9092,<kafka server 2 aws public dns>:9092 --topic samsa 

konsumenta Kafka:

kafka-console-consumer.sh --zookeeper <zookeeper server ip>:2181 --topic samsa 

Piszę wiadomość na producenta ("cześć") i nic się nie dzieje przez jakiś czas. Potem dostaję komunikat:

ERROR Error when sending message to topic samsa with key: null, value: 2 bytes 
with error: Failed to update metadata after 60000 ms. 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 

Po stronie konsumenta i ten komunikat, który okresowo powtarza:

WARN Fetching topic metadata with correlation id # for topics [Set(samsa)] from broker [BrokerEndPoint(<broker.id>,<producer's advertised.host.name>,9092)] failed (kafka.client.ClientUtils$) 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) 
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) 
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 

po jakimś czasie, producent będzie następnie szybko zacząć rzucać ten komunikat o błędzie z # zwiększania przyrostowo:

WARN Error while fetching metadata with correlation id # : {samsa=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 

Nie wiesz, dokąd się udać. Daj mi znać, jeśli potrzebujesz więcej informacji o plikach konfiguracyjnych.

Odpowiedz

10

To był problem z konfiguracją.

Aby uzyskać to działa kilka zmian config plików musiało się stać:

W config/server.properties na każdym serwerze Kafka:

  • host.name: <Public IP>
  • advertised.host.name: <AWS Public DNS Address>

W config/producer.properties na każdy serwer Kafka:

  • metadata.broker.list: <Producer Server advertised.host.name>:<Producer Server port>,<Consumer Server advertised.host.name>:<Consumer Server port>

W/etc/hosts na każdym serwerze Kafka, zmień 127.0.0.1 localhost localhost.localdomain do:

<Public IP> localhost localhost.localdomain 
+0

@kellanburker powinienem ponownie uruchomić każdą usługę po edycji '/ etc/hosts'? –