Używam wysokiej klasy klienta Python dla Kafki i chcę poznać najnowsze korekty dla każdej partycji tematu. Jednak nie mogę go uruchomić.Jak uzyskać najnowsze przesunięcie partycji dla tematu kafka?
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Ale wyjście mogę to
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
mam alternatywnego podejścia z wykorzystaniem assign
ale wynik jest taki sam
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
Wydaje niektórych z dokumentacji, że może uzyskać to zachowanie, jeśli nie zostało wydane fetch
. Ale nie mogę znaleźć sposobu, żeby to wymusić. Co ja robię źle?
Czy istnieje inny/prostszy sposób na uzyskanie najnowszych przesunięć tematów?
Nie 100% pozytywne, ale myślę, że kod jest zwrócenie wartości highwater przed 'Kafka-python' rzeczywiście podłączony do maklera . Ponieważ 'KafkaConsumer' jest asynchroniczne, myślę, że musisz rzeczywiście zużywać wiadomość, aby zapełnić wartość highwater: https://github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 –