Jak korzystać z KafkaUtils.createDirectStream
z przesunięciami dla określonego Topic
w Pyspark?Jak utworzyć InputDStream z przesunięciami w PySpark (używając KafkaUtils.createDirectStream)?
7
A
Odpowiedz
7
Jeśli chcesz utworzyć RDD z rekordów w temacie Kafki, użyj statycznego zestawu krotek. dostępne
dokonać wszystkich import
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Następnie należy utworzyć słownik Kafki Brokers
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Następnie utworzeniu przesunięcia obiektu
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
Wreszcie Ci stworzyć RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
Aby utworzyć Stream z przesunięciami trzeba wykonać następujące czynności:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
Następnie należy utworzyć kontekst sparkstreaming używając sparkcontext
ssc = StreamingContext(sc, 1)
Następny Założyliśmy wszystkie nasze parametry
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
Następnie tworzymy naszą fromOffset słownik
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
końcu tworzymy Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
1
można zrobić:
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
{"metadata.broker.list": brokers}, fromOffsets = fromoffset)
Uwaga: Spark 2.2.0, Python 3.6
ale pojawia się błąd "TypeError: unhashable type:" TopicAndPartition "" – pangpang
To jest nieaktualne dla K afka 0.8 i Spark 2.0+ :( – rjurney