2016-02-16 9 views
10

Używam wysokiej klasy klienta Python dla Kafki i chcę poznać najnowsze korekty dla każdej partycji tematu. Jednak nie mogę go uruchomić.Jak uzyskać najnowsze przesunięcie partycji dla tematu kafka?

from kafka import TopicPartition 
from kafka.consumer import KafkaConsumer 

con = KafkaConsumer(bootstrap_servers = brokers) 
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] 

con.assign(ps) 
for p in ps: 
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) 

print "Subscription = %s"%con.subscription() 
print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 

Ale wyjście mogę to

For partition 0 highwater is None 
For partition 1 highwater is None 
For partition 2 highwater is None 
For partition 3 highwater is None 
For partition 4 highwater is None 
For partition 5 highwater is None 
.... 
For partition 96 highwater is None 
For partition 97 highwater is None 
For partition 98 highwater is None 
For partition 99 highwater is None 
Subscription = None 
con.seek_to_beginning() = None 
con.seek_to_end() = None 

mam alternatywnego podejścia z wykorzystaniem assign ale wynik jest taki sam

con = KafkaConsumer(bootstrap_servers = brokers) 
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] 

con.assign(ps) 
for p in ps: 
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) 

print "Subscription = %s"%con.subscription() 
print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 
print "con.seek_to_end() = %s"%con.seek_to_end() 

Wydaje niektórych z dokumentacji, że może uzyskać to zachowanie, jeśli nie zostało wydane fetch. Ale nie mogę znaleźć sposobu, żeby to wymusić. Co ja robię źle?

Czy istnieje inny/prostszy sposób na uzyskanie najnowszych przesunięć tematów?

+0

Nie 100% pozytywne, ale myślę, że kod jest zwrócenie wartości highwater przed 'Kafka-python' rzeczywiście podłączony do maklera . Ponieważ 'KafkaConsumer' jest asynchroniczne, myślę, że musisz rzeczywiście zużywać wiadomość, aby zapełnić wartość highwater: https://github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 –

Odpowiedz

23

W końcu po spędzeniu jednego dnia na tym i kilku fałszywych początkach udało mi się znaleźć rozwiązanie i sprawić, by działało. Umieszczanie jej tak, aby inni mogli się do niej odwoływać.

from kafka import SimpleClient 
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy 
from kafka.common import OffsetRequestPayload 

client = SimpleClient(brokers) 

partitions = client.topic_partitions[topic] 
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] 

offsets_responses = client.send_offset_request(offset_requests) 

for r in offsets_responses: 
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0]) 
+1

Czy istnieje sposób uzyskać bieżący/następny offset na konsumenta/grupę na partycję? – GreenThumb

+0

Niestety, SimpleClient został uznany za przestarzały, a powyższe wartości parametru offsets_response powodują błąd FailedPayloadsError: FailedPayloadsError – dreynold

9

Jeśli chcesz korzystać Kafka skrypty powłoki obecne w kafka/bin, a następnie można uzyskać najnowsze i najmniejsze przesunięcia za pomocą kafka-run-class.sh.

Aby uzyskać najnowsze polecenie przesunięcia będzie wyglądać następująco

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname 

Aby uzyskać najmniejszą polecenie przesunięcia będzie wyglądać następująco

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname 

można znaleźć więcej informacji na temat Get offsety Shell z następujących link

Mam nadzieję, że to pomoże!

4
from kafka import KafkaConsumer, TopicPartition 

TOPIC = 'MYTOPIC' 
GROUP = 'MYGROUP' 
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092'] 

consumer = KafkaConsumer(
     bootstrap_servers=BOOTSTRAP_SERVERS, 
     group_id=GROUP, 
     enable_auto_commit=False 
    ) 


for p in consumer.partitions_for_topic(TOPIC): 
    tp = TopicPartition(TOPIC, p) 
    consumer.assign([tp]) 
    committed = consumer.committed(tp) 
    consumer.seek_to_end(tp) 
    last_offset = consumer.position(tp) 
    print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed))) 

consumer.close(autocommit=False) 
0

Innym sposobem osiągnięcia tego celu jest przez odpytywanie konsument musi otrzymać ostatnią zużywanej offsetu, a następnie za pomocą metody seek_to_end celu uzyskania najnowszej dostępnej partycji offset.

from kafka import KafkaConsumer 
consumer = KafkaConsumer('my-topic', 
        group_id='my-group', 
        bootstrap_servers=['localhost:9092']) 
consumer.poll() 
consumer.seek_to_end() 

Ta metoda szczególnie przydaje się podczas korzystania z grup konsumenckich.

Źródła:

  1. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll
  2. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end
+0

Mój serwer ma setki wiadomości, ale funkcja consumer.poll() zwróciła {} – Nick