Próbuję uzyskać i zapisać przesunięcie dla konkretnej wiadomości w Kafce, używając strumienia bezpośredniego Spark. Przeglądanie dokumentacji Spark to prosty sposób na uzyskanie przesunięć zakresu dla każdej partycji, ale potrzebuję zapisać offset początkowy dla każdej wiadomości tematu po pełnym skanowaniu kolejki.Czy możliwe jest uzyskanie określonego przesunięcia komunikatu w Kafce + SparkStreaming?
5
A
Odpowiedz
6
Tak, można użyć wersji MessageAndMetadata w wersji createDirectStream
, która umożliwia dostęp do message metadata
.
Możesz znaleźć przykład tutaj, który zwraca Dstream z tuple3
.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
W powyższym przykładzie tuple3._1
będzie miał topic
, tuple3._2
będzie miał offset
i tuple3._3
będzie miał message
.
Mam nadzieję, że to pomoże!
Jeśli mam rację, będę mógł czytać z określonego offsetu. Nadal zastanawiam się, czy istnieje prosty sposób obliczenia przesunięcia początkowego każdej wiadomości w obrębie partycji. Potrzebuję zapisać offset dla każdej wiadomości, a następnie użyć tego kodu do odczytania konkretnej wiadomości. Dziękuję Ci! –
Tak, masz rację, ale z powyższym kodem otrzymasz także offset związany z każdą wiadomością w 'messagesDStream'. Mam na myśli 'createDirectStream' daje' Dstream' 'Tuple3', aw każdej krotce otrzymasz' topic-name' oraz 'message' oraz związane z nim' offset'. – avr
Cześć, przepraszam za opóźnioną odpowiedź .. Działa. Zakładam jednak, że "odOffset" jest przesunięciem początkowym, z którego skanuje partycję. Dziękuję bardzo avr. –