Mam aplikację do przesyłania strumieniowego Spark napisaną w Javie i używanie Sparka 2.1. Używam KafkaUtils.createDirectStream
do czytania wiadomości od Kafki. Używam kodera/dekodera kryo dla wiadomości kafka. Podałem to we właściwościach Kafki-> key.deserializer, value.deserializer, key.serializer, value.deserializer
Kiedy Spark pobiera wiadomości w mikroprocesie, wiadomości są dekodowane z powodzeniem za pomocą dekodera kryo. Zauważyłem jednak, że Executor Spark tworzy nową instancję dekodera kryo do dekodowania każdej wiadomości odczytanej z kafka. Sprawdziłem to, umieszczając logi wewnątrz konstruktora dekodera.
Wydaje mi się to dziwne. Czy nie powinno się używać tego samego wystąpienia dekodera dla każdej wiadomości i każdej partii?Dlaczego Kafka Direct Stream tworzy nowy dekoder dla każdej wiadomości?
Kod gdzie Czytam od Kafki:
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
Bardzo dobrze zbadane! #impressed –
@ Yuval: Używam Kafki 0.10.x. Spark używa zbuforowanych konsumentów kafka (na executor), w których klucz pamięci podręcznej jest identyfikowany przez identyfikator klienta, identyfikator tematu, identyfikator partycji. Ma sens mieć dekoder na partycję kafka lub jak inaczej Spark będzie dekodować wiadomości równolegle. Oczekuję, że nowy dekoder musi zostać utworzony raz na partycję wewnątrz pamięci podręcznej konsumenta i to wszystko! Nie widzę tego problemu przy małym obciążeniu, ale tylko wtedy, gdy pompuję 1000 wiadomości na sekundę. Prawdopodobnie uruchamiam cykl "GC". Czy masz pojęcie, jak włączyć rejestrację w klasie KafkaRDD? – scorpio
@scorpio Kafka 0.10.x nie wymaga wcale dekodera. Zwraca bazową 'ConsumerRecord' i wybierasz, co z nią zrobić. Czy tworzysz instancję dekodera wewnątrz "mapy"? –