Obecnie używam Kafki 0.9.0.1. Zgodnie z niektórymi źródłami, które znalazłem, sposobem na ustawienie rozmiarów komunikatów jest zmodyfikowanie następujących kluczowych wartości w server.properties
.Jak ustawić rozmiar wiadomości w Kafce?
- message.max.bytes
- replica.fetch.max.bytes
- fetch.message.max.bytes
Mój plik server.properties
faktycznie ma te ustawienia.
message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760
Inne ustawienia, które mogą być istotne, znajdują się poniżej.
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
Jednak przy próbie wysłania wiadomości o ładowności od 4 do 6 MB klient nigdy nie otrzymuje żadnych wiadomości. Wydaje się, że producent wysyła wiadomości bez żadnych wyjątków. Jeśli wyślę mniejsze ładunki (np. < 1 MB), to odbiorca faktycznie odbierze wiadomości.
Masz pojęcie o tym, co robię źle pod względem ustawień konfiguracyjnych?
Oto przykładowy kod do wysłania wiadomości.
Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
File f = new File(dir, s);
byte[] data = Files.readAllBytes(f.toPath());
Payload payload = new Payload(data); //a simple pojo to store payload
String key = String.valueOf(System.currentTimeMillis());
byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();
Oto przykładowy kod do otrzymania wiadomości.
KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
ConsumerRecord<String, byte[]> records = consumer.poll(100);
for(ConsumerRecord<String, byte[]> record : records) {
long offset = record.offset();
String key = record.key();
byte[] val = record.value();
Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
System.out.println(
System.format("offset=%d, key=%s", offset, key));
}
}
Oto metody wypełniania plików właściwości dla producenta i konsumenta.
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compression.type", "snappy");
props.put("max.request.size", 10485760); //need this
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.partition.fetch.bytes", 10485760); //need this too
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
Po prostu próbowałem, ale mam ten sam efekt. Pliki "duże" nie są odbierane. Zastanawiam się, czy są one nawet wysyłane, ponieważ kiedy konsument zaczyna czytać od tematu, przesunięcia są ciągłe (np. 1, 2, 3 itd.). Wydaje mi się, że producent może nawet nie wysyłać dużych plików? –
Okazuje się, że muszę ustawić zarówno 'max.request.size' dla producenta, jak i' max.partition.fetch.bytes' dla konsumenta. Będę trochę majsterkować z kodem, aby zobaczyć, czy 'max.partition.fetch.bytes' jest naprawdę potrzebny. –
Tak, okazało się, że potrzebuję obu ustawień. Jeśli nie ustawię 'max.partition.fetch.bytes', otrzymam' RecordTooLargeException'. –