2017-12-15 161 views
6

Mam aplikację do przesyłania strumieniowego Spark napisaną w Javie i używanie Sparka 2.1. Używam KafkaUtils.createDirectStream do czytania wiadomości od Kafki. Używam kodera/dekodera kryo dla wiadomości kafka. Podałem to we właściwościach Kafki-> key.deserializer, value.deserializer, key.serializer, value.deserializer

Kiedy Spark pobiera wiadomości w mikroprocesie, wiadomości są dekodowane z powodzeniem za pomocą dekodera kryo. Zauważyłem jednak, że Executor Spark tworzy nową instancję dekodera kryo do dekodowania każdej wiadomości odczytanej z kafka. Sprawdziłem to, umieszczając logi wewnątrz konstruktora dekodera.

Wydaje mi się to dziwne. Czy nie powinno się używać tego samego wystąpienia dekodera dla każdej wiadomości i każdej partii?Dlaczego Kafka Direct Stream tworzy nowy dekoder dla każdej wiadomości?

Kod gdzie Czytam od Kafki:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams)); 

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> { 
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value()); 
}); 

Odpowiedz

3

Jeśli chcemy zobaczyć, jak Spark pobiera dane z Kafki wewnętrznie, musimy spojrzeć na KafkaRDD.compute, który realizowany jest metodą, która dla każdego RDD opowiada ramy, jak dobrze, że RDD obliczyć:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { 
    val part = thePart.asInstanceOf[KafkaRDDPartition] 
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) 
    if (part.fromOffset == part.untilOffset) { 
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + 
    s"skipping ${part.topic} ${part.partition}") 
    Iterator.empty 
    } else { 
    new KafkaRDDIterator(part, context) 
    } 
} 

Co ważne tu jest klauzula else, który tworzy KafkaRDDIterator. Ma to wewnętrznie:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[K]] 

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[V]] 

Które, jak widać, tworzy instancję zarówno klucz dekodera oraz dekodera wartości poprzez odbicie, dla każdej partycjibazowego. Oznacza to, że nie jest generowany na wiadomość, ale na partycję Kafki.

Dlaczego jest realizowany w ten sposób? Nie wiem Zakładam, że dekoder klucza i wartości powinien mieć nieodwracalne uderzenie wydajności w porównaniu do wszystkich innych przydziałów występujących w Sparku.

Jeśli profilujesz aplikację i uznałeś ją za ścieżkę dostępu do alokacji, możesz otworzyć problem. W przeciwnym razie nie martwiłbym się o to.

+0

Bardzo dobrze zbadane! #impressed –

+0

@ Yuval: Używam Kafki 0.10.x. Spark używa zbuforowanych konsumentów kafka (na executor), w których klucz pamięci podręcznej jest identyfikowany przez identyfikator klienta, identyfikator tematu, identyfikator partycji. Ma sens mieć dekoder na partycję kafka lub jak inaczej Spark będzie dekodować wiadomości równolegle. Oczekuję, że nowy dekoder musi zostać utworzony raz na partycję wewnątrz pamięci podręcznej konsumenta i to wszystko! Nie widzę tego problemu przy małym obciążeniu, ale tylko wtedy, gdy pompuję 1000 wiadomości na sekundę. Prawdopodobnie uruchamiam cykl "GC". Czy masz pojęcie, jak włączyć rejestrację w klasie KafkaRDD? – scorpio

+0

@scorpio Kafka 0.10.x nie wymaga wcale dekodera. Zwraca bazową 'ConsumerRecord' i wybierasz, co z nią zrobić. Czy tworzysz instancję dekodera wewnątrz "mapy"? –