Jestem całkowicie nowym użytkownikiem Kafki i Avro i próbuję użyć pakietu konfluentującego. Mamy istniejące POJO, których używamy do JPA i chciałbym móc po prostu utworzyć instancję moich POJO bez konieczności ręcznego odzwierciedlania każdej wartości w generycznym rekordzie. Wygląda na to, że brakuje mi tego w dokumentacji.Konwertowanie danych na generyczne zapisy w pliku confluent.io i przesyłanie za pośrednictwem produktu KafkaProducer
Przykłady użyć rodzajowe rekord i ustawić każdą wartość jeden po drugim tak:
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}
Istnieje kilka przykładów dla uzyskania schematu z klasy i znalazłem adnotacje do zmiany tego schematu jest to konieczne. Teraz jak wziąć instancję POJO i po prostu wysłać ją do serializera tak jak jest i czy biblioteka wykonuje pracę polegającą na dopasowywaniu schematu do klasy, a następnie kopiowaniu wartości do ogólnego rekordu? Czy to wszystko źle? To, co chcę zrobić, to coś takiego:
String key = "key1";
Schema schema = ReflectData.get().getSchema(myObject.getClass());
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema);
record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}
Dzięki!