2016-02-29 24 views
5

Obecnie używam Kafki 0.9.0.1. Zgodnie z niektórymi źródłami, które znalazłem, sposobem na ustawienie rozmiarów komunikatów jest zmodyfikowanie następujących kluczowych wartości w server.properties.Jak ustawić rozmiar wiadomości w Kafce?

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

Mój plik server.properties faktycznie ma te ustawienia.

message.max.bytes=10485760 
replica.fetch.max.bytes=20971520 
fetch.message.max.bytes=10485760 

Inne ustawienia, które mogą być istotne, znajdują się poniżej.

socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 

Jednak przy próbie wysłania wiadomości o ładowności od 4 do 6 MB klient nigdy nie otrzymuje żadnych wiadomości. Wydaje się, że producent wysyła wiadomości bez żadnych wyjątków. Jeśli wyślę mniejsze ładunki (np. < 1 MB), to odbiorca faktycznie odbierze wiadomości.

Masz pojęcie o tym, co robię źle pod względem ustawień konfiguracyjnych?

Oto przykładowy kod do wysłania wiadomości.

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps()); 
File dir = new File("/path/to/dir"); 
for(String s : dir.list()) { 
    File f = new File(dir, s); 
    byte[] data = Files.readAllBytes(f.toPath()); 
    Payload payload = new Payload(data); //a simple pojo to store payload 
    String key = String.valueOf(System.currentTimeMillis()); 
    byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[] 
    producer.send(new ProducerRecord<>("test", key, val)); 
} 
producer.close(); 

Oto przykładowy kod do otrzymania wiadomości.

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps()); 
consumer.subscribe(Arrays.asList("test")); 
while(true) { 
    ConsumerRecord<String, byte[]> records = consumer.poll(100); 
    for(ConsumerRecord<String, byte[]> record : records) { 
    long offset = record.offset(); 
    String key = record.key(); 
    byte[] val = record.value(); 
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object 
    System.out.println(
     System.format("offset=%d, key=%s", offset, key)); 
    } 
} 

Oto metody wypełniania plików właściwości dla producenta i konsumenta.

public static Properties getProducerProps() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("compression.type", "snappy"); 
    props.put("max.request.size", 10485760); //need this 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    return props; 
} 

public static Properties getConsumerProps() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("max.partition.fetch.bytes", 10485760); //need this too 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    return props; 
} 

Odpowiedz

7

Jane, Nie używaj fetch.message.max.bytes Przede wszystkim dlatego, że jest to właściwość, która jest od konsumenta, a nie iść w pliku server.properties i drugie, bo to dla starej wersji konsumenta, zamiast tego użyj max.partition.fetch.bytes podczas tworzenia konsumenta jako części właściwości, których używasz do utworzenia instancji.

+0

Po prostu próbowałem, ale mam ten sam efekt. Pliki "duże" nie są odbierane. Zastanawiam się, czy są one nawet wysyłane, ponieważ kiedy konsument zaczyna czytać od tematu, przesunięcia są ciągłe (np. 1, 2, 3 itd.). Wydaje mi się, że producent może nawet nie wysyłać dużych plików? –

+0

Okazuje się, że muszę ustawić zarówno 'max.request.size' dla producenta, jak i' max.partition.fetch.bytes' dla konsumenta. Będę trochę majsterkować z kodem, aby zobaczyć, czy 'max.partition.fetch.bytes' jest naprawdę potrzebny. –

+0

Tak, okazało się, że potrzebuję obu ustawień. Jeśli nie ustawię 'max.partition.fetch.bytes', otrzymam' RecordTooLargeException'. –