2017-02-21 53 views
9

Chciałbym, aby aktor konsumencki zasubskrybował temat Kafki i przesyłał dane do dalszego przetwarzania za pomocą Spark Streaming poza konsumentem. Dlaczego aktor? Ponieważ przeczytałem, że jego strategia supervisora ​​byłaby świetnym sposobem na poradzenie sobie z awariami Kafki (np. Ponowne uruchomienie po awarii).Iskrzenie strumieniowe z aktora

znalazłem dwie opcje:

  • klasy Javy KafkaConsumer: jego metoda poll() Zwraca Map[String, Object]. Chciałbym, aby DStream został zwrócony podobnie jak KafkaUtils.createDirectStream i nie wiem, jak pobrać strumień spoza aktora.
  • Przedłużenie cechy ActorHelper i użyj actorStream(), jak pokazano w tym example. Ta ostatnia opcja nie wyświetla połączenia z tematem, ale z gniazdem.

Czy ktoś może wskazać mi właściwy kierunek?

Odpowiedz

2

Dla obsługi awarii Kafka, użyłem ram Apache kurator i następujące rozwiązania:

val client: CuratorFramework = ... // see docs 
val zk: CuratorZookeeperClient = client.getZookeeperClient 

/** 
    * This method returns false if kafka or zookeeper is down. 
    */ 
def isKafkaAvailable:Boolean = 
    Try { 
     if (zk.isConnected) { 
     val xs = client.getChildren.forPath("/brokers/ids") 
     xs.size() > 0 
     } 
     else false 
    }.getOrElse(false) 

Do spożywania tematów Kafka, użyłem biblioteki com.softwaremill.reactivekafka. Na przykład:

class KafkaConsumerActor extends Actor { 
    val kafka = new ReactiveKafka() 
    val config: ConsumerProperties[Array[Byte], Any] = ... // see docs 

    override def preStart(): Unit = { 
     super.preStart() 

     val publisher = kafka.consume(config) 
     Source.fromPublisher(publisher) 
      .map(handleKafkaRecord) 
      .to(Sink.ignore).run() 
    } 

    /** 
    * This method will be invoked when any kafka records will happen. 
    */ 
    def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = { 
     // handle record 
    } 
}