2016-08-04 23 views
5

Próbuję wdrożyć konsumenta kafka w scala. Widziałem milion samouczków, jak to zrobić w Javie, a nawet niektóre (like this one), które mówią, że jest to scala, ale jest napisane w Javie.Jak wdrożyć Kafka Consumer w Scala

Czy ktoś wie, gdzie mogę znaleźć przykład, jak go napisać w Scali? Dopiero zacząłem się uczyć Scali, więc może ten połączony przykład może być użyty w Scali, mimo że jest napisany na Jawie lub czymś podobnym, ale szczerze mówiąc nie mam pojęcia, co robię w tej chwili. Wszystko, co google, po prostu łączy mnie z tym, jak to zrobić w Javie.

+0

Możesz używać całego kodu Java w Scali z bardzo małymi zmianami. –

+0

Czy mogę po prostu zrobić klasę w Javie, a następnie po prostu zaimportować ją do klasy, w której chcę go użyć? Czy będę musiał przepisać wszystkie zmienne i rzeczy na scala? – annedroiid

+0

Nieważne, mój test scala nie rozpozna klasy java. Oto klasa Java (http://pastebin.com/tnS9Amie), po prostu nie wiem wystarczająco dużo o scala, aby ją przekonwertować. Czy wygląda na to, że będzie to możliwe? – annedroiid

Odpowiedz

4

Powód, dla którego większość przykładów znajduje się w Javie, polega na tym, że nowy KafkaProducer począwszy od 0.8.2.2 jest napisany w Javie.

Zakładając, że używasz SBT jako system kompilacji, a zakładając, że pracuje z Kafki 0.8.2.2 (można zmienić wersję w razie potrzeby), musisz:

libraryDependencies ++= { 
    Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2", 
    "org.apache.kafka" % "kafka-clients" % "0.8.2.2", 
) 
} 

Prostym przykładem powinny Ci zacząć:

import scala.collection.JavaConverters._ 
import org.apache.kafka.clients.consumer.KafkaConsumer 
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample { 
    def main(args: Array[String]): Unit = { 
    val properties = new Properties() 
    properties.put("bootstrap.servers", "localhost:9092") 
    properties.put("group.id", "consumer-tutorial") 
    properties.put("key.deserializer", classOf[StringDeserializer]) 
    properties.put("value.deserializer", classOf[StringDeserializer]) 

    val kafkaConsumer = new KafkaConsumer[String, String](properties) 
    kafkaConsumer.subscribe("firstTopic", "secondTopic") 

    while (true) { 
     val results = kafkaConsumer.poll(2000).asScala 
     for ((topic, data) <- results) { 
     // Do stuff 
     } 
    } 
} 
+0

Czy konsument nie powinien rozmawiać z klientami zookeeperem, a nie z brokerami? –

+1

@AvihooMamka Kafka nie "wymaga" od ZooKeepera śledzenia przesunięć. Od Ciebie zależy, jak to zrobisz. I na ogół konsument rozmawia z pośrednikami o konsumpcji. –

+0

'wyniki' zawsze mają wartość' null'. Co przegapiłem? – ItayB